Conversation
📝 WalkthroughWalkthroughLarge-scale refactoring migrating streaming APIs from channel-based ServerStream interfaces to iterator-based iter.Seq2 sequences across BYOC providers and CloudWatch Logs library. Includes new ALB log retrieval for AWS, updated log parsing, and adjusted deployment status tracking with boolean done indicators. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.5.0)level=warning msg="[linters_context] running gomodguard failed: unable to read module file go.mod: current working directory must have a go.mod file: if you are not using go modules it is suggested to disable this linter" Comment |
fceecd9 to
212c710
Compare
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Fix all issues with AI agents
In `@src/cmd/cli/command/commands.go`:
- Around line 49-53: The tracking call is sending the raw error object
(track.Evt("CLI Error", P("err", err))) which may leak sensitive data or fail
serialization; update the error payload in the RootCmd.ExecuteContext error
handling (and the other instance around lines 380-383) to send a nil-safe,
sanitized string (e.g., use client.PrettyError(err) or fmt.Sprintf("%v", err) or
err.Error() with a nil check) and optionally add an error code field instead of
the raw err object so track.Evt receives only primitives/strings.
In `@src/pkg/cli/cd.go`:
- Around line 126-128: In TailAndWaitForCD do not convert a context.Canceled
error to nil; that hides user-initiated cancellation (e.g., Ctrl+C). Locate the
cdErr handling in TailAndWaitForCD (the block with "if errors.Is(cdErr,
context.Canceled) { cdErr = nil }") and remove or change it so that
context.Canceled is returned up the call stack unchanged; if you need to
distinguish internal shutdowns, introduce and check a specific internal sentinel
error (not context.Canceled) instead, and only suppress that sentinel while
preserving any context.Canceled errors.
In `@src/pkg/cli/client/byoc/aws/subscribe.go`:
- Around line 20-25: The loop over logSeq yields errors to the consumer but then
continues to process the associated events even when err != nil; update the
error branch inside the for events, err := range logSeq loop so that after
calling yield(nil, err) (and returning if it returns false) you immediately
continue to the next iteration instead of falling through to process events —
modify the block handling err != nil around the yield call (referencing logSeq,
events, err, and yield) to add a continue after a successful yield.
In `@src/pkg/cli/client/byoc/do/byoc.go`:
- Around line 510-527: The loop inside the Subscribe closure repeatedly calls
b.client.Apps.GetDeployment without any wait or cancellation check, causing a
tight busy loop; update the closure returned by the function to break or return
when ctx is cancelled (check ctx.Done()) and add a small configurable delay
(e.g. time.Ticker or time.Sleep) between iterations to throttle polling and
reduce CPU/API usage while keeping the existing yield behavior (reference the
returned func(yield func(*defangv1.SubscribeResponse, error) bool), the call to
b.client.Apps.GetDeployment, and phaseToState).
In `@src/pkg/cli/client/byoc/do/stream.go`:
- Around line 65-77: The code assumes strings.SplitN(data.Data, " ", 3) yields
three parts and directly indexes parts[0..2], which can panic on malformed
WebSocket messages; update the handler (the block that builds
defangv1.TailResponse / LogEntry using parts, timestamppb.New(ts), data.Op and
etag) to first validate len(parts) >= 3 and if not either skip/return a safe
error log or construct a fallback response, and also check the time.Parse result
for errors before calling timestamppb.New(ts) (use a fallback timestamp like
time.Now() or omit the timestamp on parse failure); ensure any skipped/malformed
messages are logged for debugging rather than causing a panic.
In `@src/pkg/cli/composeUp_test.go`:
- Around line 81-88: The mock GetDeploymentStatus currently always returns
false; change mockDeployProvider.GetDeploymentStatus so the first return value
indicates completion (true) when m.deploymentStatus != nil and false when it is
nil, preserving ctx handling: on ctx.Done() return true (or false?) — actually
return true with the context.Cause(ctx) error to indicate the operation
completed due to context cancellation, otherwise return (m.deploymentStatus !=
nil, m.deploymentStatus) so tests can signal completion via setting
m.deploymentStatus (e.g., io.EOF) and pending via nil.
In `@src/pkg/clouds/aws/cw/stream_test.go`:
- Around line 36-43: The test panics because CloudWatch events may have nil
Message fields; in the inner loop (the shadowed loop variable "for _, evt :=
range evt" inside the outer "for evt, err := range evts") add a nil guard before
dereferencing: ensure the loop item itself is non-nil and that evt.Message !=
nil, and only then call fmt.Println(*evt.Message); optionally rename the inner
loop variable to avoid shadowing (e.g., use "e" or "entry") for clarity.
In `@src/pkg/clouds/aws/cw/stream.go`:
- Around line 36-48: QueryLogGroup errors are currently ignored; change the
logic after calling QueryLogGroup(ctx, cwClient, lgi, start, end, 0) to surface
failures to the consumer by yielding the error (or returning) instead of
dropping it: if err != nil then call yield(nil, err) and stop if it returns
false (or return), otherwise when err == nil continue to iterate over querySeq
as before; update the branch around QueryLogGroup in stream handling to use this
pattern so callers can decide how to proceed.
In `@src/pkg/clouds/aws/ecs/status.go`:
- Around line 54-66: Guard dereferences of task.StoppedReason before using them:
compute a local reason string (e.g., reason := "<unknown>" or use
*task.StoppedReason when non-nil) and use that in both the default return
TaskFailure{task.StopCode, reason} and the EssentialContainerExited branch where
you build the container exit message (fmt.Sprintf("%s with code %d", reason,
*c.ExitCode)). Ensure you check task.StoppedReason != nil before dereferencing
in both places (the default case and inside the loop) and fall back to a safe
placeholder when nil.
🧹 Nitpick comments (6)
src/pkg/clouds/aws/ecs/tail.go (1)
99-101: Consider returning a sentinel error for "task already done" instead of nil iterator.When
done=trueanderr=nil, this returns(nil, nil). Ranging over aniliterator in Go is safe (it simply doesn't iterate), but returningnilto indicate "task completed, no logs to tail" is implicit behavior that callers must understand.Consider returning an explicit sentinel error (e.g.,
ErrTaskAlreadyComplete) to make this state explicit to callers.💡 Optional: Make the "task done" state explicit
// Define at package level: // var ErrTaskAlreadyComplete = errors.New("task already complete") // Then in this block: done, err := getTaskStatus(ctx, a.Region, a.ClusterName, taskID) if done { if err != nil { return nil, err } return nil, ErrTaskAlreadyComplete // explicit signal } if err != nil { return nil, err // TODO: handle transient errors }Caller would then handle:
tailIter, err := a.TailTaskID(ctx, cwClient, taskId) if errors.Is(err, ErrTaskAlreadyComplete) { // Task done before we could tail - just wait for result }src/pkg/clouds/aws/ecs/status.go (1)
16-17: Update the docstrings to reflect the new(done, err)contract.The comment above
GetTaskStatusstill describesnil/io.EOFreturns; now the function returns(bool, error). Please update it (and the similar comment on Line 33) to avoid confusion.✍️ Suggested doc update
-// GetTaskStatus returns nil if the task is still running, io.EOF if the task is stopped successfully, or an error if the task failed. +// GetTaskStatus returns (done=false, err=nil) while running; done=true with io.EOF on success or a TaskFailure on failure. ... -// getTaskStatus returns nil if the task is still running, io.EOF if the task is stopped successfully, or an error if the task failed. +// getTaskStatus returns (done=false, err=nil) while running; done=true with io.EOF on success or a TaskFailure on failure.src/pkg/cli/client/byoc/aws/alb_logs_test.go (1)
28-38: Mock ListObjectsV2 ignores pagination and returns error alongside contents.The mock returns both
contentsanderrfromos.ReadDir. Iferris non-nil,contentsmay be partially populated or invalid. Per learnings, mocks can panic to surface issues in tests, but silently returning both could mask test failures.Also,
params.Prefixis ignored, which may cause tests to behave differently from production where prefix filtering matters.src/pkg/cli/subscribe_test.go (1)
299-304: Test assertions may have inverted logic.Line 299:
!tt.expectRetry && isTransientError(err) && provider.retry > 5- if we don't expect retry but got a transient error and retried more than 5 times, that's unexpected. This seems correct.Line 302:
tt.expectRetry && err == nil && provider.retry < 5- if we expect retry but got no error and retried less than 5 times, that's unexpected. However, when retries succeed after 5 attempts,errshould be the canceled error (not nil), so this condition may never trigger correctly.The test logic appears to work but could be clearer.
src/pkg/cli/client/byoc/aws/alb_logs.go (1)
169-175: Error handling on object read could be more resilient.When
GetObjectorreadAlbLogsfails for one object, the entire group fails. The comments ask "or continue with other objects?" - for better resilience, consider logging the error and continuing with remaining objects, since partial results may be better than none.Proposed change for resilience
for _, obj := range group { content, err := s3Client.GetObject(ctx, &s3.GetObjectInput{ Bucket: &bucketName, Key: obj.Key, }) if err != nil { - return nil, err // or continue with other objects? + term.Debugf("Failed to read ALB log object %s: %v", *obj.Key, err) + continue // Continue with other objects } entries, err := readAlbLogs(content.Body, since, end, pattern) if err != nil { - return nil, err // or continue with other objects? + term.Debugf("Failed to parse ALB log object %s: %v", *obj.Key, err) + continue // Continue with other objects }src/pkg/cli/client/byoc/gcp/stream.go (1)
163-191: Consider limit counting behavior.The
countincrements per yielded response, butparseAndFiltercan return multiple responses per log entry. This means the actual number of yielded items may slightly exceedlimitif a single entry produces multiple responses. If strict limiting is required, consider checking the limit before yielding each response rather than after incrementing.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/pkg/cli/client/byoc/do/stream.go`:
- Around line 42-53: The websocket read loop in the returned function uses
conn.ReadMessage and never observes ctx after dialing, so cancellation can block
indefinitely; modify the function (the goroutine that runs the for loop using
conn.ReadMessage and yield) to honor ctx by either: a) wrapping the ReadMessage
call with a select (e.g., run ReadMessage in a goroutine and select between its
result and ctx.Done()), or b) spawn a watcher goroutine that waits on ctx.Done()
and calls conn.Close() to unblock ReadMessage; ensure you close the connection
and return from the loop when ctx is done so the function exits cleanly.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/pkg/cli/client/byoc/do/byoc.go (1)
465-470:⚠️ Potential issue | 🟠 MajorGuard empty LiveURL before calling
streamLogs.When only build logs are requested,
processServiceLogscan return"", which then gets passed tostreamLogsand results in a dial error. Prefer treating empty as “no live logs.”🛠️ Suggested fix
appLiveURL, err := b.processServiceLogs(ctx, req.Project, logType) if err != nil { return nil, err } - return streamLogs(ctx, appLiveURL, req.Etag) + if appLiveURL == "" { + appLiveURL = "none" + } + return streamLogs(ctx, appLiveURL, req.Etag)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/pkg/cli/client/byoc/do/byoc.go` around lines 465 - 470, processServiceLogs can return an empty appLiveURL which is then passed to streamLogs causing a dial error; update the caller to guard against empty by checking the appLiveURL returned from processServiceLogs and treating it as “no live logs” (e.g., return nil, nil or the appropriate no-logs response) instead of calling streamLogs when appLiveURL == "". Locate the code around the call to processServiceLogs and streamLogs (variables appLiveURL, req.Etag) and add a simple conditional: if appLiveURL == "" { return nil, nil } before invoking streamLogs.src/pkg/cli/subscribe.go (1)
50-98:⚠️ Potential issue | 🟠 MajorReorder message processing before error handling in iter.Seq2 loop.
With
iter.Pull2, the final batch can arrive withmsg != nilanderr == io.EOF. Sinceio.EOFis not a transient error, the current code returns immediately without processing that final message, silently dropping critical service state updates.Suggested fix
msg, err, ok := next() if !ok { return serviceStates, nil } - if err != nil { - // Reconnect on transient errors - if isTransientError(err) { - if err := provider.DelayBeforeRetry(ctx); err != nil { - return serviceStates, err - } - stop() // stop the old iterator - logs, err = provider.Subscribe(ctx, &subscribeRequest) - if err != nil { - return serviceStates, err - } - next, stop = iter.Pull2(logs) - continue - } - return serviceStates, err - } - - if msg == nil { - continue - } - - term.Debugf("service %s with state ( %s ) and status: %s\n", msg.Name, msg.State, msg.Status) // TODO: don't print in Go-routine - - if _, ok := serviceStates[msg.Name]; !ok { - term.Debugf("unexpected service %s update", msg.Name) // TODO: don't print in Go-routine - continue - } - if msg.State == defangv1.ServiceState_NOT_SPECIFIED { - // We might get task/service states that do not map to a ServiceState; ignore those - continue - } - - if serviceStates[msg.Name] != targetState { - serviceStates[msg.Name] = msg.State - - // exit early on detecting a FAILED state - switch msg.State { - case defangv1.ServiceState_BUILD_FAILED, defangv1.ServiceState_DEPLOYMENT_FAILED: - return serviceStates, client.ErrDeploymentFailed{Service: msg.Name, Message: msg.Status} - } - } - - if allInState(targetState, serviceStates) { - return serviceStates, nil // all services are in the target state - } + if msg != nil { + term.Debugf("service %s with state ( %s ) and status: %s\n", msg.Name, msg.State, msg.Status) // TODO: don't print in Go-routine + + if _, ok := serviceStates[msg.Name]; !ok { + term.Debugf("unexpected service %s update", msg.Name) // TODO: don't print in Go-routine + } else if msg.State != defangv1.ServiceState_NOT_SPECIFIED { + if serviceStates[msg.Name] != targetState { + serviceStates[msg.Name] = msg.State + + // exit early on detecting a FAILED state + switch msg.State { + case defangv1.ServiceState_BUILD_FAILED, defangv1.ServiceState_DEPLOYMENT_FAILED: + return serviceStates, client.ErrDeploymentFailed{Service: msg.Name, Message: msg.Status} + } + } + } + + if allInState(targetState, serviceStates) { + return serviceStates, nil // all services are in the target state + } + } + + if err != nil { + // Reconnect on transient errors + if isTransientError(err) { + if err := provider.DelayBeforeRetry(ctx); err != nil { + return serviceStates, err + } + stop() // stop the old iterator + logs, err = provider.Subscribe(ctx, &subscribeRequest) + if err != nil { + return serviceStates, err + } + next, stop = iter.Pull2(logs) + continue + } + return serviceStates, err + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/pkg/cli/subscribe.go` around lines 50 - 98, The loop currently returns on any non-transient err before processing a concurrently-delivered final msg from next(); to fix, process msg (when msg != nil) immediately after calling next()/next, handling state updates (serviceStates, targetState, allInState, and FAILED state checks) before treating errors, and only then handle err: if isTransientError(err) perform the existing DelayBeforeRetry/stop()/provider.Subscribe/iter.Pull2 retry flow, if err == io.EOF treat it as stream end after processing the message (i.e., break/return normally) and for other non-transient errors return the error; keep references to next(), iter.Pull2, stop(), provider.DelayBeforeRetry, provider.Subscribe, client.ErrDeploymentFailed and defangv1.ServiceState_* so you update the right sections.
🧹 Nitpick comments (3)
src/pkg/clouds/aws/ecs/status.go (2)
16-17: Documentation comment is outdated for the new signature.The comment still describes the old
error-only return. With the new(bool, error)signature, consider updating to clarify the semantics:
(false, nil): task still running or not found(true, io.EOF): task stopped successfully(true, TaskFailure): task failed(false, err): unable to check status📝 Suggested documentation update
-// GetTaskStatus returns nil if the task is still running, io.EOF if the task is stopped successfully, or an error if the task failed. -func GetTaskStatus(ctx context.Context, taskArn TaskArn) (bool, error) { +// GetTaskStatus returns (done, err) where done indicates if the task reached a terminal state. +// Returns (false, nil) if still running, (true, io.EOF) on success, or (true, TaskFailure) on failure. +func GetTaskStatus(ctx context.Context, taskArn TaskArn) (done bool, err error) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/pkg/clouds/aws/ecs/status.go` around lines 16 - 17, Update the doc comment for GetTaskStatus to match the new (bool, error) signature: describe the return semantics clearly (e.g., (false, nil) means task still running or not found, (true, io.EOF) means task stopped successfully, (true, TaskFailure) means task failed, and (false, err) means an error occurred while checking status) and mention the types referenced (TaskArn, TaskFailure, io.EOF) so callers understand the possible error values.
33-34: Same documentation update needed here.The internal function has the same stale comment as the public one.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/pkg/clouds/aws/ecs/status.go` around lines 33 - 34, The comment above getTaskStatus is stale; update the doc comment for the internal function getTaskStatus(ctx context.Context, region aws.Region, cluster, taskId string) (bool, error) to match the actual behavior (describe what the bool represents and which error values are returned instead of the outdated "nil/io.EOF" wording), ensuring the comment accurately documents return semantics and error cases for consumers and maintainers.src/pkg/clouds/aws/cw/stream.go (1)
84-139: Consider lazily starting fan-out goroutines inside the iterator.The goroutines and channel are created before returning the seq, so if the caller never iterates, the tails can run indefinitely. Moving setup into the returned function would align lifecycle with consumption.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/pkg/clouds/aws/cw/stream.go` around lines 84 - 139, The current QueryAndTailLogGroups eagerly starts goroutines and a channel so tails run even if the returned seq is never consumed; change it to defer creating ch, wg, and starting the per-log-group goroutines until the returned iterator function is invoked: move the channel (ch), sync.WaitGroup and the loop that calls QueryAndTailLogGroup and launches goroutines into the returned func closure, preserving the existing logic for collecting lastErr before starting goroutines (if lastErr != nil return it immediately), ensure context cancellation (cancel()) is still deferred at the start of the iterator, and keep the same behavior for yielding results and stopping on errors so QueryAndTailLogGroups and QueryAndTailLogGroup identifiers are used to locate the code to move.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/pkg/cli/client/byoc/aws/byoc.go`:
- Around line 790-806: When ALB logs will be merged (len(req.Services) == 0) we
must avoid applying req.Limit twice; change the call site that invokes
QueryLogGroups to pass 0 (unlimited) for the CloudWatch query when req.Limit > 0
and alb merging will occur so that cw.MergeLogEvents can merge full streams and
then you apply the final limit once via cw.TakeFirstN/cw.TakeLastN; locate the
QueryLogGroups invocation and conditionally set its limit argument to 0 when
fetchAndStreamAlbLogs will be used (referencing req.Limit,
fetchAndStreamAlbLogs, cw.MergeLogEvents, cw.TakeFirstN and cw.TakeLastN).
In `@src/pkg/cli/client/byoc/do/stream.go`:
- Around line 61-70: The code currently proceeds to parse and use `data` after a
failed `json.Unmarshal`, which can reuse stale values; modify the error handling
in the loop around `json.Unmarshal(message, &data)` so that when unmarshalling
fails you call `yield(nil, err)` and if it returns true you skip further
processing of this message by continuing the loop (i.e., do not fall through to
splitting `data.Data` and forming `parts`), referencing the existing
`json.Unmarshal`, `message`, `data`, `yield`, and `parts` symbols to locate and
update the logic.
---
Outside diff comments:
In `@src/pkg/cli/client/byoc/do/byoc.go`:
- Around line 465-470: processServiceLogs can return an empty appLiveURL which
is then passed to streamLogs causing a dial error; update the caller to guard
against empty by checking the appLiveURL returned from processServiceLogs and
treating it as “no live logs” (e.g., return nil, nil or the appropriate no-logs
response) instead of calling streamLogs when appLiveURL == "". Locate the code
around the call to processServiceLogs and streamLogs (variables appLiveURL,
req.Etag) and add a simple conditional: if appLiveURL == "" { return nil, nil }
before invoking streamLogs.
In `@src/pkg/cli/subscribe.go`:
- Around line 50-98: The loop currently returns on any non-transient err before
processing a concurrently-delivered final msg from next(); to fix, process msg
(when msg != nil) immediately after calling next()/next, handling state updates
(serviceStates, targetState, allInState, and FAILED state checks) before
treating errors, and only then handle err: if isTransientError(err) perform the
existing DelayBeforeRetry/stop()/provider.Subscribe/iter.Pull2 retry flow, if
err == io.EOF treat it as stream end after processing the message (i.e.,
break/return normally) and for other non-transient errors return the error; keep
references to next(), iter.Pull2, stop(), provider.DelayBeforeRetry,
provider.Subscribe, client.ErrDeploymentFailed and defangv1.ServiceState_* so
you update the right sections.
---
Duplicate comments:
In `@src/pkg/cli/client/byoc/do/stream.go`:
- Around line 42-53: The websocket read loop in the returned func (the closure
around conn.ReadMessage) doesn’t honor ctx cancellation and can block; add logic
to close the websocket when ctx is done so the ReadMessage loop unblocks. Inside
the same function that defines conn and the for loop (the closure returned by
the function), start a goroutine that selects on ctx.Done() and calls
conn.Close() (or conn.WriteMessage with CloseMessage then conn.Close()) to force
ReadMessage to return, ensuring the existing defer still runs; update references
around the for loop and the conn.ReadMessage handling to rely on that shutdown
path.
---
Nitpick comments:
In `@src/pkg/clouds/aws/cw/stream.go`:
- Around line 84-139: The current QueryAndTailLogGroups eagerly starts
goroutines and a channel so tails run even if the returned seq is never
consumed; change it to defer creating ch, wg, and starting the per-log-group
goroutines until the returned iterator function is invoked: move the channel
(ch), sync.WaitGroup and the loop that calls QueryAndTailLogGroup and launches
goroutines into the returned func closure, preserving the existing logic for
collecting lastErr before starting goroutines (if lastErr != nil return it
immediately), ensure context cancellation (cancel()) is still deferred at the
start of the iterator, and keep the same behavior for yielding results and
stopping on errors so QueryAndTailLogGroups and QueryAndTailLogGroup identifiers
are used to locate the code to move.
In `@src/pkg/clouds/aws/ecs/status.go`:
- Around line 16-17: Update the doc comment for GetTaskStatus to match the new
(bool, error) signature: describe the return semantics clearly (e.g., (false,
nil) means task still running or not found, (true, io.EOF) means task stopped
successfully, (true, TaskFailure) means task failed, and (false, err) means an
error occurred while checking status) and mention the types referenced (TaskArn,
TaskFailure, io.EOF) so callers understand the possible error values.
- Around line 33-34: The comment above getTaskStatus is stale; update the doc
comment for the internal function getTaskStatus(ctx context.Context, region
aws.Region, cluster, taskId string) (bool, error) to match the actual behavior
(describe what the bool represents and which error values are returned instead
of the outdated "nil/io.EOF" wording), ensuring the comment accurately documents
return semantics and error cases for consumers and maintainers.
Description
Logs refactoring.
QueryLogs. This works, but had open issues:limitfor ALB logsLinked Issues
Checklist
Summary by CodeRabbit
New Features
Improvements