Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,39 @@ var (
"pending_tasks",
WithDescription("A histogram across history shards for the number of in-memory pending history tasks."),
)
TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled")
TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled")
SchedulerActiveWorkers = NewGaugeDef(
"scheduler_active_workers",
WithDescription("Number of active workers processing tasks in the scheduler."),
)
SchedulerBusyWorkers = NewGaugeDef(
"scheduler_busy_workers",
WithDescription("Number of workers currently executing a task."),
)
SchedulerAssignedWorkers = NewGaugeDef(
"scheduler_assigned_workers",
WithDescription("Number of workers currently assigned to a task queue."),
)
SchedulerQueueDepth = NewGaugeDef(
"scheduler_queue_depth",
WithDescription("Number of tasks waiting in the scheduler queue."),
)
SchedulerActiveQueues = NewGaugeDef(
"scheduler_active_queues",
WithDescription("Number of active task queues (for sequential and IWRR schedulers)."),
)
SchedulerQueueLatency = NewTimerDef(
"scheduler_queue_latency",
WithDescription("Time a task spent waiting in the scheduler queue before execution."),
)
SchedulerTasksSubmitted = NewCounterDef(
"scheduler_tasks_submitted",
WithDescription("Total number of tasks submitted to the scheduler."),
)
SchedulerTasksCompleted = NewCounterDef(
"scheduler_tasks_completed",
WithDescription("Total number of tasks completed by the scheduler."),
)
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count")
Expand Down
2 changes: 2 additions & 0 deletions common/tasks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler_Sequential(b *testing.B) {
},
Scheduler[*noopTask](&noopScheduler{}),
logger,
nil,
)
scheduler.Start()
defer scheduler.Stop()
Expand All @@ -66,6 +67,7 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler_Parallel(b *testing.B) {
},
Scheduler[*noopTask](&noopScheduler{}),
logger,
nil,
)
scheduler.Start()
defer scheduler.Stop()
Expand Down
1 change: 1 addition & 0 deletions common/tasks/fifo_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
const (
defaultMonitorTickerDuration = time.Minute
defaultMonitorTickerJitter = 0.15
metricsExportInterval = 5 * time.Second
)

var _ Scheduler[Task] = (*FIFOScheduler[Task])(nil)
Expand Down
73 changes: 73 additions & 0 deletions common/tasks/interleaved_weighted_round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
)

const (
Expand All @@ -30,6 +31,8 @@ type (
ChannelWeightUpdateCh chan struct{}
// Optional, if specified, delete inactive channels after this duration
InactiveChannelDeletionDelay dynamicconfig.DurationPropertyFn
// Optional, if specified, used to convert channel key to string for metrics tagging
ChannelKeyToStringFn func(K) string
}

// TaskChannelKeyFn is the function for mapping a task to its task channel (key)
Expand Down Expand Up @@ -67,13 +70,17 @@ type (
// 3 -> 1
// then iwrrChannels will contain chan [0, 0, 0, 1, 0, 1, 2, 0, 1, 2, 3] (ID-ed by channel key)
iwrrChannels atomic.Value // []*WeightedChannel

// Metrics fields
metricsHandler metrics.Handler
}
)

func NewInterleavedWeightedRoundRobinScheduler[T Task, K comparable](
options InterleavedWeightedRoundRobinSchedulerOptions[T, K],
fifoScheduler Scheduler[T],
logger log.Logger,
metricsHandler metrics.Handler,
) *InterleavedWeightedRoundRobinScheduler[T, K] {
iwrrChannels := atomic.Value{}
iwrrChannels.Store(WeightedChannels[T]{})
Expand All @@ -93,6 +100,8 @@ func NewInterleavedWeightedRoundRobinScheduler[T Task, K comparable](
numInflightTask: 0,
weightedChannels: make(map[K]*WeightedChannel[T]),
iwrrChannels: iwrrChannels,

metricsHandler: metricsHandler,
}
}

Expand All @@ -113,6 +122,11 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Start() {
s.shutdownWG.Add(1)
go s.cleanupLoop()

if s.metricsHandler != nil {
s.shutdownWG.Add(1)
go s.exportMetricsLoop()
}

s.logger.Info("interleaved weighted round robin task scheduler started")
}

