diff --git a/kurrentdb/append_options.go b/kurrentdb/append_options.go index d907a8ed..a83ca6d2 100644 --- a/kurrentdb/append_options.go +++ b/kurrentdb/append_options.go @@ -6,8 +6,8 @@ import ( // AppendToStreamOptions options of the append stream request. type AppendToStreamOptions struct { - // Asks the server to check that the stream receiving the event is at the given expected version. - ExpectedRevision ExpectedRevision + // Asks the server to check that the stream receiving the event is at the expected state. + StreamState StreamState // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. @@ -33,7 +33,7 @@ func (o *AppendToStreamOptions) requiresLeader() bool { } func (o *AppendToStreamOptions) setDefaults() { - if o.ExpectedRevision == nil { - o.ExpectedRevision = Any{} + if o.StreamState == nil { + o.StreamState = Any{} } } diff --git a/kurrentdb/append_test.go b/kurrentdb/append_test.go index c71d242f..cc7fc82d 100644 --- a/kurrentdb/append_test.go +++ b/kurrentdb/append_test.go @@ -64,7 +64,7 @@ func appendToStreamSingleEventNoStream(db *kurrentdb.Client) TestCall { defer cancel() opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } _, err := db.AppendToStream(context, streamID.String(), opts, testEvent) @@ -103,7 +103,7 @@ func appendWithInvalidStreamRevision(db *kurrentdb.Client) TestCall { defer cancel() opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.StreamExists{}, + StreamState: kurrentdb.StreamExists{}, } _, err := db.AppendToStream(context, streamID.String(), opts, createTestEvent()) @@ -139,7 +139,7 @@ func appendToSystemStreamWithIncorrectCredentials(container *Container) TestCall defer cancel() opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.Any{}, + StreamState: kurrentdb.Any{}, } _, err = db.AppendToStream(context, streamID.String(), opts, createTestEvent()) @@ -156,7 +156,7 @@ func metadataOperation(db *kurrentdb.Client) TestCall { defer cancel() opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.Any{}, + StreamState: kurrentdb.Any{}, } _, err := db.AppendToStream(context, streamID.String(), opts, createTestEvent()) diff --git a/kurrentdb/client.go b/kurrentdb/client.go index 74e2cfca..ea764857 100644 --- a/kurrentdb/client.go +++ b/kurrentdb/client.go @@ -68,7 +68,7 @@ func (client *Client) AppendToStream( return nil, fmt.Errorf("could not construct append operation. Reason: %w", err) } - header := toAppendHeader(streamID, opts.ExpectedRevision) + header := toAppendHeader(streamID, opts.StreamState) if err := appendOperation.Send(header); err != nil { err = client.grpcClient.handleError(handle, trailers, err) @@ -248,7 +248,7 @@ func (client *Client) DeleteStream( callOptions := []grpc.CallOption{grpc.Header(&headers), grpc.Trailer(&trailers)} callOptions, ctx, cancel := configureGrpcCall(parent, client.config, &opts, callOptions, client.grpcClient.perRPCCredentials) defer cancel() - deleteRequest := toDeleteRequest(streamID, opts.ExpectedRevision) + deleteRequest := toDeleteRequest(streamID, opts.StreamState) deleteResponse, err := streamsClient.Delete(ctx, deleteRequest, callOptions...) if err != nil { err = client.grpcClient.handleError(handle, trailers, err) @@ -278,7 +278,7 @@ func (client *Client) TombstoneStream( callOptions := []grpc.CallOption{grpc.Header(&headers), grpc.Trailer(&trailers)} callOptions, ctx, cancel := configureGrpcCall(parent, client.config, &opts, callOptions, client.grpcClient.perRPCCredentials) defer cancel() - tombstoneRequest := toTombstoneRequest(streamID, opts.ExpectedRevision) + tombstoneRequest := toTombstoneRequest(streamID, opts.StreamState) tombstoneResponse, err := streamsClient.Tombstone(ctx, tombstoneRequest, callOptions...) if err != nil { diff --git a/kurrentdb/client_certificates_test.go b/kurrentdb/client_certificates_test.go index 18e9fbd1..175705db 100644 --- a/kurrentdb/client_certificates_test.go +++ b/kurrentdb/client_certificates_test.go @@ -144,7 +144,7 @@ func testInvalidUserCertificates(t *testing.T, endpoint string) { streamID := uuid.NewString() opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.Any{}, + StreamState: kurrentdb.Any{}, } result, err := c.AppendToStream(context.Background(), streamID, opts, testEvent) diff --git a/kurrentdb/connection_test.go b/kurrentdb/connection_test.go index 032a7c2b..452d3e49 100644 --- a/kurrentdb/connection_test.go +++ b/kurrentdb/connection_test.go @@ -27,7 +27,7 @@ func closeConnection(container *Container) TestCall { context, cancel := context.WithTimeout(context.Background(), time.Duration(5)*time.Second) defer cancel() opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } _, err := db.AppendToStream(context, streamID.String(), opts, testEvent) @@ -36,7 +36,7 @@ func closeConnection(container *Container) TestCall { } db.Close() - opts.ExpectedRevision = kurrentdb.Any{} + opts.StreamState = kurrentdb.Any{} _, err = db.AppendToStream(context, streamID.String(), opts, testEvent) esdbErr, ok := kurrentdb.FromError(err) diff --git a/kurrentdb/delete_options.go b/kurrentdb/delete_options.go index bbc1c830..32d0da57 100644 --- a/kurrentdb/delete_options.go +++ b/kurrentdb/delete_options.go @@ -6,8 +6,8 @@ import ( // DeleteStreamOptions options of the delete stream request. type DeleteStreamOptions struct { - // Asks the server to check that the stream receiving the event is at the given expected version. - ExpectedRevision ExpectedRevision + // Asks the server to check that the stream receiving the event is at the expected state. + StreamState StreamState // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. @@ -33,7 +33,7 @@ func (o *DeleteStreamOptions) requiresLeader() bool { } func (o *DeleteStreamOptions) setDefaults() { - if o.ExpectedRevision == nil { - o.ExpectedRevision = Any{} + if o.StreamState == nil { + o.StreamState = Any{} } } diff --git a/kurrentdb/delete_test.go b/kurrentdb/delete_test.go index 1dfbf314..977c4b1a 100644 --- a/kurrentdb/delete_test.go +++ b/kurrentdb/delete_test.go @@ -19,7 +19,7 @@ func DeleteTests(t *testing.T, db *kurrentdb.Client) { func canDeleteStream(db *kurrentdb.Client) TestCall { return func(t *testing.T) { opts := kurrentdb.DeleteStreamOptions{ - ExpectedRevision: kurrentdb.Revision(0), + StreamState: kurrentdb.Revision(0), } streamID := NAME_GENERATOR.Generate() @@ -43,7 +43,7 @@ func canTombstoneStream(db *kurrentdb.Client) TestCall { _, err := db.AppendToStream(context.Background(), streamId, kurrentdb.AppendToStreamOptions{}, createTestEvent()) deleteResult, err := db.TombstoneStream(context.Background(), streamId, kurrentdb.TombstoneStreamOptions{ - ExpectedRevision: kurrentdb.Revision(0), + StreamState: kurrentdb.Revision(0), }) if err != nil { diff --git a/kurrentdb/persistent_subscription_read_test.go b/kurrentdb/persistent_subscription_read_test.go index 12bbf09a..9b42b89a 100644 --- a/kurrentdb/persistent_subscription_read_test.go +++ b/kurrentdb/persistent_subscription_read_test.go @@ -79,7 +79,7 @@ func persistentSubscription_ToExistingStream_StartFromBeginning_AndEventsInIt(cl streamID := NAME_GENERATOR.Generate() // append events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events); opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } _, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events...) @@ -131,7 +131,7 @@ func persistentSubscription_ToNonExistingStream_StartFromBeginning_AppendEventsA require.NoError(t, err) // append events to StreamsClient.AppendToStreamAsync(Stream, stream_revision.StreamRevisionNoStream, Events); opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } _, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events...) require.NoError(t, err) @@ -160,7 +160,7 @@ func persistentSubscription_ToExistingStream_StartFromEnd_EventsInItAndAppendEve streamID := NAME_GENERATOR.Generate() // append events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events); opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } _, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:10]...) require.NoError(t, err) @@ -177,7 +177,7 @@ func persistentSubscription_ToExistingStream_StartFromEnd_EventsInItAndAppendEve require.NoError(t, err) // append 1 event to StreamsClient.AppendToStreamAsync(Stream, new StreamRevision(9), event[10]) - opts.ExpectedRevision = kurrentdb.Revision(9) + opts.StreamState = kurrentdb.Revision(9) _, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events[10:]...) require.NoError(t, err) @@ -205,7 +205,7 @@ func persistentSubscription_ToExistingStream_StartFromEnd_EventsInIt(clientInsta streamID := NAME_GENERATOR.Generate() // append events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, Events); opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } _, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:10]...) @@ -274,7 +274,7 @@ func persistentSubscription_ToNonExistingStream_StartFromTwo_AppendEventsAfterwa require.NoError(t, err) // append 3 event to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, events) opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } _, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events...) require.NoError(t, err) @@ -302,7 +302,7 @@ func persistentSubscription_ToExistingStream_StartFrom10_EventsInItAppendEventsA // append 10 events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, events[:10]); opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } streamID := NAME_GENERATOR.Generate() _, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:10]...) @@ -322,7 +322,7 @@ func persistentSubscription_ToExistingStream_StartFrom10_EventsInItAppendEventsA // append 1 event to StreamsClient.AppendToStreamAsync(Stream, StreamRevision(9), events[10:) opts = kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.Revision(9), + StreamState: kurrentdb.Revision(9), } _, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events[10:]...) require.NoError(t, err) @@ -351,7 +351,7 @@ func persistentSubscription_ToExistingStream_StartFrom4_EventsInIt(clientInstanc // append 10 events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, events[:10]); opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } streamID := NAME_GENERATOR.Generate() _, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:10]...) @@ -372,7 +372,7 @@ func persistentSubscription_ToExistingStream_StartFrom4_EventsInIt(clientInstanc // append 1 event to StreamsClient.AppendToStreamAsync(Stream, StreamRevision(9), events) opts = kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.Revision(9), + StreamState: kurrentdb.Revision(9), } _, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events[10:]...) require.NoError(t, err) @@ -402,7 +402,7 @@ func persistentSubscription_ToExistingStream_StartFromHigherRevisionThenEventsIn // append 10 events to StreamsClient.AppendToStreamAsync(Stream, StreamState.NoStream, events[:10]); streamID := NAME_GENERATOR.Generate() opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } _, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events[:11]...) require.NoError(t, err) @@ -421,7 +421,7 @@ func persistentSubscription_ToExistingStream_StartFromHigherRevisionThenEventsIn // append event to StreamsClient.AppendToStreamAsync(Stream, StreamRevision(10), events[11:]) opts = kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.Revision(10), + StreamState: kurrentdb.Revision(10), } _, err = clientInstance.AppendToStream(context.Background(), streamID, opts, events[11]) diff --git a/kurrentdb/persistent_subscriptions_test.go b/kurrentdb/persistent_subscriptions_test.go index 73e06470..5bc51599 100644 --- a/kurrentdb/persistent_subscriptions_test.go +++ b/kurrentdb/persistent_subscriptions_test.go @@ -247,7 +247,7 @@ func pushEventsToStream(t *testing.T, events []kurrentdb.EventData) { opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } _, err := clientInstance.AppendToStream(context.Background(), streamID, opts, events...) diff --git a/kurrentdb/protobuf_utils.go b/kurrentdb/protobuf_utils.go index 0648152a..40499d08 100644 --- a/kurrentdb/protobuf_utils.go +++ b/kurrentdb/protobuf_utils.go @@ -29,7 +29,7 @@ const systemMetadataKeysContentType = "content-type" const systemMetadataKeysCreated = "created" // toAppendHeader ... -func toAppendHeader(streamID string, streamRevision ExpectedRevision) *api.AppendReq { +func toAppendHeader(streamID string, streamRevision StreamState) *api.AppendReq { appendReq := &api.AppendReq{ Content: &api.AppendReq_Options_{ Options: &api.AppendReq_Options{}, @@ -206,7 +206,7 @@ func toFilterOptions(options *SubscriptionFilterOptions) (*api.ReadReq_Options_F return &filterOptions, nil } -func toDeleteRequest(streamID string, streamRevision ExpectedRevision) *api.DeleteReq { +func toDeleteRequest(streamID string, streamRevision StreamState) *api.DeleteReq { deleteReq := &api.DeleteReq{ Options: &api.DeleteReq_Options{ StreamIdentifier: &shared.StreamIdentifier{ @@ -237,7 +237,7 @@ func toDeleteRequest(streamID string, streamRevision ExpectedRevision) *api.Dele return deleteReq } -func toTombstoneRequest(streamID string, streamRevision ExpectedRevision) *api.TombstoneReq { +func toTombstoneRequest(streamID string, streamRevision StreamState) *api.TombstoneReq { tombstoneReq := &api.TombstoneReq{ Options: &api.TombstoneReq_Options{ StreamIdentifier: &shared.StreamIdentifier{ diff --git a/kurrentdb/read_stream_test.go b/kurrentdb/read_stream_test.go index 98ae2503..10f47e3a 100644 --- a/kurrentdb/read_stream_test.go +++ b/kurrentdb/read_stream_test.go @@ -199,7 +199,7 @@ func readStreamReturnsEOFAfterCompletion(db *kurrentdb.Client) TestCall { } opts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } streamID := NAME_GENERATOR.Generate() diff --git a/kurrentdb/revision.go b/kurrentdb/revision.go index af4d7699..16c27252 100644 --- a/kurrentdb/revision.go +++ b/kurrentdb/revision.go @@ -9,11 +9,11 @@ type StreamExists struct{} // NoStream means the stream being written to should not yet exist. type NoStream struct{} -// ExpectedRevision the use of expected revision can be a bit tricky especially when discussing guaranties given by +// StreamState the use of expected revision can be a bit tricky especially when discussing guaranties given by // KurrentDB server. The KurrentDB server will assure idempotency for all requests using any value in -// ExpectedRevision except Any. When using Any, the KurrentDB server will do its best to assure idempotency but +// StreamState except Any. When using Any, the KurrentDB server will do its best to assure idempotency but // will not guarantee it. -type ExpectedRevision interface { +type StreamState interface { isExpectedRevision() } diff --git a/kurrentdb/subscriptions_test.go b/kurrentdb/subscriptions_test.go index 9a985545..80066109 100644 --- a/kurrentdb/subscriptions_test.go +++ b/kurrentdb/subscriptions_test.go @@ -140,7 +140,7 @@ func streamSubscriptionDeliversAllEventsInStreamAndListensForNewEvents(db *kurre // Write a new event opts2 := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.Revision(5_999), + StreamState: kurrentdb.Revision(5_999), } writeResult, err := db.AppendToStream(context.Background(), streamID, opts2, testEvent) require.NoError(t, err) diff --git a/kurrentdb/tombstone_options.go b/kurrentdb/tombstone_options.go index ef0a5f28..da3a3e75 100644 --- a/kurrentdb/tombstone_options.go +++ b/kurrentdb/tombstone_options.go @@ -4,8 +4,8 @@ import "time" // TombstoneStreamOptions options of the tombstone stream request. type TombstoneStreamOptions struct { - // Asks the server to check that the stream receiving the event is at the given expected version. - ExpectedRevision ExpectedRevision + // Asks the server to check that the stream receiving the event is at the expected state. + StreamState StreamState // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. @@ -31,7 +31,7 @@ func (o *TombstoneStreamOptions) requiresLeader() bool { } func (o *TombstoneStreamOptions) setDefaults() { - if o.ExpectedRevision == nil { - o.ExpectedRevision = Any{} + if o.StreamState == nil { + o.StreamState = Any{} } } diff --git a/samples/appendingEvents.go b/samples/appendingEvents.go index 7b4502a7..bafb7cb6 100644 --- a/samples/appendingEvents.go +++ b/samples/appendingEvents.go @@ -27,7 +27,7 @@ func AppendToStream(db *kurrentdb.Client) { } options := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } result, err := db.AppendToStream(context.Background(), "some-stream", options, kurrentdb.EventData{ @@ -89,7 +89,7 @@ func AppendWithNoStream(db *kurrentdb.Client) { } options := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: kurrentdb.NoStream{}, + StreamState: kurrentdb.NoStream{}, } _, err = db.AppendToStream(context.Background(), "same-event-stream", options, kurrentdb.EventData{ @@ -151,7 +151,7 @@ func AppendWithConcurrencyCheck(db *kurrentdb.Client) { } aopts := kurrentdb.AppendToStreamOptions{ - ExpectedRevision: lastEvent.OriginalStreamRevision(), + StreamState: lastEvent.OriginalStreamRevision(), } _, err = db.AppendToStream(context.Background(), "concurrency-stream", aopts, kurrentdb.EventData{