From be3427d23f74f339be5f2fe6de67775521a37de3 Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Thu, 5 Jun 2025 15:21:32 +0000 Subject: [PATCH 1/5] feat: Adding support for max_commit_delay commit_optins --- .../transaction.rb | 24 ++++++++++++++++--- lib/spanner_client_ext.rb | 5 ++-- .../transaction_test.rb | 10 ++++++++ 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/lib/activerecord_spanner_adapter/transaction.rb b/lib/activerecord_spanner_adapter/transaction.rb index 9a1fd3a8..2d92ac8c 100644 --- a/lib/activerecord_spanner_adapter/transaction.rb +++ b/lib/activerecord_spanner_adapter/transaction.rb @@ -8,13 +8,15 @@ module ActiveRecordSpannerAdapter class Transaction attr_reader :state - def initialize connection, isolation + def initialize connection, isolation, return_commit_stats: nil, max_commit_delay: nil @connection = connection @isolation = isolation @committable = ![:read_only, :pdml].include?(isolation) && !isolation.is_a?(Hash) @state = :INITIALIZED @sequence_number = 0 @mutations = [] + @return_commit_stats = return_commit_stats + @max_commit_delay = max_commit_delay end def active? @@ -95,14 +97,30 @@ def next_sequence_number @sequence_number += 1 if @committable end + def set_commit_options return_commit_stats: nil, max_commit_delay: nil + @return_commit_stats = return_commit_stats unless return_commit_stats.nil? + @max_commit_delay = max_commit_delay unless max_commit_delay.nil? + end + + def get_commit_options # rubocop:disable Naming/AccessorMethodName + { + return_commit_stats: @return_commit_stats, + max_commit_delay: @max_commit_delay + }.compact + end + def commit raise "This transaction is not active" unless active? begin # Start a transaction with an explicit BeginTransaction RPC if the transaction only contains mutations. force_begin_read_write if @committable && !@mutations.empty? && !@grpc_transaction - - @connection.session.commit_transaction @grpc_transaction, @mutations if @committable && @grpc_transaction + commit_options = get_commit_options + if @committable && @grpc_transaction + @connection.session.commit_transaction @grpc_transaction, + @mutations, + commit_options: commit_options + end @state = :COMMITTED rescue Google::Cloud::NotFoundError => e if @connection.session_not_found? e diff --git a/lib/spanner_client_ext.rb b/lib/spanner_client_ext.rb index fcc90797..f70305b0 100644 --- a/lib/spanner_client_ext.rb +++ b/lib/spanner_client_ext.rb @@ -23,13 +23,14 @@ def create_session instance_id, database_id, labels: nil end class Session - def commit_transaction transaction, mutations = [] + def commit_transaction transaction, mutations = [], commit_options: nil ensure_service! resp = service.commit( path, mutations, - transaction_id: transaction.transaction_id + transaction_id: transaction.transaction_id, + commit_options: commit_options ) @last_updated_at = Time.now Convert.timestamp_to_time resp.commit_timestamp diff --git a/test/activerecord_spanner_adapter/transaction_test.rb b/test/activerecord_spanner_adapter/transaction_test.rb index 6996fecd..9ba97e78 100644 --- a/test/activerecord_spanner_adapter/transaction_test.rb +++ b/test/activerecord_spanner_adapter/transaction_test.rb @@ -32,6 +32,16 @@ def test_rollback assert_equal :ROLLED_BACK, transaction.state end + def test_commit_options + transaction.begin + transaction.set_commit_options return_commit_stats: true, max_commit_delay: 1000 + transaction.commit + assert_equal :COMMITTED, transaction.state + commit_options = transaction.get_commit_options + assert commit_options[:return_commit_stats] + assert_equal 1000, commit_options[:max_commit_delay] + end + def test_no_nested_transactions transaction.begin From f5bdb708d771aab457b27ca2f37035ca118cfa9b Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Thu, 5 Jun 2025 15:35:15 +0000 Subject: [PATCH 2/5] feat: Adding support for commit_options while commiting transactions --- .../transaction.rb | 30 ++++++++++++------- .../transaction_test.rb | 2 +- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/lib/activerecord_spanner_adapter/transaction.rb b/lib/activerecord_spanner_adapter/transaction.rb index 2d92ac8c..a18ffd61 100644 --- a/lib/activerecord_spanner_adapter/transaction.rb +++ b/lib/activerecord_spanner_adapter/transaction.rb @@ -8,15 +8,15 @@ module ActiveRecordSpannerAdapter class Transaction attr_reader :state - def initialize connection, isolation, return_commit_stats: nil, max_commit_delay: nil + def initialize connection, isolation, options = {} @connection = connection @isolation = isolation @committable = ![:read_only, :pdml].include?(isolation) && !isolation.is_a?(Hash) @state = :INITIALIZED @sequence_number = 0 @mutations = [] - @return_commit_stats = return_commit_stats - @max_commit_delay = max_commit_delay + @return_commit_stats = options.fetch :return_commit_stats, false + @max_commit_delay = options.fetch :max_commit_delay, 0 end def active? @@ -97,16 +97,26 @@ def next_sequence_number @sequence_number += 1 if @committable end - def set_commit_options return_commit_stats: nil, max_commit_delay: nil - @return_commit_stats = return_commit_stats unless return_commit_stats.nil? - @max_commit_delay = max_commit_delay unless max_commit_delay.nil? + # @param options [Hash] A hash containing the options to set. + # @option options [Boolean] :return_commit_stats Whether to return commit statistics. + # @option options [Integer] :max_commit_delay The maximum delay in seconds for the commit. + def set_commit_options options = {} + return if options.empty? + + if options.key? :return_commit_stats + @return_commit_stats = options[:return_commit_stats] + end + + return unless options.key? :max_commit_delay + @max_commit_delay = options[:max_commit_delay] end - def get_commit_options # rubocop:disable Naming/AccessorMethodName + # Returns the commit options that should be used for the commit. + def commit_options { return_commit_stats: @return_commit_stats, max_commit_delay: @max_commit_delay - }.compact + } end def commit @@ -115,11 +125,11 @@ def commit begin # Start a transaction with an explicit BeginTransaction RPC if the transaction only contains mutations. force_begin_read_write if @committable && !@mutations.empty? && !@grpc_transaction - commit_options = get_commit_options + com_options = commit_options if @committable && @grpc_transaction @connection.session.commit_transaction @grpc_transaction, @mutations, - commit_options: commit_options + commit_options: com_options end @state = :COMMITTED rescue Google::Cloud::NotFoundError => e diff --git a/test/activerecord_spanner_adapter/transaction_test.rb b/test/activerecord_spanner_adapter/transaction_test.rb index 9ba97e78..e87a0a42 100644 --- a/test/activerecord_spanner_adapter/transaction_test.rb +++ b/test/activerecord_spanner_adapter/transaction_test.rb @@ -37,7 +37,7 @@ def test_commit_options transaction.set_commit_options return_commit_stats: true, max_commit_delay: 1000 transaction.commit assert_equal :COMMITTED, transaction.state - commit_options = transaction.get_commit_options + commit_options = transaction.commit_options assert commit_options[:return_commit_stats] assert_equal 1000, commit_options[:max_commit_delay] end From fb3e408d5f7b091ad84db258fff12ce9c01efd9f Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Thu, 5 Jun 2025 15:38:28 +0000 Subject: [PATCH 3/5] feat: refactored set_commit_options method --- lib/activerecord_spanner_adapter/transaction.rb | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/activerecord_spanner_adapter/transaction.rb b/lib/activerecord_spanner_adapter/transaction.rb index a18ffd61..3bf393e8 100644 --- a/lib/activerecord_spanner_adapter/transaction.rb +++ b/lib/activerecord_spanner_adapter/transaction.rb @@ -103,12 +103,14 @@ def next_sequence_number def set_commit_options options = {} return if options.empty? - if options.key? :return_commit_stats - @return_commit_stats = options[:return_commit_stats] + options.each do |key, value| + case key + when :return_commit_stats + @return_commit_stats = value + when :max_commit_delay + @max_commit_delay = value + end end - - return unless options.key? :max_commit_delay - @max_commit_delay = options[:max_commit_delay] end # Returns the commit options that should be used for the commit. From c65e55e0123ab9237e89e9a05d5509d5eb97acf5 Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Fri, 6 Jun 2025 11:13:15 +0000 Subject: [PATCH 4/5] test: Added mock server tests for testing the CommitRequest proto has commit_options --- .../spanner/database_statements.rb | 26 +++++++++++-- .../connection.rb | 2 + .../transaction.rb | 38 +++++++------------ .../inline_begin_transaction_test.rb | 28 ++++++++++++++ 4 files changed, 65 insertions(+), 29 deletions(-) diff --git a/lib/active_record/connection_adapters/spanner/database_statements.rb b/lib/active_record/connection_adapters/spanner/database_statements.rb index 34f4165e..003ea630 100644 --- a/lib/active_record/connection_adapters/spanner/database_statements.rb +++ b/lib/active_record/connection_adapters/spanner/database_statements.rb @@ -229,17 +229,35 @@ def execute_ddl statements # Transaction - def transaction requires_new: nil, isolation: nil, joinable: true + def transaction requires_new: nil, isolation: nil, commit_options: nil, joinable: true + # Create a hash with only the standard options for ActiveRecord. + standard_options = { + requires_new: requires_new, + isolation: isolation, + joinable: joinable + } + if !requires_new && current_transaction.joinable? - return super + # When joining an existing transaction, we just call super with standard options. + return super(**standard_options) end backoff = 0.2 begin - super + # Start the transaction with only the standard options. + super(**standard_options) do + # Once the transaction has been started by `super`, apply your custom options + # to the Spanner transaction object. + if commit_options && @connection.current_transaction + @connection.current_transaction.set_commit_options commit_options + end + + # Now, yield to the user's code block. + yield + end rescue ActiveRecord::StatementInvalid => err if err.cause.is_a? Google::Cloud::AbortedError - sleep(delay_from_aborted(err) || backoff *= 1.3) + sleep(delay_from_aborted(err) || (backoff *= 1.3)) retry end raise diff --git a/lib/activerecord_spanner_adapter/connection.rb b/lib/activerecord_spanner_adapter/connection.rb index e82e8e11..40100157 100644 --- a/lib/activerecord_spanner_adapter/connection.rb +++ b/lib/activerecord_spanner_adapter/connection.rb @@ -44,6 +44,8 @@ def self.spanners config # Call this method if you drop and recreate a database with the same name # to prevent the cached information to be used for the new database. def self.reset_information_schemas! + return unless @database + @information_schemas.each_value do |info_schema| info_schema.connection.disconnect! end diff --git a/lib/activerecord_spanner_adapter/transaction.rb b/lib/activerecord_spanner_adapter/transaction.rb index 3bf393e8..53cd9ad7 100644 --- a/lib/activerecord_spanner_adapter/transaction.rb +++ b/lib/activerecord_spanner_adapter/transaction.rb @@ -7,16 +7,22 @@ module ActiveRecordSpannerAdapter class Transaction attr_reader :state + attr_reader :commit_options - def initialize connection, isolation, options = {} + DEFAULT_COMMIT_OPTIONS = { + return_commit_stats: false, + max_commit_delay: nil # default value is nil + }.freeze + + + def initialize connection, isolation, commit_options = DEFAULT_COMMIT_OPTIONS @connection = connection @isolation = isolation @committable = ![:read_only, :pdml].include?(isolation) && !isolation.is_a?(Hash) @state = :INITIALIZED @sequence_number = 0 @mutations = [] - @return_commit_stats = options.fetch :return_commit_stats, false - @max_commit_delay = options.fetch :max_commit_delay, 0 + @commit_options = DEFAULT_COMMIT_OPTIONS.merge commit_options end def active? @@ -97,28 +103,10 @@ def next_sequence_number @sequence_number += 1 if @committable end - # @param options [Hash] A hash containing the options to set. - # @option options [Boolean] :return_commit_stats Whether to return commit statistics. - # @option options [Integer] :max_commit_delay The maximum delay in seconds for the commit. - def set_commit_options options = {} - return if options.empty? - - options.each do |key, value| - case key - when :return_commit_stats - @return_commit_stats = value - when :max_commit_delay - @max_commit_delay = value - end - end - end - - # Returns the commit options that should be used for the commit. - def commit_options - { - return_commit_stats: @return_commit_stats, - max_commit_delay: @max_commit_delay - } + # Sets the commit options for this transaction. + # This is used to set the options for the commit RPC, such as return_commit_stats and max_commit_delay. + def set_commit_options commit_options = {} + @commit_options.merge! commit_options end def commit diff --git a/test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb b/test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb index d4687fe1..b1a59213 100644 --- a/test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb +++ b/test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb @@ -35,6 +35,34 @@ def test_read_write_transaction_uses_inlined_begin } end + def test_read_write_transaction_with_commit_options + insert_sql = register_insert_singer_result + options_to_test = { return_commit_stats: true, max_commit_delay: 1000 } + [:serializable, :repeatable_read, nil].each do |isolation| + # Start a transaction, passing both the isolation level and the commit_options. + ActiveRecord::Base.transaction isolation: isolation, commit_options: options_to_test do + Singer.create(first_name: "Test", last_name: "User") + end + # Find the CommitRequest sent to the mock server. + commit_requests = @mock.requests.select { |req| req.is_a?(Google::Cloud::Spanner::V1::CommitRequest) } + assert_equal 1, commit_requests.length + commit_request = commit_requests.first + refute_nil commit_request + + # Assert that the commit_options are present and have the correct values. + assert_equal true, commit_request.return_commit_stats + refute_nil commit_request.max_commit_delay + assert_equal 1, commit_request.max_commit_delay.seconds + + sql_request = @mock.requests.select { |req| req.is_a?(Google::Cloud::Spanner::V1::ExecuteSqlRequest) && req.sql == insert_sql }.first + assert sql_request.transaction&.begin&.read_write + assert_equal _transaction_isolation_level_to_grpc(isolation), + sql_request.transaction&.begin&.isolation_level + + @mock.requests.clear + end + end + def test_read_write_transaction_aborted_dml_is_automatically_retried_with_inline_begin insert_sql = register_insert_singer_result From f0a9f317e494bd7004b253bf310f596bd1f6600a Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Fri, 6 Jun 2025 14:19:45 +0000 Subject: [PATCH 5/5] test: fixed the mock server test to use one isolation type --- .../spanner/database_statements.rb | 16 +++------ .../transaction.rb | 15 +++----- .../inline_begin_transaction_test.rb | 35 +++++++------------ 3 files changed, 22 insertions(+), 44 deletions(-) diff --git a/lib/active_record/connection_adapters/spanner/database_statements.rb b/lib/active_record/connection_adapters/spanner/database_statements.rb index 003ea630..105f8e33 100644 --- a/lib/active_record/connection_adapters/spanner/database_statements.rb +++ b/lib/active_record/connection_adapters/spanner/database_statements.rb @@ -229,30 +229,22 @@ def execute_ddl statements # Transaction - def transaction requires_new: nil, isolation: nil, commit_options: nil, joinable: true - # Create a hash with only the standard options for ActiveRecord. - standard_options = { - requires_new: requires_new, - isolation: isolation, - joinable: joinable - } + def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs + commit_options = kwargs.delete :commit_options if !requires_new && current_transaction.joinable? - # When joining an existing transaction, we just call super with standard options. - return super(**standard_options) + return super end backoff = 0.2 begin - # Start the transaction with only the standard options. - super(**standard_options) do + super do # Once the transaction has been started by `super`, apply your custom options # to the Spanner transaction object. if commit_options && @connection.current_transaction @connection.current_transaction.set_commit_options commit_options end - # Now, yield to the user's code block. yield end rescue ActiveRecord::StatementInvalid => err diff --git a/lib/activerecord_spanner_adapter/transaction.rb b/lib/activerecord_spanner_adapter/transaction.rb index 53cd9ad7..7ca80d89 100644 --- a/lib/activerecord_spanner_adapter/transaction.rb +++ b/lib/activerecord_spanner_adapter/transaction.rb @@ -9,20 +9,16 @@ class Transaction attr_reader :state attr_reader :commit_options - DEFAULT_COMMIT_OPTIONS = { - return_commit_stats: false, - max_commit_delay: nil # default value is nil - }.freeze - def initialize connection, isolation, commit_options = DEFAULT_COMMIT_OPTIONS + def initialize connection, isolation, commit_options = nil @connection = connection @isolation = isolation @committable = ![:read_only, :pdml].include?(isolation) && !isolation.is_a?(Hash) @state = :INITIALIZED @sequence_number = 0 @mutations = [] - @commit_options = DEFAULT_COMMIT_OPTIONS.merge commit_options + @commit_options = commit_options end def active? @@ -105,8 +101,8 @@ def next_sequence_number # Sets the commit options for this transaction. # This is used to set the options for the commit RPC, such as return_commit_stats and max_commit_delay. - def set_commit_options commit_options = {} - @commit_options.merge! commit_options + def set_commit_options options # rubocop:disable Naming/AccessorMethodName + @commit_options = options&.dup end def commit @@ -115,11 +111,10 @@ def commit begin # Start a transaction with an explicit BeginTransaction RPC if the transaction only contains mutations. force_begin_read_write if @committable && !@mutations.empty? && !@grpc_transaction - com_options = commit_options if @committable && @grpc_transaction @connection.session.commit_transaction @grpc_transaction, @mutations, - commit_options: com_options + commit_options: commit_options end @state = :COMMITTED rescue Google::Cloud::NotFoundError => e diff --git a/test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb b/test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb index b1a59213..e6b2ef39 100644 --- a/test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb +++ b/test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb @@ -38,29 +38,20 @@ def test_read_write_transaction_uses_inlined_begin def test_read_write_transaction_with_commit_options insert_sql = register_insert_singer_result options_to_test = { return_commit_stats: true, max_commit_delay: 1000 } - [:serializable, :repeatable_read, nil].each do |isolation| - # Start a transaction, passing both the isolation level and the commit_options. - ActiveRecord::Base.transaction isolation: isolation, commit_options: options_to_test do - Singer.create(first_name: "Test", last_name: "User") - end - # Find the CommitRequest sent to the mock server. - commit_requests = @mock.requests.select { |req| req.is_a?(Google::Cloud::Spanner::V1::CommitRequest) } - assert_equal 1, commit_requests.length - commit_request = commit_requests.first - refute_nil commit_request - - # Assert that the commit_options are present and have the correct values. - assert_equal true, commit_request.return_commit_stats - refute_nil commit_request.max_commit_delay - assert_equal 1, commit_request.max_commit_delay.seconds - - sql_request = @mock.requests.select { |req| req.is_a?(Google::Cloud::Spanner::V1::ExecuteSqlRequest) && req.sql == insert_sql }.first - assert sql_request.transaction&.begin&.read_write - assert_equal _transaction_isolation_level_to_grpc(isolation), - sql_request.transaction&.begin&.isolation_level - - @mock.requests.clear + # Start a transaction, passing the commit_options. + ActiveRecord::Base.transaction commit_options: options_to_test do + Singer.create(first_name: "Test", last_name: "User") end + # Find the CommitRequest sent to the mock server. + commit_requests = @mock.requests.select { |req| req.is_a?(Google::Cloud::Spanner::V1::CommitRequest) } + assert_equal 1, commit_requests.length + commit_request = commit_requests.first + refute_nil commit_request + + # Assert that the commit_options are present and have the correct values. + assert_equal true, commit_request.return_commit_stats + refute_nil commit_request.max_commit_delay + assert_equal 1, commit_request.max_commit_delay.seconds end def test_read_write_transaction_aborted_dml_is_automatically_retried_with_inline_begin