From c335dcaa2408468ab9d1b8bfc68d92208a2183e1 Mon Sep 17 00:00:00 2001 From: Aaron Jensen Date: Sun, 15 Jan 2023 17:15:49 -0500 Subject: [PATCH 1/4] MessageStore, rather than MessageStore::Postgres --- init.rb | 2 +- lib/message_store.rb | 40 ++++ lib/message_store/controls.rb | 15 ++ lib/message_store/controls/category.rb | 25 +++ lib/message_store/controls/id.rb | 11 ++ lib/message_store/controls/message_data.rb | 21 +++ .../controls/message_data/metadata.rb | 24 +++ .../controls/message_data/write.rb | 74 ++++++++ lib/message_store/controls/position.rb | 13 ++ lib/message_store/controls/put.rb | 26 +++ lib/message_store/controls/random_value.rb | 9 + lib/message_store/controls/stream_name.rb | 35 ++++ lib/message_store/controls/time.rb | 62 ++++++ lib/message_store/expected_version.rb | 11 ++ lib/message_store/get.rb | 167 +++++++++++++++++ lib/message_store/get/category.rb | 79 ++++++++ .../get/category/consumer_group.rb | 18 ++ lib/message_store/get/category/correlation.rb | 15 ++ lib/message_store/get/condition.rb | 13 ++ lib/message_store/get/stream.rb | 65 +++++++ lib/message_store/get/stream/last.rb | 108 +++++++++++ lib/message_store/id.rb | 41 ++++ lib/message_store/log.rb | 7 + lib/message_store/message_data.rb | 18 ++ .../message_data/hash/transform.rb | 31 +++ lib/message_store/message_data/read.rb | 12 ++ lib/message_store/message_data/write.rb | 7 + lib/message_store/no_stream.rb | 11 ++ lib/message_store/postgres.rb | 24 --- lib/message_store/postgres/controls.rb | 8 - .../postgres/controls/category.rb | 7 - .../postgres/controls/message_data.rb | 31 --- .../postgres/controls/position.rb | 15 -- lib/message_store/postgres/controls/put.rb | 28 --- .../postgres/controls/stream_name.rb | 7 - lib/message_store/postgres/controls/time.rb | 7 - lib/message_store/postgres/get.rb | 165 ---------------- lib/message_store/postgres/get/category.rb | 81 -------- .../postgres/get/category/consumer_group.rb | 20 -- .../postgres/get/category/correlation.rb | 17 -- lib/message_store/postgres/get/condition.rb | 15 -- lib/message_store/postgres/get/stream.rb | 67 ------- lib/message_store/postgres/get/stream/last.rb | 89 --------- lib/message_store/postgres/log.rb | 9 - lib/message_store/postgres/put.rb | 146 --------------- lib/message_store/postgres/read.rb | 17 -- lib/message_store/postgres/session.rb | 176 ------------------ lib/message_store/postgres/settings.rb | 41 ---- lib/message_store/postgres/write.rb | 52 ------ lib/message_store/put.rb | 144 ++++++++++++++ lib/message_store/read.rb | 75 ++++++++ lib/message_store/read/iterator.rb | 166 +++++++++++++++++ lib/message_store/session.rb | 174 +++++++++++++++++ lib/message_store/settings.rb | 39 ++++ lib/message_store/stream_name.rb | 114 ++++++++++++ lib/message_store/write.rb | 101 ++++++++++ message_store-postgres.gemspec | 7 +- .../specialized/condition/condition.rb | 2 +- .../condition/not_activated_error.rb | 2 +- .../specialized/consumer_groups/error.rb | 10 +- .../category/specialized/correlation/error.rb | 2 +- .../stream/specialized/condition/condition.rb | 2 +- .../specialized/condition/not_activated.rb | 2 +- test/automated/read/condition.rb | 2 +- test/automated/read/default_batch_size.rb | 4 +- test/automated/session/build.rb | 4 +- test/automated/session/configure.rb | 2 +- test/automated/session/settings.rb | 4 +- test/automated/settings.rb | 4 +- test/automated/settings/default_path.rb | 2 +- .../concurrency/write_to_single_stream.rb | 6 +- test/test_init.rb | 3 +- tools/write_message.rb | 3 +- 73 files changed, 1804 insertions(+), 1052 deletions(-) create mode 100644 lib/message_store.rb create mode 100644 lib/message_store/controls.rb create mode 100644 lib/message_store/controls/category.rb create mode 100644 lib/message_store/controls/id.rb create mode 100644 lib/message_store/controls/message_data.rb create mode 100644 lib/message_store/controls/message_data/metadata.rb create mode 100644 lib/message_store/controls/message_data/write.rb create mode 100644 lib/message_store/controls/position.rb create mode 100644 lib/message_store/controls/put.rb create mode 100644 lib/message_store/controls/random_value.rb create mode 100644 lib/message_store/controls/stream_name.rb create mode 100644 lib/message_store/controls/time.rb create mode 100644 lib/message_store/expected_version.rb create mode 100644 lib/message_store/get.rb create mode 100644 lib/message_store/get/category.rb create mode 100644 lib/message_store/get/category/consumer_group.rb create mode 100644 lib/message_store/get/category/correlation.rb create mode 100644 lib/message_store/get/condition.rb create mode 100644 lib/message_store/get/stream.rb create mode 100644 lib/message_store/get/stream/last.rb create mode 100644 lib/message_store/id.rb create mode 100644 lib/message_store/log.rb create mode 100644 lib/message_store/message_data.rb create mode 100644 lib/message_store/message_data/hash/transform.rb create mode 100644 lib/message_store/message_data/read.rb create mode 100644 lib/message_store/message_data/write.rb create mode 100644 lib/message_store/no_stream.rb delete mode 100644 lib/message_store/postgres.rb delete mode 100644 lib/message_store/postgres/controls.rb delete mode 100644 lib/message_store/postgres/controls/category.rb delete mode 100644 lib/message_store/postgres/controls/message_data.rb delete mode 100644 lib/message_store/postgres/controls/position.rb delete mode 100644 lib/message_store/postgres/controls/put.rb delete mode 100644 lib/message_store/postgres/controls/stream_name.rb delete mode 100644 lib/message_store/postgres/controls/time.rb delete mode 100644 lib/message_store/postgres/get.rb delete mode 100644 lib/message_store/postgres/get/category.rb delete mode 100644 lib/message_store/postgres/get/category/consumer_group.rb delete mode 100644 lib/message_store/postgres/get/category/correlation.rb delete mode 100644 lib/message_store/postgres/get/condition.rb delete mode 100644 lib/message_store/postgres/get/stream.rb delete mode 100644 lib/message_store/postgres/get/stream/last.rb delete mode 100644 lib/message_store/postgres/log.rb delete mode 100644 lib/message_store/postgres/put.rb delete mode 100644 lib/message_store/postgres/read.rb delete mode 100644 lib/message_store/postgres/session.rb delete mode 100644 lib/message_store/postgres/settings.rb delete mode 100644 lib/message_store/postgres/write.rb create mode 100644 lib/message_store/put.rb create mode 100644 lib/message_store/read.rb create mode 100644 lib/message_store/read/iterator.rb create mode 100644 lib/message_store/session.rb create mode 100644 lib/message_store/settings.rb create mode 100644 lib/message_store/stream_name.rb create mode 100644 lib/message_store/write.rb diff --git a/init.rb b/init.rb index 9fbcbef..4813b6d 100644 --- a/init.rb +++ b/init.rb @@ -1,3 +1,3 @@ require_relative 'load_path' -require 'message_store/postgres' +require 'message_store' diff --git a/lib/message_store.rb b/lib/message_store.rb new file mode 100644 index 0000000..3ad9e5d --- /dev/null +++ b/lib/message_store.rb @@ -0,0 +1,40 @@ +require 'pg' + +require 'casing' +require 'identifier/uuid' +require 'schema' +require 'initializer' +require 'transform' +require 'virtual' +require 'async_invocation' + +require 'log' +require 'settings' + +require 'message_store/expected_version' +require 'message_store/id' +require 'message_store/no_stream' +require 'message_store/stream_name' + +require 'message_store/message_data' +require 'message_store/message_data/hash/transform' +require 'message_store/message_data/write' +require 'message_store/message_data/read' + +require 'message_store/log' + +require 'message_store/settings' +require 'message_store/session' + +require 'message_store/put' +require 'message_store/write' + +require 'message_store/get' +require 'message_store/get/condition' +require 'message_store/get/stream' +require 'message_store/get/stream/last' +require 'message_store/get/category' +require 'message_store/get/category/correlation' +require 'message_store/get/category/consumer_group' +require 'message_store/read/iterator' +require 'message_store/read' diff --git a/lib/message_store/controls.rb b/lib/message_store/controls.rb new file mode 100644 index 0000000..feed7e1 --- /dev/null +++ b/lib/message_store/controls.rb @@ -0,0 +1,15 @@ +require 'securerandom' + +require 'clock/controls' +require 'identifier/uuid/controls' + +require 'message_store/controls/random_value' +require 'message_store/controls/time' +require 'message_store/controls/id' +require 'message_store/controls/category' +require 'message_store/controls/stream_name' +require 'message_store/controls/position' +require 'message_store/controls/message_data' +require 'message_store/controls/message_data/metadata' +require 'message_store/controls/message_data/write' +require 'message_store/controls/put' diff --git a/lib/message_store/controls/category.rb b/lib/message_store/controls/category.rb new file mode 100644 index 0000000..d673878 --- /dev/null +++ b/lib/message_store/controls/category.rb @@ -0,0 +1,25 @@ +module MessageStore + module Controls + module Category + def self.example(category: nil, type: nil, types: nil, randomize_category: nil) + if randomize_category.nil? + if !category.nil? + randomize_category = false + end + end + + randomize_category = true if randomize_category.nil? + + category ||= 'test' + + if randomize_category + category = "#{category}#{SecureRandom.hex(16)}XX" + end + + category = Controls::StreamName.stream_name(category, type: type, types: types) + + category + end + end + end +end diff --git a/lib/message_store/controls/id.rb b/lib/message_store/controls/id.rb new file mode 100644 index 0000000..255bd3d --- /dev/null +++ b/lib/message_store/controls/id.rb @@ -0,0 +1,11 @@ +module MessageStore + module Controls + module ID + def self.example(i=nil, increment: nil, sample: nil) + Identifier::UUID::Controls::Incrementing.example(i=nil, increment: nil, sample: nil) + end + + Random = Identifier::UUID::Controls::Random + end + end +end diff --git a/lib/message_store/controls/message_data.rb b/lib/message_store/controls/message_data.rb new file mode 100644 index 0000000..3d81fa4 --- /dev/null +++ b/lib/message_store/controls/message_data.rb @@ -0,0 +1,21 @@ +module MessageStore + module Controls + module MessageData + def self.id + ID::Random.example + end + + def self.type + 'SomeType' + end + + def self.other_type + 'SomeOtherType' + end + + def self.data + { :attribute => RandomValue.example } + end + end + end +end diff --git a/lib/message_store/controls/message_data/metadata.rb b/lib/message_store/controls/message_data/metadata.rb new file mode 100644 index 0000000..18c3c5f --- /dev/null +++ b/lib/message_store/controls/message_data/metadata.rb @@ -0,0 +1,24 @@ +module MessageStore + module Controls + module MessageData + module Metadata + def self.data + { + meta_attribute: RandomValue.example + } + end + + module JSON + def self.data(id=nil) + data = Metadata.data + Casing::Camel.(data, symbol_to_string: true) + end + + def self.text + data.to_json + end + end + end + end + end +end diff --git a/lib/message_store/controls/message_data/write.rb b/lib/message_store/controls/message_data/write.rb new file mode 100644 index 0000000..5bb3c86 --- /dev/null +++ b/lib/message_store/controls/message_data/write.rb @@ -0,0 +1,74 @@ +module MessageStore + module Controls + module MessageData + module Write + def self.example(id: nil, type: nil, data: nil, metadata: nil) + if id == :none + id = nil + else + id ||= self.id + end + + type ||= self.type + + if data == :none + data = nil + else + data ||= self.data + end + + if metadata == :none + metadata = nil + else + metadata ||= self.metadata + end + + message_data = MessageStore::MessageData::Write.build + + message_data.id = id + message_data.type = type + message_data.data = data + message_data.metadata = metadata + + message_data + end + + def self.id + MessageData.id + end + + def self.type + MessageData.type + end + + def self.data + MessageData.data + end + + def self.metadata + MessageData::Metadata.data + end + + module List + Entry = Struct.new(:stream_name, :category, :message_data) + + def self.get(instances: nil, stream_name: nil, category: nil) + instances ||= 1 + + list = [] + instances.times do + instance_stream_name = stream_name || StreamName.example(category: category) + instance_category = MessageStore::StreamName.get_category(instance_stream_name) + + write_message = Controls::MessageData::Write.example + + list << Entry.new(instance_stream_name, instance_category, write_message) + end + + list + end + end + end + end + end +end diff --git a/lib/message_store/controls/position.rb b/lib/message_store/controls/position.rb new file mode 100644 index 0000000..9ddde03 --- /dev/null +++ b/lib/message_store/controls/position.rb @@ -0,0 +1,13 @@ +module MessageStore + module Controls + module Position + def self.example + 1 + end + + def self.max + (2 ** 63) - 1 + end + end + end +end diff --git a/lib/message_store/controls/put.rb b/lib/message_store/controls/put.rb new file mode 100644 index 0000000..348f9a7 --- /dev/null +++ b/lib/message_store/controls/put.rb @@ -0,0 +1,26 @@ +module MessageStore + module Controls + module Put + def self.call(instances: nil, stream_name: nil, message_data: nil, message: nil, category: nil, type: nil) + instances ||= 1 + stream_name ||= StreamName.example(category: category) + message_data ||= message + + message_specified = !message_data.nil? + + message_data ||= MessageData::Write.example(type: type) + + position = nil + instances.times do + position = MessageStore::Put.(message_data, stream_name) + + unless message_specified + message_data.id = MessageData::Write.id + end + end + + [stream_name, position] + end + end + end +end diff --git a/lib/message_store/controls/random_value.rb b/lib/message_store/controls/random_value.rb new file mode 100644 index 0000000..b1d2f2e --- /dev/null +++ b/lib/message_store/controls/random_value.rb @@ -0,0 +1,9 @@ +module MessageStore + module Controls + module RandomValue + def self.example + SecureRandom.hex + end + end + end +end diff --git a/lib/message_store/controls/stream_name.rb b/lib/message_store/controls/stream_name.rb new file mode 100644 index 0000000..e2ba910 --- /dev/null +++ b/lib/message_store/controls/stream_name.rb @@ -0,0 +1,35 @@ +module MessageStore + module Controls + module StreamName + def self.example(category: nil, id: nil, type: nil, types: nil, randomize_category: nil) + if id == :none + id = nil + else + id ||= Identifier::UUID.random + end + + category = Category.example(category: category, randomize_category: randomize_category) + + stream_name(category, id, type: type, types: types) + end + + def self.stream_name(category, id=nil, type: nil, types: nil) + types = Array(types) + types.unshift(type) unless type.nil? + + type_list = nil + type_list = types.join('+') unless types.empty? + + stream_name = category + stream_name = "#{stream_name}:#{type_list}" unless type_list.nil? + + if not id.nil? + composed_id = MessageStore::ID.id(id) + stream_name = "#{stream_name}-#{composed_id}" + end + + stream_name + end + end + end +end diff --git a/lib/message_store/controls/time.rb b/lib/message_store/controls/time.rb new file mode 100644 index 0000000..385b585 --- /dev/null +++ b/lib/message_store/controls/time.rb @@ -0,0 +1,62 @@ +module MessageStore + module Controls + module Time + def self.example(time=nil) + time ||= Raw.example + ISO8601.example(time) + end + + module Raw + def self.example + Clock::Controls::Time::Raw.example + end + end + + module ISO8601 + def self.example(time=nil) + Clock::Controls::Time::ISO8601.example(time) + end + + def self.precision + 3 + end + end + + module Processed + def self.example(time=nil, offset_milliseconds: nil) + offset_milliseconds ||= self.offset_milliseconds + Clock::Controls::Time::Offset.example(offset_milliseconds, time: time, precision: ISO8601.precision) + end + + module Raw + def self.example(time=nil, offset_milliseconds: nil) + offset_milliseconds ||= Processed.offset_milliseconds + Clock::Controls::Time::Offset::Raw.example(offset_milliseconds, time: time, precision: ISO8601.precision) + end + end + + def self.offset_milliseconds + 11 + end + end + + module Effective + def self.example(time=nil, offset_milliseconds: nil) + offset_milliseconds ||= self.offset_milliseconds + Clock::Controls::Time::Offset.example(offset_milliseconds, time: time, precision: ISO8601.precision) + end + + module Raw + def self.example(time=nil, offset_milliseconds: nil) + offset_milliseconds ||= Effective.offset_milliseconds + Clock::Controls::Time::Offset::Raw.example(offset_milliseconds, time: time, precision: ISO8601.precision) + end + end + + def self.offset_milliseconds + 1 + end + end + end + end +end diff --git a/lib/message_store/expected_version.rb b/lib/message_store/expected_version.rb new file mode 100644 index 0000000..c23be1b --- /dev/null +++ b/lib/message_store/expected_version.rb @@ -0,0 +1,11 @@ +module MessageStore + module ExpectedVersion + Error = Class.new(RuntimeError) + + def self.canonize(expected_version) + return nil if expected_version.nil? + return expected_version unless expected_version == NoStream.name + NoStream.version + end + end +end diff --git a/lib/message_store/get.rb b/lib/message_store/get.rb new file mode 100644 index 0000000..b061e10 --- /dev/null +++ b/lib/message_store/get.rb @@ -0,0 +1,167 @@ +module MessageStore + module Get + def self.included(cls) + cls.class_exec do + include Dependency + include Initializer + include Virtual + include Log::Dependency + + prepend Call + prepend BatchSize + + dependency :session, Session + + abstract :call + abstract :stream_name + abstract :sql_command + abstract :parameters + abstract :parameter_values + abstract :last_position + abstract :log_text + + virtual :specialize_error + virtual :assure + end + end + + module BatchSize + def batch_size + @batch_size ||= Defaults.batch_size + end + end + + def self.build(stream_name, **args) + cls = specialization(stream_name) + cls.build(stream_name, **args) + end + + def self.configure(receiver, stream_name, **args) + attr_name = args.delete(:attr_name) + attr_name ||= :get + + instance = build(stream_name, **args) + receiver.public_send("#{attr_name}=", instance) + end + + def configure(session: nil) + Session.configure(self, session: session) + end + + def self.call(stream_name, **args) + position = args.delete(:position) + instance = build(stream_name, **args) + instance.(position) + end + + module Call + def call(position=nil, stream_name: nil) + position ||= self.class::Defaults.position + + stream_name ||= self.stream_name + + assure + + logger.trace(tag: :get) { "Getting message data (#{log_text(stream_name, position)})" } + + result = get_result(stream_name, position) + + message_data = convert(result) + + logger.info(tag: :get) { "Finished getting message data (Count: #{message_data.length}, #{log_text(stream_name, position)})" } + logger.info(tags: [:data, :message_data]) { message_data.pretty_inspect } + + message_data + end + end + + def get_result(stream_name, position) + logger.trace(tag: :get) { "Getting result (#{log_text(stream_name, position)})" } + + parameter_values = parameter_values(stream_name, position) + + begin + result = session.execute(sql_command, parameter_values) + rescue PG::RaiseException => e + raise_error(e) + end + + logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, #{log_text(stream_name, position)})" } + + result + end + + def convert(result) + logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" } + + message_data = result.map do |record| + Get.message_data(record) + end + + logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{message_data.length})" } + + message_data + end + + def self.message_data(record) + record['data'] = Get::Deserialize.data(record['data']) + record['metadata'] = Get::Deserialize.metadata(record['metadata']) + record['time'] = Get::Time.utc_coerced(record['time']) + + MessageData::Read.build(record) + end + + def raise_error(pg_error) + error_message = Get.error_message(pg_error) + + error = Condition.error(error_message) + + if error.nil? + error = specialize_error(error_message) + end + + if not error.nil? + logger.error { error_message } + raise error + end + + raise pg_error + end + + def self.error_message(pg_error) + pg_error.message.gsub('ERROR:', '').strip + end + + def self.specialization(stream_name) + if StreamName.category?(stream_name) + Category + else + Stream + end + end + + module Deserialize + def self.data(serialized_data) + return nil if serialized_data.nil? + Transform::Read.(serialized_data, :json, MessageData::Hash) + end + + def self.metadata(serialized_metadata) + return nil if serialized_metadata.nil? + Transform::Read.(serialized_metadata, :json, MessageData::Hash) + end + end + + module Time + def self.utc_coerced(local_time) + Clock::UTC.coerce(local_time) + end + end + + module Defaults + def self.batch_size + 1000 + end + end + end +end diff --git a/lib/message_store/get/category.rb b/lib/message_store/get/category.rb new file mode 100644 index 0000000..7581bfe --- /dev/null +++ b/lib/message_store/get/category.rb @@ -0,0 +1,79 @@ +module MessageStore + module Get + class Category + Error = Class.new(RuntimeError) + + include Get + + initializer :category, na(:batch_size), :correlation, :consumer_group_member, :consumer_group_size, :condition + alias :stream_name :category + + def self.call(category, position: nil, batch_size: nil, correlation: nil, consumer_group_member: nil, consumer_group_size: nil, condition: nil, session: nil) + instance = build(category, batch_size: batch_size, correlation: correlation, consumer_group_member: consumer_group_member, consumer_group_size: consumer_group_size, condition: condition, session: session) + instance.(position) + end + + def self.build(category, batch_size: nil, correlation: nil, consumer_group_member: nil, consumer_group_size: nil, condition: nil, session: nil) + instance = new(category, batch_size, correlation, consumer_group_member, consumer_group_size, condition) + instance.configure(session: session) + instance + end + + def self.configure(receiver, category, attr_name: nil, batch_size: nil, correlation: nil, consumer_group_member: nil, consumer_group_size: nil, condition: nil, session: nil) + attr_name ||= :get + instance = build(category, batch_size: batch_size, correlation: correlation, consumer_group_member: consumer_group_member, consumer_group_size: consumer_group_size, condition: condition, session: session) + receiver.public_send("#{attr_name}=", instance) + end + + def sql_command + "SELECT * FROM get_category_messages(#{parameters});" + end + + def parameters + '$1::varchar, $2::bigint, $3::bigint, $4::varchar, $5::bigint, $6::bigint, $7::varchar' + end + + def parameter_values(category, position) + [ + category, + position, + batch_size, + correlation, + consumer_group_member, + consumer_group_size, + condition + ] + end + + def last_position(batch) + batch.last.global_position + end + + def specialize_error(error_message) + error = Correlation.error(error_message) + + if error.nil? + error = ConsumerGroup.error(error_message) + end + + error + end + + def log_text(category, position) + "Category: #{category}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Correlation: #{correlation.inspect}, Consumer Group Member: #{consumer_group_member.inspect}, Consumer Group Size: #{consumer_group_size.inspect}, Condition: #{condition.inspect})" + end + + def assure + if not MessageStore::StreamName.category?(category) + raise Error, "Must be a category (Stream Name: #{category})" + end + end + + module Defaults + def self.position + 1 + end + end + end + end +end diff --git a/lib/message_store/get/category/consumer_group.rb b/lib/message_store/get/category/consumer_group.rb new file mode 100644 index 0000000..6b2c4e3 --- /dev/null +++ b/lib/message_store/get/category/consumer_group.rb @@ -0,0 +1,18 @@ +module MessageStore + module Get + class Category + module ConsumerGroup + Error = Class.new(RuntimeError) + + def self.error(error_message) + if error_message.start_with?('Consumer group size must not be less than 1') || + error_message.start_with?('Consumer group member must be less than the group size') || + error_message.start_with?('Consumer group member must not be less than 0') || + error_message.start_with?('Consumer group member and size must be specified') + Error.new(error_message) + end + end + end + end + end +end diff --git a/lib/message_store/get/category/correlation.rb b/lib/message_store/get/category/correlation.rb new file mode 100644 index 0000000..5211b3a --- /dev/null +++ b/lib/message_store/get/category/correlation.rb @@ -0,0 +1,15 @@ +module MessageStore + module Get + class Category + module Correlation + Error = Class.new(RuntimeError) + + def self.error(error_message) + if error_message.start_with?('Correlation must be a category') + Error.new(error_message) + end + end + end + end + end +end diff --git a/lib/message_store/get/condition.rb b/lib/message_store/get/condition.rb new file mode 100644 index 0000000..466a5fe --- /dev/null +++ b/lib/message_store/get/condition.rb @@ -0,0 +1,13 @@ +module MessageStore + module Get + module Condition + Error = Class.new(RuntimeError) + + def self.error(error_message) + if error_message.start_with?('Retrieval with SQL condition is not activated') + Get::Condition::Error.new(error_message) + end + end + end + end +end diff --git a/lib/message_store/get/stream.rb b/lib/message_store/get/stream.rb new file mode 100644 index 0000000..a84f762 --- /dev/null +++ b/lib/message_store/get/stream.rb @@ -0,0 +1,65 @@ +module MessageStore + module Get + class Stream + Error = Class.new(RuntimeError) + + include Get + + initializer :stream_name, na(:batch_size), :condition + + def self.call(stream_name, position: nil, batch_size: nil, condition: nil, session: nil) + instance = build(stream_name, batch_size: batch_size, condition: condition, session: session) + instance.(position) + end + + def self.build(stream_name, batch_size: nil, condition: nil, session: nil) + instance = new(stream_name, batch_size, condition) + instance.configure(session: session) + instance + end + + def self.configure(receiver, stream_name, attr_name: nil, batch_size: nil, condition: nil, session: nil) + attr_name ||= :get + instance = build(stream_name, batch_size: batch_size, condition: condition, session: session) + receiver.public_send("#{attr_name}=", instance) + end + + def sql_command + "SELECT * FROM get_stream_messages(#{parameters});" + end + + def parameters + '$1::varchar, $2::bigint, $3::bigint, $4::varchar' + end + + def parameter_values(stream_name, position) + [ + stream_name, + position, + batch_size, + condition + ] + end + + def last_position(batch) + batch.last.position + end + + def log_text(stream_name, position) + "Stream Name: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Condition: #{condition.inspect})" + end + + def assure + if MessageStore::StreamName.category?(stream_name) + raise Error, "Must be a stream name (Category: #{stream_name})" + end + end + + module Defaults + def self.position + 0 + end + end + end + end +end diff --git a/lib/message_store/get/stream/last.rb b/lib/message_store/get/stream/last.rb new file mode 100644 index 0000000..cf7b953 --- /dev/null +++ b/lib/message_store/get/stream/last.rb @@ -0,0 +1,108 @@ +module MessageStore + module Get + class Stream + class Last + include Dependency + include Virtual + include Log::Dependency + + dependency :session, Session + + def self.build(session: nil) + instance = new + instance.configure(session: session) + instance + end + + def self.call(stream_name, type=nil, session: nil) + instance = build(session: session) + instance.(stream_name, type) + end + + def self.configure(receiver, session: nil, attr_name: nil) + attr_name ||= :get_last + + instance = build(session: session) + receiver.public_send("#{attr_name}=", instance) + instance + end + + def configure(session: nil) + Session.configure(self, session: session) + end + + def call(stream_name, type=nil) + logger.trace(tag: :get) { "Getting last message data (Stream Name: #{stream_name})" } + + result = get_result(stream_name, type) + + return nil if result.nil? + + message_data = convert(result[0]) + + logger.info(tag: :get) { "Finished getting message data (Stream Name: #{stream_name})" } + logger.info(tags: [:data, :message_data]) { message_data.pretty_inspect } + + message_data + end + + def get_result(stream_name, type) + logger.trace(tag: :get) { "Getting last record (Stream: #{stream_name})" } + + parameter_values = parameter_values(stream_name, type) + sql_command = sql_command(type) + + result = session.execute(sql_command, parameter_values) + + logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, Stream: #{stream_name}" } + + return nil if result.ntuples == 0 + + result + end + + def sql_command(type) + parameters = parameters(type) + + "SELECT * FROM get_last_stream_message(#{parameters});" + end + + def parameters(type) + parameters = "$1::varchar" + + # Backwards compatibility with versions of message-db that do not + # support the type parameter - Aaron, Scott, Tue Jul 12 2022 + if not type.nil? + parameters << ", $2::varchar" + end + + parameters + end + + def parameter_values(stream_name, type) + parameter_values = [ + stream_name + ] + + # Backwards compatibility with versions of message-db that do not + # support the type parameter - Aaron, Scott, Tue Jul 12 2022 + if not type.nil? + parameter_values << type + end + + parameter_values + end + + def convert(record) + logger.trace(tag: :get) { "Converting record to message data" } + + message_data = Get.message_data(record) + + logger.debug(tag: :get) { "Converted record to message data" } + + message_data + end + end + end + end +end diff --git a/lib/message_store/id.rb b/lib/message_store/id.rb new file mode 100644 index 0000000..8ba0e9e --- /dev/null +++ b/lib/message_store/id.rb @@ -0,0 +1,41 @@ +module MessageStore + module ID + Error = Class.new(RuntimeError) + + def self.compound_id_separator + '+' + end + + def self.id(id) + if id.is_a?(Array) + id = compound_id(id) + else + if id.nil? + raise Error, "ID must not be omitted" + end + end + + id + end + + def self.compound_id(ids) + if ids.empty? + raise Error, "IDs must not be omitted" + end + + ids.join(compound_id_separator) + end + + def self.get_cardinal_id(id) + parse(id).first + end + + def self.parse(id) + if id.nil? + raise Error, "ID must not be omitted" + end + + id.split(compound_id_separator) + end + end +end diff --git a/lib/message_store/log.rb b/lib/message_store/log.rb new file mode 100644 index 0000000..1e8e075 --- /dev/null +++ b/lib/message_store/log.rb @@ -0,0 +1,7 @@ +module MessageStore + class Log < ::Log + def tag!(tags) + tags << :message_store + end + end +end diff --git a/lib/message_store/message_data.rb b/lib/message_store/message_data.rb new file mode 100644 index 0000000..3838d14 --- /dev/null +++ b/lib/message_store/message_data.rb @@ -0,0 +1,18 @@ +module MessageStore + module MessageData + def self.included(cls) + cls.class_exec do + include Schema::DataStructure + + attribute :id, String + attribute :type, String + attribute :data, ::Hash + attribute :metadata, ::Hash + + def ===(other) + type == other + end + end + end + end +end diff --git a/lib/message_store/message_data/hash/transform.rb b/lib/message_store/message_data/hash/transform.rb new file mode 100644 index 0000000..622d4d6 --- /dev/null +++ b/lib/message_store/message_data/hash/transform.rb @@ -0,0 +1,31 @@ +module MessageStore + module MessageData + class Hash < ::Hash + module Transform + def self.json + JSON + end + + def self.instance(raw_data) + Hash[raw_data] + end + + def self.raw_data(instance) + Hash[instance] + end + + module JSON + def self.write(raw_hash_data) + json_formatted_data = Casing::Camel.(raw_hash_data) + ::JSON.generate(json_formatted_data) + end + + def self.read(text) + json_formatted_data = ::JSON.parse(text, :symbolize_names => true) + Casing::Underscore.(json_formatted_data) + end + end + end + end + end +end diff --git a/lib/message_store/message_data/read.rb b/lib/message_store/message_data/read.rb new file mode 100644 index 0000000..90e4a7a --- /dev/null +++ b/lib/message_store/message_data/read.rb @@ -0,0 +1,12 @@ +module MessageStore + module MessageData + class Read + include MessageData + + attribute :stream_name, String + attribute :position, Integer + attribute :global_position, Integer + attribute :time, Time + end + end +end diff --git a/lib/message_store/message_data/write.rb b/lib/message_store/message_data/write.rb new file mode 100644 index 0000000..61059d9 --- /dev/null +++ b/lib/message_store/message_data/write.rb @@ -0,0 +1,7 @@ +module MessageStore + module MessageData + class Write + include MessageData + end + end +end diff --git a/lib/message_store/no_stream.rb b/lib/message_store/no_stream.rb new file mode 100644 index 0000000..1b92375 --- /dev/null +++ b/lib/message_store/no_stream.rb @@ -0,0 +1,11 @@ +module MessageStore + module NoStream + def self.name + :no_stream + end + + def self.version + -1 + end + end +end diff --git a/lib/message_store/postgres.rb b/lib/message_store/postgres.rb deleted file mode 100644 index fef1e16..0000000 --- a/lib/message_store/postgres.rb +++ /dev/null @@ -1,24 +0,0 @@ -require 'pg' - -require 'message_store' - -require 'log' -require 'telemetry' -require 'settings' - -require 'message_store/postgres/log' - -require 'message_store/postgres/settings' -require 'message_store/postgres/session' - -require 'message_store/postgres/put' -require 'message_store/postgres/write' - -require 'message_store/postgres/get' -require 'message_store/postgres/get/condition' -require 'message_store/postgres/get/stream' -require 'message_store/postgres/get/stream/last' -require 'message_store/postgres/get/category' -require 'message_store/postgres/get/category/correlation' -require 'message_store/postgres/get/category/consumer_group' -require 'message_store/postgres/read' diff --git a/lib/message_store/postgres/controls.rb b/lib/message_store/postgres/controls.rb deleted file mode 100644 index 3560cdc..0000000 --- a/lib/message_store/postgres/controls.rb +++ /dev/null @@ -1,8 +0,0 @@ -require 'message_store/controls' - -require 'message_store/postgres/controls/time' -require 'message_store/postgres/controls/position' -require 'message_store/postgres/controls/category' -require 'message_store/postgres/controls/stream_name' -require 'message_store/postgres/controls/message_data' -require 'message_store/postgres/controls/put' diff --git a/lib/message_store/postgres/controls/category.rb b/lib/message_store/postgres/controls/category.rb deleted file mode 100644 index 15f1667..0000000 --- a/lib/message_store/postgres/controls/category.rb +++ /dev/null @@ -1,7 +0,0 @@ -module MessageStore - module Postgres - module Controls - Category = MessageStore::Controls::Category - end - end -end diff --git a/lib/message_store/postgres/controls/message_data.rb b/lib/message_store/postgres/controls/message_data.rb deleted file mode 100644 index addfb6a..0000000 --- a/lib/message_store/postgres/controls/message_data.rb +++ /dev/null @@ -1,31 +0,0 @@ -module MessageStore - module Postgres - module Controls - MessageData = MessageStore::Controls::MessageData - - module MessageData - module Write - module List - Entry = Struct.new(:stream_name, :category, :message_data) - - def self.get(instances: nil, stream_name: nil, category: nil) - instances ||= 1 - - list = [] - instances.times do - instance_stream_name = stream_name || StreamName.example(category: category) - instance_category = MessageStore::StreamName.get_category(instance_stream_name) - - write_message = Controls::MessageData::Write.example - - list << Entry.new(instance_stream_name, instance_category, write_message) - end - - list - end - end - end - end - end - end -end diff --git a/lib/message_store/postgres/controls/position.rb b/lib/message_store/postgres/controls/position.rb deleted file mode 100644 index 5b5c5d0..0000000 --- a/lib/message_store/postgres/controls/position.rb +++ /dev/null @@ -1,15 +0,0 @@ -module MessageStore - module Postgres - module Controls - module Position - def self.example - 1 - end - - def self.max - (2 ** 63) - 1 - end - end - end - end -end diff --git a/lib/message_store/postgres/controls/put.rb b/lib/message_store/postgres/controls/put.rb deleted file mode 100644 index 0a55b10..0000000 --- a/lib/message_store/postgres/controls/put.rb +++ /dev/null @@ -1,28 +0,0 @@ -module MessageStore - module Postgres - module Controls - module Put - def self.call(instances: nil, stream_name: nil, message_data: nil, message: nil, category: nil, type: nil) - instances ||= 1 - stream_name ||= StreamName.example(category: category) - message_data ||= message - - message_specified = !message_data.nil? - - message_data ||= MessageData::Write.example(type: type) - - position = nil - instances.times do - position = MessageStore::Postgres::Put.(message_data, stream_name) - - unless message_specified - message_data.id = MessageData::Write.id - end - end - - [stream_name, position] - end - end - end - end -end diff --git a/lib/message_store/postgres/controls/stream_name.rb b/lib/message_store/postgres/controls/stream_name.rb deleted file mode 100644 index b35f009..0000000 --- a/lib/message_store/postgres/controls/stream_name.rb +++ /dev/null @@ -1,7 +0,0 @@ -module MessageStore - module Postgres - module Controls - StreamName = MessageStore::Controls::StreamName - end - end -end diff --git a/lib/message_store/postgres/controls/time.rb b/lib/message_store/postgres/controls/time.rb deleted file mode 100644 index 4d99aad..0000000 --- a/lib/message_store/postgres/controls/time.rb +++ /dev/null @@ -1,7 +0,0 @@ -module MessageStore - module Postgres - module Controls - Time = MessageStore::Controls::Time - end - end -end diff --git a/lib/message_store/postgres/get.rb b/lib/message_store/postgres/get.rb deleted file mode 100644 index ce440d8..0000000 --- a/lib/message_store/postgres/get.rb +++ /dev/null @@ -1,165 +0,0 @@ -module MessageStore - module Postgres - module Get - def self.included(cls) - cls.class_exec do - include MessageStore::Get - - prepend Call - prepend BatchSize - - dependency :session, Session - - abstract :stream_name - abstract :sql_command - abstract :parameters - abstract :parameter_values - abstract :last_position - abstract :log_text - - virtual :specialize_error - virtual :assure - end - end - - module BatchSize - def batch_size - @batch_size ||= Defaults.batch_size - end - end - - def self.build(stream_name, **args) - cls = specialization(stream_name) - cls.build(stream_name, **args) - end - - def self.configure(receiver, stream_name, **args) - attr_name = args.delete(:attr_name) - attr_name ||= :get - - instance = build(stream_name, **args) - receiver.public_send("#{attr_name}=", instance) - end - - def configure(session: nil) - Session.configure(self, session: session) - end - - def self.call(stream_name, **args) - position = args.delete(:position) - instance = build(stream_name, **args) - instance.(position) - end - - module Call - def call(position=nil, stream_name: nil) - position ||= self.class::Defaults.position - - stream_name ||= self.stream_name - - assure - - logger.trace(tag: :get) { "Getting message data (#{log_text(stream_name, position)})" } - - result = get_result(stream_name, position) - - message_data = convert(result) - - logger.info(tag: :get) { "Finished getting message data (Count: #{message_data.length}, #{log_text(stream_name, position)})" } - logger.info(tags: [:data, :message_data]) { message_data.pretty_inspect } - - message_data - end - end - - def get_result(stream_name, position) - logger.trace(tag: :get) { "Getting result (#{log_text(stream_name, position)})" } - - parameter_values = parameter_values(stream_name, position) - - begin - result = session.execute(sql_command, parameter_values) - rescue PG::RaiseException => e - raise_error(e) - end - - logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, #{log_text(stream_name, position)})" } - - result - end - - def convert(result) - logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" } - - message_data = result.map do |record| - Get.message_data(record) - end - - logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{message_data.length})" } - - message_data - end - - def self.message_data(record) - record['data'] = Get::Deserialize.data(record['data']) - record['metadata'] = Get::Deserialize.metadata(record['metadata']) - record['time'] = Get::Time.utc_coerced(record['time']) - - MessageData::Read.build(record) - end - - def raise_error(pg_error) - error_message = Get.error_message(pg_error) - - error = Condition.error(error_message) - - if error.nil? - error = specialize_error(error_message) - end - - if not error.nil? - logger.error { error_message } - raise error - end - - raise pg_error - end - - def self.error_message(pg_error) - pg_error.message.gsub('ERROR:', '').strip - end - - def self.specialization(stream_name) - if StreamName.category?(stream_name) - Category - else - Stream - end - end - - module Deserialize - def self.data(serialized_data) - return nil if serialized_data.nil? - Transform::Read.(serialized_data, :json, MessageData::Hash) - end - - def self.metadata(serialized_metadata) - return nil if serialized_metadata.nil? - Transform::Read.(serialized_metadata, :json, MessageData::Hash) - end - end - - module Time - def self.utc_coerced(local_time) - Clock::UTC.coerce(local_time) - end - end - - module Defaults - def self.batch_size - 1000 - end - end - end - end -end diff --git a/lib/message_store/postgres/get/category.rb b/lib/message_store/postgres/get/category.rb deleted file mode 100644 index d226735..0000000 --- a/lib/message_store/postgres/get/category.rb +++ /dev/null @@ -1,81 +0,0 @@ -module MessageStore - module Postgres - module Get - class Category - Error = Class.new(RuntimeError) - - include Get - - initializer :category, na(:batch_size), :correlation, :consumer_group_member, :consumer_group_size, :condition - alias :stream_name :category - - def self.call(category, position: nil, batch_size: nil, correlation: nil, consumer_group_member: nil, consumer_group_size: nil, condition: nil, session: nil) - instance = build(category, batch_size: batch_size, correlation: correlation, consumer_group_member: consumer_group_member, consumer_group_size: consumer_group_size, condition: condition, session: session) - instance.(position) - end - - def self.build(category, batch_size: nil, correlation: nil, consumer_group_member: nil, consumer_group_size: nil, condition: nil, session: nil) - instance = new(category, batch_size, correlation, consumer_group_member, consumer_group_size, condition) - instance.configure(session: session) - instance - end - - def self.configure(receiver, category, attr_name: nil, batch_size: nil, correlation: nil, consumer_group_member: nil, consumer_group_size: nil, condition: nil, session: nil) - attr_name ||= :get - instance = build(category, batch_size: batch_size, correlation: correlation, consumer_group_member: consumer_group_member, consumer_group_size: consumer_group_size, condition: condition, session: session) - receiver.public_send("#{attr_name}=", instance) - end - - def sql_command - "SELECT * FROM get_category_messages(#{parameters});" - end - - def parameters - '$1::varchar, $2::bigint, $3::bigint, $4::varchar, $5::bigint, $6::bigint, $7::varchar' - end - - def parameter_values(category, position) - [ - category, - position, - batch_size, - correlation, - consumer_group_member, - consumer_group_size, - condition - ] - end - - def last_position(batch) - batch.last.global_position - end - - def specialize_error(error_message) - error = Correlation.error(error_message) - - if error.nil? - error = ConsumerGroup.error(error_message) - end - - error - end - - def log_text(category, position) - "Category: #{category}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Correlation: #{correlation.inspect}, Consumer Group Member: #{consumer_group_member.inspect}, Consumer Group Size: #{consumer_group_size.inspect}, Condition: #{condition.inspect})" - end - - def assure - if not MessageStore::StreamName.category?(category) - raise Error, "Must be a category (Stream Name: #{category})" - end - end - - module Defaults - def self.position - 1 - end - end - end - end - end -end diff --git a/lib/message_store/postgres/get/category/consumer_group.rb b/lib/message_store/postgres/get/category/consumer_group.rb deleted file mode 100644 index fc5bd0b..0000000 --- a/lib/message_store/postgres/get/category/consumer_group.rb +++ /dev/null @@ -1,20 +0,0 @@ -module MessageStore - module Postgres - module Get - class Category - module ConsumerGroup - Error = Class.new(RuntimeError) - - def self.error(error_message) - if error_message.start_with?('Consumer group size must not be less than 1') || - error_message.start_with?('Consumer group member must be less than the group size') || - error_message.start_with?('Consumer group member must not be less than 0') || - error_message.start_with?('Consumer group member and size must be specified') - Error.new(error_message) - end - end - end - end - end - end -end diff --git a/lib/message_store/postgres/get/category/correlation.rb b/lib/message_store/postgres/get/category/correlation.rb deleted file mode 100644 index c857077..0000000 --- a/lib/message_store/postgres/get/category/correlation.rb +++ /dev/null @@ -1,17 +0,0 @@ -module MessageStore - module Postgres - module Get - class Category - module Correlation - Error = Class.new(RuntimeError) - - def self.error(error_message) - if error_message.start_with?('Correlation must be a category') - Error.new(error_message) - end - end - end - end - end - end -end diff --git a/lib/message_store/postgres/get/condition.rb b/lib/message_store/postgres/get/condition.rb deleted file mode 100644 index 7e0cb99..0000000 --- a/lib/message_store/postgres/get/condition.rb +++ /dev/null @@ -1,15 +0,0 @@ -module MessageStore - module Postgres - module Get - module Condition - Error = Class.new(RuntimeError) - - def self.error(error_message) - if error_message.start_with?('Retrieval with SQL condition is not activated') - Get::Condition::Error.new(error_message) - end - end - end - end - end -end diff --git a/lib/message_store/postgres/get/stream.rb b/lib/message_store/postgres/get/stream.rb deleted file mode 100644 index f769e9b..0000000 --- a/lib/message_store/postgres/get/stream.rb +++ /dev/null @@ -1,67 +0,0 @@ -module MessageStore - module Postgres - module Get - class Stream - Error = Class.new(RuntimeError) - - include Get - - initializer :stream_name, na(:batch_size), :condition - - def self.call(stream_name, position: nil, batch_size: nil, condition: nil, session: nil) - instance = build(stream_name, batch_size: batch_size, condition: condition, session: session) - instance.(position) - end - - def self.build(stream_name, batch_size: nil, condition: nil, session: nil) - instance = new(stream_name, batch_size, condition) - instance.configure(session: session) - instance - end - - def self.configure(receiver, stream_name, attr_name: nil, batch_size: nil, condition: nil, session: nil) - attr_name ||= :get - instance = build(stream_name, batch_size: batch_size, condition: condition, session: session) - receiver.public_send("#{attr_name}=", instance) - end - - def sql_command - "SELECT * FROM get_stream_messages(#{parameters});" - end - - def parameters - '$1::varchar, $2::bigint, $3::bigint, $4::varchar' - end - - def parameter_values(stream_name, position) - [ - stream_name, - position, - batch_size, - condition - ] - end - - def last_position(batch) - batch.last.position - end - - def log_text(stream_name, position) - "Stream Name: #{stream_name}, Position: #{position.inspect}, Batch Size: #{batch_size.inspect}, Condition: #{condition.inspect})" - end - - def assure - if MessageStore::StreamName.category?(stream_name) - raise Error, "Must be a stream name (Category: #{stream_name})" - end - end - - module Defaults - def self.position - 0 - end - end - end - end - end -end diff --git a/lib/message_store/postgres/get/stream/last.rb b/lib/message_store/postgres/get/stream/last.rb deleted file mode 100644 index 44414e0..0000000 --- a/lib/message_store/postgres/get/stream/last.rb +++ /dev/null @@ -1,89 +0,0 @@ -module MessageStore - module Postgres - module Get - class Stream - class Last - include MessageStore::Get::Stream::Last - - dependency :session, Session - - def configure(session: nil) - Session.configure(self, session: session) - end - - def call(stream_name, type=nil) - logger.trace(tag: :get) { "Getting last message data (Stream Name: #{stream_name})" } - - result = get_result(stream_name, type) - - return nil if result.nil? - - message_data = convert(result[0]) - - logger.info(tag: :get) { "Finished getting message data (Stream Name: #{stream_name})" } - logger.info(tags: [:data, :message_data]) { message_data.pretty_inspect } - - message_data - end - - def get_result(stream_name, type) - logger.trace(tag: :get) { "Getting last record (Stream: #{stream_name})" } - - parameter_values = parameter_values(stream_name, type) - sql_command = sql_command(type) - - result = session.execute(sql_command, parameter_values) - - logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, Stream: #{stream_name}" } - - return nil if result.ntuples == 0 - - result - end - - def sql_command(type) - parameters = parameters(type) - - "SELECT * FROM get_last_stream_message(#{parameters});" - end - - def parameters(type) - parameters = "$1::varchar" - - # Backwards compatibility with versions of message-db that do not - # support the type parameter - Aaron, Scott, Tue Jul 12 2022 - if not type.nil? - parameters << ", $2::varchar" - end - - parameters - end - - def parameter_values(stream_name, type) - parameter_values = [ - stream_name - ] - - # Backwards compatibility with versions of message-db that do not - # support the type parameter - Aaron, Scott, Tue Jul 12 2022 - if not type.nil? - parameter_values << type - end - - parameter_values - end - - def convert(record) - logger.trace(tag: :get) { "Converting record to message data" } - - message_data = Get.message_data(record) - - logger.debug(tag: :get) { "Converted record to message data" } - - message_data - end - end - end - end - end -end diff --git a/lib/message_store/postgres/log.rb b/lib/message_store/postgres/log.rb deleted file mode 100644 index 5f40928..0000000 --- a/lib/message_store/postgres/log.rb +++ /dev/null @@ -1,9 +0,0 @@ -module MessageStore - module Postgres - class Log < ::Log - def tag!(tags) - tags << :message_store - end - end - end -end diff --git a/lib/message_store/postgres/put.rb b/lib/message_store/postgres/put.rb deleted file mode 100644 index 55005f6..0000000 --- a/lib/message_store/postgres/put.rb +++ /dev/null @@ -1,146 +0,0 @@ -module MessageStore - module Postgres - class Put - include Dependency - include Log::Dependency - - dependency :session, Session - dependency :identifier, Identifier::UUID::Random - - def self.build(session: nil) - new.tap do |instance| - instance.configure(session: session) - end - end - - def configure(session: nil) - Session.configure(self, session: session) - Identifier::UUID::Random.configure(self) - end - - def self.configure(receiver, session: nil, attr_name: nil) - attr_name ||= :put - instance = build(session: session) - receiver.public_send "#{attr_name}=", instance - end - - def self.call(write_message, stream_name, expected_version: nil, session: nil) - instance = build(session: session) - instance.(write_message, stream_name, expected_version: expected_version) - end - - def call(write_message, stream_name, expected_version: nil) - logger.trace(tag: :put) { "Putting message data (Type: #{write_message.type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect})" } - logger.trace(tags: [:data, :message_data]) { write_message.pretty_inspect } - - write_message.id ||= identifier.get - - id, type, data, metadata = destructure_message(write_message) - expected_version = ExpectedVersion.canonize(expected_version) - - insert_message(id, stream_name, type, data, metadata, expected_version).tap do |position| - logger.info(tag: :put) { "Put message data (Type: #{write_message.type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect}, Position: #{position})" } - logger.info(tags: [:data, :message_data]) { write_message.pretty_inspect } - end - end - - def destructure_message(write_message) - id = write_message.id - type = write_message.type - data = write_message.data - metadata = write_message.metadata - - logger.debug(tags: [:data, :message_data]) { "ID: #{id.pretty_inspect}" } - logger.debug(tags: [:data, :message_data]) { "Type: #{type.pretty_inspect}" } - logger.debug(tags: [:data, :message_data]) { "Data: #{data.pretty_inspect}" } - logger.debug(tags: [:data, :message_data]) { "Metadata: #{metadata.pretty_inspect}" } - - return id, type, data, metadata - end - - def insert_message(id, stream_name, type, data, metadata, expected_version) - transformed_data = transformed_data(data) - transformed_metadata = transformed_metadata(metadata) - records = execute_query(id, stream_name, type, transformed_data, transformed_metadata, expected_version) - position(records) - end - - def execute_query(id, stream_name, type, transformed_data, transformed_metadata, expected_version) - logger.trace(tag: :put) { "Executing insert (Stream Name: #{stream_name}, Type: #{type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" } - - params = [ - id, - stream_name, - type, - transformed_data, - transformed_metadata, - expected_version - ] - - begin - records = session.execute(self.class.statement, params) - rescue PG::RaiseException => e - raise_error e - end - - logger.debug(tag: :put) { "Executed insert (Type: #{type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" } - - records - end - - def self.statement - @statement ||= "SELECT write_message($1::varchar, $2::varchar, $3::varchar, $4::jsonb, $5::jsonb, $6::bigint);" - end - - def transformed_data(data) - transformed_data = nil - - if data.is_a?(Hash) && data.empty? - data = nil - end - - unless data.nil? - transformable_data = MessageData::Hash[data] - transformed_data = Transform::Write.(transformable_data, :json) - end - - logger.debug(tags: [:data, :serialize]) { "Transformed Data: #{transformed_data.inspect}" } - transformed_data - end - - def transformed_metadata(metadata) - transformed_metadata = nil - - if metadata.is_a?(Hash) && metadata.empty? - metadata = nil - end - - unless metadata.nil? - transformable_metadata = MessageData::Hash[metadata] - transformed_metadata = Transform::Write.(transformable_metadata, :json) - end - - logger.debug(tags: [:data, :serialize]) { "Transformed Metadata: #{transformed_metadata.inspect}" } - transformed_metadata - end - - def position(records) - position = nil - unless records[0].nil? - position = records[0].values[0] - end - position - end - - def raise_error(pg_error) - error_message = pg_error.message - if error_message.include? 'Wrong expected version' - error_message.gsub!('ERROR:', '').strip! - logger.error { error_message } - raise ExpectedVersion::Error, error_message - end - raise pg_error - end - end - end -end diff --git a/lib/message_store/postgres/read.rb b/lib/message_store/postgres/read.rb deleted file mode 100644 index 1c61e54..0000000 --- a/lib/message_store/postgres/read.rb +++ /dev/null @@ -1,17 +0,0 @@ -module MessageStore - module Postgres - class Read - include MessageStore::Read - - def configure(session: nil, condition: nil) - Get.configure(self.iterator, self.stream_name, batch_size: batch_size, condition: condition, session: session) - end - - module Defaults - def self.batch_size - Postgres::Get::Defaults.batch_size - end - end - end - end -end diff --git a/lib/message_store/postgres/session.rb b/lib/message_store/postgres/session.rb deleted file mode 100644 index 8d89d6b..0000000 --- a/lib/message_store/postgres/session.rb +++ /dev/null @@ -1,176 +0,0 @@ -module MessageStore - module Postgres - class Session - Error = Class.new(RuntimeError) - - include Dependency - include Settings::Setting - - include Log::Dependency - - dependency :clock, Clock::UTC - - def self.settings - Settings.names - end - - settings.each do |s| - setting s - end - - attr_accessor :connection - attr_accessor :executed_time - - def self.build(settings: nil) - instance = new - - settings ||= Settings.instance - settings.set(instance) - - Clock::UTC.configure(instance) - - instance - end - - def self.configure(receiver, session: nil, settings: nil, attr_name: nil) - attr_name ||= :session - - if session != nil && settings != nil - error_msg = "Session configured with both settings and session arguments. Use one or the other, but not both." - logger.error(tag: :session) { error_msg } - raise Error, error_msg - end - - instance = session || build(settings: settings) - receiver.public_send "#{attr_name}=", instance - instance - end - - def open - logger.trace(tag: :session) { "Connecting to database" } - - if connected? - logger.debug(tag: :session) { "Already connected. A new connection will not be built." } - return connection - end - - logger.debug(tag: :session) { "Not connected. A new connection will be built." } - connection = self.class.build_connection(self) - self.connection = connection - - logger.debug(tag: :session) { "Connected to database" } - - connection - end - alias :connect :open - - def self.build_connection(instance) - settings = instance.settings - logger.trace(tag: :session) { "Building new connection to database (Settings: #{LogText.settings(settings).inspect})" } - - connection = PG::Connection.open(settings) - connection.type_map_for_results = PG::BasicTypeMapForResults.new(connection) - - logger.debug(tag: :session) { "Built new connection to database (Settings: #{LogText.settings(settings).inspect})" } - - connection - end - - def connected? - return false if connection.nil? - - status = PG::CONNECTION_OK - begin - status = connection.status - rescue PG::ConnectionBad - status = nil - end - - status == PG::CONNECTION_OK - end - alias :open? :connected? - - def close - if connection.nil? - return - end - - connection.close - self.connection = nil - end - - def reset - connection.reset - end - - def execute(sql_command, params=nil) - logger.trace(tag: :session) { "Executing SQL command" } - logger.trace(tag: :sql) { sql_command } - logger.trace(tag: :data) { params.pretty_inspect } - - unless connected? - connect - end - - if params.nil? - connection.exec(sql_command).tap do - self.executed_time = clock.now - logger.debug(tag: :session) { "Executed SQL command (no params)" } - end - else - connection.exec_params(sql_command, params).tap do - self.executed_time = clock.now - logger.debug(tag: :session) { "Executed SQL command with params" } - end - end - end - - def executed_time_elapsed_milliseconds - return nil if executed_time.nil? - - (clock.now - executed_time) * 1000 - end - - def transaction(&blk) - unless connected? - connect - end - - connection.transaction(&blk) - end - - def escape(data) - connection = connect - - escaped_data = connection.escape(data) - - escaped_data - end - - def settings - settings = {} - self.class.settings.each do |s| - val = public_send(s) - settings[s] = val unless val.nil? - end - settings - end - - def self.logger - @logger ||= Log.get self - end - - module LogText - def self.settings(settings) - s = settings.dup - - if s.has_key?(:password) - s[:password] = '*' * 8 - end - - s - end - end - end - end -end diff --git a/lib/message_store/postgres/settings.rb b/lib/message_store/postgres/settings.rb deleted file mode 100644 index e45b6ea..0000000 --- a/lib/message_store/postgres/settings.rb +++ /dev/null @@ -1,41 +0,0 @@ -module MessageStore - module Postgres - class Settings < ::Settings - def self.instance - @instance ||= build - end - - def self.data_source - Defaults.data_source - end - - def self.names - [ - :dbname, - :host, - :hostaddr, - :port, - :user, - :password, - :connect_timeout, - :options, - :sslmode, - :krbsrvname, - :gsslib, - :service, - :keepalives, - :keepalives_idle, - :keepalives_interval, - :keepalives_count, - :tcp_user_timeout - ] - end - - class Defaults - def self.data_source - ENV['MESSAGE_STORE_SETTINGS_PATH'] || 'settings/message_store_postgres.json' - end - end - end - end -end diff --git a/lib/message_store/postgres/write.rb b/lib/message_store/postgres/write.rb deleted file mode 100644 index 7b707f6..0000000 --- a/lib/message_store/postgres/write.rb +++ /dev/null @@ -1,52 +0,0 @@ -module MessageStore - module Postgres - class Write - include MessageStore::Write - - dependency :put - - def configure(session: nil) - Put.configure(self, session: session) - end - - def write(batch, stream_name, expected_version: nil) - logger.trace(tag: :write) do - message_types = batch.map {|message_data| message_data.type }.uniq.join(', ') - "Writing batch (Stream Name: #{stream_name}, Types: #{message_types}, Number of Messages: #{batch.length}, Expected Version: #{expected_version.inspect})" - end - - unless expected_version.nil? - expected_version = ExpectedVersion.canonize(expected_version) - end - - last_position = nil - put.session.transaction do - batch.each do |message_data| - last_position = write_message_data(message_data, stream_name, expected_version: expected_version) - - unless expected_version.nil? - expected_version += 1 - end - end - end - - logger.debug(tag: :write) do - message_types = batch.map {|message_data| message_data.type }.uniq.join(', ') - "Wrote batch (Stream Name: #{stream_name}, Types: #{message_types}, Number of Messages: #{batch.length}, Expected Version: #{expected_version.inspect})" - end - - last_position - end - - def write_message_data(message_data, stream_name, expected_version: nil) - logger.trace(tag: :write) { "Writing message data (Stream Name: #{stream_name}, Type: #{message_data.type}, Expected Version: #{expected_version.inspect})" } - logger.trace(tags: [:data, :message_data]) { message_data.pretty_inspect } - - put.(message_data, stream_name, expected_version: expected_version).tap do - logger.debug(tag: :write) { "Wrote message data (Stream Name: #{stream_name}, Type: #{message_data.type}, Expected Version: #{expected_version.inspect})" } - logger.debug(tags: [:data, :message_data]) { message_data.pretty_inspect } - end - end - end - end -end diff --git a/lib/message_store/put.rb b/lib/message_store/put.rb new file mode 100644 index 0000000..50072fd --- /dev/null +++ b/lib/message_store/put.rb @@ -0,0 +1,144 @@ +module MessageStore + class Put + include Dependency + include Log::Dependency + + dependency :session, Session + dependency :identifier, Identifier::UUID::Random + + def self.build(session: nil) + new.tap do |instance| + instance.configure(session: session) + end + end + + def configure(session: nil) + Session.configure(self, session: session) + Identifier::UUID::Random.configure(self) + end + + def self.configure(receiver, session: nil, attr_name: nil) + attr_name ||= :put + instance = build(session: session) + receiver.public_send "#{attr_name}=", instance + end + + def self.call(write_message, stream_name, expected_version: nil, session: nil) + instance = build(session: session) + instance.(write_message, stream_name, expected_version: expected_version) + end + + def call(write_message, stream_name, expected_version: nil) + logger.trace(tag: :put) { "Putting message data (Type: #{write_message.type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect})" } + logger.trace(tags: [:data, :message_data]) { write_message.pretty_inspect } + + write_message.id ||= identifier.get + + id, type, data, metadata = destructure_message(write_message) + expected_version = ExpectedVersion.canonize(expected_version) + + insert_message(id, stream_name, type, data, metadata, expected_version).tap do |position| + logger.info(tag: :put) { "Put message data (Type: #{write_message.type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect}, Position: #{position})" } + logger.info(tags: [:data, :message_data]) { write_message.pretty_inspect } + end + end + + def destructure_message(write_message) + id = write_message.id + type = write_message.type + data = write_message.data + metadata = write_message.metadata + + logger.debug(tags: [:data, :message_data]) { "ID: #{id.pretty_inspect}" } + logger.debug(tags: [:data, :message_data]) { "Type: #{type.pretty_inspect}" } + logger.debug(tags: [:data, :message_data]) { "Data: #{data.pretty_inspect}" } + logger.debug(tags: [:data, :message_data]) { "Metadata: #{metadata.pretty_inspect}" } + + return id, type, data, metadata + end + + def insert_message(id, stream_name, type, data, metadata, expected_version) + transformed_data = transformed_data(data) + transformed_metadata = transformed_metadata(metadata) + records = execute_query(id, stream_name, type, transformed_data, transformed_metadata, expected_version) + position(records) + end + + def execute_query(id, stream_name, type, transformed_data, transformed_metadata, expected_version) + logger.trace(tag: :put) { "Executing insert (Stream Name: #{stream_name}, Type: #{type}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" } + + params = [ + id, + stream_name, + type, + transformed_data, + transformed_metadata, + expected_version + ] + + begin + records = session.execute(self.class.statement, params) + rescue PG::RaiseException => e + raise_error e + end + + logger.debug(tag: :put) { "Executed insert (Type: #{type}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, ID: #{id.inspect})" } + + records + end + + def self.statement + @statement ||= "SELECT write_message($1::varchar, $2::varchar, $3::varchar, $4::jsonb, $5::jsonb, $6::bigint);" + end + + def transformed_data(data) + transformed_data = nil + + if data.is_a?(Hash) && data.empty? + data = nil + end + + unless data.nil? + transformable_data = MessageData::Hash[data] + transformed_data = Transform::Write.(transformable_data, :json) + end + + logger.debug(tags: [:data, :serialize]) { "Transformed Data: #{transformed_data.inspect}" } + transformed_data + end + + def transformed_metadata(metadata) + transformed_metadata = nil + + if metadata.is_a?(Hash) && metadata.empty? + metadata = nil + end + + unless metadata.nil? + transformable_metadata = MessageData::Hash[metadata] + transformed_metadata = Transform::Write.(transformable_metadata, :json) + end + + logger.debug(tags: [:data, :serialize]) { "Transformed Metadata: #{transformed_metadata.inspect}" } + transformed_metadata + end + + def position(records) + position = nil + unless records[0].nil? + position = records[0].values[0] + end + position + end + + def raise_error(pg_error) + error_message = pg_error.message + if error_message.include? 'Wrong expected version' + error_message.gsub!('ERROR:', '').strip! + logger.error { error_message } + raise ExpectedVersion::Error, error_message + end + raise pg_error + end + end +end diff --git a/lib/message_store/read.rb b/lib/message_store/read.rb new file mode 100644 index 0000000..a229e46 --- /dev/null +++ b/lib/message_store/read.rb @@ -0,0 +1,75 @@ +module MessageStore + class Read + include Dependency + include Initializer + include Virtual + include Log::Dependency + + Error = Class.new(RuntimeError) + + dependency :iterator, Iterator + + initializer :stream_name, :position, :batch_size + + def self.build(stream_name, position: nil, batch_size: nil, session: nil, **arguments) + new(stream_name, position, batch_size).tap do |instance| + Iterator.configure(instance, position) + instance.configure(session: session, **arguments) + end + end + + def self.call(stream_name, position: nil, batch_size: nil, session: nil, **arguments, &action) + instance = build(stream_name, position: position, batch_size: batch_size, session: session, **arguments) + instance.(&action) + end + + def self.configure(receiver, stream_name, attr_name: nil, position: nil, batch_size: nil, session: nil, **arguments) + attr_name ||= :read + instance = build(stream_name, position: position, batch_size: batch_size, session: session, **arguments) + receiver.public_send "#{attr_name}=", instance + end + + def configure(session: nil, condition: nil) + Get.configure(self.iterator, self.stream_name, batch_size: batch_size, condition: condition, session: session) + end + + def call(&action) + logger.trace(tag: :read) { "Reading (Stream Name: #{stream_name})" } + + if action.nil? + error_message = "Reader must be actuated with a block" + logger.error(tag: :read) { error_message } + raise Error, error_message + end + + enumerate_message_data(&action) + + logger.info(tag: :read) { "Reading completed (Stream Name: #{stream_name})" } + + return AsyncInvocation::Incorrect + end + + def enumerate_message_data(&action) + logger.trace(tag: :read) { "Enumerating (Stream Name: #{stream_name})" } + + message_data = nil + + loop do + message_data = iterator.next + + break if message_data.nil? + logger.debug(tags: [:data, :message_data]) { message_data.pretty_inspect } + + action.(message_data) + end + + logger.debug(tag: :read) { "Enumerated (Stream Name: #{stream_name})" } + end + + module Defaults + def self.batch_size + Get::Defaults.batch_size + end + end + end +end diff --git a/lib/message_store/read/iterator.rb b/lib/message_store/read/iterator.rb new file mode 100644 index 0000000..275d0ed --- /dev/null +++ b/lib/message_store/read/iterator.rb @@ -0,0 +1,166 @@ +## Add tests - Aaron, Sun Jan 15 2023 +module MessageStore + class Read + class Iterator + include Dependency + include Initializer + include Virtual + include Log::Dependency + + dependency :get, Get + + attr_accessor :batch + + def starting_position + @starting_position ||= 0 + end + attr_writer :starting_position + + def batch_index + @batch_index ||= 0 + end + attr_writer :batch_index + + def batch_size + get.batch_size + end + + def self.build(position=nil) + new.tap do |instance| + instance.starting_position = position + Log.get(self).debug { "Built Iterator (Starting Position: #{position.inspect})" } + end + end + + def self.configure(receiver, position=nil, attr_name: nil) + attr_name ||= :iterator + instance = build(position) + receiver.public_send "#{attr_name}=", instance + end + + def next + logger.trace { "Getting next message data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" } + + if batch_depleted? + resupply + end + + message_data = batch[batch_index] + + logger.debug(tags: [:data, :message_data]) { "Next message data: #{message_data.pretty_inspect}" } + logger.debug { "Done getting next message data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" } + + advance_batch_index + + message_data + end + + def resupply + logger.trace { "Resupplying batch (Current Batch Length: #{(batch &.length).inspect})" } + + batch = [] + unless stream_depleted? + batch = get_batch + end + + reset(batch) + + logger.debug { "Batch resupplied (Next Batch Length: #{(batch &.length).inspect})" } + end + + def get_batch + position = next_batch_starting_position + + logger.trace "Getting batch (Position: #{position.inspect})" + + batch = get.(position) + + logger.debug { "Finished getting batch (Count: #{batch.length}, Position: #{position.inspect})" } + + batch + end + + def next_batch_starting_position + if not batch_initialized? + logger.debug { "Batch is not initialized (Next batch starting position: #{starting_position.inspect})" } + return starting_position + end + + previous_position = last_position + next_position = previous_position + 1 + logger.debug { "End of batch (Next starting position: #{next_position}, Previous Position: #{previous_position})" } + + next_position + end + + def last_position + get.last_position(batch) + end + + def reset(batch) + logger.trace { "Resetting batch" } + + self.batch = batch + self.batch_index = 0 + + logger.debug(tags: [:data, :batch]) { "Batch set to: \n#{batch.pretty_inspect}" } + logger.debug(tags: [:data, :batch]) { "Batch position set to: #{batch_index.inspect}" } + logger.debug { "Done resetting batch" } + end + + def advance_batch_index + logger.trace { "Advancing batch index (Batch Index: #{batch_index})" } + self.batch_index += 1 + logger.debug { "Advanced batch index (Batch Index: #{batch_index})" } + end + + def batch_initialized? + not batch.nil? + end + + def batch_depleted? + if not batch_initialized? + logger.debug { "Batch is depleted (Batch is not initialized)" } + return true + end + + if batch.empty? + logger.debug { "Batch is depleted (Batch is empty)" } + return true + end + + if batch_index == batch.length + logger.debug { "Batch is depleted (Batch Index: #{batch_index}, Batch Length: #{batch.length})" } + return true + end + + logger.debug { "Batch is not depleted (Batch Index: #{batch_index}, Batch Length: #{batch.length})" } + false + end + + def stream_depleted? + if not batch_initialized? + logger.debug { "Stream is not depleted (Batch Length: (batch is nil), Batch Size: #{batch_size})" } + return false + end + + if batch.length < batch_size + logger.debug { "Stream is depleted (Batch Length: #{batch.length}, Batch Size: #{batch_size})" } + return true + end + + logger.debug { "Stream is not depleted (Batch Length: #{batch.length}, Batch Size: #{batch_size})" } + false + end + +## Need not exist? + class Substitute < Iterator +## include Read::Iterator + + def self.build() + new + end + end + end + end +end diff --git a/lib/message_store/session.rb b/lib/message_store/session.rb new file mode 100644 index 0000000..c58fbff --- /dev/null +++ b/lib/message_store/session.rb @@ -0,0 +1,174 @@ +module MessageStore + class Session + Error = Class.new(RuntimeError) + + include Dependency + include Settings::Setting + + include Log::Dependency + + dependency :clock, Clock::UTC + + def self.settings + Settings.names + end + + settings.each do |s| + setting s + end + + attr_accessor :connection + attr_accessor :executed_time + + def self.build(settings: nil) + instance = new + + settings ||= Settings.instance + settings.set(instance) + + Clock::UTC.configure(instance) + + instance + end + + def self.configure(receiver, session: nil, settings: nil, attr_name: nil) + attr_name ||= :session + + if session != nil && settings != nil + error_msg = "Session configured with both settings and session arguments. Use one or the other, but not both." + logger.error(tag: :session) { error_msg } + raise Error, error_msg + end + + instance = session || build(settings: settings) + receiver.public_send "#{attr_name}=", instance + instance + end + + def open + logger.trace(tag: :session) { "Connecting to database" } + + if connected? + logger.debug(tag: :session) { "Already connected. A new connection will not be built." } + return connection + end + + logger.debug(tag: :session) { "Not connected. A new connection will be built." } + connection = self.class.build_connection(self) + self.connection = connection + + logger.debug(tag: :session) { "Connected to database" } + + connection + end + alias :connect :open + + def self.build_connection(instance) + settings = instance.settings + logger.trace(tag: :session) { "Building new connection to database (Settings: #{LogText.settings(settings).inspect})" } + + connection = PG::Connection.open(settings) + connection.type_map_for_results = PG::BasicTypeMapForResults.new(connection) + + logger.debug(tag: :session) { "Built new connection to database (Settings: #{LogText.settings(settings).inspect})" } + + connection + end + + def connected? + return false if connection.nil? + + status = PG::CONNECTION_OK + begin + status = connection.status + rescue PG::ConnectionBad + status = nil + end + + status == PG::CONNECTION_OK + end + alias :open? :connected? + + def close + if connection.nil? + return + end + + connection.close + self.connection = nil + end + + def reset + connection.reset + end + + def execute(sql_command, params=nil) + logger.trace(tag: :session) { "Executing SQL command" } + logger.trace(tag: :sql) { sql_command } + logger.trace(tag: :data) { params.pretty_inspect } + + unless connected? + connect + end + + if params.nil? + connection.exec(sql_command).tap do + self.executed_time = clock.now + logger.debug(tag: :session) { "Executed SQL command (no params)" } + end + else + connection.exec_params(sql_command, params).tap do + self.executed_time = clock.now + logger.debug(tag: :session) { "Executed SQL command with params" } + end + end + end + + def executed_time_elapsed_milliseconds + return nil if executed_time.nil? + + (clock.now - executed_time) * 1000 + end + + def transaction(&blk) + unless connected? + connect + end + + connection.transaction(&blk) + end + + def escape(data) + connection = connect + + escaped_data = connection.escape(data) + + escaped_data + end + + def settings + settings = {} + self.class.settings.each do |s| + val = public_send(s) + settings[s] = val unless val.nil? + end + settings + end + + def self.logger + @logger ||= Log.get self + end + + module LogText + def self.settings(settings) + s = settings.dup + + if s.has_key?(:password) + s[:password] = '*' * 8 + end + + s + end + end + end +end diff --git a/lib/message_store/settings.rb b/lib/message_store/settings.rb new file mode 100644 index 0000000..095ef45 --- /dev/null +++ b/lib/message_store/settings.rb @@ -0,0 +1,39 @@ +module MessageStore + class Settings < ::Settings + def self.instance + @instance ||= build + end + + def self.data_source + Defaults.data_source + end + + def self.names + [ + :dbname, + :host, + :hostaddr, + :port, + :user, + :password, + :connect_timeout, + :options, + :sslmode, + :krbsrvname, + :gsslib, + :service, + :keepalives, + :keepalives_idle, + :keepalives_interval, + :keepalives_count, + :tcp_user_timeout + ] + end + + class Defaults + def self.data_source + ENV['MESSAGE_STORE_SETTINGS_PATH'] || 'settings/message_store_postgres.json' + end + end + end +end diff --git a/lib/message_store/stream_name.rb b/lib/message_store/stream_name.rb new file mode 100644 index 0000000..a736d60 --- /dev/null +++ b/lib/message_store/stream_name.rb @@ -0,0 +1,114 @@ +module MessageStore + module StreamName + Error = Class.new(RuntimeError) + + def self.id_separator + '-' + end + + def self.compound_id_separator + ID.compound_id_separator + end + + def self.category_type_separator + ':' + end + + def self.compound_type_separator + '+' + end + + def self.stream_name(category, stream_id=nil, cardinal_id: nil, id: nil, ids: nil, type: nil, types: nil) + if category == nil + raise Error, "Category must not be omitted from stream name" + end + + stream_name = category + + type_list = [] + type_list.concat(Array(type)) + type_list.concat(Array(types)) + + type_part = type_list.join(compound_type_separator) + + if not type_part.empty? + stream_name = "#{stream_name}#{category_type_separator}#{type_part}" + end + + id_list = [] + id_list << cardinal_id if not cardinal_id.nil? + + id_list.concat(Array(stream_id)) + id_list.concat(Array(id)) + id_list.concat(Array(ids)) + + id_part = nil + if not id_list.empty? + id_part = ID.compound_id(id_list) + stream_name = "#{stream_name}#{id_separator}#{id_part}" + end + + stream_name + end + + def self.split(stream_name) + stream_name.split(id_separator, 2) + end + + def self.get_id(stream_name) + split(stream_name)[1] + end + + def self.get_ids(stream_name) + ids = get_id(stream_name) + + return [] if ids.nil? + + ID.parse(ids) + end + + def self.get_cardinal_id(stream_name) + id = get_id(stream_name) + + return nil if id.nil? + + ID.get_cardinal_id(id) + end + + def self.get_category(stream_name) + split(stream_name)[0] + end + + def self.category?(stream_name) + !stream_name.include?(id_separator) + end + + def self.get_category_type(stream_name) + return nil unless stream_name.include?(category_type_separator) + + category = get_category(stream_name) + + category.split(category_type_separator)[1] + end + + def self.get_type(*args) + get_category_type(*args) + end + + def self.get_category_types(stream_name) + type_list = get_type(stream_name) + + return [] if type_list.nil? + + type_list.split(compound_type_separator) + end + + def self.get_types(*args) + get_category_types(*args) + end + + def self.get_entity_name(stream_name) + get_category(stream_name).split(category_type_separator)[0] + end + end +end diff --git a/lib/message_store/write.rb b/lib/message_store/write.rb new file mode 100644 index 0000000..96ca655 --- /dev/null +++ b/lib/message_store/write.rb @@ -0,0 +1,101 @@ +module MessageStore + class Write + include Dependency + include Virtual + include Log::Dependency + + dependency :identifier, Identifier::UUID::Random + dependency :put + + def self.build(session: nil) + instance = new + Identifier::UUID::Random.configure(instance) + instance.configure(session: session) + instance + end + + def self.configure(receiver, session: nil, attr_name: nil) + attr_name ||= :write + instance = build(session: session) + receiver.public_send "#{attr_name}=", instance + end + + def self.call(message_data, stream_name, expected_version: nil, session: nil) + instance = build(session: session) + instance.(message_data, stream_name, expected_version: expected_version) + end + + def configure(session: nil) + Put.configure(self, session: session) + end + + def call(message_data, stream_name, expected_version: nil) + batch = Array(message_data) + + logger.trace(tag: :write) do + message_types = batch.map {|message_data| message_data.type }.uniq.join(', ') + "Writing message data (Types: #{message_types}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, Number of Messages: #{batch.length})" + end + logger.trace(tags: [:data, :message_data]) { batch.pretty_inspect } + + set_ids(batch) + + position = write(batch, stream_name, expected_version: expected_version) + + logger.info(tag: :write) do + message_types = batch.map {|message_data| message_data.type }.uniq.join(', ') + "Wrote message data (Types: #{message_types}, Stream Name: #{stream_name}, Expected Version: #{expected_version.inspect}, Number of Messages: #{batch.length})" + end + logger.info(tags: [:data, :message_data]) { batch.pretty_inspect } + + position + end + + def write(batch, stream_name, expected_version: nil) + logger.trace(tag: :write) do + message_types = batch.map {|message_data| message_data.type }.uniq.join(', ') + "Writing batch (Stream Name: #{stream_name}, Types: #{message_types}, Number of Messages: #{batch.length}, Expected Version: #{expected_version.inspect})" + end + + unless expected_version.nil? + expected_version = ExpectedVersion.canonize(expected_version) + end + + last_position = nil + put.session.transaction do + batch.each do |message_data| + last_position = write_message_data(message_data, stream_name, expected_version: expected_version) + + unless expected_version.nil? + expected_version += 1 + end + end + end + + logger.debug(tag: :write) do + message_types = batch.map {|message_data| message_data.type }.uniq.join(', ') + "Wrote batch (Stream Name: #{stream_name}, Types: #{message_types}, Number of Messages: #{batch.length}, Expected Version: #{expected_version.inspect})" + end + + last_position + end + + def write_message_data(message_data, stream_name, expected_version: nil) + logger.trace(tag: :write) { "Writing message data (Stream Name: #{stream_name}, Type: #{message_data.type}, Expected Version: #{expected_version.inspect})" } + logger.trace(tags: [:data, :message_data]) { message_data.pretty_inspect } + + put.(message_data, stream_name, expected_version: expected_version).tap do + logger.debug(tag: :write) { "Wrote message data (Stream Name: #{stream_name}, Type: #{message_data.type}, Expected Version: #{expected_version.inspect})" } + logger.debug(tags: [:data, :message_data]) { message_data.pretty_inspect } + end + end + + def set_ids(batch) + batch.each do |message_data| + if message_data.id.nil? + message_data.id = identifier.get + end + end + end + end +end diff --git a/message_store-postgres.gemspec b/message_store-postgres.gemspec index 6c6376a..1bf00e4 100644 --- a/message_store-postgres.gemspec +++ b/message_store-postgres.gemspec @@ -18,9 +18,14 @@ Gem::Specification.new do |s| s.executables = Dir.glob('scripts/evt-*').map(&File.method(:basename)) s.bindir = 'scripts' - s.add_runtime_dependency 'evt-message_store' s.add_runtime_dependency 'evt-log' s.add_runtime_dependency 'evt-settings' + s.add_runtime_dependency 'evt-initializer' + s.add_runtime_dependency 'evt-identifier-uuid' + s.add_runtime_dependency 'evt-transform' + s.add_runtime_dependency 'evt-virtual' + s.add_runtime_dependency 'evt-casing' + s.add_runtime_dependency 'evt-async_invocation' s.add_runtime_dependency 'message-db' s.add_runtime_dependency 'pg' diff --git a/test/automated/get/category/specialized/condition/condition.rb b/test/automated/get/category/specialized/condition/condition.rb index b6664bc..7335174 100644 --- a/test/automated/get/category/specialized/condition/condition.rb +++ b/test/automated/get/category/specialized/condition/condition.rb @@ -10,7 +10,7 @@ condition = 'position = 0 OR position = 2' - settings = Postgres::Settings.build + settings = MessageStore::Settings.build session = Session.new settings.set(session) session.options = '-c message_store.sql_condition=on' diff --git a/test/automated/get/category/specialized/condition/not_activated_error.rb b/test/automated/get/category/specialized/condition/not_activated_error.rb index 41dd97b..1e6127a 100644 --- a/test/automated/get/category/specialized/condition/not_activated_error.rb +++ b/test/automated/get/category/specialized/condition/not_activated_error.rb @@ -9,7 +9,7 @@ condition = 'some condition' - settings = Postgres::Settings.build + settings = MessageStore::Settings.build session = Session.new settings.set(session) session.options = nil diff --git a/test/automated/get/category/specialized/consumer_groups/error.rb b/test/automated/get/category/specialized/consumer_groups/error.rb index 46ae2b2..5e15252 100644 --- a/test/automated/get/category/specialized/consumer_groups/error.rb +++ b/test/automated/get/category/specialized/consumer_groups/error.rb @@ -7,7 +7,7 @@ context "Error" do context "Consumer Group Size Is Less than 1" do test "Is an error" do - assert_raises(MessageStore::Postgres::Get::Category::ConsumerGroup::Error) do + assert_raises(MessageStore::Get::Category::ConsumerGroup::Error) do Get::Category.('someCategory', consumer_group_member: 0, consumer_group_size: 0) end end @@ -15,7 +15,7 @@ context "Consumer Group Member Is Greater than the Consumer Group Size" do test "Is an error" do - assert_raises(MessageStore::Postgres::Get::Category::ConsumerGroup::Error) do + assert_raises(MessageStore::Get::Category::ConsumerGroup::Error) do Get::Category.('someCategory', consumer_group_member: 2, consumer_group_size: 1) end end @@ -23,7 +23,7 @@ context "Consumer Group Member Is Less than 0" do test "Is an error" do - assert_raises(MessageStore::Postgres::Get::Category::ConsumerGroup::Error) do + assert_raises(MessageStore::Get::Category::ConsumerGroup::Error) do Get::Category.('someCategory', consumer_group_member: -1, consumer_group_size: 1) end end @@ -31,7 +31,7 @@ context "Consumer Group Size is Missing" do test "Is an error" do - assert_raises(MessageStore::Postgres::Get::Category::ConsumerGroup::Error) do + assert_raises(MessageStore::Get::Category::ConsumerGroup::Error) do Get::Category.('someCategory', consumer_group_member: 0) end end @@ -39,7 +39,7 @@ context "Consumer Group Member is Missing" do test "Is an error" do - assert_raises(MessageStore::Postgres::Get::Category::ConsumerGroup::Error) do + assert_raises(MessageStore::Get::Category::ConsumerGroup::Error) do Get::Category.('someCategory', consumer_group_size: 1) end end diff --git a/test/automated/get/category/specialized/correlation/error.rb b/test/automated/get/category/specialized/correlation/error.rb index 178adff..200f764 100644 --- a/test/automated/get/category/specialized/correlation/error.rb +++ b/test/automated/get/category/specialized/correlation/error.rb @@ -10,7 +10,7 @@ category = Controls::Category.example test "Is an error" do - assert_raises(MessageStore::Postgres::Get::Category::Correlation::Error) do + assert_raises(MessageStore::Get::Category::Correlation::Error) do Get::Category.(category, correlation: correlation) end end diff --git a/test/automated/get/stream/specialized/condition/condition.rb b/test/automated/get/stream/specialized/condition/condition.rb index e0f6be7..17630cb 100644 --- a/test/automated/get/stream/specialized/condition/condition.rb +++ b/test/automated/get/stream/specialized/condition/condition.rb @@ -8,7 +8,7 @@ condition = 'position = 0 OR position = 2' - settings = Postgres::Settings.build + settings = MessageStore::Settings.build session = Session.new settings.set(session) session.options = '-c message_store.sql_condition=on' diff --git a/test/automated/get/stream/specialized/condition/not_activated.rb b/test/automated/get/stream/specialized/condition/not_activated.rb index 0a34e4f..35f876b 100644 --- a/test/automated/get/stream/specialized/condition/not_activated.rb +++ b/test/automated/get/stream/specialized/condition/not_activated.rb @@ -9,7 +9,7 @@ condition = 'some condition' - settings = Postgres::Settings.build + settings = MessageStore::Settings.build session = Session.new settings.set(session) session.options = nil diff --git a/test/automated/read/condition.rb b/test/automated/read/condition.rb index 9113e67..a3ecf75 100644 --- a/test/automated/read/condition.rb +++ b/test/automated/read/condition.rb @@ -8,7 +8,7 @@ message_count = 0 - settings = Postgres::Settings.build + settings = MessageStore::Settings.build session = Session.new settings.set(session) session.options = '-c message_store.sql_condition=on' diff --git a/test/automated/read/default_batch_size.rb b/test/automated/read/default_batch_size.rb index 821299f..507372a 100644 --- a/test/automated/read/default_batch_size.rb +++ b/test/automated/read/default_batch_size.rb @@ -2,10 +2,10 @@ context "Read" do context "Default batch Size" do - default_batch_size = MessageStore::Postgres::Read::Defaults.batch_size + default_batch_size = MessageStore::Read::Defaults.batch_size test "Is the Get implementation's default batch size" do - assert(default_batch_size == MessageStore::Postgres::Get::Defaults.batch_size) + assert(default_batch_size == MessageStore::Get::Defaults.batch_size) end end end diff --git a/test/automated/session/build.rb b/test/automated/session/build.rb index b9e837e..6f3e3b8 100644 --- a/test/automated/session/build.rb +++ b/test/automated/session/build.rb @@ -3,7 +3,7 @@ context "Session" do context "Build" do context "Settings is specified" do - settings = MessageStore::Postgres::Settings.build + settings = MessageStore::Settings.build session = Session.build(settings: settings) @@ -13,7 +13,7 @@ end context "Settings is not specified" do - settings = MessageStore::Postgres::Settings.build + settings = MessageStore::Settings.build session = Session.build diff --git a/test/automated/session/configure.rb b/test/automated/session/configure.rb index 1e748cf..12927b2 100644 --- a/test/automated/session/configure.rb +++ b/test/automated/session/configure.rb @@ -23,7 +23,7 @@ user: user } - settings = MessageStore::Postgres::Settings.build(settings_data) + settings = MessageStore::Settings.build(settings_data) Session.configure(receiver, settings: settings) diff --git a/test/automated/session/settings.rb b/test/automated/session/settings.rb index deb2fde..39eba74 100644 --- a/test/automated/session/settings.rb +++ b/test/automated/session/settings.rb @@ -4,10 +4,10 @@ context "Settings" do session = Session.build - settings = Postgres::Settings.build + settings = MessageStore::Settings.build settings_hash = settings.get.to_h - names = Postgres::Settings.names + names = MessageStore::Settings.names names.each do |name| test "#{name}" do diff --git a/test/automated/settings.rb b/test/automated/settings.rb index fa0a521..b7824df 100644 --- a/test/automated/settings.rb +++ b/test/automated/settings.rb @@ -1,12 +1,12 @@ require_relative 'automated_init' context "Settings" do - settings = Postgres::Settings.build + settings = MessageStore::Settings.build context "Names" do settings_hash = settings.get.to_h - names = Postgres::Settings.names + names = MessageStore::Settings.names names.each do |name| test "#{name}" do diff --git a/test/automated/settings/default_path.rb b/test/automated/settings/default_path.rb index 46ada36..9d282b1 100644 --- a/test/automated/settings/default_path.rb +++ b/test/automated/settings/default_path.rb @@ -7,7 +7,7 @@ overridden_path = 'some_path' ENV['MESSAGE_STORE_SETTINGS_PATH'] = overridden_path - settings_path = Postgres::Settings.data_source + settings_path = MessageStore::Settings.data_source test "Overridden by MESSAGE_STORE_SETTINGS_PATH environment variable" do assert(settings_path == overridden_path) diff --git a/test/interactive/concurrency/write_to_single_stream.rb b/test/interactive/concurrency/write_to_single_stream.rb index 5f91f08..4ec3514 100644 --- a/test/interactive/concurrency/write_to_single_stream.rb +++ b/test/interactive/concurrency/write_to_single_stream.rb @@ -20,11 +20,11 @@ def initialize(stream_name) end handle :write_message do - message_data_1 = MessageStore::Postgres::Controls::MessageData::Write.example(data: { actor: object_id }) - message_data_2 = MessageStore::Postgres::Controls::MessageData::Write.example(data: { actor: object_id }) + message_data_1 = MessageStore::Controls::MessageData::Write.example(data: { actor: object_id }) + message_data_2 = MessageStore::Controls::MessageData::Write.example(data: { actor: object_id }) batch = [message_data_1, message_data_2] - position = MessageStore::Postgres::Write.(batch, stream_name) + position = MessageStore::Write.(batch, stream_name) logger.info(tag: :actor) { "Wrote message data (Object ID: #{object_id}, Position: #{position}, Message Type: #{message_data_1.type.inspect}, Stream Name: #{stream_name.inspect})" } diff --git a/test/test_init.rb b/test/test_init.rb index 1cefd49..55634d1 100644 --- a/test/test_init.rb +++ b/test/test_init.rb @@ -4,9 +4,8 @@ puts RUBY_DESCRIPTION require_relative '../init' -require 'message_store/postgres/controls' +require 'message_store/controls' require 'test_bench'; TestBench.activate include MessageStore -include MessageStore::Postgres diff --git a/tools/write_message.rb b/tools/write_message.rb index 4e3ce48..0c7d6ca 100755 --- a/tools/write_message.rb +++ b/tools/write_message.rb @@ -7,10 +7,9 @@ require_relative '../init' -require 'message_store/postgres/controls' +require 'message_store/controls' include MessageStore -include MessageStore::Postgres instances = Integer(ENV['INSTANCES'] || 1) stream_name = ENV['STREAM_NAME'] From 4cf597aa5ce088176a9e5179ecaba5b9ce5dcf04 Mon Sep 17 00:00:00 2001 From: Aaron Jensen Date: Sun, 15 Jan 2023 18:35:24 -0500 Subject: [PATCH 2/4] Get and get last substitutes --- lib/message_store.rb | 2 + lib/message_store/controls.rb | 1 + .../controls/message_data/read.rb | 74 +++++++++++++++++++ .../get/stream/last/substitute.rb | 27 +++++++ lib/message_store/get/substitute.rb | 74 +++++++++++++++++++ test/automated/get_last/substitute.rb | 31 ++++++++ 6 files changed, 209 insertions(+) create mode 100644 lib/message_store/controls/message_data/read.rb create mode 100644 lib/message_store/get/stream/last/substitute.rb create mode 100644 lib/message_store/get/substitute.rb create mode 100644 test/automated/get_last/substitute.rb diff --git a/lib/message_store.rb b/lib/message_store.rb index 3ad9e5d..73ec386 100644 --- a/lib/message_store.rb +++ b/lib/message_store.rb @@ -30,9 +30,11 @@ require 'message_store/write' require 'message_store/get' +require 'message_store/get/substitute' require 'message_store/get/condition' require 'message_store/get/stream' require 'message_store/get/stream/last' +require 'message_store/get/stream/last/substitute' require 'message_store/get/category' require 'message_store/get/category/correlation' require 'message_store/get/category/consumer_group' diff --git a/lib/message_store/controls.rb b/lib/message_store/controls.rb index feed7e1..80b84d2 100644 --- a/lib/message_store/controls.rb +++ b/lib/message_store/controls.rb @@ -11,5 +11,6 @@ require 'message_store/controls/position' require 'message_store/controls/message_data' require 'message_store/controls/message_data/metadata' +require 'message_store/controls/message_data/read' require 'message_store/controls/message_data/write' require 'message_store/controls/put' diff --git a/lib/message_store/controls/message_data/read.rb b/lib/message_store/controls/message_data/read.rb new file mode 100644 index 0000000..16b0cd4 --- /dev/null +++ b/lib/message_store/controls/message_data/read.rb @@ -0,0 +1,74 @@ +module MessageStore + module Controls + module MessageData + module Read + def self.example(id: nil, type: nil, data: nil, metadata: nil) + if id == :none + id = nil + else + id ||= self.id + end + + type ||= self.type + + if data == :none + data = nil + else + data ||= self.data + end + + if metadata == :none + metadata = nil + else + metadata ||= self.metadata + end + + message_data = MessageStore::MessageData::Read.build + + message_data.id = id + message_data.type = type + message_data.data = data + message_data.metadata = metadata + message_data.position = position + message_data.global_position = global_position + message_data.time = time + message_data.stream_name = stream_name + + message_data + end + + def self.id + MessageData.id + end + + def self.type + MessageData.type + end + + def self.data + MessageData.data + end + + def self.metadata + MessageData::Metadata.data + end + + def self.position + 1 + end + + def self.global_position + 111 + end + + def self.time + Time::Raw.example + end + + def self.stream_name + StreamName.example + end + end + end + end +end diff --git a/lib/message_store/get/stream/last/substitute.rb b/lib/message_store/get/stream/last/substitute.rb new file mode 100644 index 0000000..2627dfd --- /dev/null +++ b/lib/message_store/get/stream/last/substitute.rb @@ -0,0 +1,27 @@ +module MessageStore + module Get + class Stream + class Last + module Substitute + def self.build + GetLast.new + end + + class GetLast < Stream::Last + def call(stream_name, type=nil) + streams[stream_name] + end + + def set(stream_name, message_data) + streams[stream_name] = message_data + end + + def streams + @streams ||= {} + end + end + end + end + end + end +end diff --git a/lib/message_store/get/substitute.rb b/lib/message_store/get/substitute.rb new file mode 100644 index 0000000..34d9f99 --- /dev/null +++ b/lib/message_store/get/substitute.rb @@ -0,0 +1,74 @@ +## Add tests, this did not have tests in message-store either - Aaron, Sun Jan 15 2023 +module MessageStore + module Get + class Substitute + include Initializer + include Virtual + + include Get + + attr_accessor :stream_name + alias :category :stream_name + + def batch_size + @batch_size ||= 1 + end + attr_writer :batch_size + + def items + @items ||= [] + end + + def self.build + new + end + + def call(position) + position ||= 0 + + logger.trace(tag: :get) { "Getting (Position: #{position}, Stream Name: #{stream_name.inspect}, Batch Size: #{batch_size})" } + + logger.debug(tag: :data) { "Items: \n#{items.pretty_inspect}" } + logger.debug(tag: :data) { "Position: #{position.inspect}" } + logger.debug(tag: :data) { "Batch Size: #{batch_size.inspect}" } + + # No specialized Gets for substitute + # Complexity has to be inline for the control + # Scott, Tue Oct 1 2019 + unless self.class.category_stream?(stream_name) + index = (items.index { |i| i.position >= position }) + else + index = (items.index { |i| i.global_position >= position }) + end + + logger.debug(tag: :data) { "Index: #{index.inspect}" } + + if index.nil? + items = [] + else + range = index..(index + batch_size - 1) + logger.debug(tag: :data) { "Range: #{range.pretty_inspect}" } + + items = self.items[range] + end + + logger.info(tag: :data) { "Got: \n#{items.pretty_inspect}" } + logger.info(tag: :get) { "Finished getting (Position: #{position}, Stream Name: #{stream_name.inspect})" } + + items + end + + def last_position(batch) + if self.class.category_stream?(stream_name) + batch.last.global_position + else + batch.last.position + end + end + + def self.category_stream?(stream_name) + StreamName.category?(stream_name) + end + end + end +end diff --git a/test/automated/get_last/substitute.rb b/test/automated/get_last/substitute.rb new file mode 100644 index 0000000..1eb598f --- /dev/null +++ b/test/automated/get_last/substitute.rb @@ -0,0 +1,31 @@ +require_relative '../automated_init' + +context "Get Last" do + context "Substitute" do + stream_name = Controls::StreamName.example + + control_message_data = Controls::MessageData::Read.example + + context "Message Is Set" do + substitute = SubstAttr::Substitute.build(Get::Stream::Last) + + substitute.set(stream_name, control_message_data) + + message_data = substitute.(stream_name) + + test "Returns the message that was set" do + assert(message_data == control_message_data) + end + end + + context "Message Not Set" do + substitute = SubstAttr::Substitute.build(Get::Stream::Last) + + message_data = substitute.(stream_name) + + test "Returns nil" do + assert(message_data.nil?) + end + end + end +end From b39392aa7bbe679e81f29ccb43ee4777e7d797f4 Mon Sep 17 00:00:00 2001 From: Aaron Jensen Date: Sun, 15 Jan 2023 18:45:26 -0500 Subject: [PATCH 3/4] Configure instance methods are removed --- lib/message_store/get.rb | 1 + lib/message_store/get/stream/last.rb | 6 +----- lib/message_store/put.rb | 12 ++++-------- lib/message_store/read.rb | 18 +++++++++--------- lib/message_store/read/iterator.rb | 8 ++++---- lib/message_store/write.rb | 6 +----- 6 files changed, 20 insertions(+), 31 deletions(-) diff --git a/lib/message_store/get.rb b/lib/message_store/get.rb index b061e10..da288ce 100644 --- a/lib/message_store/get.rb +++ b/lib/message_store/get.rb @@ -44,6 +44,7 @@ def self.configure(receiver, stream_name, **args) receiver.public_send("#{attr_name}=", instance) end + ## Is there a path to removing this? - Aaron, Sun Jan 15 2023 def configure(session: nil) Session.configure(self, session: session) end diff --git a/lib/message_store/get/stream/last.rb b/lib/message_store/get/stream/last.rb index cf7b953..6717b0a 100644 --- a/lib/message_store/get/stream/last.rb +++ b/lib/message_store/get/stream/last.rb @@ -10,7 +10,7 @@ class Last def self.build(session: nil) instance = new - instance.configure(session: session) + Session.configure(instance, session: session) instance end @@ -27,10 +27,6 @@ def self.configure(receiver, session: nil, attr_name: nil) instance end - def configure(session: nil) - Session.configure(self, session: session) - end - def call(stream_name, type=nil) logger.trace(tag: :get) { "Getting last message data (Stream Name: #{stream_name})" } diff --git a/lib/message_store/put.rb b/lib/message_store/put.rb index 50072fd..e42acf9 100644 --- a/lib/message_store/put.rb +++ b/lib/message_store/put.rb @@ -7,14 +7,10 @@ class Put dependency :identifier, Identifier::UUID::Random def self.build(session: nil) - new.tap do |instance| - instance.configure(session: session) - end - end - - def configure(session: nil) - Session.configure(self, session: session) - Identifier::UUID::Random.configure(self) + instance = new + Session.configure(instance, session: session) + Identifier::UUID::Random.configure(instance) + instance end def self.configure(receiver, session: nil, attr_name: nil) diff --git a/lib/message_store/read.rb b/lib/message_store/read.rb index a229e46..e32b633 100644 --- a/lib/message_store/read.rb +++ b/lib/message_store/read.rb @@ -11,11 +11,15 @@ class Read initializer :stream_name, :position, :batch_size - def self.build(stream_name, position: nil, batch_size: nil, session: nil, **arguments) - new(stream_name, position, batch_size).tap do |instance| - Iterator.configure(instance, position) - instance.configure(session: session, **arguments) - end + def self.build(stream_name, position: nil, batch_size: nil, session: nil, condition: nil, **arguments) + instance = new(stream_name, position, batch_size) + + Iterator.configure(instance, position) + + iterator = instance.iterator + Get.configure(iterator, stream_name, batch_size: batch_size, condition: condition, session: session) + + instance end def self.call(stream_name, position: nil, batch_size: nil, session: nil, **arguments, &action) @@ -29,10 +33,6 @@ def self.configure(receiver, stream_name, attr_name: nil, position: nil, batch_s receiver.public_send "#{attr_name}=", instance end - def configure(session: nil, condition: nil) - Get.configure(self.iterator, self.stream_name, batch_size: batch_size, condition: condition, session: session) - end - def call(&action) logger.trace(tag: :read) { "Reading (Stream Name: #{stream_name})" } diff --git a/lib/message_store/read/iterator.rb b/lib/message_store/read/iterator.rb index 275d0ed..1774250 100644 --- a/lib/message_store/read/iterator.rb +++ b/lib/message_store/read/iterator.rb @@ -26,10 +26,10 @@ def batch_size end def self.build(position=nil) - new.tap do |instance| - instance.starting_position = position - Log.get(self).debug { "Built Iterator (Starting Position: #{position.inspect})" } - end + instance = new + instance.starting_position = position + Log.get(self).debug { "Built Iterator (Starting Position: #{position.inspect})" } + instance end def self.configure(receiver, position=nil, attr_name: nil) diff --git a/lib/message_store/write.rb b/lib/message_store/write.rb index 96ca655..f407ed3 100644 --- a/lib/message_store/write.rb +++ b/lib/message_store/write.rb @@ -10,7 +10,7 @@ class Write def self.build(session: nil) instance = new Identifier::UUID::Random.configure(instance) - instance.configure(session: session) + Put.configure(instance, session: session) instance end @@ -25,10 +25,6 @@ def self.call(message_data, stream_name, expected_version: nil, session: nil) instance.(message_data, stream_name, expected_version: expected_version) end - def configure(session: nil) - Put.configure(self, session: session) - end - def call(message_data, stream_name, expected_version: nil) batch = Array(message_data) From 250e9991bef0b5264bb58fc8f3979c6ab9b29383 Mon Sep 17 00:00:00 2001 From: Aaron Jensen Date: Mon, 16 Jan 2023 11:19:21 -0500 Subject: [PATCH 4/4] Symlink lib script symlinks message_store --- symlink-lib.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/symlink-lib.sh b/symlink-lib.sh index 5a5f9bb..e1697ad 100755 --- a/symlink-lib.sh +++ b/symlink-lib.sh @@ -1,3 +1,3 @@ source ./library-symlinks.sh -symlink_lib 'postgres' 'message_store' +symlink_lib 'message_store'