diff --git a/db/migrate/20260226121600_sequent_fix_sequence_number_check.rb b/db/migrate/20260226121600_sequent_fix_sequence_number_check.rb new file mode 100644 index 00000000..b463d0d1 --- /dev/null +++ b/db/migrate/20260226121600_sequent_fix_sequence_number_check.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +class SequentFixSequenceNumberCheck < ActiveRecord::Migration[7.2] + def up + Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do + execute_sql_file 'store_events', version: 5 + end + end + + def down + Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do + execute_sql_file 'store_events', version: 4 + end + end + + private + + def execute_sql_file(filename, version:) + say "Applying '#{filename}' version #{version}", true + suppress_messages do + execute File.read( + File.join( + File.dirname(__FILE__), + format('sequent/%s_v%02d.sql', filename, version), + ), + ) + end + end +end diff --git a/db/migrate/sequent/store_events_v05.sql b/db/migrate/sequent/store_events_v05.sql new file mode 100644 index 00000000..f7687abc --- /dev/null +++ b/db/migrate/sequent/store_events_v05.sql @@ -0,0 +1,64 @@ +CREATE OR REPLACE PROCEDURE store_events(_command jsonb, _aggregates_with_events jsonb) +LANGUAGE plpgsql SET search_path FROM CURRENT AS $$ +DECLARE + _command_id commands.id%TYPE; + _aggregates jsonb; + _aggregate jsonb; + _events jsonb; + _aggregate_id aggregates.aggregate_id%TYPE; + _events_partition_key aggregates.events_partition_key%TYPE; + _last_sequence_number events.sequence_number%TYPE; + _next_sequence_number events.sequence_number%TYPE; +BEGIN + CALL update_types(_command, _aggregates_with_events); + + _command_id = store_command(_command); + + CALL store_aggregates(_aggregates_with_events); + + FOR _aggregate, _events IN SELECT row->0, row->1 FROM jsonb_array_elements(_aggregates_with_events) AS row + ORDER BY row->0->'aggregate_id', row->1->0->'event_json'->'sequence_number' + LOOP + _aggregate_id = _aggregate->>'aggregate_id'; + SELECT events_partition_key INTO STRICT _events_partition_key FROM aggregates WHERE aggregate_id = _aggregate_id FOR NO KEY UPDATE; + + SELECT sequence_number + INTO _last_sequence_number + FROM events + WHERE partition_key = _events_partition_key + AND aggregate_id = _aggregate_id + ORDER BY 1 DESC + LIMIT 1; + + SELECT MIN((event->'event_json'->'sequence_number')::integer) + INTO _next_sequence_number + FROM jsonb_array_elements(_events) AS event; + + -- Check sequence number of first new event to ensure optimistic locking works correctly + -- (otherwise two concurrent transactions could insert events with different first/next + -- sequence number and no constraint violation would be raised). + IF _last_sequence_number IS NULL AND _next_sequence_number <> 1 THEN + RAISE EXCEPTION 'sequence_number of first event must be 1, but was % (aggregate %)', _next_sequence_number, _aggregate_id + USING ERRCODE = 'integrity_constraint_violation'; + ELSIF _last_sequence_number IS NOT NULL AND _next_sequence_number > _last_sequence_number + 1 THEN + RAISE EXCEPTION 'sequence_number must be consecutive, but last sequence number was % and next is % (aggregate %)', + _last_sequence_number, _next_sequence_number, _aggregate_id + USING ERRCODE = 'integrity_constraint_violation'; + END IF; + + INSERT INTO events (partition_key, aggregate_id, sequence_number, created_at, command_id, event_type_id, event_json) + SELECT _events_partition_key, + _aggregate_id, + (event->'event_json'->'sequence_number')::integer, + (event->>'created_at')::timestamptz, + _command_id, + (SELECT id FROM event_types WHERE type = event->>'event_type'), + (event->'event_json') - '{aggregate_id,created_at,event_type,sequence_number}'::text[] + FROM jsonb_array_elements(_events) AS event + ORDER BY 1, 2, 3; + END LOOP; + + _aggregates = (SELECT jsonb_agg(row->0) FROM jsonb_array_elements(_aggregates_with_events) AS row); + CALL update_unique_keys(_aggregates); +END; +$$; diff --git a/db/structure.sql b/db/structure.sql index 4127bb91..d32a55ab 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -569,7 +569,7 @@ BEGIN ORDER BY 1 DESC LIMIT 1; - SELECT MIN(event->'event_json'->>'sequence_number') + SELECT MIN((event->'event_json'->'sequence_number')::integer) INTO _next_sequence_number FROM jsonb_array_elements(_events) AS event; @@ -1588,6 +1588,7 @@ ALTER TABLE ONLY sequent_schema.snapshot_records SET search_path TO public,view_schema,sequent_schema; INSERT INTO "schema_migrations" (version) VALUES +('20260226121600'), ('20260129130000'), ('20250815103000'), ('20250630113000'), diff --git a/spec/lib/sequent/core/event_store_spec.rb b/spec/lib/sequent/core/event_store_spec.rb index 72e58989..b1beb7e3 100644 --- a/spec/lib/sequent/core/event_store_spec.rb +++ b/spec/lib/sequent/core/event_store_spec.rb @@ -478,6 +478,36 @@ class MyAggregate < Sequent::Core::AggregateRoot ) end.to raise_error(ActiveRecord::StatementInvalid) end + + it 'https://github.com/zilverline/sequent/issues/514 is fixed' do + stream = Sequent::Core::EventStream.new( + aggregate_type: 'MyAggregate', + aggregate_id: Sequent.new_uuid, + ) + events = (1..8).map { |i| MyEvent.new(aggregate_id:, sequence_number: i) } + + event_store.commit_events( + Sequent::Core::Command.new(aggregate_id:), + [ + [stream, events], + ], + ) + + expect do + event_store.commit_events( + Sequent::Core::Command.new(aggregate_id:), + [ + [ + stream, + [ + MyEvent.new(aggregate_id:, sequence_number: 9), + MyEvent.new(aggregate_id:, sequence_number: 10), + ], + ], + ], + ) + end.to change(Sequent::Core::EventRecord, :count).by(2) + end end describe '#permanently_delete_events' do