diff --git a/go.mod b/go.mod index 717f2cd2..19c677be 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.24 toolchain go1.24.1 require ( - github.com/conductorone/baton-sdk v0.3.31 + github.com/conductorone/baton-sdk v0.3.35 github.com/ennyjfrick/ruleguard-logfatal v0.0.2 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 github.com/quasilyte/go-ruleguard/dsl v0.3.22 diff --git a/go.sum b/go.sum index f8ee9ae3..a34f6c93 100644 --- a/go.sum +++ b/go.sum @@ -58,8 +58,8 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/conductorone/baton-sdk v0.3.31 h1:SgHp43xCTD/ORcqvjMVLx/Ve2j6MOuLM2x0Zz5xPk6Q= -github.com/conductorone/baton-sdk v0.3.31/go.mod h1:L55WO3ERMx1mfpjDgwK3jWNRGRF2E76WrQHmW6ev8VY= +github.com/conductorone/baton-sdk v0.3.35 h1:8yzc1e/Dsmw5/a7zZcJklMeetjA7hRfD33zdD7WMHAs= +github.com/conductorone/baton-sdk v0.3.35/go.mod h1:L55WO3ERMx1mfpjDgwK3jWNRGRF2E76WrQHmW6ev8VY= github.com/conductorone/dpop v0.2.4 h1:PaiDOX1gAIXtOJPxXf08GsGkpCuT/iECEjSJzLpi0zU= github.com/conductorone/dpop v0.2.4/go.mod h1:gyo8TtzB9SCFCsjsICH4IaLZ7y64CcrDXMOPBwfq/3s= github.com/conductorone/dpop/integrations/dpop_grpc v0.2.4 h1:lYxYi9/WTSL9sE96CO0QF2BY3kehs8dTTApI134TGCA= diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/cli/commands.go b/vendor/github.com/conductorone/baton-sdk/pkg/cli/commands.go index a596cdcb..a530871e 100644 --- a/vendor/github.com/conductorone/baton-sdk/pkg/cli/commands.go +++ b/vendor/github.com/conductorone/baton-sdk/pkg/cli/commands.go @@ -189,7 +189,14 @@ func MakeMainCommand[T field.Configurable]( profile, )) case v.GetString("invoke-action") != "": - invokeActionArgs := v.GetStringMap("invoke-action-args") + invokeActionArgsStr := v.GetString("invoke-action-args") + invokeActionArgs := map[string]any{} + if invokeActionArgsStr != "" { + err := json.Unmarshal([]byte(invokeActionArgsStr), &invokeActionArgs) + if err != nil { + return fmt.Errorf("failed to parse invoke-action-args: %w", err) + } + } invokeActionArgsStruct, err := structpb.NewStruct(invokeActionArgs) if err != nil { return fmt.Errorf("failed to parse invoke-action-args: %w", err) diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/c1file.go b/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/c1file.go index 11bf0fc6..b89abccc 100644 --- a/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/c1file.go +++ b/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/c1file.go @@ -298,3 +298,27 @@ func (c *C1File) OutputFilepath() (string, error) { } return c.outputFilePath, nil } + +func (c *C1File) AttachFile(other *C1File, dbName string) (*C1FileAttached, error) { + _, err := c.db.Exec(`ATTACH DATABASE ? AS ?`, other.dbFilePath, dbName) + if err != nil { + return nil, err + } + + return &C1FileAttached{ + safe: true, + file: c, + }, nil +} + +func (c *C1FileAttached) DetachFile(dbName string) (*C1FileAttached, error) { + _, err := c.file.db.Exec(`DETACH DATABASE ?`, dbName) + if err != nil { + return nil, err + } + + return &C1FileAttached{ + safe: false, + file: c.file, + }, nil +} diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/c1file_attached.go b/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/c1file_attached.go new file mode 100644 index 00000000..6cf34ff5 --- /dev/null +++ b/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/c1file_attached.go @@ -0,0 +1,139 @@ +package dotc1z + +import ( + "context" + "errors" + "fmt" +) + +type C1FileAttached struct { + safe bool + file *C1File +} + +func (c *C1FileAttached) CompactTable(ctx context.Context, destSyncID string, baseSyncID string, appliedSyncID string, tableName string) error { + if !c.safe { + return errors.New("database has been detached") + } + ctx, span := tracer.Start(ctx, "C1FileAttached.CompactTable") + defer span.End() + + // Get the column structure for this table by querying the schema + columns, err := c.getTableColumns(ctx, tableName) + if err != nil { + return fmt.Errorf("failed to get table columns: %w", err) + } + + // Build column lists for INSERT statements + columnList := "" + selectList := "" + for i, col := range columns { + if i > 0 { + columnList += ", " + selectList += ", " + } + columnList += col + if col == "sync_id" { + selectList += "? as sync_id" + } else { + selectList += col + } + } + + // Step 1: Insert ALL records from base sync + insertBaseQuery := fmt.Sprintf(` + INSERT INTO main.%s (%s) + SELECT %s + FROM base.%s + WHERE sync_id = ? + `, tableName, columnList, selectList, tableName) + + _, err = c.file.db.ExecContext(ctx, insertBaseQuery, destSyncID, baseSyncID) + if err != nil { + return fmt.Errorf("failed to copy base records: %w", err) + } + + // Step 2: Insert/replace records from applied sync where applied.discovered_at > main.discovered_at + insertOrReplaceAppliedQuery := fmt.Sprintf(` + INSERT OR REPLACE INTO main.%s (%s) + SELECT %s + FROM attached.%s AS a + WHERE a.sync_id = ? + AND ( + NOT EXISTS ( + SELECT 1 FROM main.%s AS m + WHERE m.external_id = a.external_id AND m.sync_id = ? + ) + OR EXISTS ( + SELECT 1 FROM main.%s AS m + WHERE m.external_id = a.external_id + AND m.sync_id = ? + AND a.discovered_at > m.discovered_at + ) + ) + `, tableName, columnList, selectList, tableName, tableName, tableName) + + _, err = c.file.db.ExecContext(ctx, insertOrReplaceAppliedQuery, destSyncID, appliedSyncID, destSyncID, destSyncID) + return err +} + +func (c *C1FileAttached) getTableColumns(ctx context.Context, tableName string) ([]string, error) { + if !c.safe { + return nil, errors.New("database has been detached") + } + // PRAGMA doesn't support parameter binding, so we format the table name directly + query := fmt.Sprintf("PRAGMA table_info(%s)", tableName) + rows, err := c.file.db.QueryContext(ctx, query) + if err != nil { + return nil, err + } + defer rows.Close() + + var columns []string + for rows.Next() { + var cid int + var name, dataType string + var notNull, pk int + var defaultValue interface{} + + err := rows.Scan(&cid, &name, &dataType, ¬Null, &defaultValue, &pk) + if err != nil { + return nil, err + } + + // Skip the 'id' column as it's auto-increment + if name != "id" { + columns = append(columns, name) + } + } + + return columns, nil +} + +func (c *C1FileAttached) CompactResourceTypes(ctx context.Context, destSyncID string, baseSyncID string, appliedSyncID string) error { + if !c.safe { + return errors.New("database has been detached") + } + return c.CompactTable(ctx, destSyncID, baseSyncID, appliedSyncID, "v1_resource_types") +} + +func (c *C1FileAttached) CompactResources(ctx context.Context, destSyncID string, baseSyncID string, appliedSyncID string) error { + if !c.safe { + return errors.New("database has been detached") + } + return c.CompactTable(ctx, destSyncID, baseSyncID, appliedSyncID, "v1_resources") +} + +func (c *C1FileAttached) CompactEntitlements(ctx context.Context, destSyncID string, baseSyncID string, appliedSyncID string) error { + if !c.safe { + return errors.New("database has been detached") + } + return c.CompactTable(ctx, destSyncID, baseSyncID, appliedSyncID, "v1_entitlements") +} + +func (c *C1FileAttached) CompactGrants(ctx context.Context, destSyncID string, baseSyncID string, appliedSyncID string) error { + if !c.safe { + return errors.New("database has been detached") + } + return c.CompactTable(ctx, destSyncID, baseSyncID, appliedSyncID, "v1_grants") +} diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/manager/s3/s3.go b/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/manager/s3/s3.go index eed8b599..bca6b1ff 100644 --- a/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/manager/s3/s3.go +++ b/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/manager/s3/s3.go @@ -116,7 +116,7 @@ func (s *s3Manager) LoadC1Z(ctx context.Context) (*dotc1z.C1File, error) { return nil, err } - return dotc1z.NewC1ZFile(ctx, s.tmpFile, dotc1z.WithTmpDir(s.tmpDir)) + return dotc1z.NewC1ZFile(ctx, s.tmpFile, dotc1z.WithTmpDir(s.tmpDir), dotc1z.WithPragma("journal_mode", "WAL")) } // SaveC1Z saves a file to the AWS S3 bucket. diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/sql_helpers.go b/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/sql_helpers.go index 01e1e4bb..43781f6f 100644 --- a/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/sql_helpers.go +++ b/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/sql_helpers.go @@ -2,6 +2,7 @@ package dotc1z import ( "context" + "errors" "fmt" "strconv" "time" @@ -314,6 +315,13 @@ func executeChunkedInsert( chunks++ } + tx, err := c.db.BeginTx(ctx, nil) + if err != nil { + return err + } + + var txError error + for i := 0; i < chunks; i++ { start := i * chunkSize end := (i + 1) * chunkSize @@ -323,28 +331,39 @@ func executeChunkedInsert( chunkedRows := rows[start:end] // Create the base insert dataset - insertDs := c.db.Insert(tableName) + insertDs := tx.Insert(tableName) // Apply the custom query building function - insertDs, err := buildQueryFn(insertDs, chunkedRows) + insertDs, err = buildQueryFn(insertDs, chunkedRows) if err != nil { - return err + txError = err + break } // Generate the SQL query, args, err := insertDs.ToSQL() if err != nil { - return err + txError = err + break } // Execute the query - _, err = c.db.Exec(query, args...) + _, err = tx.ExecContext(ctx, query, args...) if err != nil { - return err + txError = err + break } } - return nil + if txError != nil { + if rollbackErr := tx.Rollback(); rollbackErr != nil { + return errors.Join(rollbackErr, txError) + } + + return fmt.Errorf("error executing chunked insert: %w", txError) + } + + return tx.Commit() } func bulkPutConnectorObject[T proto.Message]( diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/sync_runs.go b/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/sync_runs.go index 96fbfc9a..364ccc05 100644 --- a/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/sync_runs.go +++ b/vendor/github.com/conductorone/baton-sdk/pkg/dotc1z/sync_runs.go @@ -301,6 +301,22 @@ func (c *C1File) LatestFinishedSync(ctx context.Context) (string, error) { return s.ID, nil } +func (c *C1File) LatestFinishedSyncAnyType(ctx context.Context) (string, error) { + ctx, span := tracer.Start(ctx, "C1File.LatestFinishedSyncAnyType") + defer span.End() + + s, err := c.getFinishedSync(ctx, 0, SyncTypeAny) + if err != nil { + return "", err + } + + if s == nil { + return "", nil + } + + return s.ID, nil +} + func (c *C1File) getSync(ctx context.Context, syncID string) (*syncRun, error) { ctx, span := tracer.Start(ctx, "C1File.getSync") defer span.End() diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/field/defaults.go b/vendor/github.com/conductorone/baton-sdk/pkg/field/defaults.go index 373065ef..e480f0d8 100644 --- a/vendor/github.com/conductorone/baton-sdk/pkg/field/defaults.go +++ b/vendor/github.com/conductorone/baton-sdk/pkg/field/defaults.go @@ -136,10 +136,10 @@ var ( WithPersistent(true), WithExportTarget(ExportTargetNone), ) - invokeActionArgsField = StringMapField("invoke-action-args", + invokeActionArgsField = StringField("invoke-action-args", WithHidden(true), WithDescription("JSON-formatted object of map keys and values like '{ 'key': 'value' }'"), - WithDefaultValue(map[string]any{}), + WithDefaultValue("{}"), WithPersistent(true), WithExportTarget(ExportTargetNone), ) diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/sdk/version.go b/vendor/github.com/conductorone/baton-sdk/pkg/sdk/version.go index 9c13f3ed..d11ea07f 100644 --- a/vendor/github.com/conductorone/baton-sdk/pkg/sdk/version.go +++ b/vendor/github.com/conductorone/baton-sdk/pkg/sdk/version.go @@ -1,3 +1,3 @@ package sdk -const Version = "v0.3.30" +const Version = "v0.3.34" diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/sync/expand/cycle.go b/vendor/github.com/conductorone/baton-sdk/pkg/sync/expand/cycle.go index 867cd6fb..292d0261 100644 --- a/vendor/github.com/conductorone/baton-sdk/pkg/sync/expand/cycle.go +++ b/vendor/github.com/conductorone/baton-sdk/pkg/sync/expand/cycle.go @@ -4,6 +4,92 @@ import ( mapset "github.com/deckarep/golang-set/v2" ) +const ( + colorWhite uint8 = iota + colorGray + colorBlack +) + +// cycleDetector encapsulates coloring state for cycle detection on an +// EntitlementGraph. Node IDs are dense (1..NextNodeID), so slices are used for +// O(1) access and zero per-op allocations. +type cycleDetector struct { + g *EntitlementGraph + state []uint8 + parent []int +} + +func newCycleDetector(g *EntitlementGraph) *cycleDetector { + cd := &cycleDetector{ + g: g, + state: make([]uint8, g.NextNodeID+1), + parent: make([]int, g.NextNodeID+1), + } + for i := range cd.parent { + cd.parent[i] = -1 + } + return cd +} + +// dfs performs a coloring-based DFS from u, returning the first detected cycle +// as a slice of node IDs or nil if no cycle is reachable from u. +func (cd *cycleDetector) dfs(u int) ([]int, bool) { + // Self-loop fast path. + if nbrs, ok := cd.g.SourcesToDestinations[u]; ok { + if _, ok := nbrs[u]; ok { + return []int{u}, true + } + } + + cd.state[u] = colorGray + if nbrs, ok := cd.g.SourcesToDestinations[u]; ok { + for v := range nbrs { + switch cd.state[v] { + case colorWhite: + cd.parent[v] = u + if cyc, ok := cd.dfs(v); ok { + return cyc, true + } + case colorGray: + // Back-edge to a node on the current recursion stack. + // Reconstruct cycle by walking parents from u back to v (inclusive), then reverse. + cycle := make([]int, 0, 8) + for x := u; ; x = cd.parent[x] { + cycle = append(cycle, x) + if x == v || cd.parent[x] == -1 { + break + } + } + for i, j := 0, len(cycle)-1; i < j; i, j = i+1, j-1 { + cycle[i], cycle[j] = cycle[j], cycle[i] + } + return cycle, true + } + } + } + cd.state[u] = colorBlack + return nil, false +} + +// FindAny scans all nodes and returns the first detected cycle or nil if none exist. +func (cd *cycleDetector) FindAny() []int { + for nodeID := range cd.g.Nodes { + if cd.state[nodeID] != colorWhite { + continue + } + if cyc, ok := cd.dfs(nodeID); ok { + return cyc + } + } + return nil +} + +// FindFrom starts cycle detection from a specific node and returns the first +// cycle reachable from that node, or nil,false if none. +func (cd *cycleDetector) FindFrom(start int) ([]int, bool) { + return cd.dfs(start) +} + // GetFirstCycle given an entitlements graph, return a cycle by node ID if it // exists. Returns nil if no cycle exists. If there is a single // node pointing to itself, that will count as a cycle. @@ -11,46 +97,18 @@ func (g *EntitlementGraph) GetFirstCycle() []int { if g.HasNoCycles { return nil } - visited := mapset.NewSet[int]() - for nodeID := range g.Nodes { - cycle, hasCycle := g.cycleDetectionHelper(nodeID, visited, []int{}) - if hasCycle { - return cycle - } - } - - return nil + cd := newCycleDetector(g) + return cd.FindAny() } func (g *EntitlementGraph) cycleDetectionHelper( nodeID int, - visited mapset.Set[int], - currentCycle []int, ) ([]int, bool) { - visited.Add(nodeID) - if destinations, ok := g.SourcesToDestinations[nodeID]; ok { - for destinationID := range destinations { - nextCycle := make([]int, len(currentCycle)) - copy(nextCycle, currentCycle) - nextCycle = append(nextCycle, nodeID) - - if !visited.Contains(destinationID) { - if cycle, hasCycle := g.cycleDetectionHelper(destinationID, visited, nextCycle); hasCycle { - return cycle, true - } - } else { - // Make sure to not include part of the start before the cycle. - outputCycle := make([]int, 0) - for i := len(nextCycle) - 1; i >= 0; i-- { - outputCycle = append(outputCycle, nextCycle[i]) - if nextCycle[i] == destinationID { - return outputCycle, true - } - } - } - } - } - return nil, false + // Thin wrapper around the coloring-based DFS, starting from a specific node. + // The provided visited/currentCycle are ignored here; coloring provides the + // necessary state for correctness and performance. + cd := newCycleDetector(g) + return cd.FindFrom(nodeID) } // removeNode obliterates a node and all incoming/outgoing edges. diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/attached/attached.go b/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/attached/attached.go new file mode 100644 index 00000000..09ae95c6 --- /dev/null +++ b/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/attached/attached.go @@ -0,0 +1,87 @@ +package attached + +import ( + "context" + "fmt" + + "github.com/conductorone/baton-sdk/pkg/dotc1z" +) + +type Compactor struct { + base *dotc1z.C1File + applied *dotc1z.C1File + dest *dotc1z.C1File +} + +func NewAttachedCompactor(base *dotc1z.C1File, applied *dotc1z.C1File, dest *dotc1z.C1File) *Compactor { + return &Compactor{ + base: base, + applied: applied, + dest: dest, + } +} + +func (c *Compactor) CompactWithSyncID(ctx context.Context, destSyncID string) error { + // Get the latest finished full sync ID from base + baseSyncID, err := c.base.LatestFinishedSync(ctx) + if err != nil { + return fmt.Errorf("failed to get base sync ID: %w", err) + } + if baseSyncID == "" { + return fmt.Errorf("no finished full sync found in base") + } + + // Get the latest finished sync ID from applied (any type) + appliedSyncID, err := c.applied.LatestFinishedSyncAnyType(ctx) + if err != nil { + return fmt.Errorf("failed to get applied sync ID: %w", err) + } + if appliedSyncID == "" { + return fmt.Errorf("no finished sync found in applied") + } + + // Attach both the base and applied databases to the destination + base, err := c.dest.AttachFile(c.base, "base") + if err != nil { + return fmt.Errorf("failed to attach databases to destination: %w", err) + } + defer func() { + _, _ = base.DetachFile("base") + }() + + // Attach both the base and applied databases to the destination + attached, err := c.dest.AttachFile(c.applied, "attached") + if err != nil { + return fmt.Errorf("failed to attach databases to destination: %w", err) + } + defer func() { + _, _ = attached.DetachFile("attached") + }() + + if err := c.processRecords(ctx, attached, destSyncID, baseSyncID, appliedSyncID); err != nil { + return fmt.Errorf("failed to process records: %w", err) + } + + return nil +} + +func (c *Compactor) processRecords(ctx context.Context, attached *dotc1z.C1FileAttached, destSyncID string, baseSyncID string, appliedSyncID string) error { + // Compact all tables: copy base records and merge newer applied records using raw SQL + if err := attached.CompactResourceTypes(ctx, destSyncID, baseSyncID, appliedSyncID); err != nil { + return fmt.Errorf("failed to compact resource types: %w", err) + } + + if err := attached.CompactResources(ctx, destSyncID, baseSyncID, appliedSyncID); err != nil { + return fmt.Errorf("failed to compact resources: %w", err) + } + + if err := attached.CompactEntitlements(ctx, destSyncID, baseSyncID, appliedSyncID); err != nil { + return fmt.Errorf("failed to compact entitlements: %w", err) + } + + if err := attached.CompactGrants(ctx, destSyncID, baseSyncID, appliedSyncID); err != nil { + return fmt.Errorf("failed to compact grants: %w", err) + } + + return nil +} diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/compactor.go b/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/compactor.go index 535c8ffe..ab35c8cb 100644 --- a/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/compactor.go +++ b/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/compactor.go @@ -14,11 +14,14 @@ import ( c1zmanager "github.com/conductorone/baton-sdk/pkg/dotc1z/manager" "github.com/conductorone/baton-sdk/pkg/sdk" "github.com/conductorone/baton-sdk/pkg/sync" - sync_compactor "github.com/conductorone/baton-sdk/pkg/synccompactor/naive" + "github.com/conductorone/baton-sdk/pkg/synccompactor/attached" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.opentelemetry.io/otel" "go.uber.org/zap" ) +var tracer = otel.Tracer("baton-sdk/pkg.synccompactor") + type Compactor struct { entries []*CompactableSync @@ -74,20 +77,32 @@ func NewCompactor(ctx context.Context, outputDir string, compactableSyncs []*Com } func (c *Compactor) Compact(ctx context.Context) (*CompactableSync, error) { + ctx, span := tracer.Start(ctx, "Compactor.Compact") + defer span.End() if len(c.entries) < 2 { return nil, nil } base := c.entries[0] - for i := 1; i < len(c.entries); i++ { - applied := c.entries[i] - - compactable, err := c.doOneCompaction(ctx, base, applied) - if err != nil { - return nil, err + incrementals := c.entries[1:] + + // Lets compact all the incrementals together first. + compactedIncrementals := incrementals[0] + if len(incrementals) > 1 { + for i := 1; i < len(incrementals); i++ { + nextEntry := incrementals[i] + compacted, err := c.doOneCompaction(ctx, compactedIncrementals, nextEntry) + if err != nil { + return nil, err + } + compactedIncrementals = compacted } + } - base = compactable + // Then apply that onto our base. + base, err := c.doOneCompaction(ctx, base, compactedIncrementals) + if err != nil { + return nil, err } l := ctxzap.Extract(ctx) @@ -191,6 +206,8 @@ func getLatestObjects(ctx context.Context, info *CompactableSync) (*reader_v2.Sy } func (c *Compactor) doOneCompaction(ctx context.Context, base *CompactableSync, applied *CompactableSync) (*CompactableSync, error) { + ctx, span := tracer.Start(ctx, "Compactor.doOneCompaction") + defer span.End() l := ctxzap.Extract(ctx) l.Info( "running compaction", @@ -231,9 +248,9 @@ func (c *Compactor) doOneCompaction(ctx context.Context, base *CompactableSync, return nil, err } - runner := sync_compactor.NewNaiveCompactor(baseFile, appliedFile, newFile) - - if err := runner.Compact(ctx); err != nil { + // runner := naive.NewNaiveCompactor(baseFile, appliedFile, newFile) + runner := attached.NewAttachedCompactor(baseFile, appliedFile, newFile) + if err := runner.CompactWithSyncID(ctx, newSync); err != nil { l.Error("error running compaction", zap.Error(err)) return nil, err } diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/naive/naive.go b/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/naive/naive.go deleted file mode 100644 index 7e4ae6e9..00000000 --- a/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/naive/naive.go +++ /dev/null @@ -1,88 +0,0 @@ -package naive - -import ( - "context" - - "github.com/conductorone/baton-sdk/pkg/dotc1z" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" - "go.uber.org/zap" - "google.golang.org/protobuf/proto" -) - -func NewNaiveCompactor(base *dotc1z.C1File, applied *dotc1z.C1File, dest *dotc1z.C1File) *Compactor { - return &Compactor{ - base: base, - applied: applied, - dest: dest, - } -} - -type Compactor struct { - base *dotc1z.C1File - applied *dotc1z.C1File - dest *dotc1z.C1File -} - -func (n *Compactor) Compact(ctx context.Context) error { - if err := n.processResourceTypes(ctx); err != nil { - return err - } - if err := n.processResources(ctx); err != nil { - return err - } - if err := n.processEntitlements(ctx); err != nil { - return err - } - if err := n.processGrants(ctx); err != nil { - return err - } - return nil -} - -func naiveCompact[T proto.Message, REQ listRequest, RESP listResponse[T]]( - ctx context.Context, - base listFunc[T, REQ, RESP], - applied listFunc[T, REQ, RESP], - save func(context.Context, ...T) error, -) error { - var t T - l := ctxzap.Extract(ctx) - l.Info("naive compaction: compacting objects", zap.String("object_type", string(t.ProtoReflect().Descriptor().FullName()))) - // List all objects from the base file and save them in the destination file - if err := listAllObjects(ctx, base, func(items []T) (bool, error) { - if err := save(ctx, items...); err != nil { - return false, err - } - return true, nil - }); err != nil { - return err - } - - // Then list all objects from the applied file and save them in the destination file, overwriting ones with the same external_id - if err := listAllObjects(ctx, applied, func(items []T) (bool, error) { - if err := save(ctx, items...); err != nil { - return false, err - } - return true, nil - }); err != nil { - return err - } - - return nil -} - -func (n *Compactor) processResourceTypes(ctx context.Context) error { - return naiveCompact(ctx, n.base.ListResourceTypes, n.applied.ListResourceTypes, n.dest.PutResourceTypesIfNewer) -} - -func (n *Compactor) processResources(ctx context.Context) error { - return naiveCompact(ctx, n.base.ListResources, n.applied.ListResources, n.dest.PutResourcesIfNewer) -} - -func (n *Compactor) processGrants(ctx context.Context) error { - return naiveCompact(ctx, n.base.ListGrants, n.applied.ListGrants, n.dest.PutGrantsIfNewer) -} - -func (n *Compactor) processEntitlements(ctx context.Context) error { - return naiveCompact(ctx, n.base.ListEntitlements, n.applied.ListEntitlements, n.dest.PutEntitlementsIfNewer) -} diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/naive/naive_unroll.go b/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/naive/naive_unroll.go deleted file mode 100644 index cc4f8064..00000000 --- a/vendor/github.com/conductorone/baton-sdk/pkg/synccompactor/naive/naive_unroll.go +++ /dev/null @@ -1,98 +0,0 @@ -package naive - -import ( - "context" - "reflect" - - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" -) - -type listRequest interface { - proto.Message - GetPageSize() uint32 - GetPageToken() string - GetAnnotations() []*anypb.Any -} - -type listResponse[T proto.Message] interface { - GetNextPageToken() string - GetAnnotations() []*anypb.Any - GetList() []T -} - -// createRequest creates a new request object of type REQ using reflection. -func createRequest[REQ listRequest]() REQ { - var r REQ - baseType := reflect.TypeOf(r).Elem() - pointerToInitializedVal := reflect.New(baseType) - return pointerToInitializedVal.Interface().(REQ) -} - -// setFieldIfValid sets a field in a struct if it exists and can be set. -func setFieldIfValid(obj interface{}, fieldName string, setValue func(reflect.Value)) { - val := reflect.ValueOf(obj) - if val.Kind() != reflect.Ptr || val.IsNil() { - return - } - - field := val.Elem().FieldByName(fieldName) - if field.IsValid() && field.CanSet() { - setValue(field) - } -} - -// setPageSize sets the PageSize field in a request to the specified value. -func setPageSize(req listRequest, size uint64) { - setFieldIfValid(req, "PageSize", func(field reflect.Value) { - field.SetUint(size) - }) -} - -// setPageToken sets the PageToken field in a request to the specified token. -func setPageToken(req listRequest, token string) { - setFieldIfValid(req, "PageToken", func(field reflect.Value) { - field.SetString(token) - }) -} - -type listFunc[T proto.Message, REQ listRequest, RESP listResponse[T]] func(context.Context, REQ) (RESP, error) - -func listAllObjects[T proto.Message, REQ listRequest, RESP listResponse[T]](ctx context.Context, list listFunc[T, REQ, RESP], cb func(items []T) (bool, error)) error { - // Create a new request using reflection - req := createRequest[REQ]() - - // Set initial page size - setPageSize(req, 100) // Set a reasonable default page size - - var nextPageToken string - for { - // Set the page token for the current request if needed - if nextPageToken != "" { - setPageToken(req, nextPageToken) - } - - // Call the list function with the current request - resp, err := list(ctx, req) - if err != nil { - return err - } - - // Collect the results - shouldContinue, err := cb(resp.GetList()) - if err != nil { - return err - } - if !shouldContinue { - return nil - } - - // Check if there are more pages - nextPageToken = resp.GetNextPageToken() - if nextPageToken == "" || len(resp.GetList()) == 0 { - break // No more pages - } - } - - return nil -} diff --git a/vendor/github.com/conductorone/baton-sdk/pkg/uhttp/wrapper.go b/vendor/github.com/conductorone/baton-sdk/pkg/uhttp/wrapper.go index 1a368cac..3d3075e0 100644 --- a/vendor/github.com/conductorone/baton-sdk/pkg/uhttp/wrapper.go +++ b/vendor/github.com/conductorone/baton-sdk/pkg/uhttp/wrapper.go @@ -271,6 +271,11 @@ func WithGenericResponse(response *map[string]any) DoOption { if response == nil { return status.Error(codes.InvalidArgument, "response is nil") } + + if resp.StatusCode == http.StatusNoContent { + return nil + } + var v any var err error diff --git a/vendor/modules.txt b/vendor/modules.txt index a5d12645..126bc677 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -156,7 +156,7 @@ github.com/benbjohnson/clock # github.com/cenkalti/backoff/v4 v4.3.0 ## explicit; go 1.18 github.com/cenkalti/backoff/v4 -# github.com/conductorone/baton-sdk v0.3.31 +# github.com/conductorone/baton-sdk v0.3.35 ## explicit; go 1.23.4 github.com/conductorone/baton-sdk/internal/connector github.com/conductorone/baton-sdk/pb/c1/c1z/v1 @@ -197,7 +197,7 @@ github.com/conductorone/baton-sdk/pkg/sdk github.com/conductorone/baton-sdk/pkg/sync github.com/conductorone/baton-sdk/pkg/sync/expand github.com/conductorone/baton-sdk/pkg/synccompactor -github.com/conductorone/baton-sdk/pkg/synccompactor/naive +github.com/conductorone/baton-sdk/pkg/synccompactor/attached github.com/conductorone/baton-sdk/pkg/tasks github.com/conductorone/baton-sdk/pkg/tasks/c1api github.com/conductorone/baton-sdk/pkg/tasks/local