Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ private synchronized CompletableFuture<Void> changeToFollower(
throw new IllegalStateException("Unexpected role " + old);
}
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if ((old != RaftPeerRole.FOLLOWER || force) && old != RaftPeerRole.LISTENER) {
if (shouldSetFollower(old, force)) {
setRole(RaftPeerRole.FOLLOWER, reason);
if (old == RaftPeerRole.LEADER) {
future = role.shutdownLeaderState(false)
Expand All @@ -607,7 +607,7 @@ private synchronized CompletableFuture<Void> changeToFollower(
state.setLeader(null, reason);
} else if (old == RaftPeerRole.CANDIDATE) {
future = role.shutdownLeaderElection();
} else if (old == RaftPeerRole.FOLLOWER) {
} else if (old == RaftPeerRole.FOLLOWER || old == RaftPeerRole.LISTENER) {
future = role.shutdownFollowerState();
}

Expand All @@ -620,6 +620,14 @@ private synchronized CompletableFuture<Void> changeToFollower(
return future;
}

private boolean shouldSetFollower(RaftPeerRole old, boolean force) {
if (old == RaftPeerRole.LISTENER) {
final RaftConfigurationImpl conf = state.getRaftConf();
return conf.isStable() && conf.containsInConf(getId());
}
return old != RaftPeerRole.FOLLOWER || force;
}

synchronized CompletableFuture<Void> changeToFollowerAndPersistMetadata(
long newTerm,
boolean allowListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,12 @@ boolean isConfCommitted() {
return getLog().getLastCommittedIndex() >= getRaftConf().getLogEntryIndex();
}

void setRaftConf(LogEntryProto entry) {
private boolean setRaftConf(LogEntryProto entry) {
if (entry.hasConfigurationEntry()) {
setRaftConf(LogProtoUtils.toRaftConfiguration(entry));
return true;
}
return false;
}

void setRaftConf(RaftConfiguration conf) {
Expand All @@ -397,10 +399,19 @@ void truncate(long logIndex) {
configurationManager.removeConfigurations(logIndex);
}

void updateConfiguration(List<LogEntryProto> entries) {
if (entries != null && !entries.isEmpty()) {
configurationManager.removeConfigurations(entries.get(0).getIndex());
entries.forEach(this::setRaftConf);
void updateConfiguration(List<LogEntryProto> entries) throws IOException {
if (entries == null || entries.isEmpty()) {
return;
}
configurationManager.removeConfigurations(entries.get(0).getIndex());

boolean changed = false;
for(LogEntryProto entry : entries) {
changed |= setRaftConf(entry);
}

if (changed && server.getRole().getCurrentRole() == RaftPeerRole.LISTENER) {
server.changeToFollowerAndPersistMetadata(getCurrentTerm(), true, "setRaftConf").join();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -145,7 +146,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt
state.truncate(proto.getIndex());
if (!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) {
LOG.info("{}: set new configuration {} from snapshot", getMemberId(), ProtoUtils.shortDebugString(proto));
state.setRaftConf(proto);
state.updateConfiguration(Collections.singletonList(proto));
state.writeRaftConfiguration(proto);
server.getStateMachine().event().notifyConfigurationChanged(
proto.getTerm(), proto.getIndex(), proto.getConfigurationEntry());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ public void testChangeListenerToFollower() throws Exception {
assertTrue(reply.isSuccess());
Collection<RaftPeer> peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
assertEquals(0, peer.size());

listeners = cluster.getListeners()
.stream().map(RaftServer.Division::getPeer).collect(Collectors.toList());
assertEquals(0, listeners.size());
}
cluster.shutdown();
}
Expand Down