requestRef) {
try {
// for backward compatibility
- return submitClientRequestAsync(requestRef.retain())
- .whenComplete((r, e) -> requestRef.release());
+ return submitClientRequestAsync(requestRef.retain());
} catch (Exception e) {
- requestRef.release();
return JavaUtils.completeExceptionally(e);
+ } finally {
+ requestRef.release();
}
}
}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index ed41f1ea2c..18c157130b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -474,7 +474,13 @@ public long getTimeoutMs() {
@Override
public String toString() {
- return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
- + type + ", " + getMessage();
+ return toStringShort() + ", " + getMessage();
+ }
+
+ /**
+ * @return a short string which does not include {@link #message}.
+ */
+ public String toStringShort() {
+ return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", " + type;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
index 8098039d1b..8db842d734 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
@@ -17,7 +17,6 @@
*/
package org.apache.ratis.protocol;
-import javax.annotation.concurrent.Immutable;
import org.apache.ratis.proto.RaftProtos.RaftPeerIdProto;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
@@ -34,7 +33,6 @@
*
* This is a value-based class.
*/
-@Immutable
public final class RaftPeerId {
private static final Map BYTE_STRING_MAP = new ConcurrentHashMap<>();
private static final Map STRING_MAP = new ConcurrentHashMap<>();
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotManagementRequest.java
index 2ea2059b51..269fdfc591 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotManagementRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SnapshotManagementRequest.java
@@ -24,7 +24,16 @@ public final class SnapshotManagementRequest extends RaftClientRequest {
public abstract static class Op {
}
- public static class Create extends Op {
+
+ public static final class Create extends Op {
+ private final long creationGap;
+ private Create(long creationGap) {
+ this.creationGap = creationGap;
+ }
+
+ public long getCreationGap() {
+ return creationGap;
+ }
@Override
public String toString() {
@@ -35,8 +44,13 @@ public String toString() {
public static SnapshotManagementRequest newCreate(ClientId clientId,
RaftPeerId serverId, RaftGroupId groupId, long callId, long timeoutMs) {
+ return newCreate(clientId, serverId, groupId, callId, timeoutMs, 0);
+ }
+
+ public static SnapshotManagementRequest newCreate(ClientId clientId,
+ RaftPeerId serverId, RaftGroupId groupId, long callId, long timeoutMs, long creationGap) {
return new SnapshotManagementRequest(clientId,
- serverId, groupId, callId, timeoutMs,new SnapshotManagementRequest.Create());
+ serverId, groupId, callId, timeoutMs, new SnapshotManagementRequest.Create(creationGap));
}
private final Op op;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/NotLeaderException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/NotLeaderException.java
index 8d5c2cb4e9..c7dc6a3961 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/NotLeaderException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/NotLeaderException.java
@@ -30,7 +30,8 @@ public class NotLeaderException extends RaftException {
private final Collection peers;
public NotLeaderException(RaftGroupMemberId memberId, RaftPeer suggestedLeader, Collection peers) {
- super("Server " + memberId + " is not the leader" + (suggestedLeader != null? " " + suggestedLeader: ""));
+ super("Server " + memberId + " is not the leader" +
+ (suggestedLeader != null ? ", suggested leader is: " + suggestedLeader : ""));
this.suggestedLeader = suggestedLeader;
this.peers = peers != null? Collections.unmodifiableCollection(peers): Collections.emptyList();
Preconditions.assertUnique(this.peers);
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/NotReplicatedException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/NotReplicatedException.java
index 5f48654eec..37ff816245 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/NotReplicatedException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/NotReplicatedException.java
@@ -17,12 +17,17 @@
*/
package org.apache.ratis.protocol.exceptions;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import java.util.Collection;
+
public class NotReplicatedException extends RaftException {
private final long callId;
private final ReplicationLevel requiredReplication;
private final long logIndex;
+ /** This is only populated on client-side since RaftClientReply already has commitInfos */
+ private Collection commitInfos;
public NotReplicatedException(long callId, ReplicationLevel requiredReplication, long logIndex) {
super("Request with call Id " + callId + " and log index " + logIndex
@@ -32,6 +37,12 @@ public NotReplicatedException(long callId, ReplicationLevel requiredReplication,
this.logIndex = logIndex;
}
+ public NotReplicatedException(long callId, ReplicationLevel requiredReplication, long logIndex,
+ Collection commitInfos) {
+ this(callId, requiredReplication, logIndex);
+ this.commitInfos = commitInfos;
+ }
+
public long getCallId() {
return callId;
}
@@ -43,4 +54,8 @@ public ReplicationLevel getRequiredReplication() {
public long getLogIndex() {
return logIndex;
}
+
+ public Collection getCommitInfos() {
+ return commitInfos;
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
index 9ccd66ad71..38dad5c499 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/BatchLogger.java
@@ -35,7 +35,13 @@ public final class BatchLogger {
private BatchLogger() {
}
- public interface Key {}
+ public interface Key {
+ TimeDuration DEFAULT_DURATION = TimeDuration.valueOf(5, TimeUnit.SECONDS);
+
+ default TimeDuration getBatchDuration() {
+ return DEFAULT_DURATION;
+ }
+ }
private static final class UniqueId {
private final Key key;
@@ -93,6 +99,10 @@ private synchronized boolean tryStartBatch(Consumer op) {
private static final TimeoutExecutor SCHEDULER = TimeoutExecutor.getInstance();
private static final ConcurrentMap LOG_CACHE = new ConcurrentHashMap<>();
+ public static void warn(Key key, String name, Consumer op) {
+ warn(key, name, op, key.getBatchDuration(), true);
+ }
+
public static void warn(Key key, String name, Consumer op, TimeDuration batchDuration) {
warn(key, name, op, batchDuration, true);
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
index 842b8f1549..fb0f0715c5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
@@ -29,6 +29,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import java.util.function.ToLongFunction;
/**
@@ -46,6 +47,8 @@ public class DataBlockingQueue extends DataQueue {
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
+ private boolean closed = false;
+
public DataBlockingQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToLongFunction getNumBytes) {
super(name, byteLimit, elementLimit, getNumBytes);
}
@@ -72,10 +75,34 @@ public void clear() {
}
}
+ /** Apply the given handler to each element and then {@link #clear()}. */
+ public void clear(Consumer handler) {
+ try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+ for(E e : this) {
+ handler.accept(e);
+ }
+ super.clear();
+ }
+ }
+
+ /**
+ * Close this queue to stop accepting new elements, i.e. the offer(…) methods always return false.
+ * Note that closing the queue will not clear the existing elements.
+ * The existing elements can be peeked, polled or cleared after close.
+ */
+ public void close() {
+ try(AutoCloseableLock ignored = AutoCloseableLock.acquire(lock)) {
+ closed = true;
+ }
+ }
+
@Override
public boolean offer(E element) {
Objects.requireNonNull(element, "element == null");
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+ if (closed) {
+ return false;
+ }
if (super.offer(element)) {
notEmpty.signal();
return true;
@@ -95,6 +122,9 @@ public boolean offer(E element, TimeDuration timeout) throws InterruptedExceptio
long nanos = timeout.toLong(TimeUnit.NANOSECONDS);
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
for(;;) {
+ if (closed) {
+ return false;
+ }
if (super.offer(element)) {
notEmpty.signal();
return true;
@@ -162,4 +192,11 @@ public List pollList(long timeoutM
return results;
}
}
+
+ @Override
+ public E peek() {
+ try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
+ return super.peek();
+ }
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
index 3db06f56e6..38762caa17 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
@@ -154,6 +154,11 @@ public E poll() {
return polled;
}
+ /** Peek the head element from this queue. */
+ public E peek() {
+ return q.peek();
+ }
+
/** The same as {@link java.util.Collection#remove(Object)}. */
public boolean remove(E e) {
final boolean removed = q.remove(e);
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 00725903a7..7d1d75309a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -229,7 +229,7 @@ static RETURN attempt(
}
if (log != null && log.isWarnEnabled()) {
log.warn("FAILED \"" + name.get() + "\", attempt #" + i + "/" + numAttempts
- + ": " + t + ", sleep " + sleepTime + " and then retry.", t);
+ + ", sleep " + sleepTime + " and then retry: " + t);
}
}
@@ -257,7 +257,6 @@ static void attemptUntilTrue(
}, numAttempts, sleepTime, name, log);
}
-
static Timer runRepeatedly(Runnable runnable, long delay, long period, TimeUnit unit) {
final Timer timer = new Timer(true);
timer.schedule(new TimerTask() {
@@ -283,6 +282,10 @@ static CompletableFuture completeExceptionally(Throwable t) {
return future;
}
+ static boolean isCompletedNormally(CompletableFuture> future) {
+ return future.isDone() && !future.isCancelled() && !future.isCompletedExceptionally();
+ }
+
static Throwable unwrapCompletionException(Throwable t) {
Objects.requireNonNull(t, "t == null");
return t instanceof CompletionException && t.getCause() != null? t.getCause(): t;
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JmxRegister.java b/ratis-common/src/main/java/org/apache/ratis/util/JmxRegister.java
index 54f7989245..4554410488 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JmxRegister.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JmxRegister.java
@@ -46,15 +46,16 @@ static ObjectName tryRegister(String name, Object mBean) {
}
/**
- * Try registering the mBean with the names one by one.
+ * Try registering the mxBean with the names one by one.
* @return the registered name, or, if it fails, return null.
*/
- public synchronized String register(Object mBean, Iterable> names) {
+ public synchronized String register(Object mxBean, Iterable> names) {
if (registeredName == null) {
for (Supplier supplier : names) {
final String name = supplier.get();
- registeredName = tryRegister(name, mBean);
+ registeredName = tryRegister(name, mxBean);
if (registeredName != null) {
+ LOG.info("register mxBean {} as {}", mxBean.getClass(), name);
return name;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
new file mode 100644
index 0000000000..ccaa981eb9
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to
+ * observe resource object life-cycle and assert proper resource closure before they are GCed.
+ *
+ *
+ * Example usage:
+ *
+ *
{@code
+ * class MyResource implements AutoClosable {
+ * static final LeakDetector LEAK_DETECTOR = new LeakDetector("MyResource");
+ *
+ * private final UncheckedAutoCloseable leakTracker = LEAK_DETECTOR.track(this, () -> {
+ * // report leaks, don't refer to the original object (MyResource) here.
+ * System.out.println("MyResource is not closed before being discarded.");
+ * });
+ *
+ * @Override
+ * public void close() {
+ * // proper resources cleanup...
+ * // inform tracker that this object is closed properly.
+ * leakTracker.close();
+ * }
+ * }
+ *
+ * }
+ */
+public class LeakDetector {
+ private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class);
+
+ private static class LeakTrackerSet {
+ private final Set set = Collections.newSetFromMap(new HashMap<>());
+
+ synchronized boolean remove(LeakTracker tracker) {
+ return set.remove(tracker);
+ }
+
+ synchronized void removeExisting(LeakTracker tracker) {
+ final boolean removed = set.remove(tracker);
+ Preconditions.assertTrue(removed, () -> "Failed to remove existing " + tracker);
+ }
+
+ synchronized LeakTracker add(Object referent, ReferenceQueue