diff --git a/.github/workflows/rspec.yml b/.github/workflows/rspec.yml index 07ca396..40a72a3 100644 --- a/.github/workflows/rspec.yml +++ b/.github/workflows/rspec.yml @@ -18,7 +18,7 @@ jobs: services: postgres: - image: postgres:12 + image: postgres:14 env: POSTGRES_USER: sequent POSTGRES_PASSWORD: sequent diff --git a/building-a-web-application/Gemfile.lock b/building-a-web-application/Gemfile.lock index ad5c758..0f56d96 100644 --- a/building-a-web-application/Gemfile.lock +++ b/building-a-web-application/Gemfile.lock @@ -24,7 +24,8 @@ GEM bigdecimal (3.1.8) concurrent-ruby (1.3.4) connection_pool (2.4.1) - database_cleaner (2.0.2) + csv (3.3.0) + database_cleaner (2.1.0) database_cleaner-active_record (>= 2, < 3) database_cleaner-active_record (2.2.0) activerecord (>= 5.a) @@ -32,6 +33,7 @@ GEM database_cleaner-core (2.0.1) diff-lcs (1.5.1) drb (2.2.1) + gli (2.22.0) i18n (1.14.6) concurrent-ruby (~> 1.0) logger (1.6.1) @@ -47,6 +49,8 @@ GEM parser (3.3.5.0) ast (~> 2.4.1) racc + pastel (0.8.0) + tty-color (~> 0.5) pg (1.5.8) postgresql_cursor (0.6.9) activerecord (>= 6.0) @@ -57,9 +61,8 @@ GEM rack (>= 3.0.0, < 4) rack-session (2.0.0) rack (>= 3.0.0) - rackup (2.1.0) + rackup (2.2.1) rack (>= 3) - webrick (~> 1.8) rake (13.2.1) rspec (3.13.0) rspec-core (~> 3.13.0) @@ -76,17 +79,21 @@ GEM rspec-support (3.13.1) ruby2_keywords (0.0.5) securerandom (0.3.1) - sequent (7.1.1) - activemodel (>= 6.0) - activerecord (>= 6.0) + sequent (8.0.1) + activemodel (>= 7.1.3) + activerecord (>= 7.1.3) bcrypt (~> 3.1) + csv (~> 3.3) + gli (~> 2.22) i18n - oj (~> 3) + logger (~> 1.6) + oj (~> 3.3) parallel (~> 1.20) parser (>= 2.6.5, < 3.4) pg (~> 1.2) postgresql_cursor (~> 0.6) thread_safe (~> 0.3.6) + tty-prompt (~> 0.23.1) tzinfo (>= 1.1) sinatra (4.0.0) mustermann (~> 3.0) @@ -105,12 +112,24 @@ GEM thread_safe (0.3.6) tilt (2.4.0) timeout (0.4.1) + tty-color (0.6.0) + tty-cursor (0.7.1) + tty-prompt (0.23.1) + pastel (~> 0.8) + tty-reader (~> 0.8) + tty-reader (0.9.0) + tty-cursor (~> 0.7) + tty-screen (~> 0.8) + wisper (~> 2.0) + tty-screen (0.8.2) tzinfo (2.0.6) concurrent-ruby (~> 1.0) - webrick (1.8.2) + webrick (1.9.1) + wisper (2.0.1) PLATFORMS arm64-darwin-23 + arm64-darwin-24 x86_64-linux DEPENDENCIES diff --git a/building-a-web-application/db/sequent_pgsql.sql b/building-a-web-application/db/sequent_pgsql.sql new file mode 100644 index 0000000..0cfe399 --- /dev/null +++ b/building-a-web-application/db/sequent_pgsql.sql @@ -0,0 +1,416 @@ +DROP TYPE IF EXISTS aggregate_event_type CASCADE; +CREATE TYPE aggregate_event_type AS ( + aggregate_type text, + aggregate_id uuid, + events_partition_key text, + event_type text, + event_json jsonb +); + +CREATE OR REPLACE FUNCTION enrich_command_json(command commands) RETURNS jsonb +LANGUAGE plpgsql AS $$ +BEGIN + RETURN jsonb_build_object( + 'command_type', (SELECT type FROM command_types WHERE command_types.id = command.command_type_id), + 'created_at', command.created_at, + 'user_id', command.user_id, + 'aggregate_id', command.aggregate_id, + 'event_aggregate_id', command.event_aggregate_id, + 'event_sequence_number', command.event_sequence_number + ) + || command.command_json; +END +$$; + +CREATE OR REPLACE FUNCTION enrich_event_json(event events) RETURNS jsonb +LANGUAGE plpgsql AS $$ +BEGIN + RETURN jsonb_build_object( + 'aggregate_id', event.aggregate_id, + 'sequence_number', event.sequence_number, + 'created_at', event.created_at + ) + || event.event_json; +END +$$; + +CREATE OR REPLACE FUNCTION load_event( + _aggregate_id uuid, + _sequence_number integer +) RETURNS SETOF aggregate_event_type +LANGUAGE plpgsql AS $$ +BEGIN + RETURN QUERY SELECT aggregate_types.type, + a.aggregate_id, + a.events_partition_key, + event_types.type, + enrich_event_json(e) + FROM aggregates a + INNER JOIN events e ON (a.events_partition_key, a.aggregate_id) = (e.partition_key, e.aggregate_id) + INNER JOIN aggregate_types ON a.aggregate_type_id = aggregate_types.id + INNER JOIN event_types ON e.event_type_id = event_types.id + WHERE a.aggregate_id = _aggregate_id + AND e.sequence_number = _sequence_number; +END; +$$; + +CREATE OR REPLACE FUNCTION load_events( + _aggregate_ids jsonb, + _use_snapshots boolean DEFAULT TRUE, + _until timestamptz DEFAULT NULL +) RETURNS SETOF aggregate_event_type +LANGUAGE plpgsql AS $$ +DECLARE + _aggregate_id aggregates.aggregate_id%TYPE; +BEGIN + FOR _aggregate_id IN SELECT * FROM jsonb_array_elements_text(_aggregate_ids) LOOP + -- Use a single query to avoid race condition with UPDATEs to the events partition key + -- in case transaction isolation level is lower than repeatable read (the default of + -- PostgreSQL is read committed). + RETURN QUERY WITH + aggregate AS ( + SELECT aggregate_types.type, aggregate_id, events_partition_key + FROM aggregates + JOIN aggregate_types ON aggregate_type_id = aggregate_types.id + WHERE aggregate_id = _aggregate_id + ), + snapshot AS ( + SELECT * + FROM snapshot_records + WHERE _use_snapshots + AND aggregate_id = _aggregate_id + AND (_until IS NULL OR created_at < _until) + ORDER BY sequence_number DESC LIMIT 1 + ) + (SELECT a.*, s.snapshot_type, s.snapshot_json FROM aggregate a, snapshot s) + UNION ALL + (SELECT a.*, event_types.type, enrich_event_json(e) + FROM aggregate a + JOIN events e ON (a.events_partition_key, a.aggregate_id) = (e.partition_key, e.aggregate_id) + JOIN event_types ON e.event_type_id = event_types.id + WHERE e.sequence_number >= COALESCE((SELECT sequence_number FROM snapshot), 0) + AND (_until IS NULL OR e.created_at < _until) + ORDER BY e.sequence_number ASC); + END LOOP; +END; +$$; + +CREATE OR REPLACE FUNCTION store_command(_command jsonb) RETURNS bigint +LANGUAGE plpgsql AS $$ +DECLARE + _id commands.id%TYPE; + _command_json jsonb = _command->'command_json'; +BEGIN + IF NOT EXISTS (SELECT 1 FROM command_types t WHERE t.type = _command->>'command_type') THEN + -- Only try inserting if it doesn't exist to avoid exhausting the id sequence + INSERT INTO command_types (type) + VALUES (_command->>'command_type') + ON CONFLICT DO NOTHING; + END IF; + + INSERT INTO commands ( + created_at, user_id, aggregate_id, command_type_id, command_json, + event_aggregate_id, event_sequence_number + ) VALUES ( + (_command->>'created_at')::timestamptz, + (_command_json->>'user_id')::uuid, + (_command_json->>'aggregate_id')::uuid, + (SELECT id FROM command_types WHERE type = _command->>'command_type'), + (_command->'command_json') - '{command_type,created_at,organization_id,user_id,aggregate_id,event_aggregate_id,event_sequence_number}'::text[], + (_command_json->>'event_aggregate_id')::uuid, + NULLIF(_command_json->'event_sequence_number', 'null'::jsonb)::integer + ) RETURNING id INTO STRICT _id; + RETURN _id; +END; +$$; + +CREATE OR REPLACE PROCEDURE store_events(_command jsonb, _aggregates_with_events jsonb) +LANGUAGE plpgsql AS $$ +DECLARE + _command_id commands.id%TYPE; + _aggregate jsonb; + _events jsonb; + _aggregate_id aggregates.aggregate_id%TYPE; + _aggregate_row aggregates%ROWTYPE; + _provided_events_partition_key aggregates.events_partition_key%TYPE; + _events_partition_key aggregates.events_partition_key%TYPE; + _snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE; +BEGIN + _command_id = store_command(_command); + + WITH types AS ( + SELECT DISTINCT row->0->>'aggregate_type' AS type + FROM jsonb_array_elements(_aggregates_with_events) AS row + ) + INSERT INTO aggregate_types (type) + SELECT type FROM types + WHERE type NOT IN (SELECT type FROM aggregate_types) + ORDER BY 1 + ON CONFLICT DO NOTHING; + + WITH types AS ( + SELECT DISTINCT events->>'event_type' AS type + FROM jsonb_array_elements(_aggregates_with_events) AS row + CROSS JOIN LATERAL jsonb_array_elements(row->1) AS events + ) + INSERT INTO event_types (type) + SELECT type FROM types + WHERE type NOT IN (SELECT type FROM event_types) + ORDER BY 1 + ON CONFLICT DO NOTHING; + + FOR _aggregate, _events IN SELECT row->0, row->1 FROM jsonb_array_elements(_aggregates_with_events) AS row + ORDER BY row->0->'aggregate_id', row->1->0->'event_json'->'sequence_number' + LOOP + _aggregate_id = _aggregate->>'aggregate_id'; + _provided_events_partition_key = _aggregate->>'events_partition_key'; + _snapshot_outdated_at = _aggregate->>'snapshot_outdated_at'; + + SELECT * INTO _aggregate_row FROM aggregates WHERE aggregate_id = _aggregate_id; + _events_partition_key = COALESCE(_provided_events_partition_key, _aggregate_row.events_partition_key, ''); + + INSERT INTO aggregates (aggregate_id, created_at, aggregate_type_id, events_partition_key) + VALUES ( + _aggregate_id, + (_events->0->>'created_at')::timestamptz, + (SELECT id FROM aggregate_types WHERE type = _aggregate->>'aggregate_type'), + _events_partition_key + ) ON CONFLICT (aggregate_id) + DO UPDATE SET events_partition_key = EXCLUDED.events_partition_key + WHERE aggregates.events_partition_key IS DISTINCT FROM EXCLUDED.events_partition_key; + + INSERT INTO events (partition_key, aggregate_id, sequence_number, created_at, command_id, event_type_id, event_json) + SELECT _events_partition_key, + _aggregate_id, + (event->'event_json'->'sequence_number')::integer, + (event->>'created_at')::timestamptz, + _command_id, + (SELECT id FROM event_types WHERE type = event->>'event_type'), + (event->'event_json') - '{aggregate_id,created_at,event_type,sequence_number}'::text[] + FROM jsonb_array_elements(_events) AS event; + + IF _snapshot_outdated_at IS NOT NULL THEN + INSERT INTO aggregates_that_need_snapshots AS row (aggregate_id, snapshot_outdated_at) + VALUES (_aggregate_id, _snapshot_outdated_at) + ON CONFLICT (aggregate_id) DO UPDATE + SET snapshot_outdated_at = LEAST(row.snapshot_outdated_at, EXCLUDED.snapshot_outdated_at) + WHERE row.snapshot_outdated_at IS DISTINCT FROM EXCLUDED.snapshot_outdated_at; + END IF; + END LOOP; +END; +$$; + +CREATE OR REPLACE PROCEDURE store_snapshots(_snapshots jsonb) +LANGUAGE plpgsql AS $$ +DECLARE + _aggregate_id uuid; + _snapshot jsonb; + _sequence_number snapshot_records.sequence_number%TYPE; +BEGIN + FOR _snapshot IN SELECT * FROM jsonb_array_elements(_snapshots) LOOP + _aggregate_id = _snapshot->>'aggregate_id'; + _sequence_number = _snapshot->'sequence_number'; + + INSERT INTO aggregates_that_need_snapshots AS row (aggregate_id, snapshot_sequence_number_high_water_mark) + VALUES (_aggregate_id, _sequence_number) + ON CONFLICT (aggregate_id) DO UPDATE + SET snapshot_sequence_number_high_water_mark = + GREATEST(row.snapshot_sequence_number_high_water_mark, EXCLUDED.snapshot_sequence_number_high_water_mark), + snapshot_outdated_at = NULL, + snapshot_scheduled_at = NULL; + + INSERT INTO snapshot_records (aggregate_id, sequence_number, created_at, snapshot_type, snapshot_json) + VALUES ( + _aggregate_id, + _sequence_number, + (_snapshot->>'created_at')::timestamptz, + _snapshot->>'snapshot_type', + _snapshot->'snapshot_json' + ); + END LOOP; +END; +$$; + +CREATE OR REPLACE FUNCTION load_latest_snapshot(_aggregate_id uuid) RETURNS aggregate_event_type +LANGUAGE SQL AS $$ + SELECT (SELECT type FROM aggregate_types WHERE id = a.aggregate_type_id), + a.aggregate_id, + a.events_partition_key, + s.snapshot_type, + s.snapshot_json + FROM aggregates a JOIN snapshot_records s ON a.aggregate_id = s.aggregate_id + WHERE a.aggregate_id = _aggregate_id + ORDER BY s.sequence_number DESC + LIMIT 1; +$$; + +CREATE OR REPLACE PROCEDURE delete_all_snapshots(_now timestamp with time zone DEFAULT NOW()) +LANGUAGE plpgsql AS $$ +BEGIN + UPDATE aggregates_that_need_snapshots + SET snapshot_outdated_at = _now + WHERE snapshot_outdated_at IS NULL; + DELETE FROM snapshot_records; +END; +$$; + +CREATE OR REPLACE PROCEDURE delete_snapshots_before(_aggregate_id uuid, _sequence_number integer, _now timestamp with time zone DEFAULT NOW()) +LANGUAGE plpgsql AS $$ +BEGIN + DELETE FROM snapshot_records + WHERE aggregate_id = _aggregate_id + AND sequence_number < _sequence_number; + + UPDATE aggregates_that_need_snapshots + SET snapshot_outdated_at = _now + WHERE aggregate_id = _aggregate_id + AND snapshot_outdated_at IS NULL + AND NOT EXISTS (SELECT 1 FROM snapshot_records WHERE aggregate_id = _aggregate_id); +END; +$$; + +CREATE OR REPLACE FUNCTION aggregates_that_need_snapshots(_last_aggregate_id uuid, _limit integer) + RETURNS TABLE (aggregate_id uuid) +LANGUAGE plpgsql AS $$ +BEGIN + RETURN QUERY SELECT a.aggregate_id + FROM aggregates_that_need_snapshots a + WHERE a.snapshot_outdated_at IS NOT NULL + AND (_last_aggregate_id IS NULL OR a.aggregate_id > _last_aggregate_id) + ORDER BY 1 + LIMIT _limit; +END; +$$; + +CREATE OR REPLACE FUNCTION select_aggregates_for_snapshotting(_limit integer, _reschedule_snapshot_scheduled_before timestamp with time zone, _now timestamp with time zone DEFAULT NOW()) + RETURNS TABLE (aggregate_id uuid) +LANGUAGE plpgsql AS $$ +BEGIN + RETURN QUERY WITH scheduled AS MATERIALIZED ( + SELECT a.aggregate_id + FROM aggregates_that_need_snapshots AS a + WHERE snapshot_outdated_at IS NOT NULL + ORDER BY snapshot_outdated_at ASC, snapshot_sequence_number_high_water_mark DESC, aggregate_id ASC + LIMIT _limit + FOR UPDATE + ) UPDATE aggregates_that_need_snapshots AS row + SET snapshot_scheduled_at = _now + FROM scheduled + WHERE row.aggregate_id = scheduled.aggregate_id + AND (row.snapshot_scheduled_at IS NULL OR row.snapshot_scheduled_at < _reschedule_snapshot_scheduled_before) + RETURNING row.aggregate_id; +END; +$$; + +CREATE OR REPLACE PROCEDURE permanently_delete_commands_without_events(_aggregate_id uuid, _organization_id uuid) +LANGUAGE plpgsql AS $$ +BEGIN + IF _aggregate_id IS NULL AND _organization_id IS NULL THEN + RAISE EXCEPTION 'aggregate_id or organization_id must be specified to delete commands'; + END IF; + + DELETE FROM commands + WHERE (_aggregate_id IS NULL OR aggregate_id = _aggregate_id) + AND NOT EXISTS (SELECT 1 FROM events WHERE command_id = commands.id); +END; +$$; + +CREATE OR REPLACE PROCEDURE permanently_delete_event_streams(_aggregate_ids jsonb) +LANGUAGE plpgsql AS $$ +BEGIN + DELETE FROM events + USING jsonb_array_elements_text(_aggregate_ids) AS ids (id) + JOIN aggregates ON ids.id::uuid = aggregates.aggregate_id + WHERE events.partition_key = aggregates.events_partition_key + AND events.aggregate_id = aggregates.aggregate_id; + DELETE FROM aggregates + USING jsonb_array_elements_text(_aggregate_ids) AS ids (id) + WHERE aggregates.aggregate_id = ids.id::uuid; +END; +$$; + +DROP VIEW IF EXISTS command_records; +CREATE VIEW command_records (id, user_id, aggregate_id, command_type, command_json, created_at, event_aggregate_id, event_sequence_number) AS + SELECT id, + user_id, + aggregate_id, + (SELECT type FROM command_types WHERE command_types.id = command.command_type_id), + enrich_command_json(command), + created_at, + event_aggregate_id, + event_sequence_number + FROM commands command; + +DROP VIEW IF EXISTS event_records; +CREATE VIEW event_records (aggregate_id, partition_key, sequence_number, created_at, event_type, event_json, command_record_id, xact_id) AS + SELECT aggregate.aggregate_id, + event.partition_key, + event.sequence_number, + event.created_at, + type.type, + enrich_event_json(event) AS event_json, + command_id, + event.xact_id + FROM events event + JOIN aggregates aggregate ON aggregate.aggregate_id = event.aggregate_id AND aggregate.events_partition_key = event.partition_key + JOIN event_types type ON event.event_type_id = type.id; + +DROP VIEW IF EXISTS stream_records; +CREATE VIEW stream_records (aggregate_id, events_partition_key, aggregate_type, created_at) AS + SELECT aggregates.aggregate_id, + aggregates.events_partition_key, + aggregate_types.type, + aggregates.created_at + FROM aggregates JOIN aggregate_types ON aggregates.aggregate_type_id = aggregate_types.id; + +CREATE OR REPLACE FUNCTION save_events_on_delete_trigger() RETURNS TRIGGER AS $$ +BEGIN + INSERT INTO saved_event_records (operation, timestamp, "user", aggregate_id, partition_key, sequence_number, created_at, event_type, event_json, command_id, xact_id) + SELECT 'D', + statement_timestamp(), + user, + o.aggregate_id, + o.partition_key, + o.sequence_number, + o.created_at, + (SELECT type FROM event_types WHERE event_types.id = o.event_type_id), + o.event_json, + o.command_id, + o.xact_id + FROM old_table o; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION save_events_on_update_trigger() RETURNS TRIGGER AS $$ +BEGIN + INSERT INTO saved_event_records (operation, timestamp, "user", aggregate_id, partition_key, sequence_number, created_at, event_type, event_json, command_id, xact_id) + SELECT 'U', + statement_timestamp(), + user, + o.aggregate_id, + o.partition_key, + o.sequence_number, + o.created_at, + (SELECT type FROM event_types WHERE event_types.id = o.event_type_id), + o.event_json, + o.command_id, + o.xact_id + FROM old_table o LEFT JOIN new_table n ON o.aggregate_id = n.aggregate_id AND o.sequence_number = n.sequence_number + WHERE n IS NULL + -- Only save when event related information changes + OR o.created_at <> n.created_at + OR o.event_type_id <> n.event_type_id + OR o.event_json <> n.event_json; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER save_events_on_delete_trigger + AFTER DELETE ON events + REFERENCING OLD TABLE AS old_table + FOR EACH STATEMENT EXECUTE FUNCTION save_events_on_delete_trigger(); +CREATE OR REPLACE TRIGGER save_events_on_update_trigger + AFTER UPDATE ON events + REFERENCING OLD TABLE AS old_table NEW TABLE AS new_table + FOR EACH STATEMENT EXECUTE FUNCTION save_events_on_update_trigger(); diff --git a/building-a-web-application/db/sequent_schema.rb b/building-a-web-application/db/sequent_schema.rb index 3982650..b4e63da 100644 --- a/building-a-web-application/db/sequent_schema.rb +++ b/building-a-web-application/db/sequent_schema.rb @@ -1,52 +1,14 @@ -ActiveRecord::Schema.define do - - create_table "event_records", :force => true do |t| - t.uuid "aggregate_id", :null => false - t.integer "sequence_number", :null => false - t.datetime "created_at", :null => false - t.string "event_type", :null => false - t.text "event_json", :null => false - t.integer "command_record_id", :null => false - t.integer "stream_record_id", :null => false - t.bigint "xact_id" - end - - execute %Q{ -CREATE UNIQUE INDEX unique_event_per_aggregate ON event_records ( - aggregate_id, - sequence_number, - (CASE event_type WHEN 'Sequent::Core::SnapshotEvent' THEN 0 ELSE 1 END) -) -} - execute %Q{ -CREATE INDEX snapshot_events ON event_records (aggregate_id, sequence_number DESC) WHERE event_type = 'Sequent::Core::SnapshotEvent' -} - - add_index "event_records", ["command_record_id"], :name => "index_event_records_on_command_record_id" - add_index "event_records", ["event_type"], :name => "index_event_records_on_event_type" - add_index "event_records", ["created_at"], :name => "index_event_records_on_created_at" +# frozen_string_literal: true - create_table "command_records", :force => true do |t| - t.string "user_id" - t.uuid "aggregate_id" - t.string "command_type", :null => false - t.text "command_json", :null => false - t.datetime "created_at", :null => false - end - - create_table "stream_records", :force => true do |t| - t.datetime "created_at", :null => false - t.string "aggregate_type", :null => false - t.uuid "aggregate_id", :null => false - t.integer "snapshot_threshold" +ActiveRecord::Schema.define do + say_with_time 'Installing Sequent schema' do + say 'Creating tables', true + suppress_messages { execute File.read("#{File.dirname(__FILE__)}/sequent_schema_tables.sql") } + say 'Creating table partitions', true + suppress_messages { execute File.read("#{File.dirname(__FILE__)}/sequent_schema_partitions.sql") } + say 'Creating constraints and indexes', true + suppress_messages { execute File.read("#{File.dirname(__FILE__)}/sequent_schema_indexes.sql") } + say 'Creating stored procedures and views', true + suppress_messages { execute File.read("#{File.dirname(__FILE__)}/sequent_pgsql.sql") } end - - add_index "stream_records", ["aggregate_id"], :name => "index_stream_records_on_aggregate_id", :unique => true - execute %q{ -ALTER TABLE event_records ADD CONSTRAINT command_fkey FOREIGN KEY (command_record_id) REFERENCES command_records (id) -} - execute %q{ -ALTER TABLE event_records ADD CONSTRAINT stream_fkey FOREIGN KEY (stream_record_id) REFERENCES stream_records (id) -} - end diff --git a/building-a-web-application/db/sequent_schema_indexes.sql b/building-a-web-application/db/sequent_schema_indexes.sql new file mode 100644 index 0000000..fe0a1e9 --- /dev/null +++ b/building-a-web-application/db/sequent_schema_indexes.sql @@ -0,0 +1,37 @@ +ALTER TABLE aggregates ADD PRIMARY KEY (aggregate_id); +ALTER TABLE aggregates ADD UNIQUE (events_partition_key, aggregate_id); +CREATE INDEX aggregates_aggregate_type_id_idx ON aggregates (aggregate_type_id); + +ALTER TABLE commands ADD PRIMARY KEY (id); +CREATE INDEX commands_command_type_id_idx ON commands (command_type_id); +CREATE INDEX commands_aggregate_id_idx ON commands (aggregate_id); +CREATE INDEX commands_event_idx ON commands (event_aggregate_id, event_sequence_number); + +ALTER TABLE events ADD PRIMARY KEY (partition_key, aggregate_id, sequence_number); +CREATE INDEX events_command_id_idx ON events (command_id); +CREATE INDEX events_event_type_id_idx ON events (event_type_id); + +ALTER TABLE aggregates + ADD FOREIGN KEY (aggregate_type_id) REFERENCES aggregate_types (id) ON UPDATE CASCADE; + +ALTER TABLE events + ADD FOREIGN KEY (partition_key, aggregate_id) REFERENCES aggregates (events_partition_key, aggregate_id) + ON UPDATE CASCADE ON DELETE RESTRICT; +ALTER TABLE events + ADD FOREIGN KEY (command_id) REFERENCES commands (id) ON UPDATE RESTRICT ON DELETE RESTRICT; +ALTER TABLE events + ADD FOREIGN KEY (event_type_id) REFERENCES event_types (id) ON UPDATE CASCADE; +ALTER TABLE events ALTER COLUMN xact_id SET DEFAULT pg_current_xact_id()::text::bigint; + +ALTER TABLE commands + ADD FOREIGN KEY (command_type_id) REFERENCES command_types (id) ON UPDATE CASCADE; + +ALTER TABLE aggregates_that_need_snapshots + ADD FOREIGN KEY (aggregate_id) REFERENCES aggregates (aggregate_id) ON UPDATE CASCADE ON DELETE CASCADE; + +CREATE INDEX aggregates_that_need_snapshots_outdated_idx + ON aggregates_that_need_snapshots (snapshot_outdated_at ASC, snapshot_sequence_number_high_water_mark DESC, aggregate_id ASC) + WHERE snapshot_outdated_at IS NOT NULL; + +ALTER TABLE snapshot_records + ADD FOREIGN KEY (aggregate_id) REFERENCES aggregates_that_need_snapshots (aggregate_id) ON UPDATE CASCADE ON DELETE CASCADE; diff --git a/building-a-web-application/db/sequent_schema_partitions.sql b/building-a-web-application/db/sequent_schema_partitions.sql new file mode 100644 index 0000000..ea93241 --- /dev/null +++ b/building-a-web-application/db/sequent_schema_partitions.sql @@ -0,0 +1,34 @@ +-- ### Configure partitions as needed +CREATE TABLE commands_default PARTITION OF commands DEFAULT; +-- CREATE TABLE commands_0 PARTITION OF commands FOR VALUES FROM (1) TO (100e6); +-- CREATE TABLE commands_1 PARTITION OF commands FOR VALUES FROM (100e6) TO (200e6); +-- CREATE TABLE commands_2 PARTITION OF commands FOR VALUES FROM (200e6) TO (300e6); +-- CREATE TABLE commands_3 PARTITION OF commands FOR VALUES FROM (300e6) TO (400e6); + +-- ### Configure partitions as needed +CREATE TABLE aggregates_default PARTITION OF aggregates DEFAULT; +-- CREATE TABLE aggregates_0 PARTITION OF aggregates FOR VALUES FROM (MINVALUE) TO ('10000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_1 PARTITION OF aggregates FOR VALUES FROM ('10000000-0000-0000-0000-000000000000') TO ('20000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_2 PARTITION OF aggregates FOR VALUES FROM ('20000000-0000-0000-0000-000000000000') TO ('30000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_3 PARTITION OF aggregates FOR VALUES FROM ('30000000-0000-0000-0000-000000000000') TO ('40000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_4 PARTITION OF aggregates FOR VALUES FROM ('40000000-0000-0000-0000-000000000000') TO ('50000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_5 PARTITION OF aggregates FOR VALUES FROM ('50000000-0000-0000-0000-000000000000') TO ('60000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_6 PARTITION OF aggregates FOR VALUES FROM ('60000000-0000-0000-0000-000000000000') TO ('70000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_7 PARTITION OF aggregates FOR VALUES FROM ('70000000-0000-0000-0000-000000000000') TO ('80000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_8 PARTITION OF aggregates FOR VALUES FROM ('80000000-0000-0000-0000-000000000000') TO ('90000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_9 PARTITION OF aggregates FOR VALUES FROM ('90000000-0000-0000-0000-000000000000') TO ('a0000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_a PARTITION OF aggregates FOR VALUES FROM ('a0000000-0000-0000-0000-000000000000') TO ('b0000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_b PARTITION OF aggregates FOR VALUES FROM ('b0000000-0000-0000-0000-000000000000') TO ('c0000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_c PARTITION OF aggregates FOR VALUES FROM ('c0000000-0000-0000-0000-000000000000') TO ('d0000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_d PARTITION OF aggregates FOR VALUES FROM ('d0000000-0000-0000-0000-000000000000') TO ('e0000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_e PARTITION OF aggregates FOR VALUES FROM ('e0000000-0000-0000-0000-000000000000') TO ('f0000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_f PARTITION OF aggregates FOR VALUES FROM ('f0000000-0000-0000-0000-000000000000') TO (MAXVALUE); + +-- ### Configure partitions as needed +CREATE TABLE events_default PARTITION OF events DEFAULT; +-- CREATE TABLE events_2023_and_earlier PARTITION OF events FOR VALUES FROM ('Y00') TO ('Y24'); +-- CREATE TABLE events_2024 PARTITION OF events FOR VALUES FROM ('Y24') TO ('Y25'); +-- CREATE TABLE events_2025 PARTITION OF events FOR VALUES FROM ('Y25') TO ('Y26'); +-- CREATE TABLE events_2026 PARTITION OF events FOR VALUES FROM ('Y26') TO ('Y27'); +-- CREATE TABLE events_2027_and_later PARTITION OF events FOR VALUES FROM ('Y27') TO ('Y99'); +-- CREATE TABLE events_aggregate PARTITION OF events FOR VALUES FROM ('A') TO ('Ag'); diff --git a/building-a-web-application/db/sequent_schema_tables.sql b/building-a-web-application/db/sequent_schema_tables.sql new file mode 100644 index 0000000..4a498f4 --- /dev/null +++ b/building-a-web-application/db/sequent_schema_tables.sql @@ -0,0 +1,74 @@ +CREATE TABLE command_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL); +CREATE TABLE aggregate_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL); +CREATE TABLE event_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL); + +CREATE SEQUENCE IF NOT EXISTS commands_id_seq; + +CREATE TABLE commands ( + id bigint NOT NULL DEFAULT nextval('commands_id_seq'), + created_at timestamp with time zone NOT NULL, + user_id uuid, + aggregate_id uuid, + command_type_id SMALLINT NOT NULL, + command_json jsonb NOT NULL, + event_aggregate_id uuid, + event_sequence_number integer +) PARTITION BY RANGE (id); + +ALTER SEQUENCE commands_id_seq OWNED BY commands.id; + +CREATE TABLE aggregates ( + aggregate_id uuid NOT NULL, + events_partition_key text NOT NULL DEFAULT '', + aggregate_type_id SMALLINT NOT NULL, + snapshot_threshold integer, + created_at timestamp with time zone NOT NULL DEFAULT NOW() +) PARTITION BY RANGE (aggregate_id); + +CREATE TABLE events ( + aggregate_id uuid NOT NULL, + partition_key text NOT NULL DEFAULT '', + sequence_number integer NOT NULL, + created_at timestamp with time zone NOT NULL, + command_id bigint NOT NULL, + event_type_id SMALLINT NOT NULL, + event_json jsonb NOT NULL, + xact_id bigint +) PARTITION BY RANGE (partition_key); + +CREATE TABLE aggregates_that_need_snapshots ( + aggregate_id uuid NOT NULL PRIMARY KEY, + snapshot_sequence_number_high_water_mark integer, + snapshot_outdated_at timestamp with time zone, + snapshot_scheduled_at timestamp with time zone +); + +COMMENT ON TABLE aggregates_that_need_snapshots IS 'Contains a row for every aggregate with more events than its snapshot threshold.'; +COMMENT ON COLUMN aggregates_that_need_snapshots.snapshot_sequence_number_high_water_mark + IS 'The highest sequence number of the stored snapshot. Kept when snapshot are deleted to more easily query aggregates that need snapshotting the most'; +COMMENT ON COLUMN aggregates_that_need_snapshots.snapshot_outdated_at IS 'Not NULL indicates a snapshot is needed since the stored timestamp'; +COMMENT ON COLUMN aggregates_that_need_snapshots.snapshot_scheduled_at IS 'Not NULL indicates a snapshot is in the process of being taken'; + +CREATE TABLE snapshot_records ( + aggregate_id uuid NOT NULL, + sequence_number integer NOT NULL, + created_at timestamptz NOT NULL, + snapshot_type text NOT NULL, + snapshot_json jsonb NOT NULL, + PRIMARY KEY (aggregate_id, sequence_number) +); + +CREATE TABLE saved_event_records ( + operation varchar(1) NOT NULL CHECK (operation IN ('U', 'D')), + timestamp timestamptz NOT NULL, + "user" text NOT NULL, + aggregate_id uuid NOT NULL, + partition_key text DEFAULT '', + sequence_number integer NOT NULL, + created_at timestamp with time zone NOT NULL, + command_id bigint NOT NULL, + event_type text NOT NULL, + event_json jsonb NOT NULL, + xact_id bigint, + PRIMARY KEY (aggregate_id, sequence_number, timestamp) +); diff --git a/rails-blog/Gemfile.lock b/rails-blog/Gemfile.lock index 5d74a84..a9c489f 100644 --- a/rails-blog/Gemfile.lock +++ b/rails-blog/Gemfile.lock @@ -95,12 +95,14 @@ GEM concurrent-ruby (1.3.4) connection_pool (2.4.1) crass (1.0.6) + csv (3.3.0) date (3.3.4) debug (1.9.2) irb (~> 1.10) reline (>= 0.3.8) drb (2.2.1) erubi (1.13.0) + gli (2.22.0) globalid (1.2.1) activesupport (>= 6.1) i18n (1.14.6) @@ -146,14 +148,16 @@ GEM nokogiri (1.16.8) mini_portile2 (~> 2.8.2) racc (~> 1.4) - oj (3.16.6) + oj (3.16.7) bigdecimal (>= 3.0) ostruct (>= 0.2) - ostruct (0.6.0) + ostruct (0.6.1) parallel (1.26.3) parser (3.3.5.0) ast (~> 2.4.1) racc + pastel (0.8.0) + tty-color (~> 0.5) pg (1.5.8) postgresql_cursor (0.6.9) activerecord (>= 6.0) @@ -245,17 +249,21 @@ GEM rexml (~> 3.2, >= 3.2.5) rubyzip (>= 1.2.2, < 3.0) websocket (~> 1.0) - sequent (7.1.1) - activemodel (>= 6.0) - activerecord (>= 6.0) + sequent (8.0.1) + activemodel (>= 7.1.3) + activerecord (>= 7.1.3) bcrypt (~> 3.1) + csv (~> 3.3) + gli (~> 2.22) i18n - oj (~> 3) + logger (~> 1.6) + oj (~> 3.3) parallel (~> 1.20) parser (>= 2.6.5, < 3.4) pg (~> 1.2) postgresql_cursor (~> 0.6) thread_safe (~> 0.3.6) + tty-prompt (~> 0.23.1) tzinfo (>= 1.1) sprockets (4.2.1) concurrent-ruby (~> 1.0) @@ -270,6 +278,16 @@ GEM thor (1.3.2) thread_safe (0.3.6) timeout (0.4.1) + tty-color (0.6.0) + tty-cursor (0.7.1) + tty-prompt (0.23.1) + pastel (~> 0.8) + tty-reader (~> 0.8) + tty-reader (0.9.0) + tty-cursor (~> 0.7) + tty-screen (~> 0.8) + wisper (~> 2.0) + tty-screen (0.8.2) turbo-rails (2.0.11) actionpack (>= 6.0.0) railties (>= 6.0.0) @@ -287,6 +305,7 @@ GEM websocket-driver (0.7.6) websocket-extensions (>= 0.1.0) websocket-extensions (0.1.5) + wisper (2.0.1) xpath (3.2.0) nokogiri (~> 1.8) zeitwerk (2.7.0) diff --git a/rails-blog/config/database.yml b/rails-blog/config/database.yml index a437822..1b3a2e5 100644 --- a/rails-blog/config/database.yml +++ b/rails-blog/config/database.yml @@ -2,8 +2,6 @@ default: &default adapter: postgresql host: localhost port: 5432 - username: <%= ENV["POSTGRES_USER"] || "postgres" %> - password: <%= ENV["POSTGRES_PASSWORD"] %> pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 5 } %> timeout: 5000 schema_search_path: <%= ENV.fetch("SEQUENT_MIGRATION_SCHEMAS") { "public, sequent_schema, view_schema" } %> diff --git a/rails-blog/config/initializers/sequent.rb b/rails-blog/config/initializers/sequent.rb index b36acb4..9d152af 100644 --- a/rails-blog/config/initializers/sequent.rb +++ b/rails-blog/config/initializers/sequent.rb @@ -4,7 +4,6 @@ Sequent.configure do |config| config.migrations_class_name = 'SequentMigrations' config.enable_autoregistration = true - config.event_store_cache_event_types = !Rails.env.development? config.database_config_directory = 'config' diff --git a/rails-blog/db/sequent_pgsql.sql b/rails-blog/db/sequent_pgsql.sql new file mode 100644 index 0000000..0cfe399 --- /dev/null +++ b/rails-blog/db/sequent_pgsql.sql @@ -0,0 +1,416 @@ +DROP TYPE IF EXISTS aggregate_event_type CASCADE; +CREATE TYPE aggregate_event_type AS ( + aggregate_type text, + aggregate_id uuid, + events_partition_key text, + event_type text, + event_json jsonb +); + +CREATE OR REPLACE FUNCTION enrich_command_json(command commands) RETURNS jsonb +LANGUAGE plpgsql AS $$ +BEGIN + RETURN jsonb_build_object( + 'command_type', (SELECT type FROM command_types WHERE command_types.id = command.command_type_id), + 'created_at', command.created_at, + 'user_id', command.user_id, + 'aggregate_id', command.aggregate_id, + 'event_aggregate_id', command.event_aggregate_id, + 'event_sequence_number', command.event_sequence_number + ) + || command.command_json; +END +$$; + +CREATE OR REPLACE FUNCTION enrich_event_json(event events) RETURNS jsonb +LANGUAGE plpgsql AS $$ +BEGIN + RETURN jsonb_build_object( + 'aggregate_id', event.aggregate_id, + 'sequence_number', event.sequence_number, + 'created_at', event.created_at + ) + || event.event_json; +END +$$; + +CREATE OR REPLACE FUNCTION load_event( + _aggregate_id uuid, + _sequence_number integer +) RETURNS SETOF aggregate_event_type +LANGUAGE plpgsql AS $$ +BEGIN + RETURN QUERY SELECT aggregate_types.type, + a.aggregate_id, + a.events_partition_key, + event_types.type, + enrich_event_json(e) + FROM aggregates a + INNER JOIN events e ON (a.events_partition_key, a.aggregate_id) = (e.partition_key, e.aggregate_id) + INNER JOIN aggregate_types ON a.aggregate_type_id = aggregate_types.id + INNER JOIN event_types ON e.event_type_id = event_types.id + WHERE a.aggregate_id = _aggregate_id + AND e.sequence_number = _sequence_number; +END; +$$; + +CREATE OR REPLACE FUNCTION load_events( + _aggregate_ids jsonb, + _use_snapshots boolean DEFAULT TRUE, + _until timestamptz DEFAULT NULL +) RETURNS SETOF aggregate_event_type +LANGUAGE plpgsql AS $$ +DECLARE + _aggregate_id aggregates.aggregate_id%TYPE; +BEGIN + FOR _aggregate_id IN SELECT * FROM jsonb_array_elements_text(_aggregate_ids) LOOP + -- Use a single query to avoid race condition with UPDATEs to the events partition key + -- in case transaction isolation level is lower than repeatable read (the default of + -- PostgreSQL is read committed). + RETURN QUERY WITH + aggregate AS ( + SELECT aggregate_types.type, aggregate_id, events_partition_key + FROM aggregates + JOIN aggregate_types ON aggregate_type_id = aggregate_types.id + WHERE aggregate_id = _aggregate_id + ), + snapshot AS ( + SELECT * + FROM snapshot_records + WHERE _use_snapshots + AND aggregate_id = _aggregate_id + AND (_until IS NULL OR created_at < _until) + ORDER BY sequence_number DESC LIMIT 1 + ) + (SELECT a.*, s.snapshot_type, s.snapshot_json FROM aggregate a, snapshot s) + UNION ALL + (SELECT a.*, event_types.type, enrich_event_json(e) + FROM aggregate a + JOIN events e ON (a.events_partition_key, a.aggregate_id) = (e.partition_key, e.aggregate_id) + JOIN event_types ON e.event_type_id = event_types.id + WHERE e.sequence_number >= COALESCE((SELECT sequence_number FROM snapshot), 0) + AND (_until IS NULL OR e.created_at < _until) + ORDER BY e.sequence_number ASC); + END LOOP; +END; +$$; + +CREATE OR REPLACE FUNCTION store_command(_command jsonb) RETURNS bigint +LANGUAGE plpgsql AS $$ +DECLARE + _id commands.id%TYPE; + _command_json jsonb = _command->'command_json'; +BEGIN + IF NOT EXISTS (SELECT 1 FROM command_types t WHERE t.type = _command->>'command_type') THEN + -- Only try inserting if it doesn't exist to avoid exhausting the id sequence + INSERT INTO command_types (type) + VALUES (_command->>'command_type') + ON CONFLICT DO NOTHING; + END IF; + + INSERT INTO commands ( + created_at, user_id, aggregate_id, command_type_id, command_json, + event_aggregate_id, event_sequence_number + ) VALUES ( + (_command->>'created_at')::timestamptz, + (_command_json->>'user_id')::uuid, + (_command_json->>'aggregate_id')::uuid, + (SELECT id FROM command_types WHERE type = _command->>'command_type'), + (_command->'command_json') - '{command_type,created_at,organization_id,user_id,aggregate_id,event_aggregate_id,event_sequence_number}'::text[], + (_command_json->>'event_aggregate_id')::uuid, + NULLIF(_command_json->'event_sequence_number', 'null'::jsonb)::integer + ) RETURNING id INTO STRICT _id; + RETURN _id; +END; +$$; + +CREATE OR REPLACE PROCEDURE store_events(_command jsonb, _aggregates_with_events jsonb) +LANGUAGE plpgsql AS $$ +DECLARE + _command_id commands.id%TYPE; + _aggregate jsonb; + _events jsonb; + _aggregate_id aggregates.aggregate_id%TYPE; + _aggregate_row aggregates%ROWTYPE; + _provided_events_partition_key aggregates.events_partition_key%TYPE; + _events_partition_key aggregates.events_partition_key%TYPE; + _snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE; +BEGIN + _command_id = store_command(_command); + + WITH types AS ( + SELECT DISTINCT row->0->>'aggregate_type' AS type + FROM jsonb_array_elements(_aggregates_with_events) AS row + ) + INSERT INTO aggregate_types (type) + SELECT type FROM types + WHERE type NOT IN (SELECT type FROM aggregate_types) + ORDER BY 1 + ON CONFLICT DO NOTHING; + + WITH types AS ( + SELECT DISTINCT events->>'event_type' AS type + FROM jsonb_array_elements(_aggregates_with_events) AS row + CROSS JOIN LATERAL jsonb_array_elements(row->1) AS events + ) + INSERT INTO event_types (type) + SELECT type FROM types + WHERE type NOT IN (SELECT type FROM event_types) + ORDER BY 1 + ON CONFLICT DO NOTHING; + + FOR _aggregate, _events IN SELECT row->0, row->1 FROM jsonb_array_elements(_aggregates_with_events) AS row + ORDER BY row->0->'aggregate_id', row->1->0->'event_json'->'sequence_number' + LOOP + _aggregate_id = _aggregate->>'aggregate_id'; + _provided_events_partition_key = _aggregate->>'events_partition_key'; + _snapshot_outdated_at = _aggregate->>'snapshot_outdated_at'; + + SELECT * INTO _aggregate_row FROM aggregates WHERE aggregate_id = _aggregate_id; + _events_partition_key = COALESCE(_provided_events_partition_key, _aggregate_row.events_partition_key, ''); + + INSERT INTO aggregates (aggregate_id, created_at, aggregate_type_id, events_partition_key) + VALUES ( + _aggregate_id, + (_events->0->>'created_at')::timestamptz, + (SELECT id FROM aggregate_types WHERE type = _aggregate->>'aggregate_type'), + _events_partition_key + ) ON CONFLICT (aggregate_id) + DO UPDATE SET events_partition_key = EXCLUDED.events_partition_key + WHERE aggregates.events_partition_key IS DISTINCT FROM EXCLUDED.events_partition_key; + + INSERT INTO events (partition_key, aggregate_id, sequence_number, created_at, command_id, event_type_id, event_json) + SELECT _events_partition_key, + _aggregate_id, + (event->'event_json'->'sequence_number')::integer, + (event->>'created_at')::timestamptz, + _command_id, + (SELECT id FROM event_types WHERE type = event->>'event_type'), + (event->'event_json') - '{aggregate_id,created_at,event_type,sequence_number}'::text[] + FROM jsonb_array_elements(_events) AS event; + + IF _snapshot_outdated_at IS NOT NULL THEN + INSERT INTO aggregates_that_need_snapshots AS row (aggregate_id, snapshot_outdated_at) + VALUES (_aggregate_id, _snapshot_outdated_at) + ON CONFLICT (aggregate_id) DO UPDATE + SET snapshot_outdated_at = LEAST(row.snapshot_outdated_at, EXCLUDED.snapshot_outdated_at) + WHERE row.snapshot_outdated_at IS DISTINCT FROM EXCLUDED.snapshot_outdated_at; + END IF; + END LOOP; +END; +$$; + +CREATE OR REPLACE PROCEDURE store_snapshots(_snapshots jsonb) +LANGUAGE plpgsql AS $$ +DECLARE + _aggregate_id uuid; + _snapshot jsonb; + _sequence_number snapshot_records.sequence_number%TYPE; +BEGIN + FOR _snapshot IN SELECT * FROM jsonb_array_elements(_snapshots) LOOP + _aggregate_id = _snapshot->>'aggregate_id'; + _sequence_number = _snapshot->'sequence_number'; + + INSERT INTO aggregates_that_need_snapshots AS row (aggregate_id, snapshot_sequence_number_high_water_mark) + VALUES (_aggregate_id, _sequence_number) + ON CONFLICT (aggregate_id) DO UPDATE + SET snapshot_sequence_number_high_water_mark = + GREATEST(row.snapshot_sequence_number_high_water_mark, EXCLUDED.snapshot_sequence_number_high_water_mark), + snapshot_outdated_at = NULL, + snapshot_scheduled_at = NULL; + + INSERT INTO snapshot_records (aggregate_id, sequence_number, created_at, snapshot_type, snapshot_json) + VALUES ( + _aggregate_id, + _sequence_number, + (_snapshot->>'created_at')::timestamptz, + _snapshot->>'snapshot_type', + _snapshot->'snapshot_json' + ); + END LOOP; +END; +$$; + +CREATE OR REPLACE FUNCTION load_latest_snapshot(_aggregate_id uuid) RETURNS aggregate_event_type +LANGUAGE SQL AS $$ + SELECT (SELECT type FROM aggregate_types WHERE id = a.aggregate_type_id), + a.aggregate_id, + a.events_partition_key, + s.snapshot_type, + s.snapshot_json + FROM aggregates a JOIN snapshot_records s ON a.aggregate_id = s.aggregate_id + WHERE a.aggregate_id = _aggregate_id + ORDER BY s.sequence_number DESC + LIMIT 1; +$$; + +CREATE OR REPLACE PROCEDURE delete_all_snapshots(_now timestamp with time zone DEFAULT NOW()) +LANGUAGE plpgsql AS $$ +BEGIN + UPDATE aggregates_that_need_snapshots + SET snapshot_outdated_at = _now + WHERE snapshot_outdated_at IS NULL; + DELETE FROM snapshot_records; +END; +$$; + +CREATE OR REPLACE PROCEDURE delete_snapshots_before(_aggregate_id uuid, _sequence_number integer, _now timestamp with time zone DEFAULT NOW()) +LANGUAGE plpgsql AS $$ +BEGIN + DELETE FROM snapshot_records + WHERE aggregate_id = _aggregate_id + AND sequence_number < _sequence_number; + + UPDATE aggregates_that_need_snapshots + SET snapshot_outdated_at = _now + WHERE aggregate_id = _aggregate_id + AND snapshot_outdated_at IS NULL + AND NOT EXISTS (SELECT 1 FROM snapshot_records WHERE aggregate_id = _aggregate_id); +END; +$$; + +CREATE OR REPLACE FUNCTION aggregates_that_need_snapshots(_last_aggregate_id uuid, _limit integer) + RETURNS TABLE (aggregate_id uuid) +LANGUAGE plpgsql AS $$ +BEGIN + RETURN QUERY SELECT a.aggregate_id + FROM aggregates_that_need_snapshots a + WHERE a.snapshot_outdated_at IS NOT NULL + AND (_last_aggregate_id IS NULL OR a.aggregate_id > _last_aggregate_id) + ORDER BY 1 + LIMIT _limit; +END; +$$; + +CREATE OR REPLACE FUNCTION select_aggregates_for_snapshotting(_limit integer, _reschedule_snapshot_scheduled_before timestamp with time zone, _now timestamp with time zone DEFAULT NOW()) + RETURNS TABLE (aggregate_id uuid) +LANGUAGE plpgsql AS $$ +BEGIN + RETURN QUERY WITH scheduled AS MATERIALIZED ( + SELECT a.aggregate_id + FROM aggregates_that_need_snapshots AS a + WHERE snapshot_outdated_at IS NOT NULL + ORDER BY snapshot_outdated_at ASC, snapshot_sequence_number_high_water_mark DESC, aggregate_id ASC + LIMIT _limit + FOR UPDATE + ) UPDATE aggregates_that_need_snapshots AS row + SET snapshot_scheduled_at = _now + FROM scheduled + WHERE row.aggregate_id = scheduled.aggregate_id + AND (row.snapshot_scheduled_at IS NULL OR row.snapshot_scheduled_at < _reschedule_snapshot_scheduled_before) + RETURNING row.aggregate_id; +END; +$$; + +CREATE OR REPLACE PROCEDURE permanently_delete_commands_without_events(_aggregate_id uuid, _organization_id uuid) +LANGUAGE plpgsql AS $$ +BEGIN + IF _aggregate_id IS NULL AND _organization_id IS NULL THEN + RAISE EXCEPTION 'aggregate_id or organization_id must be specified to delete commands'; + END IF; + + DELETE FROM commands + WHERE (_aggregate_id IS NULL OR aggregate_id = _aggregate_id) + AND NOT EXISTS (SELECT 1 FROM events WHERE command_id = commands.id); +END; +$$; + +CREATE OR REPLACE PROCEDURE permanently_delete_event_streams(_aggregate_ids jsonb) +LANGUAGE plpgsql AS $$ +BEGIN + DELETE FROM events + USING jsonb_array_elements_text(_aggregate_ids) AS ids (id) + JOIN aggregates ON ids.id::uuid = aggregates.aggregate_id + WHERE events.partition_key = aggregates.events_partition_key + AND events.aggregate_id = aggregates.aggregate_id; + DELETE FROM aggregates + USING jsonb_array_elements_text(_aggregate_ids) AS ids (id) + WHERE aggregates.aggregate_id = ids.id::uuid; +END; +$$; + +DROP VIEW IF EXISTS command_records; +CREATE VIEW command_records (id, user_id, aggregate_id, command_type, command_json, created_at, event_aggregate_id, event_sequence_number) AS + SELECT id, + user_id, + aggregate_id, + (SELECT type FROM command_types WHERE command_types.id = command.command_type_id), + enrich_command_json(command), + created_at, + event_aggregate_id, + event_sequence_number + FROM commands command; + +DROP VIEW IF EXISTS event_records; +CREATE VIEW event_records (aggregate_id, partition_key, sequence_number, created_at, event_type, event_json, command_record_id, xact_id) AS + SELECT aggregate.aggregate_id, + event.partition_key, + event.sequence_number, + event.created_at, + type.type, + enrich_event_json(event) AS event_json, + command_id, + event.xact_id + FROM events event + JOIN aggregates aggregate ON aggregate.aggregate_id = event.aggregate_id AND aggregate.events_partition_key = event.partition_key + JOIN event_types type ON event.event_type_id = type.id; + +DROP VIEW IF EXISTS stream_records; +CREATE VIEW stream_records (aggregate_id, events_partition_key, aggregate_type, created_at) AS + SELECT aggregates.aggregate_id, + aggregates.events_partition_key, + aggregate_types.type, + aggregates.created_at + FROM aggregates JOIN aggregate_types ON aggregates.aggregate_type_id = aggregate_types.id; + +CREATE OR REPLACE FUNCTION save_events_on_delete_trigger() RETURNS TRIGGER AS $$ +BEGIN + INSERT INTO saved_event_records (operation, timestamp, "user", aggregate_id, partition_key, sequence_number, created_at, event_type, event_json, command_id, xact_id) + SELECT 'D', + statement_timestamp(), + user, + o.aggregate_id, + o.partition_key, + o.sequence_number, + o.created_at, + (SELECT type FROM event_types WHERE event_types.id = o.event_type_id), + o.event_json, + o.command_id, + o.xact_id + FROM old_table o; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION save_events_on_update_trigger() RETURNS TRIGGER AS $$ +BEGIN + INSERT INTO saved_event_records (operation, timestamp, "user", aggregate_id, partition_key, sequence_number, created_at, event_type, event_json, command_id, xact_id) + SELECT 'U', + statement_timestamp(), + user, + o.aggregate_id, + o.partition_key, + o.sequence_number, + o.created_at, + (SELECT type FROM event_types WHERE event_types.id = o.event_type_id), + o.event_json, + o.command_id, + o.xact_id + FROM old_table o LEFT JOIN new_table n ON o.aggregate_id = n.aggregate_id AND o.sequence_number = n.sequence_number + WHERE n IS NULL + -- Only save when event related information changes + OR o.created_at <> n.created_at + OR o.event_type_id <> n.event_type_id + OR o.event_json <> n.event_json; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER save_events_on_delete_trigger + AFTER DELETE ON events + REFERENCING OLD TABLE AS old_table + FOR EACH STATEMENT EXECUTE FUNCTION save_events_on_delete_trigger(); +CREATE OR REPLACE TRIGGER save_events_on_update_trigger + AFTER UPDATE ON events + REFERENCING OLD TABLE AS old_table NEW TABLE AS new_table + FOR EACH STATEMENT EXECUTE FUNCTION save_events_on_update_trigger(); diff --git a/rails-blog/db/sequent_schema.rb b/rails-blog/db/sequent_schema.rb index 38d82b3..b4e63da 100644 --- a/rails-blog/db/sequent_schema.rb +++ b/rails-blog/db/sequent_schema.rb @@ -1,60 +1,14 @@ -ActiveRecord::Schema.define do - - create_table "event_records", :force => true do |t| - t.uuid "aggregate_id", :null => false - t.integer "sequence_number", :null => false - t.datetime "created_at", :null => false - t.string "event_type", :null => false - t.text "event_json", :null => false - t.integer "command_record_id", :null => false - t.integer "stream_record_id", :null => false - t.bigint "xact_id" - end +# frozen_string_literal: true - execute %Q{ -ALTER TABLE event_records ALTER COLUMN xact_id SET DEFAULT pg_current_xact_id()::text::bigint -} - execute %Q{ -CREATE UNIQUE INDEX unique_event_per_aggregate ON event_records ( - aggregate_id, - sequence_number, - (CASE event_type WHEN 'Sequent::Core::SnapshotEvent' THEN 0 ELSE 1 END) -) -} - execute %Q{ -CREATE INDEX snapshot_events ON event_records (aggregate_id, sequence_number DESC) WHERE event_type = 'Sequent::Core::SnapshotEvent' -} - - add_index "event_records", ["command_record_id"], :name => "index_event_records_on_command_record_id" - add_index "event_records", ["event_type"], :name => "index_event_records_on_event_type" - add_index "event_records", ["created_at"], :name => "index_event_records_on_created_at" - add_index "event_records", ["xact_id"], :name => "index_event_records_on_xact_id" - - create_table "command_records", :force => true do |t| - t.string "user_id" - t.uuid "aggregate_id" - t.string "command_type", :null => false - t.string "event_aggregate_id" - t.integer "event_sequence_number" - t.text "command_json", :null => false - t.datetime "created_at", :null => false - end - - add_index "command_records", ["event_aggregate_id", 'event_sequence_number'], :name => "index_command_records_on_event" - - create_table "stream_records", :force => true do |t| - t.datetime "created_at", :null => false - t.string "aggregate_type", :null => false - t.uuid "aggregate_id", :null => false - t.integer "snapshot_threshold" +ActiveRecord::Schema.define do + say_with_time 'Installing Sequent schema' do + say 'Creating tables', true + suppress_messages { execute File.read("#{File.dirname(__FILE__)}/sequent_schema_tables.sql") } + say 'Creating table partitions', true + suppress_messages { execute File.read("#{File.dirname(__FILE__)}/sequent_schema_partitions.sql") } + say 'Creating constraints and indexes', true + suppress_messages { execute File.read("#{File.dirname(__FILE__)}/sequent_schema_indexes.sql") } + say 'Creating stored procedures and views', true + suppress_messages { execute File.read("#{File.dirname(__FILE__)}/sequent_pgsql.sql") } end - - add_index "stream_records", ["aggregate_id"], :name => "index_stream_records_on_aggregate_id", :unique => true - execute %q{ -ALTER TABLE event_records ADD CONSTRAINT command_fkey FOREIGN KEY (command_record_id) REFERENCES command_records (id) -} - execute %q{ -ALTER TABLE event_records ADD CONSTRAINT stream_fkey FOREIGN KEY (stream_record_id) REFERENCES stream_records (id) -} - end diff --git a/rails-blog/db/sequent_schema_indexes.sql b/rails-blog/db/sequent_schema_indexes.sql new file mode 100644 index 0000000..fe0a1e9 --- /dev/null +++ b/rails-blog/db/sequent_schema_indexes.sql @@ -0,0 +1,37 @@ +ALTER TABLE aggregates ADD PRIMARY KEY (aggregate_id); +ALTER TABLE aggregates ADD UNIQUE (events_partition_key, aggregate_id); +CREATE INDEX aggregates_aggregate_type_id_idx ON aggregates (aggregate_type_id); + +ALTER TABLE commands ADD PRIMARY KEY (id); +CREATE INDEX commands_command_type_id_idx ON commands (command_type_id); +CREATE INDEX commands_aggregate_id_idx ON commands (aggregate_id); +CREATE INDEX commands_event_idx ON commands (event_aggregate_id, event_sequence_number); + +ALTER TABLE events ADD PRIMARY KEY (partition_key, aggregate_id, sequence_number); +CREATE INDEX events_command_id_idx ON events (command_id); +CREATE INDEX events_event_type_id_idx ON events (event_type_id); + +ALTER TABLE aggregates + ADD FOREIGN KEY (aggregate_type_id) REFERENCES aggregate_types (id) ON UPDATE CASCADE; + +ALTER TABLE events + ADD FOREIGN KEY (partition_key, aggregate_id) REFERENCES aggregates (events_partition_key, aggregate_id) + ON UPDATE CASCADE ON DELETE RESTRICT; +ALTER TABLE events + ADD FOREIGN KEY (command_id) REFERENCES commands (id) ON UPDATE RESTRICT ON DELETE RESTRICT; +ALTER TABLE events + ADD FOREIGN KEY (event_type_id) REFERENCES event_types (id) ON UPDATE CASCADE; +ALTER TABLE events ALTER COLUMN xact_id SET DEFAULT pg_current_xact_id()::text::bigint; + +ALTER TABLE commands + ADD FOREIGN KEY (command_type_id) REFERENCES command_types (id) ON UPDATE CASCADE; + +ALTER TABLE aggregates_that_need_snapshots + ADD FOREIGN KEY (aggregate_id) REFERENCES aggregates (aggregate_id) ON UPDATE CASCADE ON DELETE CASCADE; + +CREATE INDEX aggregates_that_need_snapshots_outdated_idx + ON aggregates_that_need_snapshots (snapshot_outdated_at ASC, snapshot_sequence_number_high_water_mark DESC, aggregate_id ASC) + WHERE snapshot_outdated_at IS NOT NULL; + +ALTER TABLE snapshot_records + ADD FOREIGN KEY (aggregate_id) REFERENCES aggregates_that_need_snapshots (aggregate_id) ON UPDATE CASCADE ON DELETE CASCADE; diff --git a/rails-blog/db/sequent_schema_partitions.sql b/rails-blog/db/sequent_schema_partitions.sql new file mode 100644 index 0000000..ea93241 --- /dev/null +++ b/rails-blog/db/sequent_schema_partitions.sql @@ -0,0 +1,34 @@ +-- ### Configure partitions as needed +CREATE TABLE commands_default PARTITION OF commands DEFAULT; +-- CREATE TABLE commands_0 PARTITION OF commands FOR VALUES FROM (1) TO (100e6); +-- CREATE TABLE commands_1 PARTITION OF commands FOR VALUES FROM (100e6) TO (200e6); +-- CREATE TABLE commands_2 PARTITION OF commands FOR VALUES FROM (200e6) TO (300e6); +-- CREATE TABLE commands_3 PARTITION OF commands FOR VALUES FROM (300e6) TO (400e6); + +-- ### Configure partitions as needed +CREATE TABLE aggregates_default PARTITION OF aggregates DEFAULT; +-- CREATE TABLE aggregates_0 PARTITION OF aggregates FOR VALUES FROM (MINVALUE) TO ('10000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_1 PARTITION OF aggregates FOR VALUES FROM ('10000000-0000-0000-0000-000000000000') TO ('20000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_2 PARTITION OF aggregates FOR VALUES FROM ('20000000-0000-0000-0000-000000000000') TO ('30000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_3 PARTITION OF aggregates FOR VALUES FROM ('30000000-0000-0000-0000-000000000000') TO ('40000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_4 PARTITION OF aggregates FOR VALUES FROM ('40000000-0000-0000-0000-000000000000') TO ('50000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_5 PARTITION OF aggregates FOR VALUES FROM ('50000000-0000-0000-0000-000000000000') TO ('60000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_6 PARTITION OF aggregates FOR VALUES FROM ('60000000-0000-0000-0000-000000000000') TO ('70000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_7 PARTITION OF aggregates FOR VALUES FROM ('70000000-0000-0000-0000-000000000000') TO ('80000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_8 PARTITION OF aggregates FOR VALUES FROM ('80000000-0000-0000-0000-000000000000') TO ('90000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_9 PARTITION OF aggregates FOR VALUES FROM ('90000000-0000-0000-0000-000000000000') TO ('a0000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_a PARTITION OF aggregates FOR VALUES FROM ('a0000000-0000-0000-0000-000000000000') TO ('b0000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_b PARTITION OF aggregates FOR VALUES FROM ('b0000000-0000-0000-0000-000000000000') TO ('c0000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_c PARTITION OF aggregates FOR VALUES FROM ('c0000000-0000-0000-0000-000000000000') TO ('d0000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_d PARTITION OF aggregates FOR VALUES FROM ('d0000000-0000-0000-0000-000000000000') TO ('e0000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_e PARTITION OF aggregates FOR VALUES FROM ('e0000000-0000-0000-0000-000000000000') TO ('f0000000-0000-0000-0000-000000000000'); +-- CREATE TABLE aggregates_f PARTITION OF aggregates FOR VALUES FROM ('f0000000-0000-0000-0000-000000000000') TO (MAXVALUE); + +-- ### Configure partitions as needed +CREATE TABLE events_default PARTITION OF events DEFAULT; +-- CREATE TABLE events_2023_and_earlier PARTITION OF events FOR VALUES FROM ('Y00') TO ('Y24'); +-- CREATE TABLE events_2024 PARTITION OF events FOR VALUES FROM ('Y24') TO ('Y25'); +-- CREATE TABLE events_2025 PARTITION OF events FOR VALUES FROM ('Y25') TO ('Y26'); +-- CREATE TABLE events_2026 PARTITION OF events FOR VALUES FROM ('Y26') TO ('Y27'); +-- CREATE TABLE events_2027_and_later PARTITION OF events FOR VALUES FROM ('Y27') TO ('Y99'); +-- CREATE TABLE events_aggregate PARTITION OF events FOR VALUES FROM ('A') TO ('Ag'); diff --git a/rails-blog/db/sequent_schema_tables.sql b/rails-blog/db/sequent_schema_tables.sql new file mode 100644 index 0000000..4a498f4 --- /dev/null +++ b/rails-blog/db/sequent_schema_tables.sql @@ -0,0 +1,74 @@ +CREATE TABLE command_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL); +CREATE TABLE aggregate_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL); +CREATE TABLE event_types (id SMALLINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, type text UNIQUE NOT NULL); + +CREATE SEQUENCE IF NOT EXISTS commands_id_seq; + +CREATE TABLE commands ( + id bigint NOT NULL DEFAULT nextval('commands_id_seq'), + created_at timestamp with time zone NOT NULL, + user_id uuid, + aggregate_id uuid, + command_type_id SMALLINT NOT NULL, + command_json jsonb NOT NULL, + event_aggregate_id uuid, + event_sequence_number integer +) PARTITION BY RANGE (id); + +ALTER SEQUENCE commands_id_seq OWNED BY commands.id; + +CREATE TABLE aggregates ( + aggregate_id uuid NOT NULL, + events_partition_key text NOT NULL DEFAULT '', + aggregate_type_id SMALLINT NOT NULL, + snapshot_threshold integer, + created_at timestamp with time zone NOT NULL DEFAULT NOW() +) PARTITION BY RANGE (aggregate_id); + +CREATE TABLE events ( + aggregate_id uuid NOT NULL, + partition_key text NOT NULL DEFAULT '', + sequence_number integer NOT NULL, + created_at timestamp with time zone NOT NULL, + command_id bigint NOT NULL, + event_type_id SMALLINT NOT NULL, + event_json jsonb NOT NULL, + xact_id bigint +) PARTITION BY RANGE (partition_key); + +CREATE TABLE aggregates_that_need_snapshots ( + aggregate_id uuid NOT NULL PRIMARY KEY, + snapshot_sequence_number_high_water_mark integer, + snapshot_outdated_at timestamp with time zone, + snapshot_scheduled_at timestamp with time zone +); + +COMMENT ON TABLE aggregates_that_need_snapshots IS 'Contains a row for every aggregate with more events than its snapshot threshold.'; +COMMENT ON COLUMN aggregates_that_need_snapshots.snapshot_sequence_number_high_water_mark + IS 'The highest sequence number of the stored snapshot. Kept when snapshot are deleted to more easily query aggregates that need snapshotting the most'; +COMMENT ON COLUMN aggregates_that_need_snapshots.snapshot_outdated_at IS 'Not NULL indicates a snapshot is needed since the stored timestamp'; +COMMENT ON COLUMN aggregates_that_need_snapshots.snapshot_scheduled_at IS 'Not NULL indicates a snapshot is in the process of being taken'; + +CREATE TABLE snapshot_records ( + aggregate_id uuid NOT NULL, + sequence_number integer NOT NULL, + created_at timestamptz NOT NULL, + snapshot_type text NOT NULL, + snapshot_json jsonb NOT NULL, + PRIMARY KEY (aggregate_id, sequence_number) +); + +CREATE TABLE saved_event_records ( + operation varchar(1) NOT NULL CHECK (operation IN ('U', 'D')), + timestamp timestamptz NOT NULL, + "user" text NOT NULL, + aggregate_id uuid NOT NULL, + partition_key text DEFAULT '', + sequence_number integer NOT NULL, + created_at timestamp with time zone NOT NULL, + command_id bigint NOT NULL, + event_type text NOT NULL, + event_json jsonb NOT NULL, + xact_id bigint, + PRIMARY KEY (aggregate_id, sequence_number, timestamp) +);