diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 689bb8cefd..60f72e001e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -591,7 +591,7 @@ private synchronized CompletableFuture changeToFollower( throw new IllegalStateException("Unexpected role " + old); } CompletableFuture future = CompletableFuture.completedFuture(null); - if ((old != RaftPeerRole.FOLLOWER || force) && old != RaftPeerRole.LISTENER) { + if (shouldSetFollower(old, force)) { setRole(RaftPeerRole.FOLLOWER, reason); if (old == RaftPeerRole.LEADER) { future = role.shutdownLeaderState(false) @@ -607,7 +607,7 @@ private synchronized CompletableFuture changeToFollower( state.setLeader(null, reason); } else if (old == RaftPeerRole.CANDIDATE) { future = role.shutdownLeaderElection(); - } else if (old == RaftPeerRole.FOLLOWER) { + } else if (old == RaftPeerRole.FOLLOWER || old == RaftPeerRole.LISTENER) { future = role.shutdownFollowerState(); } @@ -620,6 +620,14 @@ private synchronized CompletableFuture changeToFollower( return future; } + private boolean shouldSetFollower(RaftPeerRole old, boolean force) { + if (old == RaftPeerRole.LISTENER) { + final RaftConfigurationImpl conf = state.getRaftConf(); + return conf.isStable() && conf.containsInConf(getId()); + } + return old != RaftPeerRole.FOLLOWER || force; + } + synchronized CompletableFuture changeToFollowerAndPersistMetadata( long newTerm, boolean allowListener, diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index ee1b7d37b7..bcf11baf7a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -376,10 +376,12 @@ boolean isConfCommitted() { return getLog().getLastCommittedIndex() >= getRaftConf().getLogEntryIndex(); } - void setRaftConf(LogEntryProto entry) { + private boolean setRaftConf(LogEntryProto entry) { if (entry.hasConfigurationEntry()) { setRaftConf(LogProtoUtils.toRaftConfiguration(entry)); + return true; } + return false; } void setRaftConf(RaftConfiguration conf) { @@ -397,10 +399,19 @@ void truncate(long logIndex) { configurationManager.removeConfigurations(logIndex); } - void updateConfiguration(List entries) { - if (entries != null && !entries.isEmpty()) { - configurationManager.removeConfigurations(entries.get(0).getIndex()); - entries.forEach(this::setRaftConf); + void updateConfiguration(List entries) throws IOException { + if (entries == null || entries.isEmpty()) { + return; + } + configurationManager.removeConfigurations(entries.get(0).getIndex()); + + boolean changed = false; + for(LogEntryProto entry : entries) { + changed |= setRaftConf(entry); + } + + if (changed && server.getRole().getCurrentRole() == RaftPeerRole.LISTENER) { + server.changeToFollowerAndPersistMetadata(getCurrentTerm(), true, "setRaftConf").join(); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index eac690feb2..46b6aaf87f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collections; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -145,7 +146,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt state.truncate(proto.getIndex()); if (!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) { LOG.info("{}: set new configuration {} from snapshot", getMemberId(), ProtoUtils.shortDebugString(proto)); - state.setRaftConf(proto); + state.updateConfiguration(Collections.singletonList(proto)); state.writeRaftConfiguration(proto); server.getStateMachine().event().notifyConfigurationChanged( proto.getTerm(), proto.getIndex(), proto.getConfigurationEntry()); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 456d2ad2a2..6959bd3422 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -556,6 +556,10 @@ public void testChangeListenerToFollower() throws Exception { assertTrue(reply.isSuccess()); Collection peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER); assertEquals(0, peer.size()); + + listeners = cluster.getListeners() + .stream().map(RaftServer.Division::getPeer).collect(Collectors.toList()); + assertEquals(0, listeners.size()); } cluster.shutdown(); }