Expand Down Expand Up @@ -151,6 +165,7 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Submit(
// or exceeding rate limit
channel, releaseFn := s.getOrCreateTaskChannel(s.options.TaskChannelKeyFn(task))
defer releaseFn()
s.setEnqueueTime(task)
channel.Chan() <- task
s.notifyDispatcher()
}
Expand All @@ -166,6 +181,7 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) TrySubmit(
// there are tasks pending dispatching, need to respect round roubin weight
channel, releaseFn := s.getOrCreateTaskChannel(s.options.TaskChannelKeyFn(task))
defer releaseFn()
s.setEnqueueTime(task)
select {
case channel.Chan() <- task:
s.notifyDispatcher()
Expand Down Expand Up @@ -349,6 +365,7 @@ LoopDispatch:
select {
case task := <-channel.Chan():
channel.UpdateLastActiveTime(now)
s.recordQueueLatency(task)
s.fifoScheduler.Submit(task)
numTasks++
default:
Expand Down Expand Up @@ -403,3 +420,59 @@ DrainLoop:
func (s *InterleavedWeightedRoundRobinScheduler[T, K]) isStopped() bool {
return atomic.LoadInt32(&s.status) == common.DaemonStatusStopped
}

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) exportMetricsLoop() {
defer s.shutdownWG.Done()

ticker := time.NewTicker(metricsExportInterval)
defer ticker.Stop()

for {
select {
case <-s.shutdownChan:
return
case <-ticker.C:
s.emitGaugeMetrics()
}
}
}

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) emitGaugeMetrics() {
s.RLock()
defer s.RUnlock()

for key, channel := range s.weightedChannels {
depth := channel.Len()
var tags []metrics.Tag
if s.options.ChannelKeyToStringFn != nil {
tags = []metrics.Tag{metrics.StringTag("channel_key", s.options.ChannelKeyToStringFn(key))}
}
metrics.SchedulerQueueDepth.With(s.metricsHandler).Record(float64(depth), tags...)
}

metrics.SchedulerActiveQueues.With(s.metricsHandler).Record(float64(len(s.weightedChannels)))
}

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) setEnqueueTime(task T) {
if timestamped, ok := any(task).(SchedulerTimestampedTask); ok {
timestamped.SetSchedulerEnqueueTime(s.ts.Now())
}
}

func (s *InterleavedWeightedRoundRobinScheduler[T, K]) recordQueueLatency(task T) {
if s.metricsHandler == nil {
return
}
if timestamped, ok := any(task).(SchedulerTimestampedTask); ok {
enqueueTime := timestamped.GetSchedulerEnqueueTime()
if !enqueueTime.IsZero() {
latency := s.ts.Now().Sub(enqueueTime)
var tags []metrics.Tag
if s.options.ChannelKeyToStringFn != nil {
key := s.options.TaskChannelKeyFn(task)
tags = []metrics.Tag{metrics.StringTag("channel_key", s.options.ChannelKeyToStringFn(key))}
}
metrics.SchedulerQueueLatency.With(s.metricsHandler).Record(latency, tags...)
}
}
}
3 changes: 3 additions & 0 deletions common/tasks/interleaved_weighted_round_robin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) SetupTest() {
},
Scheduler[*testTask](s.mockFIFOScheduler),
logger,
nil,
)
s.scheduler.ts = s.ts
}
Expand Down Expand Up @@ -452,6 +453,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestInactiveChannelDeletio
},
Scheduler[*testTask](s.mockFIFOScheduler),
log.NewTestLogger(),
nil,
)
s.scheduler.ts = s.ts
s.mockFIFOScheduler.EXPECT().Start()
Expand Down Expand Up @@ -528,6 +530,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestInactiveChannelDeletio
},
Scheduler[*testTask](s.mockFIFOScheduler),
log.NewTestLogger(),
nil,
)
s.mockFIFOScheduler.EXPECT().Start()
s.scheduler.Start()
Expand Down
11 changes: 11 additions & 0 deletions common/tasks/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package tasks

import (
"time"
)

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination scheduler_mock.go
type (
// Scheduler is the generic interface for scheduling & processing tasks
Expand All @@ -9,4 +13,11 @@ type (
Start()
Stop()
}

// SchedulerTimestampedTask is an optional interface that tasks can implement
// to enable queue wait latency tracking in schedulers.
SchedulerTimestampedTask interface {
SetSchedulerEnqueueTime(time.Time)
GetSchedulerEnqueueTime() time.Time
}
)
51 changes: 51 additions & 0 deletions common/tasks/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading