From 651f7633a7c9944ca35d7e5dc9a2d0d1397c43f6 Mon Sep 17 00:00:00 2001 From: "kongzhi.jy" Date: Tue, 30 Dec 2025 16:01:24 +0800 Subject: [PATCH 1/8] fix listener role transition --- .../ratis/server/impl/RaftServerImpl.java | 26 +++++++++++++++++-- .../server/impl/LeaderElectionTests.java | 5 ++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 689bb8cefd..0427490757 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1570,6 +1570,7 @@ private CompletableFuture appendEntriesAsync(RaftPeerId final Optional followerState; final Timekeeper.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time(); final CompletableFuture future; + final CompletableFuture checkListenerFuture; synchronized (this) { // Check life cycle state again to avoid the PAUSING/PAUSED state. assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); @@ -1610,8 +1611,9 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde } state.updateConfiguration(entries); + checkListenerFuture = checkAndUpdateListenerState(); } - future.join(); + CompletableFuture.allOf(future, checkListenerFuture).join(); final CompletableFuture appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null) : appendLog(entries); @@ -1645,7 +1647,27 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde return reply; }); } - private CompletableFuture appendLog(List entries) { + + /** + * The listener checks whether it can become a follower. + * + * @return + */ + private CompletableFuture checkAndUpdateListenerState() { + CompletableFuture future = CompletableFuture.completedFuture(null); + if (role.getCurrentRole() == RaftPeerRole.LISTENER) { + if (state.getRaftConf().isStable() && state.getRaftConf().getPeer(getId()) != null) { + Object reason = "Listener transitionRole"; + setRole(RaftPeerRole.FOLLOWER, reason); + future = role.shutdownFollowerState(); + role.startFollowerState(this, reason); + setFirstElection(reason); + } + } + return future; + } + + private CompletableFuture appendLog(List entries) { final List entriesTermIndices = ConsecutiveIndices.convert(entries); if (!appendLogTermIndices.append(entriesTermIndices)) { // index already exists, return the last future diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 456d2ad2a2..4764b3b812 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -61,6 +61,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.ratis.RaftTestUtil.getPeersWithPriority; import static org.apache.ratis.RaftTestUtil.waitForLeader; @@ -556,6 +557,10 @@ public void testChangeListenerToFollower() throws Exception { assertTrue(reply.isSuccess()); Collection peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER); assertEquals(0, peer.size()); + + listeners = cluster.getListeners() + .stream().map(RaftServer.Division::getPeer).collect(Collectors.toList()); + assertEquals(0, listeners.size()); } cluster.shutdown(); } From 78cfec73b787454d468f17e73b41c61b79fdbd34 Mon Sep 17 00:00:00 2001 From: "kongzhi.jy" Date: Tue, 30 Dec 2025 16:14:51 +0800 Subject: [PATCH 2/8] update --- .../java/org/apache/ratis/server/impl/LeaderElectionTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 4764b3b812..6959bd3422 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -61,7 +61,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.ratis.RaftTestUtil.getPeersWithPriority; import static org.apache.ratis.RaftTestUtil.waitForLeader; From 7c81d7043852f0e480a712179dcdca16ad84a787 Mon Sep 17 00:00:00 2001 From: "kongzhi.jy" Date: Wed, 31 Dec 2025 10:52:17 +0800 Subject: [PATCH 3/8] update --- .../ratis/server/impl/RaftServerImpl.java | 35 ++++++------------- 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 0427490757..da9ff15ce1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -591,7 +591,7 @@ private synchronized CompletableFuture changeToFollower( throw new IllegalStateException("Unexpected role " + old); } CompletableFuture future = CompletableFuture.completedFuture(null); - if ((old != RaftPeerRole.FOLLOWER || force) && old != RaftPeerRole.LISTENER) { + if (shouldSetFollower(old, force)) { setRole(RaftPeerRole.FOLLOWER, reason); if (old == RaftPeerRole.LEADER) { future = role.shutdownLeaderState(false) @@ -607,7 +607,7 @@ private synchronized CompletableFuture changeToFollower( state.setLeader(null, reason); } else if (old == RaftPeerRole.CANDIDATE) { future = role.shutdownLeaderElection(); - } else if (old == RaftPeerRole.FOLLOWER) { + } else if (old == RaftPeerRole.FOLLOWER || old == RaftPeerRole.LISTENER) { future = role.shutdownFollowerState(); } @@ -620,6 +620,14 @@ private synchronized CompletableFuture changeToFollower( return future; } + private boolean shouldSetFollower(RaftPeerRole old, boolean force) { + if (old == RaftPeerRole.LISTENER) { + final RaftConfigurationImpl conf = state.getRaftConf(); + return conf.isStable() && conf.containsInConf(getId()); + } + return old != RaftPeerRole.FOLLOWER || force; + } + synchronized CompletableFuture changeToFollowerAndPersistMetadata( long newTerm, boolean allowListener, @@ -1570,7 +1578,6 @@ private CompletableFuture appendEntriesAsync(RaftPeerId final Optional followerState; final Timekeeper.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time(); final CompletableFuture future; - final CompletableFuture checkListenerFuture; synchronized (this) { // Check life cycle state again to avoid the PAUSING/PAUSED state. assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); @@ -1611,9 +1618,8 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde } state.updateConfiguration(entries); - checkListenerFuture = checkAndUpdateListenerState(); } - CompletableFuture.allOf(future, checkListenerFuture).join(); + future.join(); final CompletableFuture appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null) : appendLog(entries); @@ -1648,25 +1654,6 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde }); } - /** - * The listener checks whether it can become a follower. - * - * @return - */ - private CompletableFuture checkAndUpdateListenerState() { - CompletableFuture future = CompletableFuture.completedFuture(null); - if (role.getCurrentRole() == RaftPeerRole.LISTENER) { - if (state.getRaftConf().isStable() && state.getRaftConf().getPeer(getId()) != null) { - Object reason = "Listener transitionRole"; - setRole(RaftPeerRole.FOLLOWER, reason); - future = role.shutdownFollowerState(); - role.startFollowerState(this, reason); - setFirstElection(reason); - } - } - return future; - } - private CompletableFuture appendLog(List entries) { final List entriesTermIndices = ConsecutiveIndices.convert(entries); if (!appendLogTermIndices.append(entriesTermIndices)) { From 61c50e923154c72ec708c7317a68b17548035beb Mon Sep 17 00:00:00 2001 From: "kongzhi.jy" Date: Wed, 31 Dec 2025 10:53:22 +0800 Subject: [PATCH 4/8] update --- .../main/java/org/apache/ratis/server/impl/RaftServerImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index da9ff15ce1..60f72e001e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1653,8 +1653,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde return reply; }); } - - private CompletableFuture appendLog(List entries) { + private CompletableFuture appendLog(List entries) { final List entriesTermIndices = ConsecutiveIndices.convert(entries); if (!appendLogTermIndices.append(entriesTermIndices)) { // index already exists, return the last future From c442e38bfa314cac83647ef7ec58fc4f442b151e Mon Sep 17 00:00:00 2001 From: "kongzhi.jy" Date: Wed, 31 Dec 2025 14:18:07 +0800 Subject: [PATCH 5/8] update unit tests --- .../org/apache/ratis/server/impl/LeaderElectionTests.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 6959bd3422..51d218f843 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -557,6 +557,11 @@ public void testChangeListenerToFollower() throws Exception { Collection peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER); assertEquals(0, peer.size()); + // multiple messages to trigger listener change role to follower + client.io().send(new RaftTestUtil.SimpleMessage("message")); + client.io().send(new RaftTestUtil.SimpleMessage("message")); + client.io().send(new RaftTestUtil.SimpleMessage("message")); + listeners = cluster.getListeners() .stream().map(RaftServer.Division::getPeer).collect(Collectors.toList()); assertEquals(0, listeners.size()); From b69dd4abea7381fc3a7baed87a7bd810aee50222 Mon Sep 17 00:00:00 2001 From: "kongzhi.jy" Date: Wed, 31 Dec 2025 15:06:29 +0800 Subject: [PATCH 6/8] update setRaftConf --- .../ratis/server/impl/RaftServerImpl.java | 18 ++++++++++++++++++ .../apache/ratis/server/impl/ServerState.java | 1 + .../ratis/server/impl/LeaderElectionTests.java | 5 ----- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 60f72e001e..085b098723 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1653,6 +1653,24 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde return reply; }); } + + + /** + * check whether listener should change to follower + */ + CompletableFuture checkAndUpdateListenerState() { + CompletableFuture future = CompletableFuture.completedFuture(null); + if (role.getCurrentRole() == RaftPeerRole.LISTENER) { + RaftConfigurationImpl currentConf = getRaftConf(); + if (currentConf.isStable() && currentConf.containsInConf(getId())) { + Object reason = "Listener transitionRole"; + final AtomicBoolean termUpdated = new AtomicBoolean(); + future = changeToFollower(state.getCurrentTerm(), false, true, reason, termUpdated); + } + } + return future; + } + private CompletableFuture appendLog(List entries) { final List entriesTermIndices = ConsecutiveIndices.convert(entries); if (!appendLogTermIndices.append(entriesTermIndices)) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index ee1b7d37b7..d3b430414e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -389,6 +389,7 @@ void setRaftConf(RaftConfiguration conf) { if (!listeners.isEmpty()) { server.getServerRpc().addRaftPeers(listeners); } + server.checkAndUpdateListenerState().join(); LOG.info("{}: set configuration {}", getMemberId(), conf); LOG.trace("{}: {}", getMemberId(), configurationManager); } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 51d218f843..6959bd3422 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -557,11 +557,6 @@ public void testChangeListenerToFollower() throws Exception { Collection peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER); assertEquals(0, peer.size()); - // multiple messages to trigger listener change role to follower - client.io().send(new RaftTestUtil.SimpleMessage("message")); - client.io().send(new RaftTestUtil.SimpleMessage("message")); - client.io().send(new RaftTestUtil.SimpleMessage("message")); - listeners = cluster.getListeners() .stream().map(RaftServer.Division::getPeer).collect(Collectors.toList()); assertEquals(0, listeners.size()); From 32b4b7e2f75f730fe7942bd5b1d3b3bde505f4ab Mon Sep 17 00:00:00 2001 From: "kongzhi.jy" Date: Wed, 31 Dec 2025 15:27:32 +0800 Subject: [PATCH 7/8] update --- .../org/apache/ratis/server/impl/RaftServerImpl.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 085b098723..0cd83d4d89 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1661,12 +1661,9 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde CompletableFuture checkAndUpdateListenerState() { CompletableFuture future = CompletableFuture.completedFuture(null); if (role.getCurrentRole() == RaftPeerRole.LISTENER) { - RaftConfigurationImpl currentConf = getRaftConf(); - if (currentConf.isStable() && currentConf.containsInConf(getId())) { - Object reason = "Listener transitionRole"; - final AtomicBoolean termUpdated = new AtomicBoolean(); - future = changeToFollower(state.getCurrentTerm(), false, true, reason, termUpdated); - } + Object reason = "Listener transitionRole"; + final AtomicBoolean termUpdated = new AtomicBoolean(); + future = changeToFollower(state.getCurrentTerm(), false, true, reason, termUpdated); } return future; } From 9738f770b12c05a71d700b067651cec08ee22a5a Mon Sep 17 00:00:00 2001 From: joecqupt Date: Sat, 3 Jan 2026 23:02:26 +0800 Subject: [PATCH 8/8] update --- .../ratis/server/impl/RaftServerImpl.java | 15 ------------- .../apache/ratis/server/impl/ServerState.java | 22 ++++++++++++++----- .../impl/SnapshotInstallationHandler.java | 3 ++- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 0cd83d4d89..60f72e001e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1653,21 +1653,6 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde return reply; }); } - - - /** - * check whether listener should change to follower - */ - CompletableFuture checkAndUpdateListenerState() { - CompletableFuture future = CompletableFuture.completedFuture(null); - if (role.getCurrentRole() == RaftPeerRole.LISTENER) { - Object reason = "Listener transitionRole"; - final AtomicBoolean termUpdated = new AtomicBoolean(); - future = changeToFollower(state.getCurrentTerm(), false, true, reason, termUpdated); - } - return future; - } - private CompletableFuture appendLog(List entries) { final List entriesTermIndices = ConsecutiveIndices.convert(entries); if (!appendLogTermIndices.append(entriesTermIndices)) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index d3b430414e..bcf11baf7a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -376,10 +376,12 @@ boolean isConfCommitted() { return getLog().getLastCommittedIndex() >= getRaftConf().getLogEntryIndex(); } - void setRaftConf(LogEntryProto entry) { + private boolean setRaftConf(LogEntryProto entry) { if (entry.hasConfigurationEntry()) { setRaftConf(LogProtoUtils.toRaftConfiguration(entry)); + return true; } + return false; } void setRaftConf(RaftConfiguration conf) { @@ -389,7 +391,6 @@ void setRaftConf(RaftConfiguration conf) { if (!listeners.isEmpty()) { server.getServerRpc().addRaftPeers(listeners); } - server.checkAndUpdateListenerState().join(); LOG.info("{}: set configuration {}", getMemberId(), conf); LOG.trace("{}: {}", getMemberId(), configurationManager); } @@ -398,10 +399,19 @@ void truncate(long logIndex) { configurationManager.removeConfigurations(logIndex); } - void updateConfiguration(List entries) { - if (entries != null && !entries.isEmpty()) { - configurationManager.removeConfigurations(entries.get(0).getIndex()); - entries.forEach(this::setRaftConf); + void updateConfiguration(List entries) throws IOException { + if (entries == null || entries.isEmpty()) { + return; + } + configurationManager.removeConfigurations(entries.get(0).getIndex()); + + boolean changed = false; + for(LogEntryProto entry : entries) { + changed |= setRaftConf(entry); + } + + if (changed && server.getRole().getCurrentRole() == RaftPeerRole.LISTENER) { + server.changeToFollowerAndPersistMetadata(getCurrentTerm(), true, "setRaftConf").join(); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index eac690feb2..46b6aaf87f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collections; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -145,7 +146,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt state.truncate(proto.getIndex()); if (!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) { LOG.info("{}: set new configuration {} from snapshot", getMemberId(), ProtoUtils.shortDebugString(proto)); - state.setRaftConf(proto); + state.updateConfiguration(Collections.singletonList(proto)); state.writeRaftConfiguration(proto); server.getStateMachine().event().notifyConfigurationChanged( proto.getTerm(), proto.getIndex(), proto.getConfigurationEntry());