From 0a64c4b735d8dc8a857fbfe9c6be0a1d5ed6d87e Mon Sep 17 00:00:00 2001 From: Shingo Omura Date: Tue, 29 Sep 2020 21:48:10 +0900 Subject: [PATCH 1/2] wip --- pkg/backend/redis/queue.go | 3 +++ pkg/backend/redis/task.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/pkg/backend/redis/queue.go b/pkg/backend/redis/queue.go index 44ea134..9b0e955 100644 --- a/pkg/backend/redis/queue.go +++ b/pkg/backend/redis/queue.go @@ -20,6 +20,7 @@ package redis import ( "context" "encoding/json" + "fmt" "reflect" "time" @@ -271,8 +272,10 @@ func (b *Backend) ensureQueueExistsByUID(rds redis.Cmdable, uid string) (*taskqu rawQueue, err := rds.Get(b.queueKey(uid)).Result() switch { case err == redis.Nil: + b.Logger.Error().Err(err).Stack().Str("redis cmd", fmt.Sprintf("GET %s", b.queueKey(uid))).Msg("debug-next-task") return nil, iface.TaskQueueNotFound case err != nil: + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("GET %s", b.queueKey(uid))).Msg("debug-next-task") return nil, err } diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index bc46448..212c76f 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -398,9 +398,11 @@ func (b *Backend) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) ( toPop := false numWorkerPending, err := tx.LLen(workerPendingQueueKey).Result() if err == redis.Nil { + b.Logger.Error().Err(err).Stack().Str("redis cmd", fmt.Sprintf("LLEN %s", workerPendingQueueKey)).Msg("debug-next-task") toPop = true } if err != nil { + b.Logger.Error().Err(err).Stack().Str("redis cmd", fmt.Sprintf("LLEN %s", workerPendingQueueKey)).Msg("debug-next-task") return err } if numWorkerPending == 0 { @@ -413,6 +415,7 @@ func (b *Backend) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) ( return nil }) if err == redis.Nil { + return backoff.Permanent(iface.TaskQueueEmptyError) } return err From c28533ba0e2e7ccfc4ff652354c7e870019adfaa Mon Sep 17 00:00:00 2001 From: Shingo Omura Date: Tue, 29 Sep 2020 22:34:47 +0900 Subject: [PATCH 2/2] detailed log in code path from NextTask() --- pkg/backend/redis/queue.go | 2 +- pkg/backend/redis/task.go | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/backend/redis/queue.go b/pkg/backend/redis/queue.go index 9b0e955..02a1c58 100644 --- a/pkg/backend/redis/queue.go +++ b/pkg/backend/redis/queue.go @@ -272,7 +272,7 @@ func (b *Backend) ensureQueueExistsByUID(rds redis.Cmdable, uid string) (*taskqu rawQueue, err := rds.Get(b.queueKey(uid)).Result() switch { case err == redis.Nil: - b.Logger.Error().Err(err).Stack().Str("redis cmd", fmt.Sprintf("GET %s", b.queueKey(uid))).Msg("debug-next-task") + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("GET %s", b.queueKey(uid))).Msg("debug-next-task") return nil, iface.TaskQueueNotFound case err != nil: b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("GET %s", b.queueKey(uid))).Msg("debug-next-task") diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index 212c76f..d6ae9c5 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -398,11 +398,11 @@ func (b *Backend) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) ( toPop := false numWorkerPending, err := tx.LLen(workerPendingQueueKey).Result() if err == redis.Nil { - b.Logger.Error().Err(err).Stack().Str("redis cmd", fmt.Sprintf("LLEN %s", workerPendingQueueKey)).Msg("debug-next-task") + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("LLEN %s", workerPendingQueueKey)).Msg("debug-next-task") toPop = true } if err != nil { - b.Logger.Error().Err(err).Stack().Str("redis cmd", fmt.Sprintf("LLEN %s", workerPendingQueueKey)).Msg("debug-next-task") + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("LLEN %s", workerPendingQueueKey)).Msg("debug-next-task") return err } if numWorkerPending == 0 { @@ -415,12 +415,13 @@ func (b *Backend) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) ( return nil }) if err == redis.Nil { - + b.Logger.Error().Stack().Err(err).Msg("debug-next-task: this should not happen.") return backoff.Permanent(iface.TaskQueueEmptyError) } return err }, queueKey, workerPendingQueueKey) if err != nil { + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("MULTI; RPOPLPUSH %s %s; EXEC", pendingQueueKey, workerPendingQueueKey)).Msg("debug-next-task") return nil, err } @@ -457,9 +458,11 @@ func (b *Backend) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) ( // peak worker pending queue taskUIDs, err := tx.LRange(workerPendingQueueKey, -1, -1).Result() if err == redis.Nil { + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("LRANGE %s -1 -1", workerPendingQueueKey)).Msg("debug-next-task") return backoff.Permanent(iface.TaskQueueEmptyError) } if err != nil { + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("LRANGE %s -1 -1", workerPendingQueueKey)).Msg("debug-next-task") return err } if len(taskUIDs) == 0 { @@ -469,6 +472,7 @@ func (b *Backend) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) ( // get Task and transit to Received raw, err := tx.Get(b.taskKey(queue.UID.String(), taskUIDs[0])).Result() if err != nil { + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("Get %s", b.taskKey(queue.UID.String(), taskUIDs[0]))).Msg("debug-next-task") return err } t = &task.Task{} @@ -507,6 +511,7 @@ func (b *Backend) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) ( workerPendingQueueKey, workerTasksKey, ) if err != nil { + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("MULTI; RPOP %s; SADD %s %s; Set %s -1; EXEC", workerPendingQueueKey, workerTasksKey, t.UID, b.taskKey(queue.UID.String(), t.UID))).Msg("debug-next-task") return nil, err } return t, nil