Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions db/migrate/20260226121600_sequent_fix_sequence_number_check.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

class SequentFixSequenceNumberCheck < ActiveRecord::Migration[7.2]
def up
Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do
execute_sql_file 'store_events', version: 5
end
end

def down
Sequent::Support::Database.with_search_path(Sequent.configuration.event_store_schema_name) do
execute_sql_file 'store_events', version: 4
end
end

private

def execute_sql_file(filename, version:)
say "Applying '#{filename}' version #{version}", true
suppress_messages do
execute File.read(
File.join(
File.dirname(__FILE__),
format('sequent/%s_v%02d.sql', filename, version),
),
)
end
end
end
64 changes: 64 additions & 0 deletions db/migrate/sequent/store_events_v05.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
CREATE OR REPLACE PROCEDURE store_events(_command jsonb, _aggregates_with_events jsonb)
LANGUAGE plpgsql SET search_path FROM CURRENT AS $$
DECLARE
_command_id commands.id%TYPE;
_aggregates jsonb;
_aggregate jsonb;
_events jsonb;
_aggregate_id aggregates.aggregate_id%TYPE;
_events_partition_key aggregates.events_partition_key%TYPE;
_last_sequence_number events.sequence_number%TYPE;
_next_sequence_number events.sequence_number%TYPE;
BEGIN
CALL update_types(_command, _aggregates_with_events);

_command_id = store_command(_command);

CALL store_aggregates(_aggregates_with_events);

FOR _aggregate, _events IN SELECT row->0, row->1 FROM jsonb_array_elements(_aggregates_with_events) AS row
ORDER BY row->0->'aggregate_id', row->1->0->'event_json'->'sequence_number'
LOOP
_aggregate_id = _aggregate->>'aggregate_id';
SELECT events_partition_key INTO STRICT _events_partition_key FROM aggregates WHERE aggregate_id = _aggregate_id FOR NO KEY UPDATE;

SELECT sequence_number
INTO _last_sequence_number
FROM events
WHERE partition_key = _events_partition_key
AND aggregate_id = _aggregate_id
ORDER BY 1 DESC
LIMIT 1;

SELECT MIN((event->'event_json'->'sequence_number')::integer)
INTO _next_sequence_number
FROM jsonb_array_elements(_events) AS event;

-- Check sequence number of first new event to ensure optimistic locking works correctly
-- (otherwise two concurrent transactions could insert events with different first/next
-- sequence number and no constraint violation would be raised).
IF _last_sequence_number IS NULL AND _next_sequence_number <> 1 THEN
RAISE EXCEPTION 'sequence_number of first event must be 1, but was % (aggregate %)', _next_sequence_number, _aggregate_id
USING ERRCODE = 'integrity_constraint_violation';
ELSIF _last_sequence_number IS NOT NULL AND _next_sequence_number > _last_sequence_number + 1 THEN
RAISE EXCEPTION 'sequence_number must be consecutive, but last sequence number was % and next is % (aggregate %)',
_last_sequence_number, _next_sequence_number, _aggregate_id
USING ERRCODE = 'integrity_constraint_violation';
END IF;

INSERT INTO events (partition_key, aggregate_id, sequence_number, created_at, command_id, event_type_id, event_json)
SELECT _events_partition_key,
_aggregate_id,
(event->'event_json'->'sequence_number')::integer,
(event->>'created_at')::timestamptz,
_command_id,
(SELECT id FROM event_types WHERE type = event->>'event_type'),
(event->'event_json') - '{aggregate_id,created_at,event_type,sequence_number}'::text[]
FROM jsonb_array_elements(_events) AS event
ORDER BY 1, 2, 3;
END LOOP;

_aggregates = (SELECT jsonb_agg(row->0) FROM jsonb_array_elements(_aggregates_with_events) AS row);
CALL update_unique_keys(_aggregates);
END;
$$;
3 changes: 2 additions & 1 deletion db/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ BEGIN
ORDER BY 1 DESC
LIMIT 1;

SELECT MIN(event->'event_json'->>'sequence_number')
SELECT MIN((event->'event_json'->'sequence_number')::integer)
INTO _next_sequence_number
FROM jsonb_array_elements(_events) AS event;

Expand Down Expand Up @@ -1588,6 +1588,7 @@ ALTER TABLE ONLY sequent_schema.snapshot_records
SET search_path TO public,view_schema,sequent_schema;

INSERT INTO "schema_migrations" (version) VALUES
('20260226121600'),
('20260129130000'),
('20250815103000'),
('20250630113000'),
Expand Down
30 changes: 30 additions & 0 deletions spec/lib/sequent/core/event_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,36 @@ class MyAggregate < Sequent::Core::AggregateRoot
)
end.to raise_error(ActiveRecord::StatementInvalid)
end

it 'https://github.com/zilverline/sequent/issues/514 is fixed' do
stream = Sequent::Core::EventStream.new(
aggregate_type: 'MyAggregate',
aggregate_id: Sequent.new_uuid,
)
events = (1..8).map { |i| MyEvent.new(aggregate_id:, sequence_number: i) }

event_store.commit_events(
Sequent::Core::Command.new(aggregate_id:),
[
[stream, events],
],
)

expect do
event_store.commit_events(
Sequent::Core::Command.new(aggregate_id:),
[
[
stream,
[
MyEvent.new(aggregate_id:, sequence_number: 9),
MyEvent.new(aggregate_id:, sequence_number: 10),
],
],
],
)
end.to change(Sequent::Core::EventRecord, :count).by(2)
end
end

describe '#permanently_delete_events' do
Expand Down