From e2d484a92b7caae5f2ec361aee21966e2261fbfe Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Mon, 2 Mar 2026 16:41:02 -0800 Subject: [PATCH 1/4] feat(query): add QueryDatastoreReader to act as an implementation shim for datastores --- .../query_plan_consistency_test.go | 2 +- internal/services/v1/permissions_queryplan.go | 2 +- pkg/query/alias.go | 31 +- pkg/query/arrow_reversal_test.go | 8 +- .../check_deep_arrow_benchmark_test.go | 167 +++++-- .../check_wide_arrow_benchmark_test.go | 137 +++++- pkg/query/build_tree_test.go | 14 +- pkg/query/caveat.go | 28 +- pkg/query/caveat_test.go | 4 +- pkg/query/context.go | 17 +- pkg/query/datastore.go | 438 +++++------------- pkg/query/exclusion_test.go | 10 +- pkg/query/intersection_arrow_test.go | 16 +- pkg/query/observer_analyze_test.go | 2 +- pkg/query/observer_count_test.go | 2 +- pkg/query/quick_e2e_test.go | 6 +- pkg/query/reader.go | 300 ++++++++++++ pkg/query/reader_timing.go | 93 ++++ pkg/query/recursive.go | 2 +- pkg/query/recursive_benchmark_test.go | 14 +- pkg/query/recursive_coverage_test.go | 10 +- pkg/query/recursive_strategies_test.go | 6 +- pkg/query/recursive_test.go | 16 +- pkg/query/simplify_caveat_test.go | 44 +- pkg/query/tracing_test.go | 2 +- pkg/query/wildcard_multirelation_test.go | 4 +- pkg/query/wildcard_subjects_test.go | 10 +- 27 files changed, 864 insertions(+), 521 deletions(-) create mode 100644 pkg/query/reader.go create mode 100644 pkg/query/reader_timing.go diff --git a/internal/services/integrationtesting/query_plan_consistency_test.go b/internal/services/integrationtesting/query_plan_consistency_test.go index c73a23427..e0f04a8a6 100644 --- a/internal/services/integrationtesting/query_plan_consistency_test.go +++ b/internal/services/integrationtesting/query_plan_consistency_test.go @@ -48,7 +48,7 @@ type queryPlanConsistencyHandle struct { func (q *queryPlanConsistencyHandle) buildContext(t *testing.T) *query.Context { return query.NewLocalContext(t.Context(), - query.WithReader(datalayer.NewDataLayer(q.ds).SnapshotReader(q.revision)), + query.WithRevisionedReader(datalayer.NewDataLayer(q.ds).SnapshotReader(q.revision)), query.WithCaveatRunner(caveats.NewCaveatRunner(caveattypes.Default.TypeSet)), query.WithTraceLogger(query.NewTraceLogger())) // Enable tracing for debugging } diff --git a/internal/services/v1/permissions_queryplan.go b/internal/services/v1/permissions_queryplan.go index e5a4104f9..077dd67fe 100644 --- a/internal/services/v1/permissions_queryplan.go +++ b/internal/services/v1/permissions_queryplan.go @@ -73,7 +73,7 @@ func (ps *permissionServer) checkPermissionWithQueryPlan(ctx context.Context, re qctx := &query.Context{ Context: ctx, Executor: query.LocalExecutor{}, - Reader: reader, + Reader: query.NewQueryDatastoreReader(reader), CaveatContext: caveatContext, CaveatRunner: caveatsimpl.NewCaveatRunner(ps.config.CaveatTypeSet), } diff --git a/pkg/query/alias.go b/pkg/query/alias.go index 61c6e3995..b62ffa202 100644 --- a/pkg/query/alias.go +++ b/pkg/query/alias.go @@ -1,10 +1,5 @@ package query -import ( - "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/options" -) - // AliasIterator is an iterator that rewrites the Resource's Relation field of all paths // streamed from the sub-iterator to a specified alias relation. type AliasIterator struct { @@ -144,31 +139,7 @@ func (a *AliasIterator) shouldIncludeSelfEdge(ctx *Context, resource Object, fil // resourceExistsAsSubject queries the datastore to check if the given resource appears // as a subject in any relationship, including expired relationships. func (a *AliasIterator) resourceExistsAsSubject(ctx *Context, resource Object) (bool, error) { - filter := datastore.RelationshipsFilter{ - OptionalSubjectsSelectors: []datastore.SubjectsSelector{{ - OptionalSubjectType: resource.ObjectType, - OptionalSubjectIds: []string{resource.ObjectID}, - RelationFilter: datastore.SubjectRelationFilter{}.WithNonEllipsisRelation(a.relation), - }}, - OptionalExpirationOption: datastore.ExpirationFilterOptionNone, - } - - iter, err := ctx.Reader.QueryRelationships(ctx, filter, - options.WithLimit(options.LimitOne), - options.WithSkipExpiration(true)) // Include expired relationships - if err != nil { - return false, err - } - - // Check if any relationship exists - for _, err := range iter { - if err != nil { - return false, err - } - return true, nil - } - - return false, nil + return ctx.Reader.SubjectExistsAsRelationship(ctx, resource, a.relation) } func (a *AliasIterator) IterResourcesImpl(ctx *Context, subject ObjectAndRelation, filterResourceType ObjectType) (PathSeq, error) { diff --git a/pkg/query/arrow_reversal_test.go b/pkg/query/arrow_reversal_test.go index 29a6886f5..a4480fd7d 100644 --- a/pkg/query/arrow_reversal_test.go +++ b/pkg/query/arrow_reversal_test.go @@ -112,7 +112,7 @@ func TestDoubleWideArrowAdvisedMatchesPlain(t *testing.T) { resources := NewObjects("file", "file0") subject := NewObject("user", "user42").WithEllipses() - reader := datalayer.NewDataLayer(rawDS).SnapshotReader(revision) + readerOpt := WithRevisionedReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision)) // ---- plain (LTR) ---- @@ -120,7 +120,7 @@ func TestDoubleWideArrowAdvisedMatchesPlain(t *testing.T) { plainIt, err := canonicalOutline.Compile() require.NoError(t, err) - plainSeq, err := NewLocalContext(ctx, WithReader(reader), WithTraceLogger(plainTrace)). + plainSeq, err := NewLocalContext(ctx, readerOpt, WithTraceLogger(plainTrace)). Check(plainIt, resources, subject) require.NoError(t, err) plainPaths, err := CollectAll(plainSeq) @@ -132,7 +132,7 @@ func TestDoubleWideArrowAdvisedMatchesPlain(t *testing.T) { obs := NewCountObserver() warmIt, err := canonicalOutline.Compile() require.NoError(t, err) - warmSeq, err := NewLocalContext(ctx, WithReader(reader), WithObserver(obs)). + warmSeq, err := NewLocalContext(ctx, readerOpt, WithObserver(obs)). Check(warmIt, resources, subject) require.NoError(t, err) _, err = CollectAll(warmSeq) @@ -144,7 +144,7 @@ func TestDoubleWideArrowAdvisedMatchesPlain(t *testing.T) { require.NoError(t, err) advisedTrace := NewTraceLogger() - advisedSeq, err := NewLocalContext(ctx, WithReader(reader), WithTraceLogger(advisedTrace)). + advisedSeq, err := NewLocalContext(ctx, readerOpt, WithTraceLogger(advisedTrace)). Check(advisedIt, resources, subject) require.NoError(t, err) advisedPaths, err := CollectAll(advisedSeq) diff --git a/pkg/query/benchmarks/check_deep_arrow_benchmark_test.go b/pkg/query/benchmarks/check_deep_arrow_benchmark_test.go index 1ec0c733e..85bf334bc 100644 --- a/pkg/query/benchmarks/check_deep_arrow_benchmark_test.go +++ b/pkg/query/benchmarks/check_deep_arrow_benchmark_test.go @@ -20,14 +20,22 @@ import ( // BenchmarkCheckDeepArrow benchmarks permission checking through a deep recursive chain. // This recreates the testharness scenario with: -// - A 30+ level deep parent chain: document:target -> document:1 -> ... -> document:29 +// - A 30+ level deep parent chain: document:target -> document:1 -> ... -> document:30 // - document:29#view@user:slow // - Checking if user:slow has viewer permission on document:target // // The permission viewer = view + parent->viewer creates a recursive traversal through // all 30+ levels to find the view relationship at the end of the chain. +// +// Four sub-benchmarks are run: +// - plain: compile the outline directly and run Check each iteration +// - advised: seed a CountAdvisor from a single warm-up run, apply it to the +// canonical outline, compile once, then run Check each iteration +// - plain_delay: same as plain, but with a delay reader simulating network latency +// - advised_delay: same as advised, but with a delay reader simulating network latency func BenchmarkCheckDeepArrow(b *testing.B) { - // Create an in-memory datastore + // ---- shared setup ---- + rawDS, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(b, err) @@ -43,76 +51,159 @@ func BenchmarkCheckDeepArrow(b *testing.B) { } ` - // Compile the schema compiled, err := compiler.Compile(compiler.InputSchema{ Source: input.Source("benchmark"), SchemaString: schemaText, }, compiler.AllowUnprefixedObjectType()) require.NoError(b, err) - // Write the schema _, err = rawDS.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.LegacyWriteNamespaces(ctx, compiled.ObjectDefinitions...) }) require.NoError(b, err) - // Build relationships for the deep arrow scenario - // Create a chain: document:target -> document:1 -> document:2 -> ... -> document:30 -> document:31 + // Build relationships for the deep arrow scenario. + // Chain: document:target -> document:1 -> document:2 -> ... -> document:30 // Plus: document:29#view@user:slow relationships := make([]tuple.Relationship, 0, 33) - - // document:target#parent@document:1 relationships = append(relationships, tuple.MustParse("document:target#parent@document:1")) - - // Chain: document:1 through document:30 for i := 1; i <= 30; i++ { rel := fmt.Sprintf("document:%d#parent@document:%d", i, i+1) relationships = append(relationships, tuple.MustParse(rel)) } - - // The view relationship at the end of the chain relationships = append(relationships, tuple.MustParse("document:29#view@user:slow")) - // Write all relationships to the datastore revision, err := common.WriteRelationships(ctx, rawDS, tuple.UpdateOperationCreate, relationships...) require.NoError(b, err) - // Build schema for querying dsSchema, err := schema.BuildSchemaFromDefinitions(compiled.ObjectDefinitions, nil) require.NoError(b, err) - // Create the iterator tree for the viewer permission using BuildIteratorFromSchema - viewerIterator, err := query.BuildIteratorFromSchema(dsSchema, "document", "viewer") + // Build the canonical outline once; all sub-benchmarks derive from it. + canonicalOutline, err := query.BuildOutlineFromSchema(dsSchema, "document", "viewer") require.NoError(b, err) - // Create query context - queryCtx := query.NewLocalContext(ctx, - query.WithReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision)), - query.WithMaxRecursionDepth(50), - ) - - // The resource we're checking: document:target + // The resource and subject are the same for all sub-benchmarks. resources := query.NewObjects("document", "target") - - // The subject we're checking: user:slow subject := query.NewObject("user", "slow").WithEllipses() - // Reset the timer - everything before this is setup - b.ResetTimer() + // Base reader (no simulated latency). + reader := query.NewQueryDatastoreReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision)) - // Run the benchmark - for b.Loop() { - // Check if user:slow can view document:target - // This will traverse the entire 30+ level chain - seq, err := queryCtx.Check(viewerIterator, resources, subject) - require.NoError(b, err) + // Delay reader wrapping the base reader with simulated network latency. + delayReader := query.NewDelayReader(networkDelay, reader) - // Collect all results (should find user:slow at the end of the chain) - paths, err := query.CollectAll(seq) + // buildAdvisedIterator seeds a CountAdvisor from a single warm-up run using the + // provided reader and returns the compiled advised iterator. + buildAdvisedIterator := func(b *testing.B, r query.QueryDatastoreReader) query.Iterator { + b.Helper() + obs := query.NewCountObserver() + warmIt, err := canonicalOutline.Compile() + require.NoError(b, err) + warmCtx := query.NewLocalContext(ctx, + query.WithReader(r), + query.WithObserver(obs), + query.WithMaxRecursionDepth(50), + ) + seq, err := warmCtx.Check(warmIt, resources, subject) + require.NoError(b, err) + _, err = query.CollectAll(seq) require.NoError(b, err) - // Verify we found the expected result - require.Len(b, paths, 1) - require.Equal(b, "slow", paths[0].Subject.ObjectID) + advisor := query.NewCountAdvisor(obs.GetStats()) + advisedCO, err := query.ApplyAdvisor(canonicalOutline, advisor) + require.NoError(b, err) + advisedIt, err := advisedCO.Compile() + require.NoError(b, err) + return advisedIt } + + // ---- plain sub-benchmark ---- + + b.Run("plain", func(b *testing.B) { + it, err := canonicalOutline.Compile() + require.NoError(b, err) + + b.Log("plain explain:\n", it.Explain()) + + queryCtx := query.NewLocalContext(ctx, + query.WithReader(reader), + query.WithMaxRecursionDepth(50), + ) + + b.ResetTimer() + for b.Loop() { + seq, err := queryCtx.Check(it, resources, subject) + require.NoError(b, err) + paths, err := query.CollectAll(seq) + require.NoError(b, err) + require.Len(b, paths, 1) + require.Equal(b, "slow", paths[0].Subject.ObjectID) + } + }) + + // ---- advised sub-benchmark ---- + + b.Run("advised", func(b *testing.B) { + advisedIt := buildAdvisedIterator(b, reader) + + b.Log("advised explain:\n", advisedIt.Explain()) + + queryCtx := query.NewLocalContext(ctx, + query.WithReader(reader), + query.WithMaxRecursionDepth(50), + ) + + b.ResetTimer() + for b.Loop() { + seq, err := queryCtx.Check(advisedIt, resources, subject) + require.NoError(b, err) + paths, err := query.CollectAll(seq) + require.NoError(b, err) + require.Len(b, paths, 1) + require.Equal(b, "slow", paths[0].Subject.ObjectID) + } + }) + + // ---- plain_delay sub-benchmark ---- + + b.Run("plain_delay", func(b *testing.B) { + it, err := canonicalOutline.Compile() + require.NoError(b, err) + + queryCtx := query.NewLocalContext(ctx, + query.WithReader(delayReader), + query.WithMaxRecursionDepth(50), + ) + + b.ResetTimer() + for b.Loop() { + seq, err := queryCtx.Check(it, resources, subject) + require.NoError(b, err) + paths, err := query.CollectAll(seq) + require.NoError(b, err) + require.Len(b, paths, 1) + require.Equal(b, "slow", paths[0].Subject.ObjectID) + } + }) + + // ---- advised_delay sub-benchmark ---- + + b.Run("advised_delay", func(b *testing.B) { + advisedIt := buildAdvisedIterator(b, delayReader) + queryCtx := query.NewLocalContext(ctx, + query.WithReader(delayReader), + query.WithMaxRecursionDepth(50), + ) + + b.ResetTimer() + for b.Loop() { + seq, err := queryCtx.Check(advisedIt, resources, subject) + require.NoError(b, err) + paths, err := query.CollectAll(seq) + require.NoError(b, err) + require.Len(b, paths, 1) + require.Equal(b, "slow", paths[0].Subject.ObjectID) + } + }) } diff --git a/pkg/query/benchmarks/check_wide_arrow_benchmark_test.go b/pkg/query/benchmarks/check_wide_arrow_benchmark_test.go index 17011ef28..2dab7353d 100644 --- a/pkg/query/benchmarks/check_wide_arrow_benchmark_test.go +++ b/pkg/query/benchmarks/check_wide_arrow_benchmark_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/require" @@ -18,6 +19,10 @@ import ( "github.com/authzed/spicedb/pkg/tuple" ) +// networkDelay is the simulated per-call round-trip latency used by the delay +// sub-benchmarks. Adjust this to model different network environments. +const networkDelay = 100 * time.Microsecond + // BenchmarkCheckWideArrow benchmarks permission checking through a wide arrow relationship. // This creates a scenario with: // - 10 files @@ -28,6 +33,13 @@ import ( // // The permission viewer = view + group->member creates a wide arrow traversal where // checking if a user has viewer permission on a file requires checking many group memberships. +// +// Four sub-benchmarks are run: +// - plain: compile the outline directly and run Check each iteration +// - advised: seed a CountAdvisor from a single warm-up run, apply it to the +// canonical outline, compile once, then run Check each iteration +// - plain_delay: same as plain, but with a delay reader simulating network latency +// - advised_delay: same as advised, but with a delay reader simulating network latency func BenchmarkCheckWideArrow(b *testing.B) { const ( numFiles = 10 @@ -111,15 +123,10 @@ func BenchmarkCheckWideArrow(b *testing.B) { dsSchema, err := schema.BuildSchemaFromDefinitions(compiled.ObjectDefinitions, nil) require.NoError(b, err) - // Create the iterator tree for the viewer permission using BuildIteratorFromSchema - viewerIterator, err := query.BuildIteratorFromSchema(dsSchema, "file", "viewer") + // Build the canonical outline once; all sub-benchmarks derive from it. + canonicalOutline, err := query.BuildOutlineFromSchema(dsSchema, "file", "viewer") require.NoError(b, err) - // Create query context - queryCtx := query.NewLocalContext(ctx, - query.WithReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision)), - ) - // The resource we're checking: file:file0 resources := query.NewObjects("file", "file0") @@ -127,22 +134,112 @@ func BenchmarkCheckWideArrow(b *testing.B) { // This user should have access through multiple groups subject := query.NewObject("user", "user15").WithEllipses() - // Reset the timer - everything before this is setup - b.ResetTimer() + // Base reader (no simulated latency). + reader := query.NewQueryDatastoreReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision)) - // Run the benchmark - for b.Loop() { - // Check if user:user15 can view file:file0 - // This will traverse through many group memberships - seq, err := queryCtx.Check(viewerIterator, resources, subject) - require.NoError(b, err) + // Delay reader wrapping the base reader with simulated network latency. + delayReader := query.NewDelayReader(networkDelay, reader) - // Collect all results - paths, err := query.CollectAll(seq) + // buildAdvisedIterator seeds a CountAdvisor from a single warm-up run using the + // provided reader and returns the compiled advised iterator. + buildAdvisedIterator := func(b *testing.B, r query.QueryDatastoreReader) query.Iterator { + b.Helper() + obs := query.NewCountObserver() + warmIt, err := canonicalOutline.Compile() + require.NoError(b, err) + warmCtx := query.NewLocalContext(ctx, + query.WithReader(r), + query.WithObserver(obs), + ) + seq, err := warmCtx.Check(warmIt, resources, subject) + require.NoError(b, err) + _, err = query.CollectAll(seq) require.NoError(b, err) - // Verify we found at least one path - // user15 should have access through multiple groups - require.NotEmpty(b, paths) + advisor := query.NewCountAdvisor(obs.GetStats()) + advisedCO, err := query.ApplyAdvisor(canonicalOutline, advisor) + require.NoError(b, err) + advisedIt, err := advisedCO.Compile() + require.NoError(b, err) + return advisedIt } + + // ---- plain sub-benchmark ---- + // Compile the outline directly and run Check each iteration. No advisement. + + b.Run("plain", func(b *testing.B) { + it, err := canonicalOutline.Compile() + require.NoError(b, err) + + b.Log("plain explain:\n", it.Explain()) + + queryCtx := query.NewLocalContext(ctx, query.WithReader(reader)) + + b.ResetTimer() + for b.Loop() { + seq, err := queryCtx.Check(it, resources, subject) + require.NoError(b, err) + paths, err := query.CollectAll(seq) + require.NoError(b, err) + require.NotEmpty(b, paths) + } + }) + + // ---- advised sub-benchmark ---- + // Seed a CountAdvisor from a warm-up run, compile the advised iterator once, + // then run Check each iteration. + + b.Run("advised", func(b *testing.B) { + advisedIt := buildAdvisedIterator(b, reader) + + b.Log("advised explain:\n", advisedIt.Explain()) + + queryCtx := query.NewLocalContext(ctx, query.WithReader(reader)) + + b.ResetTimer() + for b.Loop() { + seq, err := queryCtx.Check(advisedIt, resources, subject) + require.NoError(b, err) + paths, err := query.CollectAll(seq) + require.NoError(b, err) + require.NotEmpty(b, paths) + } + }) + + // ---- plain_delay sub-benchmark ---- + // Same as plain but with networkDelay latency injected per datastore call. + + b.Run("plain_delay", func(b *testing.B) { + it, err := canonicalOutline.Compile() + require.NoError(b, err) + + queryCtx := query.NewLocalContext(ctx, query.WithReader(delayReader)) + + b.ResetTimer() + for b.Loop() { + seq, err := queryCtx.Check(it, resources, subject) + require.NoError(b, err) + paths, err := query.CollectAll(seq) + require.NoError(b, err) + require.NotEmpty(b, paths) + } + }) + + // ---- advised_delay sub-benchmark ---- + // Same as advised but with networkDelay latency injected per datastore call. + // The warm-up run also uses the delay reader so advisor stats reflect realistic + // call patterns. + + b.Run("advised_delay", func(b *testing.B) { + advisedIt := buildAdvisedIterator(b, delayReader) + queryCtx := query.NewLocalContext(ctx, query.WithReader(delayReader)) + + for b.Loop() { + seq, err := queryCtx.Check(advisedIt, resources, subject) + require.NoError(b, err) + paths, err := query.CollectAll(seq) + require.NoError(b, err) + require.NotEmpty(b, paths) + } + }) } diff --git a/pkg/query/build_tree_test.go b/pkg/query/build_tree_test.go index 7c357b8cf..e7b616258 100644 --- a/pkg/query/build_tree_test.go +++ b/pkg/query/build_tree_test.go @@ -32,7 +32,7 @@ func TestBuildTree(t *testing.T) { require.NoError(err) ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) relSeq, err := ctx.Check(it, NewObjects("document", "specialplan"), NewObject("user", "multiroleguy").WithEllipses()) require.NoError(err) @@ -62,7 +62,7 @@ func TestBuildTreeMultipleRelations(t *testing.T) { require.Contains(explain.String(), "Union", "edit permission should create a union iterator") ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) relSeq, err := ctx.Check(it, NewObjects("document", "specialplan"), NewObject("user", "multiroleguy").WithEllipses()) require.NoError(err) @@ -113,7 +113,7 @@ func TestBuildTreeSubRelations(t *testing.T) { require.NotEmpty(explain.String()) ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) // Just test that the iterator can be executed without error relSeq, err := ctx.Check(it, NewObjects("document", "companyplan"), NewObject("user", "legal").WithEllipses()) @@ -212,7 +212,7 @@ func TestBuildTreeIntersectionOperation(t *testing.T) { require.Contains(explain.String(), "Intersection", "should create intersection iterator") ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) // Test execution relSeq, err := ctx.Check(it, NewObjects("document", "specialplan"), NewObject("user", "multiroleguy").WithEllipses()) @@ -275,7 +275,7 @@ func TestBuildTreeExclusionEdgeCases(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) userDef := testfixtures.UserNS.CloneVT() @@ -535,7 +535,7 @@ func TestBuildTreeSingleRelationOptimization(t *testing.T) { require.Contains(explain.String(), "Datastore", "should create datastore iterator") ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) // Test execution relSeq, err := ctx.Check(it, NewObjects("document", "companyplan"), NewObject("user", "legal").WithEllipses()) @@ -555,7 +555,7 @@ func TestBuildTreeSubrelationHandling(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) userDef := testfixtures.UserNS.CloneVT() diff --git a/pkg/query/caveat.go b/pkg/query/caveat.go index ef0dc8024..8c20c64e5 100644 --- a/pkg/query/caveat.go +++ b/pkg/query/caveat.go @@ -1,9 +1,11 @@ package query import ( + "context" "errors" "fmt" + "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" ) @@ -127,16 +129,12 @@ func (c *CaveatIterator) simplifyCaveat(ctx *Context, path Path) (*core.CaveatEx } // Use the SimplifyCaveatExpression function to properly handle AND/OR logic - sr, err := ctx.Reader.ReadSchema(ctx) - if err != nil { - return nil, false, fmt.Errorf("failed to get schema reader: %w", err) - } simplified, passes, err := SimplifyCaveatExpression( ctx, ctx.CaveatRunner, path.Caveat, ctx.CaveatContext, - sr, + caveatDefinitionLookupAdapter{ctx.Reader}, ) if err != nil { return nil, false, fmt.Errorf("failed to simplify caveat: %w", err) @@ -219,6 +217,26 @@ func (c *CaveatIterator) SubjectTypes() ([]ObjectType, error) { return c.subiterator.SubjectTypes() } +// caveatDefinitionLookupAdapter wraps a QueryDatastoreReader to satisfy +// caveats.CaveatDefinitionLookup (which takes a bulk name slice) by calling +// LookupCaveatDefinition individually for each name. +type caveatDefinitionLookupAdapter struct{ r QueryDatastoreReader } + +func (a caveatDefinitionLookupAdapter) LookupCaveatDefinitionsByNames( + ctx context.Context, + names []string, +) (map[string]datastore.CaveatDefinition, error) { + out := make(map[string]datastore.CaveatDefinition, len(names)) + for _, name := range names { + def, err := a.r.LookupCaveatDefinition(ctx, name) + if err != nil { + return nil, err + } + out[name] = def + } + return out, nil +} + // buildExplainInfo creates detailed explanation information for the caveat iterator func (c *CaveatIterator) buildExplainInfo() string { if c.caveat == nil { diff --git a/pkg/query/caveat_test.go b/pkg/query/caveat_test.go index 222b19f84..645a10b61 100644 --- a/pkg/query/caveat_test.go +++ b/pkg/query/caveat_test.go @@ -110,7 +110,7 @@ func TestCaveatIteratorNoCaveat(t *testing.T) { require.NoError(t, err) queryCtx := NewLocalContext(context.Background(), - WithReader(dl.SnapshotReader(rev)), + WithRevisionedReader(dl.SnapshotReader(rev)), WithCaveatContext(tc.caveatContext), WithCaveatRunner(caveats.NewCaveatRunner(types.NewTypeSet()))) @@ -203,7 +203,7 @@ func TestCaveatIteratorWithCaveat(t *testing.T) { require.NoError(t, err) queryCtx := NewLocalContext(context.Background(), - WithReader(dl.SnapshotReader(rev)), + WithRevisionedReader(dl.SnapshotReader(rev)), WithCaveatContext(tc.caveatContext), WithCaveatRunner(caveats.NewCaveatRunner(types.NewTypeSet()))) diff --git a/pkg/query/context.go b/pkg/query/context.go index 803d28e80..cc46e48b7 100644 --- a/pkg/query/context.go +++ b/pkg/query/context.go @@ -5,7 +5,6 @@ import ( "github.com/authzed/spicedb/internal/caveats" "github.com/authzed/spicedb/pkg/datalayer" - "github.com/authzed/spicedb/pkg/datastore/options" "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/tuple" ) @@ -17,7 +16,7 @@ import ( type Context struct { context.Context Executor Executor - Reader datalayer.RevisionedReader // Datastore reader for this query at a specific revision + Reader QueryDatastoreReader // Datastore reader for this query at a specific revision CaveatContext map[string]any CaveatRunner *caveats.CaveatRunner TraceLogger *TraceLogger // For debugging iterator execution (used by TraceStep calls inside iterators) @@ -26,7 +25,6 @@ type Context struct { // Pagination options for IterSubjects and IterResources PaginationCursors map[string]*tuple.Relationship // Cursors for pagination, keyed by iterator ID PaginationLimit *uint64 // Limit for pagination (max number of results to return) - PaginationSort options.SortOrder // Sort order for pagination // observers holds the list of Observer implementations to notify during query execution. observers []Observer @@ -55,10 +53,16 @@ func NewLocalContext(stdContext context.Context, opts ...ContextOption) *Context type ContextOption func(*Context) // WithReader sets the datastore reader for the context. -func WithReader(reader datalayer.RevisionedReader) ContextOption { +func WithReader(reader QueryDatastoreReader) ContextOption { return func(ctx *Context) { ctx.Reader = reader } } +// WithRevisionedReader wraps a datalayer.RevisionedReader as a QueryDatastoreReader +// and sets it as the datastore reader for the context. +func WithRevisionedReader(reader datalayer.RevisionedReader) ContextOption { + return func(ctx *Context) { ctx.Reader = NewQueryDatastoreReader(reader) } +} + // WithObserver adds an Observer to the context. func WithObserver(o Observer) ContextOption { return func(ctx *Context) { ctx.observers = append(ctx.observers, o) } @@ -89,11 +93,6 @@ func WithPaginationLimit(limit uint64) ContextOption { return func(ctx *Context) { ctx.PaginationLimit = &limit } } -// WithPaginationSort sets the pagination sort order for the context. -func WithPaginationSort(sort options.SortOrder) ContextOption { - return func(ctx *Context) { ctx.PaginationSort = sort } -} - // GetPaginationCursor retrieves the cursor for a specific iterator ID. func (ctx *Context) GetPaginationCursor(iteratorID string) *tuple.Relationship { if ctx.PaginationCursors == nil { diff --git a/pkg/query/datastore.go b/pkg/query/datastore.go index 530c147d2..96ed6d716 100644 --- a/pkg/query/datastore.go +++ b/pkg/query/datastore.go @@ -2,36 +2,12 @@ package query import ( "fmt" - "iter" - "github.com/authzed/spicedb/pkg/datastore" - "github.com/authzed/spicedb/pkg/datastore/options" - "github.com/authzed/spicedb/pkg/datastore/queryshape" "github.com/authzed/spicedb/pkg/schema/v2" "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/tuple" ) -// convertRelationSeqToPathSeq converts an iter.Seq2[tuple.Relationship, error] from the datastore -// into a PathSeq by transforming each tuple.Relationship into a Path using FromRelationship. -func convertRelationSeqToPathSeq(relSeq iter.Seq2[tuple.Relationship, error]) PathSeq { - return func(yield func(Path, error) bool) { - for rel, err := range relSeq { - if err != nil { - if !yield(Path{}, err) { - return - } - continue - } - - path := FromRelationship(rel) - if !yield(path, nil) { - return - } - } - } -} - // DatastoreIterator is a common leaf iterator. It represents the set of all // relationships of the given schema.BaseRelation, ie, relations that have a // known resource and subject type and may contain caveats or expiration. @@ -50,13 +26,6 @@ func NewDatastoreIterator(base *schema.BaseRelation) *DatastoreIterator { } } -func (r *DatastoreIterator) buildSubjectRelationFilter() datastore.SubjectRelationFilter { - if r.base.Subrelation() == tuple.Ellipsis { - return datastore.SubjectRelationFilter{}.WithEllipsisRelation() - } - return datastore.SubjectRelationFilter{}.WithNonEllipsisRelation(r.base.Subrelation()) -} - func (r *DatastoreIterator) CheckImpl(ctx *Context, resources []Object, subject ObjectAndRelation) (PathSeq, error) { // For subrelations, we need to allow type mismatches because the subrelation might bridge different types // For example, group:member -> group:member should find group:everyone#member@group:engineering#member @@ -81,87 +50,52 @@ func (r *DatastoreIterator) CheckImpl(ctx *Context, resources []Object, subject } func (r *DatastoreIterator) checkNormalImpl(ctx *Context, resources []Object, subject ObjectAndRelation) (PathSeq, error) { - resourceIDs := make([]string, len(resources)) - for i, res := range resources { - resourceIDs[i] = res.ObjectID - } - - filter := datastore.RelationshipsFilter{ - OptionalResourceType: r.base.DefinitionName(), - OptionalResourceIds: resourceIDs, - OptionalResourceRelation: r.base.RelationName(), - OptionalSubjectsSelectors: []datastore.SubjectsSelector{ - { - OptionalSubjectType: r.base.Type(), - OptionalSubjectIds: []string{subject.ObjectID}, - RelationFilter: r.buildSubjectRelationFilter(), - }, - }, - } + ctx.TraceStep(r, "querying datastore for %s:%s with resources=%v", r.base.Type(), r.base.RelationName(), resourceIDs(resources)) - ctx.TraceStep(r, "querying datastore for %s:%s with resources=%v", r.base.Type(), r.base.RelationName(), resourceIDs) - - relIter, err := ctx.Reader.QueryRelationships(ctx, filter, - options.WithSkipCaveats(r.base.Caveat() == ""), - options.WithSkipExpiration(!r.base.Expiration()), - options.WithQueryShape(queryshape.CheckPermissionSelectDirectSubjects), + pathSeq, err := ctx.Reader.CheckRelationships(ctx, + resources, + r.base.RelationName(), + subject, + r.base.Caveat() != "", r.base.Expiration(), ) if err != nil { return nil, err } - // Convert to PathSeq - pathSeq := convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter)) - // Eagerly collect all results to terminate the database query immediately paths, err := CollectAll(pathSeq) if err != nil { return nil, err } - - // Return iterator over collected slice return PathSeqFromSlice(paths), nil } func (r *DatastoreIterator) checkWildcardImpl(ctx *Context, resources []Object, subject ObjectAndRelation) (PathSeq, error) { // Query the datastore for wildcard relationships (subject ObjectID = "*") - resourceIDs := make([]string, len(resources)) - for i, res := range resources { - resourceIDs[i] = res.ObjectID - } - - filter := datastore.RelationshipsFilter{ - OptionalResourceType: r.base.DefinitionName(), - OptionalResourceIds: resourceIDs, - OptionalResourceRelation: r.base.RelationName(), - OptionalSubjectsSelectors: []datastore.SubjectsSelector{ - { - OptionalSubjectType: r.base.Type(), - OptionalSubjectIds: []string{tuple.PublicWildcard}, // Look for "*" subjects - RelationFilter: r.buildSubjectRelationFilter(), - }, - }, + wildcardSubject := ObjectAndRelation{ + ObjectType: subject.ObjectType, + ObjectID: WildcardObjectID, + Relation: subject.Relation, } - relIter, err := ctx.Reader.QueryRelationships(ctx, filter, - options.WithSkipCaveats(r.base.Caveat() == ""), - options.WithSkipExpiration(!r.base.Expiration()), - options.WithQueryShape(queryshape.CheckPermissionSelectDirectSubjects), + pathSeq, err := ctx.Reader.CheckRelationships(ctx, + resources, + r.base.RelationName(), + wildcardSubject, + r.base.Caveat() != "", r.base.Expiration(), ) if err != nil { return nil, err } - // Convert to PathSeq and rewrite subjects - pathSeq := RewriteSubject(convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter)), subject) + // Rewrite subjects from wildcard back to the actual subject + pathSeq = RewriteSubject(pathSeq, subject) // Eagerly collect all results to terminate the database query immediately paths, err := CollectAll(pathSeq) if err != nil { return nil, err } - - // Return iterator over collected slice return PathSeqFromSlice(paths), nil } @@ -173,103 +107,72 @@ func (r *DatastoreIterator) IterSubjectsImpl(ctx *Context, resource Object, filt } func (r *DatastoreIterator) iterSubjectsNormalImpl(ctx *Context, resource Object) (PathSeq, error) { - filter := datastore.RelationshipsFilter{ - OptionalResourceType: r.base.DefinitionName(), - OptionalResourceIds: []string{resource.ObjectID}, - OptionalResourceRelation: r.base.RelationName(), - OptionalSubjectsSelectors: []datastore.SubjectsSelector{ - { - OptionalSubjectType: r.base.Type(), - RelationFilter: r.buildSubjectRelationFilter(), - }, - }, + subjectType := ObjectType{ + Type: r.base.Type(), + Subrelation: r.base.Subrelation(), } // If pagination is not configured, do the simple eager collection if ctx.PaginationLimit == nil { - relIter, err := ctx.Reader.QueryRelationships(ctx, filter, - options.WithSkipCaveats(r.base.Caveat() == ""), - options.WithSkipExpiration(!r.base.Expiration()), - options.WithQueryShape(queryshape.AllSubjectsForResources), + pathSeq, err := ctx.Reader.QuerySubjects(ctx, + resource, + r.base.RelationName(), + subjectType, + r.base.Caveat() != "", r.base.Expiration(), + QueryPage{}, ) if err != nil { return nil, err } - // Convert to PathSeq and filter out wildcard subjects - pathSeq := FilterWildcardSubjects(convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter))) - - // Eagerly collect all results to terminate the database query immediately - paths, err := CollectAll(pathSeq) + // Filter out wildcard subjects and eagerly collect + paths, err := CollectAll(FilterWildcardSubjects(pathSeq)) if err != nil { return nil, err } - - // Return iterator over collected slice return PathSeqFromSlice(paths), nil } // Pagination is configured - return a PathSeq that fetches pages as needed return func(yield func(Path, error) bool) { - var cursor *tuple.Relationship iteratorID := fmt.Sprintf("%016x:iter_subjects", r.CanonicalKey().Hash()) - - // Check if we have a starting cursor from previous iteration - cursor = ctx.GetPaginationCursor(iteratorID) + cursor := ctx.GetPaginationCursor(iteratorID) for { - // Build query options for this page - queryOpts := []options.QueryOptionsOption{ - options.WithSkipCaveats(r.base.Caveat() == ""), - options.WithSkipExpiration(!r.base.Expiration()), - options.WithQueryShape(queryshape.AllSubjectsForResources), - options.WithLimit(ctx.PaginationLimit), - } - - if ctx.PaginationSort != options.Unsorted { - queryOpts = append(queryOpts, options.WithSort(ctx.PaginationSort)) - } - if cursor != nil { - queryOpts = append(queryOpts, options.WithAfter(options.ToCursor(*cursor))) - } - - // Fetch this page - relIter, err := ctx.Reader.QueryRelationships(ctx, filter, queryOpts...) + pathSeq, err := ctx.Reader.QuerySubjects(ctx, + resource, + r.base.RelationName(), + subjectType, + r.base.Caveat() != "", r.base.Expiration(), + QueryPage{Limit: ctx.PaginationLimit, Cursor: cursor}, + ) if err != nil { yield(Path{}, err) return } - // Convert to PathSeq and filter out wildcard subjects - pathSeq := FilterWildcardSubjects(convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter))) - - // Materialize this page into memory - paths, err := CollectAll(pathSeq) + paths, err := CollectAll(FilterWildcardSubjects(pathSeq)) if err != nil { yield(Path{}, err) return } - // If no results, we're done if len(paths) == 0 { return } - // Update cursor for next page lastPath := paths[len(paths)-1] if rel, err := lastPath.ToRelationship(); err == nil { cursor = &rel ctx.SetPaginationCursor(iteratorID, cursor) } - // Yield all paths from this page for _, path := range paths { if !yield(path, nil) { return } } - // If we got fewer results than the limit, we're done if uint64(len(paths)) < *ctx.PaginationLimit { return } @@ -284,36 +187,32 @@ func (r *DatastoreIterator) iterSubjectsWildcardImpl(ctx *Context, resource Obje // 2. If yes, querying for all concrete subjects with relationships to this resource // // This avoids doing a full subject enumeration when no wildcard exists (the common case). - // When wildcards do exist, we do 2 queries in this branch, but that's the correct semantic - // behavior - we only enumerate when there's actually a wildcard to expand. // First, check if there's actually a wildcard relationship for this resource - wildcardFilter := datastore.RelationshipsFilter{ - OptionalResourceType: r.base.DefinitionName(), - OptionalResourceIds: []string{resource.ObjectID}, - OptionalResourceRelation: r.base.RelationName(), - OptionalSubjectsSelectors: []datastore.SubjectsSelector{ - { - OptionalSubjectType: r.base.Type(), - OptionalSubjectIds: []string{tuple.PublicWildcard}, // Look for "*" subjects - RelationFilter: r.buildSubjectRelationFilter(), - }, - }, + // by doing a CheckRelationships probe with subject ObjectID = "*". + subjectType := ObjectType{ + Type: r.base.Type(), + Subrelation: r.base.Subrelation(), + } + + wildcardSubject := ObjectAndRelation{ + ObjectType: r.base.Type(), + ObjectID: WildcardObjectID, + Relation: r.base.Subrelation(), } - wildcardIter, err := ctx.Reader.QueryRelationships(ctx, wildcardFilter, - options.WithSkipCaveats(r.base.Caveat() == ""), - options.WithSkipExpiration(!r.base.Expiration()), - options.WithQueryShape(queryshape.AllSubjectsForResources), - options.WithLimit(options.LimitOne), // We only need to know if one exists + wildcardPathSeq, err := ctx.Reader.CheckRelationships(ctx, + []Object{resource}, + r.base.RelationName(), + wildcardSubject, + r.base.Caveat() != "", r.base.Expiration(), ) if err != nil { return nil, err } - // Check if any wildcard relationship exists hasWildcard := false - for _, err := range wildcardIter { + for _, err := range wildcardPathSeq { if err != nil { return nil, err } @@ -321,116 +220,74 @@ func (r *DatastoreIterator) iterSubjectsWildcardImpl(ctx *Context, resource Obje break } - // If no wildcard relationship exists, return empty - nothing to enumerate if !hasWildcard { return EmptyPathSeq(), nil } - // Wildcard exists, so enumerate all concrete subjects of the appropriate type. - // A wildcard (e.g., user:*) means "all subjects of that type", so we need to enumerate - // all defined subjects of that type in the datastore. This may return some of the same - // subjects as the non-wildcard branch (when both wildcard and concrete relationships exist), - // but the Union will deduplicate them. - // - // Note: We query for all subjects of the appropriate type, not just those with a relationship - // to this specific resource. This matches the semantics of wildcards, which grant access to - // ALL subjects of the type, regardless of whether they have other relationships. - allSubjectsFilter := datastore.RelationshipsFilter{ - // Note: We intentionally omit OptionalResourceType and OptionalResourceIds to find - // all subjects of the appropriate type across all resources - OptionalSubjectsSelectors: []datastore.SubjectsSelector{ - { - OptionalSubjectType: r.base.Type(), - RelationFilter: r.buildSubjectRelationFilter(), - }, - }, - } + // Wildcard exists — enumerate all concrete subjects of the appropriate type. + // Empty Object{} and empty resourceRelation means no resource constraints at all. + allSubjectsResource := Object{} + const noResourceRelation = "" - // If pagination is not configured, do the simple eager collection if ctx.PaginationLimit == nil { - relIter, err := ctx.Reader.QueryRelationships(ctx, allSubjectsFilter, - options.WithSkipCaveats(r.base.Caveat() == ""), - options.WithSkipExpiration(!r.base.Expiration()), - options.WithQueryShape(queryshape.AllSubjectsForResources), + pathSeq, err := ctx.Reader.QuerySubjects(ctx, + allSubjectsResource, + noResourceRelation, + subjectType, + r.base.Caveat() != "", r.base.Expiration(), + QueryPage{}, ) if err != nil { return nil, err } - // Convert to PathSeq and filter out wildcard subjects - pathSeq := FilterWildcardSubjects(convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter))) - - // Eagerly collect all results to terminate the database query immediately - paths, err := CollectAll(pathSeq) + paths, err := CollectAll(FilterWildcardSubjects(pathSeq)) if err != nil { return nil, err } - - // Return iterator over collected slice return PathSeqFromSlice(paths), nil } - // Pagination is configured - return a PathSeq that fetches pages as needed + // Pagination is configured return func(yield func(Path, error) bool) { - var cursor *tuple.Relationship iteratorID := fmt.Sprintf("%016x:iter_subjects_wildcard", r.CanonicalKey().Hash()) - - // Check if we have a starting cursor from previous iteration - cursor = ctx.GetPaginationCursor(iteratorID) + cursor := ctx.GetPaginationCursor(iteratorID) for { - // Build query options for this page - queryOpts := []options.QueryOptionsOption{ - options.WithSkipCaveats(r.base.Caveat() == ""), - options.WithSkipExpiration(!r.base.Expiration()), - options.WithQueryShape(queryshape.AllSubjectsForResources), - options.WithLimit(ctx.PaginationLimit), - } - - if ctx.PaginationSort != options.Unsorted { - queryOpts = append(queryOpts, options.WithSort(ctx.PaginationSort)) - } - if cursor != nil { - queryOpts = append(queryOpts, options.WithAfter(options.ToCursor(*cursor))) - } - - // Fetch this page - relIter, err := ctx.Reader.QueryRelationships(ctx, allSubjectsFilter, queryOpts...) + pathSeq, err := ctx.Reader.QuerySubjects(ctx, + allSubjectsResource, + noResourceRelation, + subjectType, + r.base.Caveat() != "", r.base.Expiration(), + QueryPage{Limit: ctx.PaginationLimit, Cursor: cursor}, + ) if err != nil { yield(Path{}, err) return } - // Convert to PathSeq and filter out wildcard subjects - pathSeq := FilterWildcardSubjects(convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter))) - - // Materialize this page into memory - paths, err := CollectAll(pathSeq) + paths, err := CollectAll(FilterWildcardSubjects(pathSeq)) if err != nil { yield(Path{}, err) return } - // If no results, we're done if len(paths) == 0 { return } - // Update cursor for next page lastPath := paths[len(paths)-1] if rel, err := lastPath.ToRelationship(); err == nil { cursor = &rel ctx.SetPaginationCursor(iteratorID, cursor) } - // Yield all paths from this page for _, path := range paths { if !yield(path, nil) { return } } - // If we got fewer results than the limit, we're done if uint64(len(paths)) < *ctx.PaginationLimit { return } @@ -462,103 +319,64 @@ func (r *DatastoreIterator) IterResourcesImpl(ctx *Context, subject ObjectAndRel return EmptyPathSeq(), nil } - filter := datastore.RelationshipsFilter{ - OptionalResourceType: r.base.DefinitionName(), - OptionalResourceRelation: r.base.RelationName(), - OptionalSubjectsSelectors: []datastore.SubjectsSelector{ - { - OptionalSubjectType: subject.ObjectType, - OptionalSubjectIds: []string{subject.ObjectID}, - RelationFilter: r.buildSubjectRelationFilter(), - }, - }, - } - - // If pagination is not configured, do the simple eager collection if ctx.PaginationLimit == nil { - relIter, err := ctx.Reader.QueryRelationships(ctx, filter, - options.WithSkipCaveats(r.base.Caveat() == ""), - options.WithSkipExpiration(!r.base.Expiration()), - options.WithQueryShape(queryshape.MatchingResourcesForSubject), + pathSeq, err := ctx.Reader.QueryResources(ctx, + r.base.DefinitionName(), + r.base.RelationName(), + subject, + r.base.Caveat() != "", r.base.Expiration(), + QueryPage{}, ) if err != nil { return nil, err } - // Convert to PathSeq - pathSeq := convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter)) - - // Eagerly collect all results to terminate the database query immediately paths, err := CollectAll(pathSeq) if err != nil { return nil, err } - - // Return iterator over collected slice return PathSeqFromSlice(paths), nil } - // Pagination is configured - return a PathSeq that fetches pages as needed return func(yield func(Path, error) bool) { - var cursor *tuple.Relationship iteratorID := fmt.Sprintf("%016x:iter_resources", r.CanonicalKey().Hash()) - - // Check if we have a starting cursor from previous iteration - cursor = ctx.GetPaginationCursor(iteratorID) + cursor := ctx.GetPaginationCursor(iteratorID) for { - // Build query options for this page - queryOpts := []options.QueryOptionsOption{ - options.WithSkipCaveats(r.base.Caveat() == ""), - options.WithSkipExpiration(!r.base.Expiration()), - options.WithQueryShape(queryshape.MatchingResourcesForSubject), - options.WithLimit(ctx.PaginationLimit), - } - - if ctx.PaginationSort != options.Unsorted { - queryOpts = append(queryOpts, options.WithSort(ctx.PaginationSort)) - } - if cursor != nil { - queryOpts = append(queryOpts, options.WithAfter(options.ToCursor(*cursor))) - } - - // Fetch this page - relIter, err := ctx.Reader.QueryRelationships(ctx, filter, queryOpts...) + pathSeq, err := ctx.Reader.QueryResources(ctx, + r.base.DefinitionName(), + r.base.RelationName(), + subject, + r.base.Caveat() != "", r.base.Expiration(), + QueryPage{Limit: ctx.PaginationLimit, Cursor: cursor}, + ) if err != nil { yield(Path{}, err) return } - // Convert to PathSeq - pathSeq := convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter)) - - // Materialize this page into memory paths, err := CollectAll(pathSeq) if err != nil { yield(Path{}, err) return } - // If no results, we're done if len(paths) == 0 { return } - // Update cursor for next page lastPath := paths[len(paths)-1] if rel, err := lastPath.ToRelationship(); err == nil { cursor = &rel ctx.SetPaginationCursor(iteratorID, cursor) } - // Yield all paths from this page for _, path := range paths { if !yield(path, nil) { return } } - // If we got fewer results than the limit, we're done if uint64(len(paths)) < *ctx.PaginationLimit { return } @@ -567,103 +385,72 @@ func (r *DatastoreIterator) IterResourcesImpl(ctx *Context, subject ObjectAndRel } func (r *DatastoreIterator) iterResourcesWildcardImpl(ctx *Context, subject ObjectAndRelation) (PathSeq, error) { - filter := datastore.RelationshipsFilter{ - OptionalResourceType: r.base.DefinitionName(), - OptionalResourceRelation: r.base.RelationName(), - OptionalSubjectsSelectors: []datastore.SubjectsSelector{ - { - OptionalSubjectType: subject.ObjectType, - OptionalSubjectIds: []string{tuple.PublicWildcard}, // Look for "*" subjects - RelationFilter: r.buildSubjectRelationFilter(), - }, - }, + wildcardSubject := ObjectAndRelation{ + ObjectType: subject.ObjectType, + ObjectID: WildcardObjectID, + Relation: subject.Relation, } - // If pagination is not configured, do the simple eager collection if ctx.PaginationLimit == nil { - relIter, err := ctx.Reader.QueryRelationships(ctx, filter, - options.WithSkipCaveats(r.base.Caveat() == ""), - options.WithSkipExpiration(!r.base.Expiration()), - options.WithQueryShape(queryshape.AllSubjectsForResources), + pathSeq, err := ctx.Reader.QueryResources(ctx, + r.base.DefinitionName(), + r.base.RelationName(), + wildcardSubject, + r.base.Caveat() != "", r.base.Expiration(), + QueryPage{}, ) if err != nil { return nil, err } - // Convert to PathSeq and rewrite subjects - pathSeq := RewriteSubject(convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter)), subject) - - // Eagerly collect all results to terminate the database query immediately + pathSeq = RewriteSubject(pathSeq, subject) paths, err := CollectAll(pathSeq) if err != nil { return nil, err } - - // Return iterator over collected slice return PathSeqFromSlice(paths), nil } - // Pagination is configured - return a PathSeq that fetches pages as needed return func(yield func(Path, error) bool) { - var cursor *tuple.Relationship iteratorID := fmt.Sprintf("%016x:iter_resources_wildcard", r.CanonicalKey().Hash()) - - // Check if we have a starting cursor from previous iteration - cursor = ctx.GetPaginationCursor(iteratorID) + cursor := ctx.GetPaginationCursor(iteratorID) for { - // Build query options for this page - queryOpts := []options.QueryOptionsOption{ - options.WithSkipCaveats(r.base.Caveat() == ""), - options.WithSkipExpiration(!r.base.Expiration()), - options.WithQueryShape(queryshape.AllSubjectsForResources), - options.WithLimit(ctx.PaginationLimit), - } - - if ctx.PaginationSort != options.Unsorted { - queryOpts = append(queryOpts, options.WithSort(ctx.PaginationSort)) - } - if cursor != nil { - queryOpts = append(queryOpts, options.WithAfter(options.ToCursor(*cursor))) - } - - // Fetch this page - relIter, err := ctx.Reader.QueryRelationships(ctx, filter, queryOpts...) + pathSeq, err := ctx.Reader.QueryResources(ctx, + r.base.DefinitionName(), + r.base.RelationName(), + wildcardSubject, + r.base.Caveat() != "", r.base.Expiration(), + QueryPage{Limit: ctx.PaginationLimit, Cursor: cursor}, + ) if err != nil { yield(Path{}, err) return } - // Convert to PathSeq and rewrite subjects - pathSeq := RewriteSubject(convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter)), subject) - - // Materialize this page into memory + pathSeq = RewriteSubject(pathSeq, subject) paths, err := CollectAll(pathSeq) if err != nil { yield(Path{}, err) return } - // If no results, we're done if len(paths) == 0 { return } - // Update cursor for next page lastPath := paths[len(paths)-1] if rel, err := lastPath.ToRelationship(); err == nil { cursor = &rel ctx.SetPaginationCursor(iteratorID, cursor) } - // Yield all paths from this page for _, path := range paths { if !yield(path, nil) { return } } - // If we got fewer results than the limit, we're done if uint64(len(paths)) < *ctx.PaginationLimit { return } @@ -733,3 +520,12 @@ func (r *DatastoreIterator) SubjectTypes() ([]ObjectType, error) { Subrelation: r.base.Subrelation(), }}, nil } + +// resourceIDs extracts the ObjectID strings from a slice of Objects. +func resourceIDs(resources []Object) []string { + ids := make([]string, len(resources)) + for i, r := range resources { + ids[i] = r.ObjectID + } + return ids +} diff --git a/pkg/query/exclusion_test.go b/pkg/query/exclusion_test.go index 828545455..6b3134661 100644 --- a/pkg/query/exclusion_test.go +++ b/pkg/query/exclusion_test.go @@ -23,7 +23,7 @@ func TestExclusionIterator(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) // Create test paths path1 := MustPathFromString("document:doc1#viewer@user:alice") @@ -259,7 +259,7 @@ func TestExclusionWithEmptyIterator(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) path1 := MustPathFromString("document:doc1#viewer@user:alice") @@ -305,7 +305,7 @@ func TestExclusionErrorHandling(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) path1 := MustPathFromString("document:doc1#viewer@user:alice") @@ -386,7 +386,7 @@ func TestExclusionWithComplexIteratorTypes(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) // Create test relations path1 := MustPathFromString("document:doc1#viewer@user:alice") @@ -574,7 +574,7 @@ func TestExclusion_CombinedCaveatLogic(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) // Helper to create paths with caveats createPathWithCaveat := func(relation, caveatName string) Path { diff --git a/pkg/query/intersection_arrow_test.go b/pkg/query/intersection_arrow_test.go index 411618bb7..00c00bfa7 100644 --- a/pkg/query/intersection_arrow_test.go +++ b/pkg/query/intersection_arrow_test.go @@ -41,7 +41,7 @@ func TestIntersectionArrowIterator(t *testing.T) { require.NoError(err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) // Test: alice should have access because she's a member of ALL teams (team1 and team2) resources := []Object{NewObject("document", "doc1")} @@ -90,7 +90,7 @@ func TestIntersectionArrowIterator(t *testing.T) { require.NoError(err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) // Test: alice should NOT have access because she's not a member of ALL teams resources := []Object{NewObject("document", "doc1")} @@ -130,7 +130,7 @@ func TestIntersectionArrowIterator(t *testing.T) { require.NoError(err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) // Test: alice should have access because she's a member of the only team resources := []Object{NewObject("document", "doc1")} @@ -175,7 +175,7 @@ func TestIntersectionArrowIterator(t *testing.T) { require.NoError(err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) resources := []Object{NewObject("document", "doc1")} subject := ObjectAndRelation{ObjectType: "user", ObjectID: "alice"} @@ -218,7 +218,7 @@ func TestIntersectionArrowIterator(t *testing.T) { require.NoError(err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) resources := []Object{NewObject("document", "doc1")} subject := ObjectAndRelation{ObjectType: "user", ObjectID: "alice"} @@ -257,7 +257,7 @@ func TestIntersectionArrowIterator(t *testing.T) { require.NoError(err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) resources := []Object{} subject := ObjectAndRelation{ObjectType: "user", ObjectID: "alice"} @@ -286,7 +286,7 @@ func TestIntersectionArrowIteratorCaveatCombination(t *testing.T) { require.NoError(err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) t.Run("CombineTwoCaveats_AND_Logic", func(t *testing.T) { t.Parallel() @@ -513,7 +513,7 @@ func TestIntersectionArrowIteratorClone(t *testing.T) { require.NoError(err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) // Test that both iterators produce the same results resources := []Object{NewObject("document", "doc1")} diff --git a/pkg/query/observer_analyze_test.go b/pkg/query/observer_analyze_test.go index 39eaf514b..27827283b 100644 --- a/pkg/query/observer_analyze_test.go +++ b/pkg/query/observer_analyze_test.go @@ -149,7 +149,7 @@ func TestAnalysisIntegration(t *testing.T) { // Create a context with analysis enabled analyze := NewAnalyzeObserver() ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithObserver(analyze)) // Execute a Check operation diff --git a/pkg/query/observer_count_test.go b/pkg/query/observer_count_test.go index fe6649c01..91d2b9358 100644 --- a/pkg/query/observer_count_test.go +++ b/pkg/query/observer_count_test.go @@ -154,7 +154,7 @@ func TestCountObserverIntegration(t *testing.T) { // Create a context with count observer enabled countObs := NewCountObserver() ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithObserver(countObs)) // Execute a Check operation diff --git a/pkg/query/quick_e2e_test.go b/pkg/query/quick_e2e_test.go index e722ce9c8..02a6349f8 100644 --- a/pkg/query/quick_e2e_test.go +++ b/pkg/query/quick_e2e_test.go @@ -39,7 +39,7 @@ func TestCheck(t *testing.T) { it := NewIntersectionIterator(vande, edit) ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) relSeq, err := ctx.Check(it, NewObjects("document", "specialplan"), NewObject("user", "multiroleguy").WithEllipses()) require.NoError(err) @@ -67,7 +67,7 @@ func TestBaseIterSubjects(t *testing.T) { vande := NewDatastoreIterator(vandeRel.BaseRelations()[0]) ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) relSeq, err := ctx.IterSubjects(vande, NewObject("document", "specialplan"), NoObjectFilter()) require.NoError(err) @@ -100,7 +100,7 @@ func TestCheckArrow(t *testing.T) { it := NewArrowIterator(folders, view) ctx := NewLocalContext(t.Context(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))) relSeq, err := ctx.Check(it, NewObjects("document", "companyplan"), NewObject("user", "legal").WithEllipses()) require.NoError(err) diff --git a/pkg/query/reader.go b/pkg/query/reader.go new file mode 100644 index 000000000..cd0a3daf0 --- /dev/null +++ b/pkg/query/reader.go @@ -0,0 +1,300 @@ +package query + +import ( + "context" + "iter" + + "github.com/authzed/spicedb/pkg/datalayer" + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" + "github.com/authzed/spicedb/pkg/datastore/queryshape" + "github.com/authzed/spicedb/pkg/tuple" +) + +// WildcardObjectID is the subject ID representing a public wildcard ("*"). +const WildcardObjectID = tuple.PublicWildcard + +// limitOne is used for existence-probe queries that only need to know if +// at least one row exists. +var limitOne uint64 = 1 + +// QueryPage bundles pagination parameters for QuerySubjects and QueryResources. +type QueryPage struct { + Limit *uint64 + Cursor *tuple.Relationship +} + +// QueryDatastoreReader is the minimal datastore interface used by pkg/query. +// It exposes only the four logical operations actually performed by this package, +// returning PathSeq values directly so callers never touch raw relationship iterators. +type QueryDatastoreReader interface { + // CheckRelationships finds paths for specific resource objects matched against + // a subject. subject.ObjectID may be WildcardObjectID for wildcard checks. + CheckRelationships( + ctx context.Context, + resources []Object, + resourceRelation string, + subject ObjectAndRelation, + withCaveats, withExpiration bool, + ) (PathSeq, error) + + // QuerySubjects finds all subject paths for a resource. + // If resource.ObjectID is empty, no resource ID filter is applied (wildcard expansion). + // subjectType.Subrelation drives the ellipsis-vs-non-ellipsis filter. + QuerySubjects( + ctx context.Context, + resource Object, + resourceRelation string, + subjectType ObjectType, + withCaveats, withExpiration bool, + page QueryPage, + ) (PathSeq, error) + + // QueryResources finds all resource paths for a subject. + // subject.ObjectID may be WildcardObjectID for wildcard resource queries. + QueryResources( + ctx context.Context, + resourceType string, + resourceRelation string, + subject ObjectAndRelation, + withCaveats, withExpiration bool, + page QueryPage, + ) (PathSeq, error) + + // SubjectExistsAsRelationship is an existence probe used by AliasIterator. + // It includes expired relationships and returns true if any relationship + // has the given subject with the specified non-ellipsis relation. + SubjectExistsAsRelationship( + ctx context.Context, + subject Object, + nonEllipsisRelation string, + ) (bool, error) + + // LookupCaveatDefinition fetches a single caveat definition by name. + // Implementations are expected to cache results. + LookupCaveatDefinition( + ctx context.Context, + name string, + ) (datastore.CaveatDefinition, error) +} + +// NewQueryDatastoreReader wraps a datalayer.RevisionedReader as a QueryDatastoreReader. +func NewQueryDatastoreReader(r datalayer.RevisionedReader) QueryDatastoreReader { + return &datalayerQueryDatastoreReader{inner: r} +} + +type datalayerQueryDatastoreReader struct { + inner datalayer.RevisionedReader +} + +// convertRelationSeqToPathSeq converts an iter.Seq2[tuple.Relationship, error] from +// the datastore into a PathSeq by transforming each Relationship into a Path. +func convertRelationSeqToPathSeq(relSeq iter.Seq2[tuple.Relationship, error]) PathSeq { + return func(yield func(Path, error) bool) { + for rel, err := range relSeq { + if err != nil { + if !yield(Path{}, err) { + return + } + continue + } + if !yield(FromRelationship(rel), nil) { + return + } + } + } +} + +// buildSubjectRelationFilter returns the appropriate SubjectRelationFilter for a +// given subrelation string: ellipsis → WithEllipsisRelation, otherwise → WithNonEllipsisRelation. +func buildSubjectRelationFilter(subrelation string) datastore.SubjectRelationFilter { + if subrelation == tuple.Ellipsis { + return datastore.SubjectRelationFilter{}.WithEllipsisRelation() + } + return datastore.SubjectRelationFilter{}.WithNonEllipsisRelation(subrelation) +} + +func (r *datalayerQueryDatastoreReader) CheckRelationships( + ctx context.Context, + resources []Object, + resourceRelation string, + subject ObjectAndRelation, + withCaveats, withExpiration bool, +) (PathSeq, error) { + resourceIDs := make([]string, len(resources)) + for i, res := range resources { + resourceIDs[i] = res.ObjectID + } + + // All resources in a DatastoreIterator share the same type. + resourceType := "" + if len(resources) > 0 { + resourceType = resources[0].ObjectType + } + + filter := datastore.RelationshipsFilter{ + OptionalResourceType: resourceType, + OptionalResourceIds: resourceIDs, + OptionalResourceRelation: resourceRelation, + OptionalSubjectsSelectors: []datastore.SubjectsSelector{ + { + OptionalSubjectType: subject.ObjectType, + OptionalSubjectIds: []string{subject.ObjectID}, + RelationFilter: buildSubjectRelationFilter(subject.Relation), + }, + }, + } + + relIter, err := r.inner.QueryRelationships(ctx, filter, + options.WithSkipCaveats(!withCaveats), + options.WithSkipExpiration(!withExpiration), + options.WithQueryShape(queryshape.CheckPermissionSelectDirectSubjects), + ) + if err != nil { + return nil, err + } + return convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter)), nil +} + +func (r *datalayerQueryDatastoreReader) QuerySubjects( + ctx context.Context, + resource Object, + resourceRelation string, + subjectType ObjectType, + withCaveats, withExpiration bool, + page QueryPage, +) (PathSeq, error) { + filter := datastore.RelationshipsFilter{ + OptionalSubjectsSelectors: []datastore.SubjectsSelector{ + { + OptionalSubjectType: subjectType.Type, + RelationFilter: buildSubjectRelationFilter(subjectType.Subrelation), + }, + }, + } + // Non-empty fields constrain the query; empty means no constraint on that axis. + if resource.ObjectType != "" { + filter.OptionalResourceType = resource.ObjectType + } + if resource.ObjectID != "" { + filter.OptionalResourceIds = []string{resource.ObjectID} + } + if resourceRelation != "" { + filter.OptionalResourceRelation = resourceRelation + } + + queryOpts := []options.QueryOptionsOption{ + options.WithSkipCaveats(!withCaveats), + options.WithSkipExpiration(!withExpiration), + options.WithQueryShape(queryshape.AllSubjectsForResources), + } + if page.Limit != nil { + queryOpts = append(queryOpts, + options.WithLimit(page.Limit), + options.WithSort(options.ChooseEfficient), + ) + } + if page.Cursor != nil { + queryOpts = append(queryOpts, options.WithAfter(options.ToCursor(*page.Cursor))) + } + + relIter, err := r.inner.QueryRelationships(ctx, filter, queryOpts...) + if err != nil { + return nil, err + } + return convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter)), nil +} + +func (r *datalayerQueryDatastoreReader) QueryResources( + ctx context.Context, + resourceType string, + resourceRelation string, + subject ObjectAndRelation, + withCaveats, withExpiration bool, + page QueryPage, +) (PathSeq, error) { + filter := datastore.RelationshipsFilter{ + OptionalResourceType: resourceType, + OptionalResourceRelation: resourceRelation, + OptionalSubjectsSelectors: []datastore.SubjectsSelector{ + { + OptionalSubjectType: subject.ObjectType, + OptionalSubjectIds: []string{subject.ObjectID}, + RelationFilter: buildSubjectRelationFilter(subject.Relation), + }, + }, + } + + queryOpts := []options.QueryOptionsOption{ + options.WithSkipCaveats(!withCaveats), + options.WithSkipExpiration(!withExpiration), + options.WithQueryShape(queryshape.MatchingResourcesForSubject), + } + if page.Limit != nil { + queryOpts = append(queryOpts, + options.WithLimit(page.Limit), + options.WithSort(options.ChooseEfficient), + ) + } + if page.Cursor != nil { + queryOpts = append(queryOpts, options.WithAfter(options.ToCursor(*page.Cursor))) + } + + relIter, err := r.inner.QueryRelationships(ctx, filter, queryOpts...) + if err != nil { + return nil, err + } + return convertRelationSeqToPathSeq(iter.Seq2[tuple.Relationship, error](relIter)), nil +} + +func (r *datalayerQueryDatastoreReader) SubjectExistsAsRelationship( + ctx context.Context, + subject Object, + nonEllipsisRelation string, +) (bool, error) { + filter := datastore.RelationshipsFilter{ + OptionalSubjectsSelectors: []datastore.SubjectsSelector{ + { + OptionalSubjectType: subject.ObjectType, + OptionalSubjectIds: []string{subject.ObjectID}, + RelationFilter: datastore.SubjectRelationFilter{}.WithNonEllipsisRelation(nonEllipsisRelation), + }, + }, + OptionalExpirationOption: datastore.ExpirationFilterOptionNone, + } + + relIter, err := r.inner.QueryRelationships(ctx, filter, + options.WithLimit(&limitOne), + options.WithSkipExpiration(true), + ) + if err != nil { + return false, err + } + + for _, err := range relIter { + if err != nil { + return false, err + } + return true, nil + } + return false, nil +} + +func (r *datalayerQueryDatastoreReader) LookupCaveatDefinition( + ctx context.Context, + name string, +) (datastore.CaveatDefinition, error) { + sr, err := r.inner.ReadSchema(ctx) + if err != nil { + return nil, err + } + defs, err := sr.LookupCaveatDefinitionsByNames(ctx, []string{name}) + if err != nil { + return nil, err + } + def, ok := defs[name] + if !ok { + return nil, datastore.NewCaveatNameNotFoundErr(name) + } + return def, nil +} diff --git a/pkg/query/reader_timing.go b/pkg/query/reader_timing.go new file mode 100644 index 000000000..09b41a13e --- /dev/null +++ b/pkg/query/reader_timing.go @@ -0,0 +1,93 @@ +package query + +import ( + "context" + "time" + + "github.com/authzed/spicedb/pkg/datastore" +) + +// delayReader is a QueryDatastoreReader shim that sleeps for a fixed duration +// before each call to simulate network latency. It delegates all logic to an +// inner reader. +type delayReader struct { + delay time.Duration + inner QueryDatastoreReader +} + +// NewDelayReader wraps inner with a shim that sleeps for delay before every +// call. Use this in benchmarks to model realistic network round-trip costs. +func NewDelayReader(delay time.Duration, inner QueryDatastoreReader) QueryDatastoreReader { + return &delayReader{delay: delay, inner: inner} +} + +func (r *delayReader) sleep(ctx context.Context) error { + select { + case <-time.After(r.delay): + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (r *delayReader) CheckRelationships( + ctx context.Context, + resources []Object, + resourceRelation string, + subject ObjectAndRelation, + withCaveats, withExpiration bool, +) (PathSeq, error) { + if err := r.sleep(ctx); err != nil { + return nil, err + } + return r.inner.CheckRelationships(ctx, resources, resourceRelation, subject, withCaveats, withExpiration) +} + +func (r *delayReader) QuerySubjects( + ctx context.Context, + resource Object, + resourceRelation string, + subjectType ObjectType, + withCaveats, withExpiration bool, + page QueryPage, +) (PathSeq, error) { + if err := r.sleep(ctx); err != nil { + return nil, err + } + return r.inner.QuerySubjects(ctx, resource, resourceRelation, subjectType, withCaveats, withExpiration, page) +} + +func (r *delayReader) QueryResources( + ctx context.Context, + resourceType string, + resourceRelation string, + subject ObjectAndRelation, + withCaveats, withExpiration bool, + page QueryPage, +) (PathSeq, error) { + if err := r.sleep(ctx); err != nil { + return nil, err + } + return r.inner.QueryResources(ctx, resourceType, resourceRelation, subject, withCaveats, withExpiration, page) +} + +func (r *delayReader) SubjectExistsAsRelationship( + ctx context.Context, + subject Object, + nonEllipsisRelation string, +) (bool, error) { + if err := r.sleep(ctx); err != nil { + return false, err + } + return r.inner.SubjectExistsAsRelationship(ctx, subject, nonEllipsisRelation) +} + +func (r *delayReader) LookupCaveatDefinition( + ctx context.Context, + name string, +) (datastore.CaveatDefinition, error) { + if err := r.sleep(ctx); err != nil { + return nil, err + } + return r.inner.LookupCaveatDefinition(ctx, name) +} diff --git a/pkg/query/recursive.go b/pkg/query/recursive.go index d6109bcac..94c0fe662 100644 --- a/pkg/query/recursive.go +++ b/pkg/query/recursive.go @@ -17,7 +17,7 @@ const ( recursiveCheckIterSubjects recursiveCheckStrategy = iota // recursiveCheckIterResources calls IterResources with subject, filters by resources recursiveCheckIterResources - // recursiveCheckDeepening uses iterative deepening (current implementation) + // recursiveCheckDeepening uses iterative deepening recursiveCheckDeepening ) diff --git a/pkg/query/recursive_benchmark_test.go b/pkg/query/recursive_benchmark_test.go index c3ea66b65..faf344f73 100644 --- a/pkg/query/recursive_benchmark_test.go +++ b/pkg/query/recursive_benchmark_test.go @@ -44,7 +44,7 @@ func BenchmarkRecursiveShallowGraph(b *testing.B) { } ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(50)) b.ResetTimer() @@ -97,7 +97,7 @@ func BenchmarkRecursiveWideGraph(b *testing.B) { } ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(50)) b.ResetTimer() @@ -145,7 +145,7 @@ func BenchmarkRecursiveDeepGraph(b *testing.B) { } ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(50)) b.ResetTimer() @@ -175,7 +175,7 @@ func BenchmarkRecursiveEmptyGraph(b *testing.B) { } ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(50)) b.ResetTimer() @@ -234,7 +234,7 @@ func BenchmarkRecursiveSparseGraph(b *testing.B) { } ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(50)) b.ResetTimer() @@ -282,7 +282,7 @@ func BenchmarkRecursiveCyclicGraph(b *testing.B) { } ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(50)) b.ResetTimer() @@ -330,7 +330,7 @@ func BenchmarkRecursiveIterResources(b *testing.B) { } ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(50)) b.ResetTimer() diff --git a/pkg/query/recursive_coverage_test.go b/pkg/query/recursive_coverage_test.go index 52fc66307..68423781f 100644 --- a/pkg/query/recursive_coverage_test.go +++ b/pkg/query/recursive_coverage_test.go @@ -111,7 +111,7 @@ func TestBreadthFirstIterResources_MaxDepth(t *testing.T) { // Set a low max depth ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(3)) seq, err := recursive.IterResourcesImpl(ctx, ObjectAndRelation{ObjectType: "user", ObjectID: "alice", Relation: "..."}, NoObjectFilter()) @@ -143,7 +143,7 @@ func TestBreadthFirstIterResources_ErrorHandling(t *testing.T) { require.NoError(err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) seq, err := recursive.IterResourcesImpl(ctx, ObjectAndRelation{ObjectType: "user", ObjectID: "alice", Relation: "..."}, NoObjectFilter()) require.NoError(err) @@ -168,7 +168,7 @@ func TestBreadthFirstIterResources_ErrorHandling(t *testing.T) { require.NoError(err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) seq, err := recursive.IterResourcesImpl(ctx, ObjectAndRelation{ObjectType: "user", ObjectID: "alice", Relation: "..."}, NoObjectFilter()) require.NoError(err) @@ -202,7 +202,7 @@ func TestBreadthFirstIterResources_MergeOrSemantics(t *testing.T) { require.NoError(err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(5)) seq, err := recursive.IterResourcesImpl(ctx, ObjectAndRelation{ObjectType: "user", ObjectID: "alice", Relation: "..."}, NoObjectFilter()) @@ -230,7 +230,7 @@ func TestIterativeDeepening_MaxDepth(t *testing.T) { maxDepth := 5 ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(maxDepth)) seq, err := recursive.CheckImpl(ctx, []Object{{ObjectType: "folder", ObjectID: "folder1"}}, ObjectAndRelation{ObjectType: "user", ObjectID: "alice", Relation: "..."}) diff --git a/pkg/query/recursive_strategies_test.go b/pkg/query/recursive_strategies_test.go index b0bbef906..4382b4159 100644 --- a/pkg/query/recursive_strategies_test.go +++ b/pkg/query/recursive_strategies_test.go @@ -60,7 +60,7 @@ func TestRecursiveCheckStrategies(t *testing.T) { // Contexts contain mutable state (e.g., recursiveFrontierCollectors) // that must not be shared across concurrent goroutines. queryCtx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) // Create recursive iterator with the specific strategy recursive := NewRecursiveIterator(union, "folder", "view") @@ -110,7 +110,7 @@ func TestRecursiveCheckStrategiesEmpty(t *testing.T) { recursive := NewRecursiveIterator(emptyFixed, "folder", "view") queryCtx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) strategies := []recursiveCheckStrategy{ recursiveCheckIterSubjects, @@ -164,7 +164,7 @@ func TestRecursiveCheckStrategiesMultipleResources(t *testing.T) { require.NoError(t, err) queryCtx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) strategies := []recursiveCheckStrategy{ recursiveCheckIterSubjects, diff --git a/pkg/query/recursive_test.go b/pkg/query/recursive_test.go index 7932b8258..1b0ddf4d4 100644 --- a/pkg/query/recursive_test.go +++ b/pkg/query/recursive_test.go @@ -27,7 +27,7 @@ func TestRecursiveSentinel(t *testing.T) { require.NoError(t, err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) // CheckImpl should return empty seq, err := sentinel.CheckImpl(ctx, []Object{{ObjectType: "folder", ObjectID: "folder1"}}, ObjectAndRelation{ObjectType: "user", ObjectID: "tom", Relation: "..."}) @@ -63,7 +63,7 @@ func TestRecursiveIteratorEmptyBaseCase(t *testing.T) { require.NoError(t, err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) // Execute - should terminate immediately with empty result seq, err := recursive.CheckImpl(ctx, []Object{{ObjectType: "folder", ObjectID: "folder1"}}, ObjectAndRelation{ObjectType: "user", ObjectID: "tom", Relation: "..."}) @@ -167,7 +167,7 @@ func TestRecursiveIteratorExecutionError(t *testing.T) { require.NoError(t, err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) // Test CheckImpl with a faulty iterator seq, err := recursive.CheckImpl(ctx, []Object{{ObjectType: "folder", ObjectID: "folder1"}}, ObjectAndRelation{ObjectType: "user", ObjectID: "tom", Relation: "..."}) @@ -198,7 +198,7 @@ func TestRecursiveIteratorCollectionError(t *testing.T) { require.NoError(t, err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision))) // Test CheckImpl with a faulty iterator that fails on collection seq, err := recursive.CheckImpl(ctx, []Object{{ObjectType: "folder", ObjectID: "folder1"}}, ObjectAndRelation{ObjectType: "user", ObjectID: "tom", Relation: "..."}) @@ -223,7 +223,7 @@ func TestBFSEarlyTermination(t *testing.T) { require.NoError(t, err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(50)) // High max depth // IterSubjects on a node with no children (sentinel returns empty) @@ -271,7 +271,7 @@ func TestBFSCycleDetection(t *testing.T) { require.NoError(t, err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(10)) seq, err := recursive.IterSubjectsImpl(ctx, Object{ObjectType: "folder", ObjectID: "folder1"}, NoObjectFilter()) @@ -304,7 +304,7 @@ func TestBFSSelfReferential(t *testing.T) { require.NoError(t, err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(10)) seq, err := recursive.IterSubjectsImpl(ctx, Object{ObjectType: "folder", ObjectID: "folder1"}, NoObjectFilter()) @@ -346,7 +346,7 @@ func TestBFSResourcesWithEllipses(t *testing.T) { require.NoError(t, err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(datastore.NoRevision)), WithMaxRecursionDepth(5)) // Query IterResources - should find folder2 diff --git a/pkg/query/simplify_caveat_test.go b/pkg/query/simplify_caveat_test.go index 15267c890..4de367cdd 100644 --- a/pkg/query/simplify_caveat_test.go +++ b/pkg/query/simplify_caveat_test.go @@ -47,9 +47,7 @@ func TestSimplifyLeafCaveat(t *testing.T) { }) require.NoError(err) - dl := datalayer.NewDataLayer(ds) - sr, srErr := dl.SnapshotReader(revision).ReadSchema(ctx) - require.NoError(srErr) + sr := caveatDefinitionLookupAdapter{NewQueryDatastoreReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))} runner := internalcaveats.NewCaveatRunner(caveattypes.Default.TypeSet) // Create caveat expression without context @@ -136,9 +134,7 @@ func TestSimplifyAndOperation(t *testing.T) { }) require.NoError(err) - dl := datalayer.NewDataLayer(ds) - sr, srErr := dl.SnapshotReader(revision).ReadSchema(ctx) - require.NoError(srErr) + sr := caveatDefinitionLookupAdapter{NewQueryDatastoreReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))} runner := internalcaveats.NewCaveatRunner(caveattypes.Default.TypeSet) // Create AND expression: caveat1 AND caveat2 @@ -242,9 +238,7 @@ func TestSimplifyOrOperation(t *testing.T) { }) require.NoError(err) - dl := datalayer.NewDataLayer(ds) - sr, srErr := dl.SnapshotReader(revision).ReadSchema(ctx) - require.NoError(srErr) + sr := caveatDefinitionLookupAdapter{NewQueryDatastoreReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))} runner := internalcaveats.NewCaveatRunner(caveattypes.Default.TypeSet) // Create OR expression: caveat1 OR caveat2 @@ -360,9 +354,7 @@ func TestSimplifyNestedOperations(t *testing.T) { }) require.NoError(err) - dl := datalayer.NewDataLayer(ds) - sr, srErr := dl.SnapshotReader(revision).ReadSchema(ctx) - require.NoError(srErr) + sr := caveatDefinitionLookupAdapter{NewQueryDatastoreReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))} runner := internalcaveats.NewCaveatRunner(caveattypes.Default.TypeSet) // Create nested expression: (caveat1 OR caveat2) AND caveat3 @@ -447,9 +439,7 @@ func TestSimplifyOrWithSameCaveatDifferentContexts(t *testing.T) { }) require.NoError(err) - dl := datalayer.NewDataLayer(ds) - sr, srErr := dl.SnapshotReader(revision).ReadSchema(ctx) - require.NoError(srErr) + sr := caveatDefinitionLookupAdapter{NewQueryDatastoreReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))} runner := internalcaveats.NewCaveatRunner(caveattypes.Default.TypeSet) // Create OR expression: write_limit(limit=2) OR write_limit(limit=4) @@ -535,9 +525,7 @@ func TestSimplifyAndWithSameCaveatDifferentContexts(t *testing.T) { }) require.NoError(err) - dl := datalayer.NewDataLayer(ds) - sr, srErr := dl.SnapshotReader(revision).ReadSchema(ctx) - require.NoError(srErr) + sr := caveatDefinitionLookupAdapter{NewQueryDatastoreReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))} runner := internalcaveats.NewCaveatRunner(caveattypes.Default.TypeSet) // Create AND expression: write_limit(limit=2) AND write_limit(limit=4) @@ -633,9 +621,7 @@ func TestSimplifyNotWithSameCaveatDifferentContexts(t *testing.T) { }) require.NoError(err) - dl := datalayer.NewDataLayer(ds) - sr, srErr := dl.SnapshotReader(revision).ReadSchema(ctx) - require.NoError(srErr) + sr := caveatDefinitionLookupAdapter{NewQueryDatastoreReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))} runner := internalcaveats.NewCaveatRunner(caveattypes.Default.TypeSet) // Create NOT expression: NOT write_limit(limit=4) @@ -717,9 +703,7 @@ func TestSimplifyComplexNestedExpressions(t *testing.T) { }) require.NoError(err) - dl := datalayer.NewDataLayer(ds) - sr, srErr := dl.SnapshotReader(revision).ReadSchema(ctx) - require.NoError(srErr) + sr := caveatDefinitionLookupAdapter{NewQueryDatastoreReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))} runner := internalcaveats.NewCaveatRunner(caveattypes.Default.TypeSet) t.Run("OrOfAnds_ComplexNesting", func(t *testing.T) { @@ -1186,9 +1170,7 @@ func TestSimplifyWithEmptyContext(t *testing.T) { }) require.NoError(err) - dl := datalayer.NewDataLayer(ds) - sr, srErr := dl.SnapshotReader(revision).ReadSchema(ctx) - require.NoError(srErr) + sr := caveatDefinitionLookupAdapter{NewQueryDatastoreReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))} runner := internalcaveats.NewCaveatRunner(caveattypes.Default.TypeSet) // Create nested expression: (caveat1 OR caveat2) AND caveat3 @@ -1270,9 +1252,7 @@ func TestSimplifyNotConditional(t *testing.T) { }) require.NoError(err) - dl := datalayer.NewDataLayer(ds) - sr, srErr := dl.SnapshotReader(revision).ReadSchema(ctx) - require.NoError(srErr) + sr := caveatDefinitionLookupAdapter{NewQueryDatastoreReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))} runner := internalcaveats.NewCaveatRunner(caveattypes.Default.TypeSet) // Create NOT expression: NOT limit_check(limit=10) @@ -1358,9 +1338,7 @@ func TestSimplifyDeeplyNestedCaveats(t *testing.T) { }) require.NoError(err) - dl := datalayer.NewDataLayer(ds) - sr, srErr := dl.SnapshotReader(revision).ReadSchema(ctx) - require.NoError(srErr) + sr := caveatDefinitionLookupAdapter{NewQueryDatastoreReader(datalayer.NewDataLayer(ds).SnapshotReader(revision))} runner := internalcaveats.NewCaveatRunner(caveattypes.Default.TypeSet) // Helper to create caveat expressions diff --git a/pkg/query/tracing_test.go b/pkg/query/tracing_test.go index 3ab7e832b..dedc3b121 100644 --- a/pkg/query/tracing_test.go +++ b/pkg/query/tracing_test.go @@ -24,7 +24,7 @@ func TestIteratorTracing(t *testing.T) { require.NoError(t, err) ctx := NewLocalContext(context.Background(), - WithReader(datalayer.NewDataLayer(ds).SnapshotReader(revision)), + WithRevisionedReader(datalayer.NewDataLayer(ds).SnapshotReader(revision)), WithTraceLogger(traceLogger), ) diff --git a/pkg/query/wildcard_multirelation_test.go b/pkg/query/wildcard_multirelation_test.go index d67baaaa0..4ee41ada4 100644 --- a/pkg/query/wildcard_multirelation_test.go +++ b/pkg/query/wildcard_multirelation_test.go @@ -82,7 +82,7 @@ func TestIterSubjectsWildcardWithMultipleRelations(t *testing.T) { wildcardBranch := NewDatastoreIterator(viewerRel.BaseRelations()[1]) // user:* (wildcard) queryCtx := NewLocalContext(ctx, - WithReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision)), + WithRevisionedReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision)), WithTraceLogger(NewTraceLogger())) // Enable tracing for debugging subjects, err := queryCtx.IterSubjects(wildcardBranch, NewObject("document", "publicdoc"), NoObjectFilter()) require.NoError(err) @@ -114,7 +114,7 @@ func TestIterSubjectsWildcardWithMultipleRelations(t *testing.T) { ) queryCtx := NewLocalContext(ctx, - WithReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision)), + WithRevisionedReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision)), WithTraceLogger(NewTraceLogger())) // Enable tracing for debugging subjects, err := queryCtx.IterSubjects(union, NewObject("document", "publicdoc"), NoObjectFilter()) require.NoError(err) diff --git a/pkg/query/wildcard_subjects_test.go b/pkg/query/wildcard_subjects_test.go index e0affd25e..39eeb276b 100644 --- a/pkg/query/wildcard_subjects_test.go +++ b/pkg/query/wildcard_subjects_test.go @@ -72,7 +72,7 @@ func TestIterSubjectsWithWildcard(t *testing.T) { // The non-wildcard branch should only return concrete subjects, filtering out wildcards nonWildcardBranch := NewDatastoreIterator(viewerRel.BaseRelations()[0]) // user (non-wildcard) - queryCtx := NewLocalContext(ctx, WithReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision))) + queryCtx := NewLocalContext(ctx, WithRevisionedReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision))) subjects, err := queryCtx.IterSubjects(nonWildcardBranch, NewObject("resource", "first"), NoObjectFilter()) require.NoError(err) @@ -91,7 +91,7 @@ func TestIterSubjectsWithWildcard(t *testing.T) { // The wildcard branch should enumerate concrete subjects when a wildcard exists wildcardBranch := NewDatastoreIterator(viewerRel.BaseRelations()[1]) // user:* (wildcard) - queryCtx := NewLocalContext(ctx, WithReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision))) + queryCtx := NewLocalContext(ctx, WithRevisionedReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision))) subjects, err := queryCtx.IterSubjects(wildcardBranch, NewObject("resource", "first"), NoObjectFilter()) require.NoError(err) @@ -113,7 +113,7 @@ func TestIterSubjectsWithWildcard(t *testing.T) { NewDatastoreIterator(viewerRel.BaseRelations()[1]), // user:* ) - queryCtx := NewLocalContext(ctx, WithReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision))) + queryCtx := NewLocalContext(ctx, WithRevisionedReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision))) subjects, err := queryCtx.IterSubjects(union, NewObject("resource", "first"), NoObjectFilter()) require.NoError(err) @@ -180,7 +180,7 @@ func TestIterSubjectsWildcardWithoutWildcardRelationship(t *testing.T) { // The wildcard branch should return empty because there's no wildcard relationship wildcardBranch := NewDatastoreIterator(viewerRel.BaseRelations()[1]) // user:* (wildcard) - queryCtx := NewLocalContext(ctx, WithReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision))) + queryCtx := NewLocalContext(ctx, WithRevisionedReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision))) subjects, err := queryCtx.IterSubjects(wildcardBranch, NewObject("resource", "second"), NoObjectFilter()) require.NoError(err) @@ -196,7 +196,7 @@ func TestIterSubjectsWildcardWithoutWildcardRelationship(t *testing.T) { t.Parallel() nonWildcardBranch := NewDatastoreIterator(viewerRel.BaseRelations()[0]) // user (non-wildcard) - queryCtx := NewLocalContext(ctx, WithReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision))) + queryCtx := NewLocalContext(ctx, WithRevisionedReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision))) subjects, err := queryCtx.IterSubjects(nonWildcardBranch, NewObject("resource", "second"), NoObjectFilter()) require.NoError(err) From fe622359c360525bcdc498bc7462764dea4b52b7 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Tue, 3 Mar 2026 08:48:11 -0800 Subject: [PATCH 2/4] test: add double-length arrow benchmark test --- .../check_double_wide_arrow_benchmark_test.go | 245 ++++++++++++++++++ 1 file changed, 245 insertions(+) create mode 100644 pkg/query/benchmarks/check_double_wide_arrow_benchmark_test.go diff --git a/pkg/query/benchmarks/check_double_wide_arrow_benchmark_test.go b/pkg/query/benchmarks/check_double_wide_arrow_benchmark_test.go new file mode 100644 index 000000000..62cda3025 --- /dev/null +++ b/pkg/query/benchmarks/check_double_wide_arrow_benchmark_test.go @@ -0,0 +1,245 @@ +package benchmarks + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/memdb" + "github.com/authzed/spicedb/pkg/datalayer" + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/query" + "github.com/authzed/spicedb/pkg/schema/v2" + "github.com/authzed/spicedb/pkg/schemadsl/compiler" + "github.com/authzed/spicedb/pkg/schemadsl/input" + "github.com/authzed/spicedb/pkg/tuple" +) + +// BenchmarkCheckDoubleWideArrow benchmarks permission checking through two consecutive +// arrow hops with wide fan-out at each level. +// +// The hierarchy is: file -> org -> group -> user +// +// - 5 files, each belonging to 3 orgs +// - 23 orgs (prime), each containing 7 groups +// - 97 groups (prime), each with 15 members +// - 997 users (prime) +// +// The schema: +// +// definition group { relation member: user } +// definition org { relation group: group; permission member = group->member } +// definition file { relation org: org; relation view: user; +// permission viewer = view + org->member } +// +// Checking viewer on a file requires two arrow traversals: +// 1. file->org (fanout: orgs per file) +// 2. org->member which resolves to org->group->member (double fan-out) +// +// Four sub-benchmarks are run: +// - plain: compile the outline directly and run Check each iteration +// - advised: seed a CountAdvisor from a single warm-up run, apply it to the +// canonical outline, compile once, then run Check each iteration +// - plain_delay: same as plain, but with networkDelay latency per datastore call +// - advised_delay: same as advised, but with networkDelay latency per datastore call +func BenchmarkCheckDoubleWideArrow(b *testing.B) { + const ( + numFiles = 5 + numOrgs = 97 // prime + numGroups = 299 // prime + numUsers = 499 // prime + orgsPerFile = 20 + groupsPerOrg = 10 + usersPerGroup = 20 + ) + + // ---- shared setup ---- + + rawDS, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) + require.NoError(b, err) + + ctx := context.Background() + + schemaText := ` + definition user {} + + definition group { + relation member: user + } + + definition org { + relation group: group + permission member = group->member + } + + definition file { + relation org: org + relation view: user + permission viewer = view + org->member + } + ` + + compiled, err := compiler.Compile(compiler.InputSchema{ + Source: input.Source("benchmark"), + SchemaString: schemaText, + }, compiler.AllowUnprefixedObjectType()) + require.NoError(b, err) + + _, err = rawDS.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.LegacyWriteNamespaces(ctx, compiled.ObjectDefinitions...) + }) + require.NoError(b, err) + + relationships := make([]tuple.Relationship, 0, + numFiles*orgsPerFile+numOrgs*groupsPerOrg+numGroups*usersPerGroup) + + // file:fileN#org@org:orgM + for fileID := 0; fileID < numFiles; fileID++ { + step := fileID + 1 + for i := 0; i < orgsPerFile; i++ { + orgID := (i * step) % numOrgs + rel := fmt.Sprintf("file:file%d#org@org:org%d", fileID, orgID) + relationships = append(relationships, tuple.MustParse(rel)) + } + } + + // org:orgN#group@group:groupM + for orgID := 0; orgID < numOrgs; orgID++ { + step := orgID + 1 + for i := 0; i < groupsPerOrg; i++ { + groupID := (i * step) % numGroups + rel := fmt.Sprintf("org:org%d#group@group:group%d", orgID, groupID) + relationships = append(relationships, tuple.MustParse(rel)) + } + } + + // group:groupN#member@user:userM + for groupID := 0; groupID < numGroups; groupID++ { + step := groupID + 1 + for i := 0; i < usersPerGroup; i++ { + userID := (i * step) % numUsers + rel := fmt.Sprintf("group:group%d#member@user:user%d", groupID, userID) + relationships = append(relationships, tuple.MustParse(rel)) + } + } + + revision, err := common.WriteRelationships(ctx, rawDS, tuple.UpdateOperationCreate, relationships...) + require.NoError(b, err) + + dsSchema, err := schema.BuildSchemaFromDefinitions(compiled.ObjectDefinitions, nil) + require.NoError(b, err) + + // Build the canonical outline once; all sub-benchmarks derive from it. + canonicalOutline, err := query.BuildOutlineFromSchema(dsSchema, "file", "viewer") + require.NoError(b, err) + + // The resource and subject are the same for all sub-benchmarks. + resources := query.NewObjects("file", "file0") + subject := query.NewObject("user", "user181").WithEllipses() + + // Base reader (no simulated latency). + reader := query.NewQueryDatastoreReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision)) + + // Delay reader wrapping the base reader with simulated network latency. + delayReader := query.NewDelayReader(networkDelay, reader) + + // buildAdvisedIterator seeds a CountAdvisor from a single warm-up run using the + // provided reader and returns the compiled advised iterator. + buildAdvisedIterator := func(b *testing.B, r query.QueryDatastoreReader) query.Iterator { + b.Helper() + obs := query.NewCountObserver() + warmIt, err := canonicalOutline.Compile() + require.NoError(b, err) + warmCtx := query.NewLocalContext(ctx, + query.WithReader(r), + query.WithObserver(obs), + ) + seq, err := warmCtx.Check(warmIt, resources, subject) + require.NoError(b, err) + _, err = query.CollectAll(seq) + require.NoError(b, err) + + advisor := query.NewCountAdvisor(obs.GetStats()) + advisedCO, err := query.ApplyAdvisor(canonicalOutline, advisor) + require.NoError(b, err) + advisedIt, err := advisedCO.Compile() + require.NoError(b, err) + return advisedIt + } + + // ---- plain sub-benchmark ---- + + b.Run("plain", func(b *testing.B) { + it, err := canonicalOutline.Compile() + require.NoError(b, err) + + b.Log("plain explain:\n", it.Explain()) + + queryCtx := query.NewLocalContext(ctx, query.WithReader(reader)) + + b.ResetTimer() + for b.Loop() { + seq, err := queryCtx.Check(it, resources, subject) + require.NoError(b, err) + paths, err := query.CollectAll(seq) + require.NoError(b, err) + require.NotEmpty(b, paths) + } + }) + + // ---- advised sub-benchmark ---- + + b.Run("advised", func(b *testing.B) { + advisedIt := buildAdvisedIterator(b, reader) + + b.Log("advised explain:\n", advisedIt.Explain()) + + queryCtx := query.NewLocalContext(ctx, query.WithReader(reader)) + + b.ResetTimer() + for b.Loop() { + seq, err := queryCtx.Check(advisedIt, resources, subject) + require.NoError(b, err) + paths, err := query.CollectAll(seq) + require.NoError(b, err) + require.NotEmpty(b, paths) + } + }) + + // ---- plain_delay sub-benchmark ---- + + b.Run("plain_delay", func(b *testing.B) { + it, err := canonicalOutline.Compile() + require.NoError(b, err) + + queryCtx := query.NewLocalContext(ctx, query.WithReader(delayReader)) + + b.ResetTimer() + for b.Loop() { + seq, err := queryCtx.Check(it, resources, subject) + require.NoError(b, err) + paths, err := query.CollectAll(seq) + require.NoError(b, err) + require.NotEmpty(b, paths) + } + }) + + // ---- advised_delay sub-benchmark ---- + + b.Run("advised_delay", func(b *testing.B) { + advisedIt := buildAdvisedIterator(b, delayReader) + queryCtx := query.NewLocalContext(ctx, query.WithReader(delayReader)) + + b.ResetTimer() + for b.Loop() { + seq, err := queryCtx.Check(advisedIt, resources, subject) + require.NoError(b, err) + paths, err := query.CollectAll(seq) + require.NoError(b, err) + require.NotEmpty(b, paths) + } + }) +} From 0b6c0fec0ec58577a2950e6e1b4f7eb0720e580d Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Wed, 11 Mar 2026 14:21:06 -0700 Subject: [PATCH 3/4] fix: change interface to ensure fetching of a single resource objecttype --- pkg/query/datastore.go | 15 +++++++++++---- pkg/query/reader.go | 20 ++++++-------------- pkg/query/reader_timing.go | 5 +++-- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/pkg/query/datastore.go b/pkg/query/datastore.go index 96ed6d716..63cd9953c 100644 --- a/pkg/query/datastore.go +++ b/pkg/query/datastore.go @@ -50,10 +50,13 @@ func (r *DatastoreIterator) CheckImpl(ctx *Context, resources []Object, subject } func (r *DatastoreIterator) checkNormalImpl(ctx *Context, resources []Object, subject ObjectAndRelation) (PathSeq, error) { - ctx.TraceStep(r, "querying datastore for %s:%s with resources=%v", r.base.Type(), r.base.RelationName(), resourceIDs(resources)) + ids := resourceIDs(resources) + ctx.TraceStep(r, "querying datastore for %s:%s with resources=%v", r.base.Type(), r.base.RelationName(), ids) + resourceType := ObjectType{Type: r.base.DefinitionName()} pathSeq, err := ctx.Reader.CheckRelationships(ctx, - resources, + resourceType, + ids, r.base.RelationName(), subject, r.base.Caveat() != "", r.base.Expiration(), @@ -78,8 +81,10 @@ func (r *DatastoreIterator) checkWildcardImpl(ctx *Context, resources []Object, Relation: subject.Relation, } + resourceType := ObjectType{Type: r.base.DefinitionName()} pathSeq, err := ctx.Reader.CheckRelationships(ctx, - resources, + resourceType, + resourceIDs(resources), r.base.RelationName(), wildcardSubject, r.base.Caveat() != "", r.base.Expiration(), @@ -201,8 +206,10 @@ func (r *DatastoreIterator) iterSubjectsWildcardImpl(ctx *Context, resource Obje Relation: r.base.Subrelation(), } + resourceType := ObjectType{Type: r.base.DefinitionName()} wildcardPathSeq, err := ctx.Reader.CheckRelationships(ctx, - []Object{resource}, + resourceType, + []string{resource.ObjectID}, r.base.RelationName(), wildcardSubject, r.base.Caveat() != "", r.base.Expiration(), diff --git a/pkg/query/reader.go b/pkg/query/reader.go index cd0a3daf0..43e82e95d 100644 --- a/pkg/query/reader.go +++ b/pkg/query/reader.go @@ -30,9 +30,11 @@ type QueryPage struct { type QueryDatastoreReader interface { // CheckRelationships finds paths for specific resource objects matched against // a subject. subject.ObjectID may be WildcardObjectID for wildcard checks. + // All resource IDs must be of the same resourceType. CheckRelationships( ctx context.Context, - resources []Object, + resourceType ObjectType, + resourceIDs []string, resourceRelation string, subject ObjectAndRelation, withCaveats, withExpiration bool, @@ -116,24 +118,14 @@ func buildSubjectRelationFilter(subrelation string) datastore.SubjectRelationFil func (r *datalayerQueryDatastoreReader) CheckRelationships( ctx context.Context, - resources []Object, + resourceType ObjectType, + resourceIDs []string, resourceRelation string, subject ObjectAndRelation, withCaveats, withExpiration bool, ) (PathSeq, error) { - resourceIDs := make([]string, len(resources)) - for i, res := range resources { - resourceIDs[i] = res.ObjectID - } - - // All resources in a DatastoreIterator share the same type. - resourceType := "" - if len(resources) > 0 { - resourceType = resources[0].ObjectType - } - filter := datastore.RelationshipsFilter{ - OptionalResourceType: resourceType, + OptionalResourceType: resourceType.Type, OptionalResourceIds: resourceIDs, OptionalResourceRelation: resourceRelation, OptionalSubjectsSelectors: []datastore.SubjectsSelector{ diff --git a/pkg/query/reader_timing.go b/pkg/query/reader_timing.go index 09b41a13e..d1380f2e5 100644 --- a/pkg/query/reader_timing.go +++ b/pkg/query/reader_timing.go @@ -32,7 +32,8 @@ func (r *delayReader) sleep(ctx context.Context) error { func (r *delayReader) CheckRelationships( ctx context.Context, - resources []Object, + resourceType ObjectType, + resourceIDs []string, resourceRelation string, subject ObjectAndRelation, withCaveats, withExpiration bool, @@ -40,7 +41,7 @@ func (r *delayReader) CheckRelationships( if err := r.sleep(ctx); err != nil { return nil, err } - return r.inner.CheckRelationships(ctx, resources, resourceRelation, subject, withCaveats, withExpiration) + return r.inner.CheckRelationships(ctx, resourceType, resourceIDs, resourceRelation, subject, withCaveats, withExpiration) } func (r *delayReader) QuerySubjects( From a5fa99177a78c9666c157f3db9ada31da2efbed1 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Wed, 11 Mar 2026 14:47:49 -0700 Subject: [PATCH 4/4] fix: add wildcard invariant checks for subrelations (they must be ellipsis) --- pkg/query/datastore.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/pkg/query/datastore.go b/pkg/query/datastore.go index 63cd9953c..211ede72c 100644 --- a/pkg/query/datastore.go +++ b/pkg/query/datastore.go @@ -74,11 +74,19 @@ func (r *DatastoreIterator) checkNormalImpl(ctx *Context, resources []Object, su } func (r *DatastoreIterator) checkWildcardImpl(ctx *Context, resources []Object, subject ObjectAndRelation) (PathSeq, error) { + // Invariant: wildcard subjects in the datastore are always stored with the ellipsis + // relation. The "*" is only ever an ObjectID; "type:*#relation" is syntactically + // invalid and cannot be written. Any caller passing a non-ellipsis relation here + // would cause us to query with the wrong relation filter and return a false negative. + if subject.Relation != tuple.Ellipsis { + return nil, spiceerrors.MustBugf("checkWildcardImpl called with non-ellipsis subject relation %q for subject %s:%s; wildcard subjects are always stored with ellipsis relation", subject.Relation, subject.ObjectType, subject.ObjectID) + } + // Query the datastore for wildcard relationships (subject ObjectID = "*") wildcardSubject := ObjectAndRelation{ ObjectType: subject.ObjectType, ObjectID: WildcardObjectID, - Relation: subject.Relation, + Relation: tuple.Ellipsis, } resourceType := ObjectType{Type: r.base.DefinitionName()} @@ -392,10 +400,18 @@ func (r *DatastoreIterator) IterResourcesImpl(ctx *Context, subject ObjectAndRel } func (r *DatastoreIterator) iterResourcesWildcardImpl(ctx *Context, subject ObjectAndRelation) (PathSeq, error) { + // Invariant: wildcard subjects in the datastore are always stored with the ellipsis + // relation. The "*" is only ever an ObjectID; "type:*#relation" is syntactically + // invalid and cannot be written. Any caller passing a non-ellipsis relation here + // would cause us to query with the wrong relation filter and return a false negative. + if subject.Relation != tuple.Ellipsis { + return nil, spiceerrors.MustBugf("iterResourcesWildcardImpl called with non-ellipsis subject relation %q for subject %s:%s; wildcard subjects are always stored with ellipsis relation", subject.Relation, subject.ObjectType, subject.ObjectID) + } + wildcardSubject := ObjectAndRelation{ ObjectType: subject.ObjectType, ObjectID: WildcardObjectID, - Relation: subject.Relation, + Relation: tuple.Ellipsis, } if ctx.PaginationLimit == nil {