diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index fb84c17dd0..5a645e46eb 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -854,7 +854,39 @@ var ( "pending_tasks", WithDescription("A histogram across history shards for the number of in-memory pending history tasks."), ) - TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled") + TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled") + SchedulerActiveWorkers = NewGaugeDef( + "scheduler_active_workers", + WithDescription("Number of active workers processing tasks in the scheduler."), + ) + SchedulerBusyWorkers = NewGaugeDef( + "scheduler_busy_workers", + WithDescription("Number of workers currently executing a task."), + ) + SchedulerAssignedWorkers = NewGaugeDef( + "scheduler_assigned_workers", + WithDescription("Number of workers currently assigned to a task queue."), + ) + SchedulerQueueDepth = NewGaugeDef( + "scheduler_queue_depth", + WithDescription("Number of tasks waiting in the scheduler queue."), + ) + SchedulerActiveQueues = NewGaugeDef( + "scheduler_active_queues", + WithDescription("Number of active task queues (for sequential and IWRR schedulers)."), + ) + SchedulerQueueLatency = NewTimerDef( + "scheduler_queue_latency", + WithDescription("Time a task spent waiting in the scheduler queue before execution."), + ) + SchedulerTasksSubmitted = NewCounterDef( + "scheduler_tasks_submitted", + WithDescription("Total number of tasks submitted to the scheduler."), + ) + SchedulerTasksCompleted = NewCounterDef( + "scheduler_tasks_completed", + WithDescription("Total number of tasks completed by the scheduler."), + ) QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count") QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count") diff --git a/common/tasks/benchmark_test.go b/common/tasks/benchmark_test.go index 88f5a787e0..b02e86a223 100644 --- a/common/tasks/benchmark_test.go +++ b/common/tasks/benchmark_test.go @@ -40,6 +40,7 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler_Sequential(b *testing.B) { }, Scheduler[*noopTask](&noopScheduler{}), logger, + nil, ) scheduler.Start() defer scheduler.Stop() @@ -66,6 +67,7 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler_Parallel(b *testing.B) { }, Scheduler[*noopTask](&noopScheduler{}), logger, + nil, ) scheduler.Start() defer scheduler.Stop() diff --git a/common/tasks/fifo_scheduler.go b/common/tasks/fifo_scheduler.go index 44f57f8499..674f410fbb 100644 --- a/common/tasks/fifo_scheduler.go +++ b/common/tasks/fifo_scheduler.go @@ -15,6 +15,7 @@ import ( const ( defaultMonitorTickerDuration = time.Minute defaultMonitorTickerJitter = 0.15 + metricsExportInterval = 5 * time.Second ) var _ Scheduler[Task] = (*FIFOScheduler[Task])(nil) diff --git a/common/tasks/interleaved_weighted_round_robin.go b/common/tasks/interleaved_weighted_round_robin.go index 12a5f58560..6a56dfe523 100644 --- a/common/tasks/interleaved_weighted_round_robin.go +++ b/common/tasks/interleaved_weighted_round_robin.go @@ -10,6 +10,7 @@ import ( "go.temporal.io/server/common/clock" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" ) const ( @@ -30,6 +31,8 @@ type ( ChannelWeightUpdateCh chan struct{} // Optional, if specified, delete inactive channels after this duration InactiveChannelDeletionDelay dynamicconfig.DurationPropertyFn + // Optional, if specified, used to convert channel key to string for metrics tagging + ChannelKeyToStringFn func(K) string } // TaskChannelKeyFn is the function for mapping a task to its task channel (key) @@ -67,6 +70,9 @@ type ( // 3 -> 1 // then iwrrChannels will contain chan [0, 0, 0, 1, 0, 1, 2, 0, 1, 2, 3] (ID-ed by channel key) iwrrChannels atomic.Value // []*WeightedChannel + + // Metrics fields + metricsHandler metrics.Handler } ) @@ -74,6 +80,7 @@ func NewInterleavedWeightedRoundRobinScheduler[T Task, K comparable]( options InterleavedWeightedRoundRobinSchedulerOptions[T, K], fifoScheduler Scheduler[T], logger log.Logger, + metricsHandler metrics.Handler, ) *InterleavedWeightedRoundRobinScheduler[T, K] { iwrrChannels := atomic.Value{} iwrrChannels.Store(WeightedChannels[T]{}) @@ -93,6 +100,8 @@ func NewInterleavedWeightedRoundRobinScheduler[T Task, K comparable]( numInflightTask: 0, weightedChannels: make(map[K]*WeightedChannel[T]), iwrrChannels: iwrrChannels, + + metricsHandler: metricsHandler, } } @@ -113,6 +122,11 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Start() { s.shutdownWG.Add(1) go s.cleanupLoop() + if s.metricsHandler != nil { + s.shutdownWG.Add(1) + go s.exportMetricsLoop() + } + s.logger.Info("interleaved weighted round robin task scheduler started") } @@ -151,6 +165,7 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Submit( // or exceeding rate limit channel, releaseFn := s.getOrCreateTaskChannel(s.options.TaskChannelKeyFn(task)) defer releaseFn() + s.setEnqueueTime(task) channel.Chan() <- task s.notifyDispatcher() } @@ -166,6 +181,7 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) TrySubmit( // there are tasks pending dispatching, need to respect round roubin weight channel, releaseFn := s.getOrCreateTaskChannel(s.options.TaskChannelKeyFn(task)) defer releaseFn() + s.setEnqueueTime(task) select { case channel.Chan() <- task: s.notifyDispatcher() @@ -349,6 +365,7 @@ LoopDispatch: select { case task := <-channel.Chan(): channel.UpdateLastActiveTime(now) + s.recordQueueLatency(task) s.fifoScheduler.Submit(task) numTasks++ default: @@ -403,3 +420,59 @@ DrainLoop: func (s *InterleavedWeightedRoundRobinScheduler[T, K]) isStopped() bool { return atomic.LoadInt32(&s.status) == common.DaemonStatusStopped } + +func (s *InterleavedWeightedRoundRobinScheduler[T, K]) exportMetricsLoop() { + defer s.shutdownWG.Done() + + ticker := time.NewTicker(metricsExportInterval) + defer ticker.Stop() + + for { + select { + case <-s.shutdownChan: + return + case <-ticker.C: + s.emitGaugeMetrics() + } + } +} + +func (s *InterleavedWeightedRoundRobinScheduler[T, K]) emitGaugeMetrics() { + s.RLock() + defer s.RUnlock() + + for key, channel := range s.weightedChannels { + depth := channel.Len() + var tags []metrics.Tag + if s.options.ChannelKeyToStringFn != nil { + tags = []metrics.Tag{metrics.StringTag("channel_key", s.options.ChannelKeyToStringFn(key))} + } + metrics.SchedulerQueueDepth.With(s.metricsHandler).Record(float64(depth), tags...) + } + + metrics.SchedulerActiveQueues.With(s.metricsHandler).Record(float64(len(s.weightedChannels))) +} + +func (s *InterleavedWeightedRoundRobinScheduler[T, K]) setEnqueueTime(task T) { + if timestamped, ok := any(task).(SchedulerTimestampedTask); ok { + timestamped.SetSchedulerEnqueueTime(s.ts.Now()) + } +} + +func (s *InterleavedWeightedRoundRobinScheduler[T, K]) recordQueueLatency(task T) { + if s.metricsHandler == nil { + return + } + if timestamped, ok := any(task).(SchedulerTimestampedTask); ok { + enqueueTime := timestamped.GetSchedulerEnqueueTime() + if !enqueueTime.IsZero() { + latency := s.ts.Now().Sub(enqueueTime) + var tags []metrics.Tag + if s.options.ChannelKeyToStringFn != nil { + key := s.options.TaskChannelKeyFn(task) + tags = []metrics.Tag{metrics.StringTag("channel_key", s.options.ChannelKeyToStringFn(key))} + } + metrics.SchedulerQueueLatency.With(s.metricsHandler).Record(latency, tags...) + } + } +} diff --git a/common/tasks/interleaved_weighted_round_robin_test.go b/common/tasks/interleaved_weighted_round_robin_test.go index 7cb08d2c76..e91825517d 100644 --- a/common/tasks/interleaved_weighted_round_robin_test.go +++ b/common/tasks/interleaved_weighted_round_robin_test.go @@ -75,6 +75,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) SetupTest() { }, Scheduler[*testTask](s.mockFIFOScheduler), logger, + nil, ) s.scheduler.ts = s.ts } @@ -452,6 +453,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestInactiveChannelDeletio }, Scheduler[*testTask](s.mockFIFOScheduler), log.NewTestLogger(), + nil, ) s.scheduler.ts = s.ts s.mockFIFOScheduler.EXPECT().Start() @@ -528,6 +530,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestInactiveChannelDeletio }, Scheduler[*testTask](s.mockFIFOScheduler), log.NewTestLogger(), + nil, ) s.mockFIFOScheduler.EXPECT().Start() s.scheduler.Start() diff --git a/common/tasks/scheduler.go b/common/tasks/scheduler.go index 133372da30..2de874c07d 100644 --- a/common/tasks/scheduler.go +++ b/common/tasks/scheduler.go @@ -1,5 +1,9 @@ package tasks +import ( + "time" +) + //go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination scheduler_mock.go type ( // Scheduler is the generic interface for scheduling & processing tasks @@ -9,4 +13,11 @@ type ( Start() Stop() } + + // SchedulerTimestampedTask is an optional interface that tasks can implement + // to enable queue wait latency tracking in schedulers. + SchedulerTimestampedTask interface { + SetSchedulerEnqueueTime(time.Time) + GetSchedulerEnqueueTime() time.Time + } ) diff --git a/common/tasks/scheduler_mock.go b/common/tasks/scheduler_mock.go index 4be9a6aa04..4eec125299 100644 --- a/common/tasks/scheduler_mock.go +++ b/common/tasks/scheduler_mock.go @@ -11,6 +11,7 @@ package tasks import ( reflect "reflect" + time "time" gomock "go.uber.org/mock/gomock" ) @@ -88,3 +89,53 @@ func (mr *MockSchedulerMockRecorder[T]) TrySubmit(task any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TrySubmit", reflect.TypeOf((*MockScheduler[T])(nil).TrySubmit), task) } + +// MockSchedulerTimestampedTask is a mock of SchedulerTimestampedTask interface. +type MockSchedulerTimestampedTask struct { + ctrl *gomock.Controller + recorder *MockSchedulerTimestampedTaskMockRecorder + isgomock struct{} +} + +// MockSchedulerTimestampedTaskMockRecorder is the mock recorder for MockSchedulerTimestampedTask. +type MockSchedulerTimestampedTaskMockRecorder struct { + mock *MockSchedulerTimestampedTask +} + +// NewMockSchedulerTimestampedTask creates a new mock instance. +func NewMockSchedulerTimestampedTask(ctrl *gomock.Controller) *MockSchedulerTimestampedTask { + mock := &MockSchedulerTimestampedTask{ctrl: ctrl} + mock.recorder = &MockSchedulerTimestampedTaskMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSchedulerTimestampedTask) EXPECT() *MockSchedulerTimestampedTaskMockRecorder { + return m.recorder +} + +// GetSchedulerEnqueueTime mocks base method. +func (m *MockSchedulerTimestampedTask) GetSchedulerEnqueueTime() time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSchedulerEnqueueTime") + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// GetSchedulerEnqueueTime indicates an expected call of GetSchedulerEnqueueTime. +func (mr *MockSchedulerTimestampedTaskMockRecorder) GetSchedulerEnqueueTime() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSchedulerEnqueueTime", reflect.TypeOf((*MockSchedulerTimestampedTask)(nil).GetSchedulerEnqueueTime)) +} + +// SetSchedulerEnqueueTime mocks base method. +func (m *MockSchedulerTimestampedTask) SetSchedulerEnqueueTime(arg0 time.Time) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetSchedulerEnqueueTime", arg0) +} + +// SetSchedulerEnqueueTime indicates an expected call of SetSchedulerEnqueueTime. +func (mr *MockSchedulerTimestampedTaskMockRecorder) SetSchedulerEnqueueTime(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSchedulerEnqueueTime", reflect.TypeOf((*MockSchedulerTimestampedTask)(nil).SetSchedulerEnqueueTime), arg0) +} diff --git a/common/tasks/sequential_scheduler.go b/common/tasks/sequential_scheduler.go index 7475c88900..b1875b0c64 100644 --- a/common/tasks/sequential_scheduler.go +++ b/common/tasks/sequential_scheduler.go @@ -7,10 +7,12 @@ import ( "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/clock" "go.temporal.io/server/common/collection" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" ) var _ Scheduler[Task] = (*SequentialScheduler[Task])(nil) @@ -37,6 +39,12 @@ type ( queueChan chan SequentialTaskQueue[T] logger log.Logger + + // Metrics fields + metricsHandler metrics.Handler + metricTagsFn MetricTagsFn[T] + timeSource clock.TimeSource + assignedWorkerCount int64 } ) @@ -45,6 +53,8 @@ func NewSequentialScheduler[T Task]( taskQueueHashFn collection.HashFunc, taskQueueFactory SequentialTaskQueueFactory[T], logger log.Logger, + metricsHandler metrics.Handler, + metricTagsFn MetricTagsFn[T], ) *SequentialScheduler[T] { return &SequentialScheduler[T]{ status: common.DaemonStatusInitialized, @@ -56,6 +66,10 @@ func NewSequentialScheduler[T Task]( queueFactory: taskQueueFactory, queueChan: make(chan SequentialTaskQueue[T], options.QueueSize), queues: collection.NewShardedConcurrentTxMap(1024, taskQueueHashFn), + + metricsHandler: metricsHandler, + metricTagsFn: metricTagsFn, + timeSource: clock.NewRealTimeSource(), } } @@ -72,6 +86,11 @@ func (s *SequentialScheduler[T]) Start() { s.workerCountSubscriptionCancelFn = workerCountSubscriptionCancelFn s.updateWorkerCount(initialWorkerCount) + if s.metricsHandler != nil { + s.shutdownWG.Add(1) + go s.exportMetricsLoop() + } + s.logger.Info("sequential scheduler started") } @@ -99,6 +118,9 @@ func (s *SequentialScheduler[T]) Stop() { } func (s *SequentialScheduler[T]) Submit(task T) { + s.setEnqueueTime(task) + s.recordTaskSubmitted(task) + queue := s.queueFactory(task) queue.Add(task) @@ -135,6 +157,8 @@ func (s *SequentialScheduler[T]) Submit(task T) { } func (s *SequentialScheduler[T]) TrySubmit(task T) bool { + s.setEnqueueTime(task) + queue := s.queueFactory(task) queue.Add(task) @@ -150,6 +174,7 @@ func (s *SequentialScheduler[T]) TrySubmit(task T) bool { panic("Error is not expected as the evaluation function returns nil") } if fnEvaluated { + s.recordTaskSubmitted(task) if s.isStopped() { s.drainTasks() } @@ -158,6 +183,7 @@ func (s *SequentialScheduler[T]) TrySubmit(task T) bool { select { case s.queueChan <- queue: + s.recordTaskSubmitted(task) if s.isStopped() { s.drainTasks() } @@ -240,6 +266,9 @@ func (s *SequentialScheduler[T]) processTaskQueue( queue SequentialTaskQueue[T], workerShutdownCh <-chan struct{}, ) { + atomic.AddInt64(&s.assignedWorkerCount, 1) + defer atomic.AddInt64(&s.assignedWorkerCount, -1) + for { select { case <-s.shutdownChan: @@ -282,6 +311,8 @@ func (s *SequentialScheduler[T]) executeTask(queue SequentialTaskQueue[T]) { shouldRetry := true task := queue.Remove() + s.recordQueueLatency(task) + operation := func() (retErr error) { var executePanic error defer func() { @@ -307,10 +338,12 @@ func (s *SequentialScheduler[T]) executeTask(queue SequentialTaskQueue[T]) { } task.Nack(err) + s.recordTaskCompleted(task) return } task.Ack() + s.recordTaskCompleted(task) } func (s *SequentialScheduler[T]) drainTasks() { @@ -339,3 +372,74 @@ LoopDrainQueues: func (s *SequentialScheduler[T]) isStopped() bool { return atomic.LoadInt32(&s.status) == common.DaemonStatusStopped } + +func (s *SequentialScheduler[T]) exportMetricsLoop() { + defer s.shutdownWG.Done() + + ticker := time.NewTicker(metricsExportInterval) + defer ticker.Stop() + + for { + select { + case <-s.shutdownChan: + return + case <-ticker.C: + s.emitGaugeMetrics() + } + } +} + +func (s *SequentialScheduler[T]) emitGaugeMetrics() { + s.workerLock.Lock() + activeWorkers := len(s.workerShutdownCh) + s.workerLock.Unlock() + + assignedWorkers := atomic.LoadInt64(&s.assignedWorkerCount) + queueDepth := len(s.queueChan) + activeQueues := s.queues.Len() + + metrics.SchedulerActiveWorkers.With(s.metricsHandler).Record(float64(activeWorkers)) + metrics.SchedulerAssignedWorkers.With(s.metricsHandler).Record(float64(assignedWorkers)) + metrics.SchedulerQueueDepth.With(s.metricsHandler).Record(float64(queueDepth)) + metrics.SchedulerActiveQueues.With(s.metricsHandler).Record(float64(activeQueues)) +} + +func (s *SequentialScheduler[T]) setEnqueueTime(task T) { + if timestamped, ok := any(task).(SchedulerTimestampedTask); ok { + timestamped.SetSchedulerEnqueueTime(s.timeSource.Now()) + } +} + +func (s *SequentialScheduler[T]) recordQueueLatency(task T) { + if s.metricsHandler == nil { + return + } + if timestamped, ok := any(task).(SchedulerTimestampedTask); ok { + enqueueTime := timestamped.GetSchedulerEnqueueTime() + if !enqueueTime.IsZero() { + latency := s.timeSource.Now().Sub(enqueueTime) + metrics.SchedulerQueueLatency.With(s.metricsHandler).Record(latency, s.getMetricTags(task)...) + } + } +} + +func (s *SequentialScheduler[T]) recordTaskSubmitted(task T) { + if s.metricsHandler == nil { + return + } + metrics.SchedulerTasksSubmitted.With(s.metricsHandler).Record(1, s.getMetricTags(task)...) +} + +func (s *SequentialScheduler[T]) recordTaskCompleted(task T) { + if s.metricsHandler == nil { + return + } + metrics.SchedulerTasksCompleted.With(s.metricsHandler).Record(1, s.getMetricTags(task)...) +} + +func (s *SequentialScheduler[T]) getMetricTags(task T) []metrics.Tag { + if s.metricTagsFn != nil { + return s.metricTagsFn(task) + } + return nil +} diff --git a/common/tasks/sequential_scheduler_test.go b/common/tasks/sequential_scheduler_test.go index 09e41091a8..b6caa7a3aa 100644 --- a/common/tasks/sequential_scheduler_test.go +++ b/common/tasks/sequential_scheduler_test.go @@ -223,5 +223,7 @@ func (s *sequentialSchedulerSuite) newTestProcessor() *SequentialScheduler[*Mock hashFn, factory, log.NewNoopLogger(), + nil, + nil, ) } diff --git a/service/history/archival_queue_factory.go b/service/history/archival_queue_factory.go index d722a83ef3..6fd0ae540d 100644 --- a/service/history/archival_queue_factory.go +++ b/service/history/archival_queue_factory.go @@ -76,6 +76,7 @@ func newHostScheduler(params ArchivalQueueFactoryParams) queues.Scheduler { }, params.NamespaceRegistry, params.Logger, + nil, // metrics disabled for archival queue ) } diff --git a/service/history/queues/queue_scheduled_test.go b/service/history/queues/queue_scheduled_test.go index feeeb66831..5155c2ddff 100644 --- a/service/history/queues/queue_scheduled_test.go +++ b/service/history/queues/queue_scheduled_test.go @@ -86,6 +86,7 @@ func (s *scheduledQueueSuite) SetupTest() { }, s.mockShard.GetNamespaceRegistry(), logger, + metrics.NoopMetricsHandler, ) scheduler = NewRateLimitedScheduler( scheduler, diff --git a/service/history/queues/scheduler.go b/service/history/queues/scheduler.go index 8ab4e09322..542e4b1144 100644 --- a/service/history/queues/scheduler.go +++ b/service/history/queues/scheduler.go @@ -80,6 +80,7 @@ func NewScheduler( options SchedulerOptions, namespaceRegistry namespace.Registry, logger log.Logger, + metricsHandler metrics.Handler, ) Scheduler { var scheduler tasks.Scheduler[Executable] @@ -140,6 +141,7 @@ func NewScheduler( logger, )), logger, + metricsHandler, ) return &schedulerImpl{ diff --git a/service/history/replication/batchable_task.go b/service/history/replication/batchable_task.go index 6fde607434..ed88c69641 100644 --- a/service/history/replication/batchable_task.go +++ b/service/history/replication/batchable_task.go @@ -33,6 +33,8 @@ type ( individualTaskHandler func(task TrackableExecutableTask) logger log.Logger metricsHandler metrics.Handler + // schedulerEnqueueTime tracks when this task was enqueued in the scheduler + schedulerEnqueueTime time.Time } batchState int @@ -133,6 +135,27 @@ func (w *batchedTask) ReplicationTask() *replicationspb.ReplicationTask { return w.batchedTask.ReplicationTask() } +// SetSchedulerEnqueueTime implements SchedulerTimestampedTask interface +func (w *batchedTask) SetSchedulerEnqueueTime(t time.Time) { + w.schedulerEnqueueTime = t +} + +// GetSchedulerEnqueueTime implements SchedulerTimestampedTask interface +// Returns the enqueue time of the first individual task if this batchedTask's +// enqueue time is not set (which happens when the task is wrapped after Submit). +func (w *batchedTask) GetSchedulerEnqueueTime() time.Time { + if !w.schedulerEnqueueTime.IsZero() { + return w.schedulerEnqueueTime + } + // Delegate to the first individual task's enqueue time + if len(w.individualTasks) > 0 { + if timestamped, ok := w.individualTasks[0].(interface{ GetSchedulerEnqueueTime() time.Time }); ok { + return timestamped.GetSchedulerEnqueueTime() + } + } + return time.Time{} +} + func (w *batchedTask) callIndividual(f func(task TrackableExecutableTask)) { for _, task := range w.individualTasks { f(task) diff --git a/service/history/replication/executable_task.go b/service/history/replication/executable_task.go index 772669ad39..76ac7bab88 100644 --- a/service/history/replication/executable_task.go +++ b/service/history/replication/executable_task.go @@ -102,6 +102,9 @@ type ( MarkExecutionStart() GetPriority() enumsspb.TaskPriority NamespaceName() string + // SchedulerTimestampedTask methods for queue latency tracking + SetSchedulerEnqueueTime(time.Time) + GetSchedulerEnqueueTime() time.Time } ExecutableTaskImpl struct { ProcessToolBox @@ -123,6 +126,7 @@ type ( markPoisonPillAttempts int isDuplicated bool taskExecuteStartTime time.Time + schedulerEnqueueTime time.Time } ) @@ -300,6 +304,18 @@ func (e *ExecutableTaskImpl) NamespaceName() string { return "" } +// SetSchedulerEnqueueTime implements SchedulerTimestampedTask interface +// to enable queue wait latency tracking in schedulers. +func (e *ExecutableTaskImpl) SetSchedulerEnqueueTime(t time.Time) { + e.schedulerEnqueueTime = t +} + +// GetSchedulerEnqueueTime implements SchedulerTimestampedTask interface +// to enable queue wait latency tracking in schedulers. +func (e *ExecutableTaskImpl) GetSchedulerEnqueueTime() time.Time { + return e.schedulerEnqueueTime +} + func (e *ExecutableTaskImpl) emitFinishMetrics( now time.Time, ) { diff --git a/service/history/replication/executable_task_mock.go b/service/history/replication/executable_task_mock.go index e3cfc75ca6..81d1cc649a 100644 --- a/service/history/replication/executable_task_mock.go +++ b/service/history/replication/executable_task_mock.go @@ -155,6 +155,20 @@ func (mr *MockExecutableTaskMockRecorder) GetPriority() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPriority", reflect.TypeOf((*MockExecutableTask)(nil).GetPriority)) } +// GetSchedulerEnqueueTime mocks base method. +func (m *MockExecutableTask) GetSchedulerEnqueueTime() time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSchedulerEnqueueTime") + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// GetSchedulerEnqueueTime indicates an expected call of GetSchedulerEnqueueTime. +func (mr *MockExecutableTaskMockRecorder) GetSchedulerEnqueueTime() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSchedulerEnqueueTime", reflect.TypeOf((*MockExecutableTask)(nil).GetSchedulerEnqueueTime)) +} + // IsRetryableError mocks base method. func (m *MockExecutableTask) IsRetryableError(err error) bool { m.ctrl.T.Helper() @@ -288,6 +302,18 @@ func (mr *MockExecutableTaskMockRecorder) RetryPolicy() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RetryPolicy", reflect.TypeOf((*MockExecutableTask)(nil).RetryPolicy)) } +// SetSchedulerEnqueueTime mocks base method. +func (m *MockExecutableTask) SetSchedulerEnqueueTime(arg0 time.Time) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetSchedulerEnqueueTime", arg0) +} + +// SetSchedulerEnqueueTime indicates an expected call of SetSchedulerEnqueueTime. +func (mr *MockExecutableTaskMockRecorder) SetSchedulerEnqueueTime(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSchedulerEnqueueTime", reflect.TypeOf((*MockExecutableTask)(nil).SetSchedulerEnqueueTime), arg0) +} + // SourceClusterName mocks base method. func (m *MockExecutableTask) SourceClusterName() string { m.ctrl.T.Helper() diff --git a/service/history/replication/fx.go b/service/history/replication/fx.go index 8bceb6d142..ebf7f8bb12 100644 --- a/service/history/replication/fx.go +++ b/service/history/replication/fx.go @@ -130,9 +130,20 @@ func replicationTaskExecutorProvider() TaskExecutorProvider { func replicationStreamHighPrioritySchedulerProvider( config *configs.Config, logger log.Logger, + metricsHandler metrics.Handler, queueFactory ctasks.SequentialTaskQueueFactory[TrackableExecutableTask], lc fx.Lifecycle, ) ctasks.Scheduler[TrackableExecutableTask] { + iwrrMetricsHandler := metricsHandler.WithTags( + metrics.StringTag("queue_type", "replication"), + metrics.StringTag("scheduler_type", "iwrr"), + metrics.StringTag("priority", "high"), + ) + seqMetricsHandler := metricsHandler.WithTags( + metrics.StringTag("queue_type", "replication"), + metrics.StringTag("scheduler_type", "sequential_scheduler"), + metrics.StringTag("priority", "high"), + ) // SequentialScheduler has panic wrapper when executing task, // if changing the executor, please make sure other executor has panic wrapper scheduler := ctasks.NewSequentialScheduler[TrackableExecutableTask]( @@ -143,6 +154,8 @@ func replicationStreamHighPrioritySchedulerProvider( WorkflowKeyHashFn, queueFactory, logger, + seqMetricsHandler, + nil, ) taskChannelKeyFn := func(e TrackableExecutableTask) ClusterChannelKey { return ClusterChannelKey{ @@ -156,11 +169,13 @@ func replicationStreamHighPrioritySchedulerProvider( // They share the same weight so it just does a round-robin on all clusters' tasks. rrScheduler := ctasks.NewInterleavedWeightedRoundRobinScheduler( ctasks.InterleavedWeightedRoundRobinSchedulerOptions[TrackableExecutableTask, ClusterChannelKey]{ - TaskChannelKeyFn: taskChannelKeyFn, - ChannelWeightFn: channelWeightFn, + TaskChannelKeyFn: taskChannelKeyFn, + ChannelWeightFn: channelWeightFn, + ChannelKeyToStringFn: func(key ClusterChannelKey) string { return key.ClusterName }, }, scheduler, logger, + iwrrMetricsHandler, ) lc.Append(fx.StartStopHook(rrScheduler.Start, rrScheduler.Stop)) return rrScheduler @@ -175,6 +190,16 @@ func replicationStreamLowPrioritySchedulerProvider( metricsHandler metrics.Handler, lc fx.Lifecycle, ) ctasks.Scheduler[TrackableExecutableTask] { + iwrrMetricsHandler := metricsHandler.WithTags( + metrics.StringTag("queue_type", "replication"), + metrics.StringTag("scheduler_type", "iwrr"), + metrics.StringTag("priority", "low"), + ) + seqMetricsHandler := metricsHandler.WithTags( + metrics.StringTag("queue_type", "replication"), + metrics.StringTag("scheduler_type", "sequential_scheduler"), + metrics.StringTag("priority", "low"), + ) queueFactory := func(task TrackableExecutableTask) ctasks.SequentialTaskQueue[TrackableExecutableTask] { item := task.QueueID() workflowKey, ok := item.(definition.WorkflowKey) @@ -202,6 +227,8 @@ func replicationStreamLowPrioritySchedulerProvider( taskQueueHashFunc, queueFactory, logger, + seqMetricsHandler, + nil, ) taskChannelKeyFn := func(e TrackableExecutableTask) ClusterChannelKey { return ClusterChannelKey{ @@ -260,11 +287,13 @@ func replicationStreamLowPrioritySchedulerProvider( // They share the same weight so it just does a round-robin on all clusters' tasks. rrScheduler := ctasks.NewInterleavedWeightedRoundRobinScheduler( ctasks.InterleavedWeightedRoundRobinSchedulerOptions[TrackableExecutableTask, ClusterChannelKey]{ - TaskChannelKeyFn: taskChannelKeyFn, - ChannelWeightFn: channelWeightFn, + TaskChannelKeyFn: taskChannelKeyFn, + ChannelWeightFn: channelWeightFn, + ChannelKeyToStringFn: func(key ClusterChannelKey) string { return key.ClusterName }, }, scheduler, logger, + iwrrMetricsHandler, ) ts := ctasks.NewRateLimitedScheduler[TrackableExecutableTask]( rrScheduler, diff --git a/service/history/timer_queue_factory.go b/service/history/timer_queue_factory.go index 4bbb450a05..f8777a6ac8 100644 --- a/service/history/timer_queue_factory.go +++ b/service/history/timer_queue_factory.go @@ -51,6 +51,7 @@ func NewTimerQueueFactory( }, params.NamespaceRegistry, params.Logger, + nil, // metrics disabled for timer queue ), HostPriorityAssigner: queues.NewPriorityAssigner( params.NamespaceRegistry, diff --git a/service/history/transfer_queue_factory.go b/service/history/transfer_queue_factory.go index 9b0d254d3e..b5549552ec 100644 --- a/service/history/transfer_queue_factory.go +++ b/service/history/transfer_queue_factory.go @@ -54,6 +54,7 @@ func NewTransferQueueFactory( }, params.NamespaceRegistry, params.Logger, + nil, // metrics disabled for transfer queue ), HostPriorityAssigner: queues.NewPriorityAssigner( params.NamespaceRegistry, diff --git a/service/history/visibility_queue_factory.go b/service/history/visibility_queue_factory.go index 69fbe0c56f..4fd1e8f828 100644 --- a/service/history/visibility_queue_factory.go +++ b/service/history/visibility_queue_factory.go @@ -47,6 +47,7 @@ func NewVisibilityQueueFactory( }, params.NamespaceRegistry, params.Logger, + nil, // metrics disabled for visibility queue ), HostPriorityAssigner: queues.NewPriorityAssigner( params.NamespaceRegistry, diff --git a/tests/xdc/replication_scheduler_metrics_test.go b/tests/xdc/replication_scheduler_metrics_test.go new file mode 100644 index 0000000000..1655af2906 --- /dev/null +++ b/tests/xdc/replication_scheduler_metrics_test.go @@ -0,0 +1,319 @@ +package xdc + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/suite" + sdkclient "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" + "go.temporal.io/server/common" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics/metricstest" +) + +type ( + replicationSchedulerMetricsTestSuite struct { + xdcBaseSuite + namespaceName string + namespaceID string + } +) + +func TestReplicationSchedulerMetricsTestSuite(t *testing.T) { + t.Parallel() + s := &replicationSchedulerMetricsTestSuite{ + namespaceName: "scheduler-metrics-test-" + common.GenerateRandomString(5), + } + suite.Run(t, s) +} + +func (s *replicationSchedulerMetricsTestSuite) SetupSuite() { + s.dynamicConfigOverrides = map[dynamicconfig.Key]any{ + dynamicconfig.EnableReplicationStream.Key(): true, + dynamicconfig.EnableReplicationTaskBatching.Key(): false, + } + s.logger = log.NewTestLogger() + s.setupSuite() +} + +func (s *replicationSchedulerMetricsTestSuite) TearDownSuite() { + s.tearDownSuite() +} + +func (s *replicationSchedulerMetricsTestSuite) SetupTest() { + s.setupTest() +} + +// simpleWorkflow is a minimal workflow that completes immediately. +// This generates replication events with minimal overhead. +func simpleWorkflow(ctx workflow.Context) error { + return nil +} + +func (s *replicationSchedulerMetricsTestSuite) TestSchedulerMetricsUnderLoad() { + const ( + testDuration = 2 * time.Minute + metricsInterval = 10 * time.Second + numConcurrentUsers = 5 + taskQueueName = "scheduler-metrics-test-tq" + ) + + ctx, cancel := context.WithTimeout(context.Background(), testDuration+30*time.Second) + defer cancel() + + // Create global namespace (handles replication waiting automatically) + s.namespaceName = s.createGlobalNamespace() + + // Start metrics capture on standby cluster (cluster 1) + standbyCapture := s.clusters[1].Host().CaptureMetricsHandler() + s.Require().NotNil(standbyCapture, "standby cluster should have CaptureMetricsHandler enabled") + capture := standbyCapture.StartCapture() + defer standbyCapture.StopCapture(capture) + + // Also capture metrics on active cluster for comparison + activeCapture := s.clusters[0].Host().CaptureMetricsHandler() + s.Require().NotNil(activeCapture, "active cluster should have CaptureMetricsHandler enabled") + activeMetrics := activeCapture.StartCapture() + defer activeCapture.StopCapture(activeMetrics) + + // Create SDK client for active cluster + sdkClient, err := sdkclient.Dial(sdkclient.Options{ + HostPort: s.clusters[0].Host().FrontendGRPCAddress(), + Namespace: s.namespaceName, + }) + s.Require().NoError(err) + defer sdkClient.Close() + + // Start worker on active cluster + w := worker.New(sdkClient, taskQueueName, worker.Options{}) + w.RegisterWorkflowWithOptions(simpleWorkflow, workflow.RegisterOptions{Name: "simple-workflow"}) + err = w.Start() + s.Require().NoError(err) + defer w.Stop() + + // Track workflow stats + var workflowsStarted atomic.Int64 + var workflowsCompleted atomic.Int64 + var workflowErrors atomic.Int64 + + // Create a done channel to signal workflow generators to stop + done := make(chan struct{}) + + // Start concurrent workflow generators + var wg sync.WaitGroup + for i := 0; i < numConcurrentUsers; i++ { + workerID := i + wg.Go(func() { + s.runWorkflowGenerator(ctx, sdkClient, taskQueueName, workerID, done, &workflowsStarted, &workflowsCompleted, &workflowErrors) + }) + } + + // Metrics collection loop + ticker := time.NewTicker(metricsInterval) + defer ticker.Stop() + + testStart := time.Now() + iteration := 0 + + s.T().Logf("Starting scheduler metrics test - will run for %v with %d concurrent workflow generators", testDuration, numConcurrentUsers) + + for { + select { + case <-ctx.Done(): + s.T().Logf("Context done, stopping test") + close(done) + wg.Wait() + return + + case <-ticker.C: + iteration++ + elapsed := time.Since(testStart) + + // Log workflow progress + started := workflowsStarted.Load() + completed := workflowsCompleted.Load() + errors := workflowErrors.Load() + s.T().Logf("Iteration %d (elapsed: %v) - Workflows: started=%d, completed=%d, errors=%d", + iteration, elapsed.Round(time.Second), started, completed, errors) + + // Log standby cluster metrics snapshot + snapshot := capture.Snapshot() + s.logSchedulerMetrics("standby", snapshot) + + // Check if we've run long enough + if elapsed >= testDuration { + s.T().Logf("Test duration reached, stopping workflow generators") + close(done) + wg.Wait() + + // Wait for replication to catch up + s.T().Logf("Waiting for replication to catch up...") + s.waitForClusterSynced() + + // Final metrics verification + finalSnapshot := capture.Snapshot() + s.T().Logf("\n=== FINAL METRICS VERIFICATION ===") + s.verifySchedulerMetrics(finalSnapshot) + return + } + } + } +} + +// runWorkflowGenerator continuously starts and waits for workflows until done channel is closed +func (s *replicationSchedulerMetricsTestSuite) runWorkflowGenerator( + ctx context.Context, + client sdkclient.Client, + taskQueue string, + workerID int, + done <-chan struct{}, + started *atomic.Int64, + completed *atomic.Int64, + errors *atomic.Int64, +) { + workflowNum := 0 + for { + select { + case <-done: + return + case <-ctx.Done(): + return + default: + workflowID := common.GenerateRandomString(10) + workflowNum++ + + started.Add(1) + + run, err := client.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ + ID: workflowID, + TaskQueue: taskQueue, + WorkflowRunTimeout: 30 * time.Second, + }, "simple-workflow") + + if err != nil { + errors.Add(1) + // Don't log every error to avoid spam, but track them + if workflowNum%100 == 0 { + s.T().Logf("Worker %d: workflow start error (count: %d): %v", workerID, errors.Load(), err) + } + continue + } + + // Wait for workflow to complete + err = run.Get(ctx, nil) + if err != nil { + errors.Add(1) + continue + } + + completed.Add(1) + + // Small delay to avoid overwhelming the system + time.Sleep(10 * time.Millisecond) //nolint:forbidigo + } + } +} + +// logSchedulerMetrics logs all scheduler-related metrics from the snapshot +func (s *replicationSchedulerMetricsTestSuite) logSchedulerMetrics(clusterName string, snapshot map[string][]*metricstest.CapturedRecording) { + schedulerMetrics := []string{ + "scheduler_tasks_submitted", + "scheduler_tasks_completed", + "scheduler_queue_latency", + "scheduler_queue_depth", + "scheduler_active_queues", + "scheduler_active_workers", + "scheduler_assigned_workers", + "scheduler_busy_workers", + } + + s.T().Logf("--- %s cluster scheduler metrics ---", clusterName) + for _, metricName := range schedulerMetrics { + recordings := snapshot[metricName] + if len(recordings) == 0 { + continue + } + + // Filter for replication queue type + for _, r := range recordings { + if r.Tags["queue_type"] == "replication" { + s.T().Logf(" %s: value=%v tags=%v", metricName, r.Value, r.Tags) + } + } + } +} + +// verifySchedulerMetrics asserts that scheduler metrics are being emitted for the replication queue +func (s *replicationSchedulerMetricsTestSuite) verifySchedulerMetrics(snapshot map[string][]*metricstest.CapturedRecording) { + // Log all metrics for debugging + s.logSchedulerMetrics("standby-final", snapshot) + + // Collect metrics by name filtered for replication + metricsFound := make(map[string]bool) + metricsValues := make(map[string][]interface{}) + + schedulerMetrics := []string{ + "scheduler_tasks_submitted", + "scheduler_tasks_completed", + "scheduler_queue_latency", + "scheduler_queue_depth", + "scheduler_active_queues", + "scheduler_active_workers", + "scheduler_assigned_workers", + } + + for _, metricName := range schedulerMetrics { + recordings := snapshot[metricName] + for _, r := range recordings { + if r.Tags["queue_type"] == "replication" { + metricsFound[metricName] = true + metricsValues[metricName] = append(metricsValues[metricName], r.Value) + } + } + } + + // Report what we found + s.T().Logf("\nReplication scheduler metrics found:") + for name, found := range metricsFound { + if found { + s.T().Logf(" [OK] %s: %v", name, metricsValues[name]) + } + } + + // Assert key metrics exist + // Note: We check that metrics are being recorded, but some gauges might be 0 if no tasks are queued + s.True(metricsFound["scheduler_tasks_submitted"], "scheduler_tasks_submitted should be recorded for replication queue") + s.True(metricsFound["scheduler_tasks_completed"], "scheduler_tasks_completed should be recorded for replication queue") + + // Verify that tasks were actually processed + submittedValues := metricsValues["scheduler_tasks_submitted"] + s.NotEmpty(submittedValues, "should have scheduler_tasks_submitted recordings") + + completedValues := metricsValues["scheduler_tasks_completed"] + s.NotEmpty(completedValues, "should have scheduler_tasks_completed recordings") + + // Sum up counter values + var totalSubmitted int64 + for _, v := range submittedValues { + if intVal, ok := v.(int64); ok { + totalSubmitted += intVal + } + } + + var totalCompleted int64 + for _, v := range completedValues { + if intVal, ok := v.(int64); ok { + totalCompleted += intVal + } + } + + s.T().Logf("\nTotal replication scheduler tasks: submitted=%d, completed=%d", totalSubmitted, totalCompleted) + s.Positive(totalSubmitted, "should have submitted some replication tasks") + s.Positive(totalCompleted, "should have completed some replication tasks") +}