-
Notifications
You must be signed in to change notification settings - Fork 1
[BB-1221] Add job sync for users #25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
pkg/connector/client/auth0.go
Outdated
| var target Job | ||
|
|
||
| // TODO(golds): we need a better way to handle cache invalidation in uhttp. | ||
| err := uhttp.ClearCaches(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't do it. this way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue was relate to get the status for the job, since it's cached, the status was never completed.
uhttp gets the cache by env, so we are not able to create one without cache.
Maybe create http.Client manually and handles the status like uhttp
pkg/connector/client/job.go
Outdated
|
|
||
| var users []User | ||
|
|
||
| for len(allData) > index { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a json lines reader? i think bufio.NewScanner is more idiomatic and covers more edge cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, each line it's a json object, I will take a look at bufio.NewScanner
|
|
||
| logger.Debug("Sync job completed", zap.String("job_id", state.Id)) | ||
|
|
||
| usersJob, err := o.client.ProcessUserJob(ctx, job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how large can this be? can this be large enough to break our 4mb gRPC responses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, looks like we need to split the process for the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of
- Process each time and get am offset by the pagination
- Process once and returns a batch for each request, storing in memory, but we will not be able to container this connector until Cache comes in sdk
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughIntroduces job-based user synchronization. Adds config flags and connector plumbing to create, poll, and process Auth0 user export jobs. Implements job models, endpoints, client methods, and gzip NDJSON processing. Adjusts request handling for optional cache control. Updates dependencies and tests. Extends user resources with CreatedAt. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant S as Sync Loop
participant Conn as Connector
participant UB as userBuilder
participant C as Client
participant A as Auth0 API
participant H as Job File Host
S->>Conn: ResourceSyncers()
Conn->>UB: newUserBuilder(client, byJob, limit)
S->>UB: List(state)
alt First page (no token)
UB->>C: CreateJob(limit)
C->>A: POST /api/v2/jobs/users-exports\nbody: {fields:[...], limit}
A-->>C: 200 Job{id}
C-->>UB: Job + rate limit
UB-->>S: Pending token (jobId), retry later
else Polling
UB->>C: GetJob(jobId)
C->>A: GET /api/v2/jobs/{id}
A-->>C: 200 Job{status}
C-->>UB: Job + rate limit
alt status != completed
UB-->>S: Pending token (jobId), retry later
else completed with location
UB->>C: ProcessUserJob(job)
C->>H: GET job.location (gzip NDJSON)
H-->>C: 200 stream
C-->>UB: []User
UB-->>S: Emit Resources (with CreatedAt)
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/connector/client/request.go (1)
207-209: Fix possible nil-pointer dereference on request error.If response is nil, accessing response.Body will panic. Mirror the guard used in doRequest.
Apply:
- if err != nil { - return nil, &ratelimitData, fmt.Errorf("error doing request: %w, body: %v", err, logBody(response.Body)) - } + if err != nil { + if response != nil { + return nil, &ratelimitData, fmt.Errorf("error doing request: %w, body: %v", err, logBody(response.Body)) + } + return nil, &ratelimitData, fmt.Errorf("error doing request: %w", err) + }
🧹 Nitpick comments (6)
go.mod (1)
13-14: Directly pinning grpc/protobuf: ensure generators are aligned.Good to pin runtime versions. Make sure protoc-gen-go and protoc-gen-go-grpc are compatibly pinned in your toolchain to avoid mismatches with runtime libraries.
Suggested pins for local/CI generators:
- protoc-gen-go: google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.5
- protoc-gen-go-grpc: google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.4.0
pkg/connector/client/job_test.go (1)
10-40: Use os.Open for readability (mode is ignored for reads).Minor: replace os.OpenFile(..., os.O_RDONLY, 0600) with os.Open.
Apply:
- file, err := os.OpenFile(s.file, os.O_RDONLY, 0600) + file, err := os.Open(s.file)pkg/connector/client/auth0.go (1)
425-477: DRY the exported fields list for CreateJob.Extract the fields slice to a package-level var/const to avoid duplication and make future changes easier.
Add above functions:
var userExportFields = []JobField{ {Name: "user_id"}, {Name: "name"}, {Name: "email"}, {Name: "nickname"}, {Name: "created_at"}, {Name: "updated_at"}, }Then update here:
- Fields: []JobField{ - { Name: "user_id" }, - { Name: "name" }, - { Name: "email" }, - { Name: "nickname" }, - { Name: "created_at" }, - { Name: "updated_at" }, - }, + Fields: userExportFields,pkg/config/schema.go (1)
27-34: Add defaults and minimum validation for job-sync fields (no upper bound on limit)SyncUsersByJob = field.BoolField( "sync-users-by-job", field.WithDescription("Sync users by job (only applicable for Auth0 tenants with more than 1000 users https://auth0.com/docs/users/search/v3/view-search-results-by-page#limitation)"), + field.WithDefault(false), ) SyncUsersByJobLimit = field.IntField( "sync-users-by-job-limit", field.WithDescription("Number of users to fetch per job (only applicable if sync-users-by-job is true)"), + field.WithDefault(1000), + field.WithMinimum(1), )Auth0 imposes no documented maximum on
limit(omit it to export all users); valid export formats are"csv"and"json"(NDJSON).pkg/connector/client/job.go (1)
49-50: Avoid accumulating entire export in memory; expose a streaming/page API.Returning []User for large exports risks high memory use and downstream 4MB gRPC payloads. Consider a paged variant like ProcessUserJobPage(ctx, job, offset, pageSize) to decode and return a slice plus next offset, or a callback-based streaming API.
pkg/connector/users.go (1)
140-154: Split results to respect 4MB gRPC limits and reduce memory pressure.Processing the entire export into one response can exceed gRPC message limits and spike memory. Page the results using the state bag (offset/pageSize) and a paged decode from the gz file each invocation. This avoids holding all users at once and fits responses under 4MB.
Example approach:
- Extend syncJobPage with Offset and PageSize.
- Add a client method ProcessUserJobPage(ctx, job, offset, pageSize) (or a callback) that decodes JSON objects from the gzip stream, skips offset users, and returns up to pageSize users plus next offset.
Pseudo-usage here:
type syncJobPage struct { Id string; Attempt int; Offset int; PageSize int } if state.PageSize == 0 { state.PageSize = 400 } // tune to stay <4MB users, nextOffset, done, err := o.client.ProcessUserJobPage(ctx, job, state.Offset, state.PageSize) ... if !done { bag.Push(syncJobPage{ Id: state.Id, Attempt: 0, Offset: nextOffset, PageSize: state.PageSize }) nextToken, _ := bag.Marshal() return outputResources, nextToken, outputAnnotations, nil }Note: Implement ProcessUserJobPage with json.Decoder to skip offset items and decode the next page efficiently.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (100)
cmd/baton-auth0/main.gois excluded by none and included by nonego.sumis excluded by!**/*.sumand included by nonepkg/connector/client/asset/empty.json.gzis excluded by!**/*.gzand included bypkg/**pkg/connector/client/asset/test.json.gzis excluded by!**/*.gzand included bypkg/**pkg/connector/client/asset/test2.json.gzis excluded by!**/*.gzand included bypkg/**vendor/github.com/conductorone/baton-sdk/internal/connector/connector.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/annotation_resource_tree.pb.gois excluded by!**/*.pb.go,!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/annotation_resource_tree.pb.validate.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/annotation_sync_id.pb.gois excluded by!**/*.pb.go,!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/annotation_sync_id.pb.validate.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/connector.pb.gois excluded by!**/*.pb.go,!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/event_feed.pb.gois excluded by!**/*.pb.go,!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/event_feed.pb.validate.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/resource.pb.gois excluded by!**/*.pb.go,!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/resource.pb.validate.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1/baton.pb.gois excluded by!**/*.pb.go,!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1/baton.pb.validate.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1/session.pb.gois excluded by!**/*.pb.go,!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1/session.pb.validate.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1/session_grpc.pb.gois excluded by!**/*.pb.go,!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/annotations/annotations.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/bid/bid.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/cli/cli.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/cli/commands.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/cli/lambda_server__added.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/config/config.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/connectorbuilder/connectorbuilder.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/connectorrunner/runner.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/connectorstore/connectorstore.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/crypto/client_secret.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/crypto/crypto.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/crypto/password.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/crypto/providers/jwk/jwk.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/crypto/providers/registry.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/dotc1z/c1file.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/dotc1z/c1file_attached.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/dotc1z/clone_sync.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/dotc1z/decoder.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/dotc1z/diff.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/dotc1z/dotc1z.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/dotc1z/file.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/dotc1z/manager/local/local.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/dotc1z/manager/manager.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/dotc1z/manager/s3/s3.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/dotc1z/sql_helpers.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/dotc1z/sync_runs.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/field/defaults.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/field/validation.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/lambda/grpc/config/config.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/lambda/grpc/transport.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/provisioner/provisioner.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/ratelimit/mem_ratelimiter.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/sdk/version.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/session/README.mdis excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/session/grpc_session.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/session/json.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/session/memory.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/session/session.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/sync/client_wrapper.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/sync/expand/cycle.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/sync/expand/graph.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/sync/expand/scc/bitset.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/sync/expand/scc/scc.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/sync/expand/scc/test_source.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/sync/state.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/sync/syncer.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/synccompactor/attached/attached.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/synccompactor/compactor.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/tasks/c1api/delete_resource.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/tasks/c1api/full_sync.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/tasks/c1api/manager.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/tasks/c1api/service_client.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/tasks/local/syncer.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/types/session_cache.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/types/types.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/ugrpc/c1_credential_provider.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/ugrpc/interceptors.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/uhttp/dbcache.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/uhttp/wrapper.gois excluded by!vendor/**and included by nonevendor/github.com/conductorone/baton-sdk/pkg/us3/s3.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/README.mdis excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/huff0/bitreader.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/internal/le/le.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/internal/le/unsafe_disabled.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/internal/le/unsafe_enabled.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/s2sx.modis excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/README.mdis excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/bitreader.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/blockdec.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/blockenc.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/decoder.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/enc_base.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/matchlen_generic.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/seqdec.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/seqdec_amd64.sis excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/seqdec_generic.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/seqenc.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/snappy.gois excluded by!vendor/**and included by nonevendor/github.com/klauspost/compress/zstd/zstd.gois excluded by!vendor/**and included by nonevendor/modules.txtis excluded by!vendor/**and included by none
📒 Files selected for processing (11)
go.mod(2 hunks)pkg/config/schema.go(1 hunks)pkg/connector/client/auth0.go(1 hunks)pkg/connector/client/job.go(1 hunks)pkg/connector/client/job_test.go(1 hunks)pkg/connector/client/models.go(1 hunks)pkg/connector/client/path.go(1 hunks)pkg/connector/client/request.go(5 hunks)pkg/connector/connector.go(3 hunks)pkg/connector/users.go(5 hunks)pkg/connector/users_test.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
pkg/connector/client/auth0.go (1)
pkg/connector/client/models.go (2)
Job(119-128)JobField(115-117)
pkg/connector/client/job.go (2)
pkg/connector/client/auth0.go (2)
Client(14-18)New(20-58)pkg/connector/client/models.go (2)
Job(119-128)User(57-67)
pkg/connector/users.go (2)
pkg/connector/client/auth0.go (2)
Client(14-18)New(20-58)pkg/connector/connector.go (1)
New(65-85)
pkg/connector/connector.go (1)
pkg/connector/client/auth0.go (2)
Client(14-18)New(20-58)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Cursor Bugbot
🔇 Additional comments (12)
pkg/connector/users_test.go (1)
32-32: LGTM: test explicitly disables job-sync path.Constructor update is correct and keeps this test on the paginated (non-job) code path.
pkg/connector/client/path.go (1)
20-21: LGTM: Auth0 job endpoints look correct.Paths match Auth0 Management API for creating and fetching jobs.
pkg/connector/client/request.go (1)
117-141: Good addition: per-request cache control.The cache flag and WithNoCache option address stale job status responses without changing default behavior.
pkg/connector/client/models.go (1)
115-128: LGTM: Job and JobField models.Fields and JSON tags align with Auth0 jobs schema for users export.
pkg/connector/client/auth0.go (1)
487-497: LGTM: no-cache on GetJob to avoid stale status.Using doRequest with cache=false directly resolves the prior cache issue when polling job status.
go.mod (1)
3-3: Confirm CI/build environments support Go 1.25
I didn’t find any CI configs in the repo—please ensure any CI images, Dockerfiles, and release builders are updated to Go ≥ 1.25.pkg/connector/client/job.go (2)
17-24: Good precondition checks on job status and location.Early validation prevents unnecessary external calls.
52-61: Resolved: previous newline/index parsing bug is gone.Switching to line-wise parsing eliminates the earlier incorrect index handling raised in prior review.
pkg/connector/users.go (2)
46-46: Nice: propagate CreatedAt to user trait.Helps downstream systems with temporal context.
76-105: Job creation flow looks solid.State bag setup and token handoff are correct.
pkg/connector/connector.go (2)
57-59: Good guard for syncUsersByJobLimit.Prevents misconfiguration early.
65-85: Constructor wiring looks correct.New fields are passed through and initialized as expected; user builder receives the flags.
| func process(reader io.Reader) ([]User, error) { | ||
| gzipReader, err := gzip.NewReader(reader) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer gzipReader.Close() | ||
|
|
||
| var users []User | ||
|
|
||
| scanner := bufio.NewScanner(gzipReader) | ||
|
|
||
| for scanner.Scan() { | ||
| line := scanner.Bytes() | ||
| var user User | ||
| err = json.Unmarshal(line, &user) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| users = append(users, user) | ||
| } | ||
|
|
||
| return users, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix NDJSON parsing: bufio.Scanner 64KB limit causes data loss; switch to json.Decoder or increase buffer and check errors.
Scanner’s default 64KB token limit will truncate large user objects; current code also ignores scanner.Err(), so truncation can be silent. Use json.Decoder to handle arbitrarily large objects and properly terminate on io.EOF.
Apply this diff:
-func process(reader io.Reader) ([]User, error) {
- gzipReader, err := gzip.NewReader(reader)
- if err != nil {
- return nil, err
- }
- defer gzipReader.Close()
-
- var users []User
-
- scanner := bufio.NewScanner(gzipReader)
-
- for scanner.Scan() {
- line := scanner.Bytes()
- var user User
- err = json.Unmarshal(line, &user)
- if err != nil {
- return nil, err
- }
- users = append(users, user)
- }
-
- return users, nil
-}
+func process(reader io.Reader) ([]User, error) {
+ gr, err := gzip.NewReader(reader)
+ if err != nil {
+ return nil, err
+ }
+ defer gr.Close()
+
+ dec := json.NewDecoder(gr)
+ users := make([]User, 0, 1024)
+ for {
+ var u User
+ if err := dec.Decode(&u); err != nil {
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ return nil, err
+ }
+ users = append(users, u)
+ }
+ return users, nil
+}Alternative (if you prefer Scanner): set a large buffer and check errors:
scanner := bufio.NewScanner(gr)
buf := make([]byte, 0, 1<<20) // 1MB initial
scanner.Buffer(buf, 100<<20) // up to 100MB per line
for scanner.Scan() { ... }
if err := scanner.Err(); err != nil { return nil, err }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func process(reader io.Reader) ([]User, error) { | |
| gzipReader, err := gzip.NewReader(reader) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer gzipReader.Close() | |
| var users []User | |
| scanner := bufio.NewScanner(gzipReader) | |
| for scanner.Scan() { | |
| line := scanner.Bytes() | |
| var user User | |
| err = json.Unmarshal(line, &user) | |
| if err != nil { | |
| return nil, err | |
| } | |
| users = append(users, user) | |
| } | |
| return users, nil | |
| } | |
| func process(reader io.Reader) ([]User, error) { | |
| gr, err := gzip.NewReader(reader) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer gr.Close() | |
| dec := json.NewDecoder(gr) | |
| users := make([]User, 0, 1024) | |
| for { | |
| var u User | |
| if err := dec.Decode(&u); err != nil { | |
| if errors.Is(err, io.EOF) { | |
| break | |
| } | |
| return nil, err | |
| } | |
| users = append(users, u) | |
| } | |
| return users, nil | |
| } |
🤖 Prompt for AI Agents
In pkg/connector/client/job.go around lines 52 to 74, the code uses
bufio.Scanner which has a 64KB token limit and also never checks scanner.Err(),
causing silent truncation of large NDJSON user objects; replace the Scanner loop
with a json.Decoder reading directly from the gzip.Reader and decode successive
User objects until io.EOF to correctly handle arbitrarily large objects and
termination, ensuring you propagate any decode errors, close the gzip reader in
a defer, and return the collected users; alternatively, if you must keep
Scanner, call scanner.Buffer with a sufficiently large max token size (e.g.
100MB) and after the loop check and return scanner.Err() if non-nil.
| if job.Status != "completed" { | ||
| var anno annotations.Annotations | ||
|
|
||
| anno.WithRateLimiting(&v2.RateLimitDescription{ | ||
| Limit: 1, | ||
| Remaining: 0, | ||
| ResetAt: timestamppb.New(time.Now().Add(time.Second * 10)), | ||
| }) | ||
| bag.Push(syncJobPage{ | ||
| Id: state.Id, | ||
| Attempt: state.Attempt + 1, | ||
| }) | ||
|
|
||
| nextToken, err := bag.Marshal() | ||
| if err != nil { | ||
| return nil, "", anno, err | ||
| } | ||
|
|
||
| return nil, nextToken, anno, status.Errorf(codes.Unavailable, "Sync job it's not completed: %s", job.Status) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle failed job status and add backoff instead of fixed 10s.
Currently non-completed statuses all retry; a failed job will loop forever. Also, fixed 10s retry is suboptimal.
Apply this diff:
- if job.Status != "completed" {
+ if job.Status == "failed" {
+ return nil, "", outputAnnotations, status.Errorf(codes.FailedPrecondition, "sync job failed (id=%s)", job.Id)
+ }
+ if job.Status != "completed" {
var anno annotations.Annotations
-
- anno.WithRateLimiting(&v2.RateLimitDescription{
- Limit: 1,
- Remaining: 0,
- ResetAt: timestamppb.New(time.Now().Add(time.Second * 10)),
- })
+ // Exponential backoff with cap.
+ delay := time.Second * time.Duration(1<<state.Attempt)
+ if delay > time.Minute {
+ delay = time.Minute
+ }
+ anno.WithRateLimiting(&v2.RateLimitDescription{
+ Limit: 1,
+ Remaining: 0,
+ ResetAt: timestamppb.New(time.Now().Add(delay)),
+ })
bag.Push(syncJobPage{
Id: state.Id,
Attempt: state.Attempt + 1,
})
nextToken, err := bag.Marshal()
if err != nil {
return nil, "", anno, err
}
return nil, nextToken, anno, status.Errorf(codes.Unavailable, "Sync job it's not completed: %s", job.Status)
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In pkg/connector/users.go around lines 117 to 136, non-completed job statuses
are always re-queued and use a fixed 10s retry which causes failed jobs to loop
forever and is suboptimal; change the logic so that if job.Status == "failed"
you do not requeue but return an immediate error (e.g.,
status.Errorf(codes.FailedPrecondition, "Sync job failed: %s", job.Status)) so
callers know it failed, and for other non-completed statuses implement
exponential backoff based on state.Attempt (e.g., backoff = min(maxBackoff,
base*(2^Attempt))) to compute ResetAt instead of a fixed 10s, increment Attempt
when pushing the job back into the bag, update the rate-limit annotation with
that computed ResetAt (and appropriate Limit/Remaining), marshal and return
nextToken as before, and preserve existing error handling for bag.Marshal.
| usersJob, err := o.client.ProcessUserJob(ctx, job) | ||
| if err != nil { | ||
| return nil, "", nil, err | ||
| } | ||
|
|
||
| for _, user := range usersJob { | ||
| userResource0, err := userResource(user, parentResourceID) | ||
| if err != nil { | ||
| return nil, "", nil, err | ||
| } | ||
| outputResources = append(outputResources, userResource0) | ||
| } | ||
|
|
||
| return outputResources, "", nil, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return accumulated rate-limit annotations on success.
You populate outputAnnotations earlier but drop them on return.
Apply this diff:
- return outputResources, "", nil, nil
+ return outputResources, "", outputAnnotations, nil📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| usersJob, err := o.client.ProcessUserJob(ctx, job) | |
| if err != nil { | |
| return nil, "", nil, err | |
| } | |
| for _, user := range usersJob { | |
| userResource0, err := userResource(user, parentResourceID) | |
| if err != nil { | |
| return nil, "", nil, err | |
| } | |
| outputResources = append(outputResources, userResource0) | |
| } | |
| return outputResources, "", nil, nil | |
| } | |
| usersJob, err := o.client.ProcessUserJob(ctx, job) | |
| if err != nil { | |
| return nil, "", nil, err | |
| } | |
| for _, user := range usersJob { | |
| userResource0, err := userResource(user, parentResourceID) | |
| if err != nil { | |
| return nil, "", nil, err | |
| } | |
| outputResources = append(outputResources, userResource0) | |
| } | |
| return outputResources, "", outputAnnotations, nil | |
| } |
🤖 Prompt for AI Agents
In pkg/connector/users.go around lines 140 to 154, the function currently
returns outputResources, "", nil, nil and thus drops the accumulated
outputAnnotations collected earlier; modify the final return to include the
accumulated annotations (e.g., return outputResources, "", outputAnnotations,
nil) so that rate-limit annotations are returned on success, ensuring the
variable name matches the one used earlier in the function.
Auth0 has a limitation that prevents it from getting more than 1000 users, even using pagination https://auth0.com/docs/users/search/v3/view-search-results-by-page#limitation
Adding an option to enable job-based sync with a limit of users.
Summary by CodeRabbit