diff --git a/pkg/backend/redis/queue.go b/pkg/backend/redis/queue.go index 44ea134..02a1c58 100644 --- a/pkg/backend/redis/queue.go +++ b/pkg/backend/redis/queue.go @@ -20,6 +20,7 @@ package redis import ( "context" "encoding/json" + "fmt" "reflect" "time" @@ -271,8 +272,10 @@ func (b *Backend) ensureQueueExistsByUID(rds redis.Cmdable, uid string) (*taskqu rawQueue, err := rds.Get(b.queueKey(uid)).Result() switch { case err == redis.Nil: + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("GET %s", b.queueKey(uid))).Msg("debug-next-task") return nil, iface.TaskQueueNotFound case err != nil: + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("GET %s", b.queueKey(uid))).Msg("debug-next-task") return nil, err } diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index bc46448..d6ae9c5 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -398,9 +398,11 @@ func (b *Backend) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) ( toPop := false numWorkerPending, err := tx.LLen(workerPendingQueueKey).Result() if err == redis.Nil { + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("LLEN %s", workerPendingQueueKey)).Msg("debug-next-task") toPop = true } if err != nil { + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("LLEN %s", workerPendingQueueKey)).Msg("debug-next-task") return err } if numWorkerPending == 0 { @@ -413,11 +415,13 @@ func (b *Backend) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) ( return nil }) if err == redis.Nil { + b.Logger.Error().Stack().Err(err).Msg("debug-next-task: this should not happen.") return backoff.Permanent(iface.TaskQueueEmptyError) } return err }, queueKey, workerPendingQueueKey) if err != nil { + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("MULTI; RPOPLPUSH %s %s; EXEC", pendingQueueKey, workerPendingQueueKey)).Msg("debug-next-task") return nil, err } @@ -454,9 +458,11 @@ func (b *Backend) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) ( // peak worker pending queue taskUIDs, err := tx.LRange(workerPendingQueueKey, -1, -1).Result() if err == redis.Nil { + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("LRANGE %s -1 -1", workerPendingQueueKey)).Msg("debug-next-task") return backoff.Permanent(iface.TaskQueueEmptyError) } if err != nil { + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("LRANGE %s -1 -1", workerPendingQueueKey)).Msg("debug-next-task") return err } if len(taskUIDs) == 0 { @@ -466,6 +472,7 @@ func (b *Backend) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) ( // get Task and transit to Received raw, err := tx.Get(b.taskKey(queue.UID.String(), taskUIDs[0])).Result() if err != nil { + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("Get %s", b.taskKey(queue.UID.String(), taskUIDs[0]))).Msg("debug-next-task") return err } t = &task.Task{} @@ -504,6 +511,7 @@ func (b *Backend) NextTask(ctx context.Context, queueUID, workerUID uuid.UUID) ( workerPendingQueueKey, workerTasksKey, ) if err != nil { + b.Logger.Error().Stack().Err(err).Str("redis cmd", fmt.Sprintf("MULTI; RPOP %s; SADD %s %s; Set %s -1; EXEC", workerPendingQueueKey, workerTasksKey, t.UID, b.taskKey(queue.UID.String(), t.UID))).Msg("debug-next-task") return nil, err } return t, nil