diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java index 2b316977d9..12ad569f22 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java @@ -22,9 +22,12 @@ import org.apache.ratis.metrics.RatisMetricRegistry; import org.apache.ratis.metrics.RatisMetrics; import org.apache.ratis.metrics.Timekeeper; +import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; import org.apache.ratis.server.raftlog.RaftLogIndex; import org.apache.ratis.statemachine.StateMachine; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.LongSupplier; /** @@ -39,6 +42,11 @@ public final class StateMachineMetrics extends RatisMetrics { public static final String STATEMACHINE_APPLY_COMPLETED_GAUGE = "applyCompletedIndex"; public static final String STATEMACHINE_TAKE_SNAPSHOT_TIMER = "takeSnapshot"; + /** Time taken for the State Machine applyLog operation to complete execution. */ + public static final String STATEMACHINE_APPLY_LOG_EXECUTION_TIME = "%sApplyLogExecutionTime"; + + private final Map applyLogTimers = new ConcurrentHashMap<>(); + public static StateMachineMetrics getStateMachineMetrics( RaftServerImpl server, RaftLogIndex appliedIndex, StateMachine stateMachine) { @@ -72,4 +80,12 @@ public Timekeeper getTakeSnapshotTimer() { return takeSnapshotTimer; } + private Timekeeper newApplyLogExecutionTimer(LogEntryBodyCase logType) { + return getRegistry().timer(String.format(STATEMACHINE_APPLY_LOG_EXECUTION_TIME, + logType.name().toLowerCase())); + } + + public Timekeeper getApplyLogExecutionTimer(LogEntryBodyCase logType) { + return applyLogTimers.computeIfAbsent(logType, this::newApplyLogExecutionTimer); + } } \ No newline at end of file diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index bd7f26a8a2..a516ad5efb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -250,7 +250,8 @@ private CompletableFuture applyLog(CompletableFuture applyLogFutures } else { LOG.debug("{}: applying nextIndex={}", this, nextIndex); } - + final Timekeeper.Context applyLogTimerContext = stateMachineMetrics.get().getApplyLogExecutionTimer( + next.getLogEntryBodyCase()).time(); final CompletableFuture f = server.applyLogToStateMachine(next); final long incremented = appliedIndex.incrementAndGet(debugIndexChange); Preconditions.assertTrue(incremented == nextIndex); @@ -259,10 +260,12 @@ private CompletableFuture applyLog(CompletableFuture applyLogFutures LOG.error("Exception while {}: applying txn index={}, nextLog={}", this, nextIndex, LogProtoUtils.toLogEntryString(next), ex); return null; - }); + }) + .whenComplete((m, e) -> applyLogTimerContext.stop()); applyLogFutures = applyLogFutures.thenCombine(exceptionHandledFuture, (v, message) -> null); f.thenAccept(m -> notifyAppliedIndex(incremented)); } else { + applyLogTimerContext.stop(); notifyAppliedIndex(incremented); } } else {