Skip to content

Conversation

@MarcusGoldschmidt
Copy link
Contributor

@MarcusGoldschmidt MarcusGoldschmidt commented Sep 22, 2025

Auth0 has a limitation that prevents it from getting more than 1000 users, even using pagination https://auth0.com/docs/users/search/v3/view-search-results-by-page#limitation

Adding an option to enable job-based sync with a limit of users.

Summary by CodeRabbit

  • New Features
    • Added optional background job flow for user synchronization with a configurable record limit.
    • New configuration options to enable job-based user sync and set the sync limit.
    • User resources now include a Created At timestamp.
    • Added validation to require a positive sync limit when job-based sync is enabled.
  • Chores
    • Upgraded core SDK and supporting dependencies for improved compatibility and performance.

cursor[bot]

This comment was marked as outdated.

var target Job

// TODO(golds): we need a better way to handle cache invalidation in uhttp.
err := uhttp.ClearCaches(ctx)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't do it. this way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue was relate to get the status for the job, since it's cached, the status was never completed.

uhttp gets the cache by env, so we are not able to create one without cache.

Maybe create http.Client manually and handles the status like uhttp


var users []User

for len(allData) > index {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a json lines reader? i think bufio.NewScanner is more idiomatic and covers more edge cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, each line it's a json object, I will take a look at bufio.NewScanner


logger.Debug("Sync job completed", zap.String("job_id", state.Id))

usersJob, err := o.client.ProcessUserJob(ctx, job)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how large can this be? can this be large enough to break our 4mb gRPC responses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, looks like we need to split the process for the file

Copy link
Contributor Author

@MarcusGoldschmidt MarcusGoldschmidt Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of

  • Process each time and get am offset by the pagination
  • Process once and returns a batch for each request, storing in memory, but we will not be able to container this connector until Cache comes in sdk

@coderabbitai
Copy link

coderabbitai bot commented Sep 24, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Introduces job-based user synchronization. Adds config flags and connector plumbing to create, poll, and process Auth0 user export jobs. Implements job models, endpoints, client methods, and gzip NDJSON processing. Adjusts request handling for optional cache control. Updates dependencies and tests. Extends user resources with CreatedAt.

Changes

Cohort / File(s) Summary
Dependencies
go.mod
Upgrades baton-sdk to v0.4.2. Promotes grpc and protobuf to direct deps. Bumps klauspost/compress indirectly.
Config Schema
pkg/config/schema.go
Adds exported fields: SyncUsersByJob (BoolField) and SyncUsersByJobLimit (IntField). Updates ConfigurationFields.
Client API: Jobs
pkg/connector/client/auth0.go, pkg/connector/client/models.go, pkg/connector/client/path.go
Adds Job and JobField types; adds API paths for creating/getting jobs; implements CreateJob(limit) and GetJob(id).
Client: Job Processing
pkg/connector/client/job.go, pkg/connector/client/job_test.go
Implements ProcessUserJob to fetch job file URL, stream gzip NDJSON, and decode Users. Adds tests for processing gzipped assets.
Client: Request Caching
pkg/connector/client/request.go
Adds cache flag to internal doRequest; public get/post/delete default to cache=true; conditionally adds No-Cache header.
Connector Plumbing
pkg/connector/connector.go, pkg/connector/users.go, pkg/connector/users_test.go
Adds syncUsersByJob and syncUsersByJobLimit to constructor and struct; validates limit when enabled; wires into user builder; implements job-driven List flow and adds CreatedAt on resources; updates tests to new builder signature.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant S as Sync Loop
  participant Conn as Connector
  participant UB as userBuilder
  participant C as Client
  participant A as Auth0 API
  participant H as Job File Host

  S->>Conn: ResourceSyncers()
  Conn->>UB: newUserBuilder(client, byJob, limit)
  S->>UB: List(state)

  alt First page (no token)
    UB->>C: CreateJob(limit)
    C->>A: POST /api/v2/jobs/users-exports\nbody: {fields:[...], limit}
    A-->>C: 200 Job{id}
    C-->>UB: Job + rate limit
    UB-->>S: Pending token (jobId), retry later
  else Polling
    UB->>C: GetJob(jobId)
    C->>A: GET /api/v2/jobs/{id}
    A-->>C: 200 Job{status}
    C-->>UB: Job + rate limit
    alt status != completed
      UB-->>S: Pending token (jobId), retry later
    else completed with location
      UB->>C: ProcessUserJob(job)
      C->>H: GET job.location (gzip NDJSON)
      H-->>C: 200 stream
      C-->>UB: []User
      UB-->>S: Emit Resources (with CreatedAt)
    end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I tap my paws on keys so bright,
Spinning jobs to fetch the night—
Gzipped stars of users stream,
NDJSON in moonbeam gleam.
With limits set and cache in tow,
I hop through polls until they glow.
Thump! Another sync—let’s go! 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title succinctly captures the primary feature of the changeset by indicating the addition of job‐based user synchronization, aligns with the implemented functionality, and is both clear and specific without extraneous detail.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch goldschmidt/export-users-job

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/connector/client/request.go (1)

207-209: Fix possible nil-pointer dereference on request error.

If response is nil, accessing response.Body will panic. Mirror the guard used in doRequest.

Apply:

-   if err != nil {
-       return nil, &ratelimitData, fmt.Errorf("error doing request: %w, body: %v", err, logBody(response.Body))
-   }
+   if err != nil {
+       if response != nil {
+           return nil, &ratelimitData, fmt.Errorf("error doing request: %w, body: %v", err, logBody(response.Body))
+       }
+       return nil, &ratelimitData, fmt.Errorf("error doing request: %w", err)
+   }
🧹 Nitpick comments (6)
go.mod (1)

13-14: Directly pinning grpc/protobuf: ensure generators are aligned.

Good to pin runtime versions. Make sure protoc-gen-go and protoc-gen-go-grpc are compatibly pinned in your toolchain to avoid mismatches with runtime libraries.

Suggested pins for local/CI generators:

  • protoc-gen-go: google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.5
  • protoc-gen-go-grpc: google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.4.0
pkg/connector/client/job_test.go (1)

10-40: Use os.Open for readability (mode is ignored for reads).

Minor: replace os.OpenFile(..., os.O_RDONLY, 0600) with os.Open.

Apply:

-           file, err := os.OpenFile(s.file, os.O_RDONLY, 0600)
+           file, err := os.Open(s.file)
pkg/connector/client/auth0.go (1)

425-477: DRY the exported fields list for CreateJob.

Extract the fields slice to a package-level var/const to avoid duplication and make future changes easier.

Add above functions:

var userExportFields = []JobField{
    {Name: "user_id"},
    {Name: "name"},
    {Name: "email"},
    {Name: "nickname"},
    {Name: "created_at"},
    {Name: "updated_at"},
}

Then update here:

-       Fields: []JobField{
-           { Name: "user_id" },
-           { Name: "name" },
-           { Name: "email" },
-           { Name: "nickname" },
-           { Name: "created_at" },
-           { Name: "updated_at" },
-       },
+       Fields: userExportFields,
pkg/config/schema.go (1)

27-34: Add defaults and minimum validation for job-sync fields (no upper bound on limit)

    SyncUsersByJob = field.BoolField(
        "sync-users-by-job",
        field.WithDescription("Sync users by job (only applicable for Auth0 tenants with more than 1000 users https://auth0.com/docs/users/search/v3/view-search-results-by-page#limitation)"),
+       field.WithDefault(false),
    )
    SyncUsersByJobLimit = field.IntField(
        "sync-users-by-job-limit",
        field.WithDescription("Number of users to fetch per job (only applicable if sync-users-by-job is true)"),
+       field.WithDefault(1000),
+       field.WithMinimum(1),
    )

Auth0 imposes no documented maximum on limit (omit it to export all users); valid export formats are "csv" and "json" (NDJSON).

pkg/connector/client/job.go (1)

49-50: Avoid accumulating entire export in memory; expose a streaming/page API.

Returning []User for large exports risks high memory use and downstream 4MB gRPC payloads. Consider a paged variant like ProcessUserJobPage(ctx, job, offset, pageSize) to decode and return a slice plus next offset, or a callback-based streaming API.

pkg/connector/users.go (1)

140-154: Split results to respect 4MB gRPC limits and reduce memory pressure.

Processing the entire export into one response can exceed gRPC message limits and spike memory. Page the results using the state bag (offset/pageSize) and a paged decode from the gz file each invocation. This avoids holding all users at once and fits responses under 4MB.

Example approach:

  • Extend syncJobPage with Offset and PageSize.
  • Add a client method ProcessUserJobPage(ctx, job, offset, pageSize) (or a callback) that decodes JSON objects from the gzip stream, skips offset users, and returns up to pageSize users plus next offset.

Pseudo-usage here:

type syncJobPage struct { Id string; Attempt int; Offset int; PageSize int }
if state.PageSize == 0 { state.PageSize = 400 } // tune to stay <4MB
users, nextOffset, done, err := o.client.ProcessUserJobPage(ctx, job, state.Offset, state.PageSize)
...
if !done {
  bag.Push(syncJobPage{ Id: state.Id, Attempt: 0, Offset: nextOffset, PageSize: state.PageSize })
  nextToken, _ := bag.Marshal()
  return outputResources, nextToken, outputAnnotations, nil
}

Note: Implement ProcessUserJobPage with json.Decoder to skip offset items and decode the next page efficiently.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9b2ca27 and f838059.

⛔ Files ignored due to path filters (100)
  • cmd/baton-auth0/main.go is excluded by none and included by none
  • go.sum is excluded by !**/*.sum and included by none
  • pkg/connector/client/asset/empty.json.gz is excluded by !**/*.gz and included by pkg/**
  • pkg/connector/client/asset/test.json.gz is excluded by !**/*.gz and included by pkg/**
  • pkg/connector/client/asset/test2.json.gz is excluded by !**/*.gz and included by pkg/**
  • vendor/github.com/conductorone/baton-sdk/internal/connector/connector.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/annotation_resource_tree.pb.go is excluded by !**/*.pb.go, !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/annotation_resource_tree.pb.validate.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/annotation_sync_id.pb.go is excluded by !**/*.pb.go, !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/annotation_sync_id.pb.validate.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/connector.pb.go is excluded by !**/*.pb.go, !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/event_feed.pb.go is excluded by !**/*.pb.go, !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/event_feed.pb.validate.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/resource.pb.go is excluded by !**/*.pb.go, !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connector/v2/resource.pb.validate.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1/baton.pb.go is excluded by !**/*.pb.go, !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1/baton.pb.validate.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1/session.pb.go is excluded by !**/*.pb.go, !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1/session.pb.validate.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1/session_grpc.pb.go is excluded by !**/*.pb.go, !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/annotations/annotations.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/bid/bid.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/cli/cli.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/cli/commands.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/cli/lambda_server__added.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/config/config.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/connectorbuilder/connectorbuilder.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/connectorrunner/runner.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/connectorstore/connectorstore.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/crypto/client_secret.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/crypto/crypto.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/crypto/password.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/crypto/providers/jwk/jwk.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/crypto/providers/registry.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/c1file.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/c1file_attached.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/clone_sync.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/decoder.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/diff.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/dotc1z.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/file.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/manager/local/local.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/manager/manager.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/manager/s3/s3.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/sql_helpers.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/sync_runs.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/field/defaults.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/field/validation.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/lambda/grpc/config/config.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/lambda/grpc/transport.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/provisioner/provisioner.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/ratelimit/mem_ratelimiter.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/sdk/version.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/session/README.md is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/session/grpc_session.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/session/json.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/session/memory.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/session/session.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/sync/client_wrapper.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/sync/expand/cycle.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/sync/expand/graph.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/sync/expand/scc/bitset.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/sync/expand/scc/scc.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/sync/expand/scc/test_source.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/sync/state.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/sync/syncer.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/attached/attached.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/compactor.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/tasks/c1api/delete_resource.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/tasks/c1api/full_sync.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/tasks/c1api/manager.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/tasks/c1api/service_client.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/tasks/local/syncer.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/types/session_cache.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/types/types.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/ugrpc/c1_credential_provider.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/ugrpc/interceptors.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/uhttp/dbcache.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/uhttp/wrapper.go is excluded by !vendor/** and included by none
  • vendor/github.com/conductorone/baton-sdk/pkg/us3/s3.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/README.md is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/huff0/bitreader.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/internal/le/le.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/internal/le/unsafe_disabled.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/internal/le/unsafe_enabled.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/s2sx.mod is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/README.md is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/bitreader.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/blockdec.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/blockenc.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/decoder.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/enc_base.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/matchlen_generic.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/seqdec.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/seqdec_amd64.s is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/seqdec_generic.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/seqenc.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/snappy.go is excluded by !vendor/** and included by none
  • vendor/github.com/klauspost/compress/zstd/zstd.go is excluded by !vendor/** and included by none
  • vendor/modules.txt is excluded by !vendor/** and included by none
📒 Files selected for processing (11)
  • go.mod (2 hunks)
  • pkg/config/schema.go (1 hunks)
  • pkg/connector/client/auth0.go (1 hunks)
  • pkg/connector/client/job.go (1 hunks)
  • pkg/connector/client/job_test.go (1 hunks)
  • pkg/connector/client/models.go (1 hunks)
  • pkg/connector/client/path.go (1 hunks)
  • pkg/connector/client/request.go (5 hunks)
  • pkg/connector/connector.go (3 hunks)
  • pkg/connector/users.go (5 hunks)
  • pkg/connector/users_test.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
pkg/connector/client/auth0.go (1)
pkg/connector/client/models.go (2)
  • Job (119-128)
  • JobField (115-117)
pkg/connector/client/job.go (2)
pkg/connector/client/auth0.go (2)
  • Client (14-18)
  • New (20-58)
pkg/connector/client/models.go (2)
  • Job (119-128)
  • User (57-67)
pkg/connector/users.go (2)
pkg/connector/client/auth0.go (2)
  • Client (14-18)
  • New (20-58)
pkg/connector/connector.go (1)
  • New (65-85)
pkg/connector/connector.go (1)
pkg/connector/client/auth0.go (2)
  • Client (14-18)
  • New (20-58)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Cursor Bugbot
🔇 Additional comments (12)
pkg/connector/users_test.go (1)

32-32: LGTM: test explicitly disables job-sync path.

Constructor update is correct and keeps this test on the paginated (non-job) code path.

pkg/connector/client/path.go (1)

20-21: LGTM: Auth0 job endpoints look correct.

Paths match Auth0 Management API for creating and fetching jobs.

pkg/connector/client/request.go (1)

117-141: Good addition: per-request cache control.

The cache flag and WithNoCache option address stale job status responses without changing default behavior.

pkg/connector/client/models.go (1)

115-128: LGTM: Job and JobField models.

Fields and JSON tags align with Auth0 jobs schema for users export.

pkg/connector/client/auth0.go (1)

487-497: LGTM: no-cache on GetJob to avoid stale status.

Using doRequest with cache=false directly resolves the prior cache issue when polling job status.

go.mod (1)

3-3: Confirm CI/build environments support Go 1.25
I didn’t find any CI configs in the repo—please ensure any CI images, Dockerfiles, and release builders are updated to Go ≥ 1.25.

pkg/connector/client/job.go (2)

17-24: Good precondition checks on job status and location.

Early validation prevents unnecessary external calls.


52-61: Resolved: previous newline/index parsing bug is gone.

Switching to line-wise parsing eliminates the earlier incorrect index handling raised in prior review.

pkg/connector/users.go (2)

46-46: Nice: propagate CreatedAt to user trait.

Helps downstream systems with temporal context.


76-105: Job creation flow looks solid.

State bag setup and token handoff are correct.

pkg/connector/connector.go (2)

57-59: Good guard for syncUsersByJobLimit.

Prevents misconfiguration early.


65-85: Constructor wiring looks correct.

New fields are passed through and initialized as expected; user builder receives the flags.

Comment on lines +52 to +74
func process(reader io.Reader) ([]User, error) {
gzipReader, err := gzip.NewReader(reader)
if err != nil {
return nil, err
}
defer gzipReader.Close()

var users []User

scanner := bufio.NewScanner(gzipReader)

for scanner.Scan() {
line := scanner.Bytes()
var user User
err = json.Unmarshal(line, &user)
if err != nil {
return nil, err
}
users = append(users, user)
}

return users, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix NDJSON parsing: bufio.Scanner 64KB limit causes data loss; switch to json.Decoder or increase buffer and check errors.

Scanner’s default 64KB token limit will truncate large user objects; current code also ignores scanner.Err(), so truncation can be silent. Use json.Decoder to handle arbitrarily large objects and properly terminate on io.EOF.

Apply this diff:

-func process(reader io.Reader) ([]User, error) {
-	gzipReader, err := gzip.NewReader(reader)
-	if err != nil {
-		return nil, err
-	}
-	defer gzipReader.Close()
-
-	var users []User
-
-	scanner := bufio.NewScanner(gzipReader)
-
-	for scanner.Scan() {
-		line := scanner.Bytes()
-		var user User
-		err = json.Unmarshal(line, &user)
-		if err != nil {
-			return nil, err
-		}
-		users = append(users, user)
-	}
-
-	return users, nil
-}
+func process(reader io.Reader) ([]User, error) {
+	gr, err := gzip.NewReader(reader)
+	if err != nil {
+		return nil, err
+	}
+	defer gr.Close()
+
+	dec := json.NewDecoder(gr)
+	users := make([]User, 0, 1024)
+	for {
+		var u User
+		if err := dec.Decode(&u); err != nil {
+			if errors.Is(err, io.EOF) {
+				break
+			}
+			return nil, err
+		}
+		users = append(users, u)
+	}
+	return users, nil
+}

Alternative (if you prefer Scanner): set a large buffer and check errors:

scanner := bufio.NewScanner(gr)
buf := make([]byte, 0, 1<<20) // 1MB initial
scanner.Buffer(buf, 100<<20)  // up to 100MB per line
for scanner.Scan() { ... }
if err := scanner.Err(); err != nil { return nil, err }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func process(reader io.Reader) ([]User, error) {
gzipReader, err := gzip.NewReader(reader)
if err != nil {
return nil, err
}
defer gzipReader.Close()
var users []User
scanner := bufio.NewScanner(gzipReader)
for scanner.Scan() {
line := scanner.Bytes()
var user User
err = json.Unmarshal(line, &user)
if err != nil {
return nil, err
}
users = append(users, user)
}
return users, nil
}
func process(reader io.Reader) ([]User, error) {
gr, err := gzip.NewReader(reader)
if err != nil {
return nil, err
}
defer gr.Close()
dec := json.NewDecoder(gr)
users := make([]User, 0, 1024)
for {
var u User
if err := dec.Decode(&u); err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, err
}
users = append(users, u)
}
return users, nil
}
🤖 Prompt for AI Agents
In pkg/connector/client/job.go around lines 52 to 74, the code uses
bufio.Scanner which has a 64KB token limit and also never checks scanner.Err(),
causing silent truncation of large NDJSON user objects; replace the Scanner loop
with a json.Decoder reading directly from the gzip.Reader and decode successive
User objects until io.EOF to correctly handle arbitrarily large objects and
termination, ensuring you propagate any decode errors, close the gzip reader in
a defer, and return the collected users; alternatively, if you must keep
Scanner, call scanner.Buffer with a sufficiently large max token size (e.g.
100MB) and after the loop check and return scanner.Err() if non-nil.

Comment on lines +117 to +136
if job.Status != "completed" {
var anno annotations.Annotations

anno.WithRateLimiting(&v2.RateLimitDescription{
Limit: 1,
Remaining: 0,
ResetAt: timestamppb.New(time.Now().Add(time.Second * 10)),
})
bag.Push(syncJobPage{
Id: state.Id,
Attempt: state.Attempt + 1,
})

nextToken, err := bag.Marshal()
if err != nil {
return nil, "", anno, err
}

return nil, nextToken, anno, status.Errorf(codes.Unavailable, "Sync job it's not completed: %s", job.Status)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle failed job status and add backoff instead of fixed 10s.

Currently non-completed statuses all retry; a failed job will loop forever. Also, fixed 10s retry is suboptimal.

Apply this diff:

-		if job.Status != "completed" {
+		if job.Status == "failed" {
+			return nil, "", outputAnnotations, status.Errorf(codes.FailedPrecondition, "sync job failed (id=%s)", job.Id)
+		}
+		if job.Status != "completed" {
 			var anno annotations.Annotations
-
-			anno.WithRateLimiting(&v2.RateLimitDescription{
-				Limit:     1,
-				Remaining: 0,
-				ResetAt:   timestamppb.New(time.Now().Add(time.Second * 10)),
-			})
+			// Exponential backoff with cap.
+			delay := time.Second * time.Duration(1<<state.Attempt)
+			if delay > time.Minute {
+				delay = time.Minute
+			}
+			anno.WithRateLimiting(&v2.RateLimitDescription{
+				Limit:     1,
+				Remaining: 0,
+				ResetAt:   timestamppb.New(time.Now().Add(delay)),
+			})
 			bag.Push(syncJobPage{
 				Id:      state.Id,
 				Attempt: state.Attempt + 1,
 			})
 
 			nextToken, err := bag.Marshal()
 			if err != nil {
 				return nil, "", anno, err
 			}
 
 			return nil, nextToken, anno, status.Errorf(codes.Unavailable, "Sync job it's not completed: %s", job.Status)
 		}

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In pkg/connector/users.go around lines 117 to 136, non-completed job statuses
are always re-queued and use a fixed 10s retry which causes failed jobs to loop
forever and is suboptimal; change the logic so that if job.Status == "failed"
you do not requeue but return an immediate error (e.g.,
status.Errorf(codes.FailedPrecondition, "Sync job failed: %s", job.Status)) so
callers know it failed, and for other non-completed statuses implement
exponential backoff based on state.Attempt (e.g., backoff = min(maxBackoff,
base*(2^Attempt))) to compute ResetAt instead of a fixed 10s, increment Attempt
when pushing the job back into the bag, update the rate-limit annotation with
that computed ResetAt (and appropriate Limit/Remaining), marshal and return
nextToken as before, and preserve existing error handling for bag.Marshal.

Comment on lines +140 to +154
usersJob, err := o.client.ProcessUserJob(ctx, job)
if err != nil {
return nil, "", nil, err
}

for _, user := range usersJob {
userResource0, err := userResource(user, parentResourceID)
if err != nil {
return nil, "", nil, err
}
outputResources = append(outputResources, userResource0)
}

return outputResources, "", nil, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Return accumulated rate-limit annotations on success.

You populate outputAnnotations earlier but drop them on return.

Apply this diff:

-		return outputResources, "", nil, nil
+		return outputResources, "", outputAnnotations, nil
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
usersJob, err := o.client.ProcessUserJob(ctx, job)
if err != nil {
return nil, "", nil, err
}
for _, user := range usersJob {
userResource0, err := userResource(user, parentResourceID)
if err != nil {
return nil, "", nil, err
}
outputResources = append(outputResources, userResource0)
}
return outputResources, "", nil, nil
}
usersJob, err := o.client.ProcessUserJob(ctx, job)
if err != nil {
return nil, "", nil, err
}
for _, user := range usersJob {
userResource0, err := userResource(user, parentResourceID)
if err != nil {
return nil, "", nil, err
}
outputResources = append(outputResources, userResource0)
}
return outputResources, "", outputAnnotations, nil
}
🤖 Prompt for AI Agents
In pkg/connector/users.go around lines 140 to 154, the function currently
returns outputResources, "", nil, nil and thus drops the accumulated
outputAnnotations collected earlier; modify the final return to include the
accumulated annotations (e.g., return outputResources, "", outputAnnotations,
nil) so that rate-limit annotations are returned on success, ensuring the
variable name matches the one used earlier in the function.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants