diff --git a/lib/active_record/connection_adapters/spanner/database_statements.rb b/lib/active_record/connection_adapters/spanner/database_statements.rb index 34f4165e..105f8e33 100644 --- a/lib/active_record/connection_adapters/spanner/database_statements.rb +++ b/lib/active_record/connection_adapters/spanner/database_statements.rb @@ -229,17 +229,27 @@ def execute_ddl statements # Transaction - def transaction requires_new: nil, isolation: nil, joinable: true + def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs + commit_options = kwargs.delete :commit_options + if !requires_new && current_transaction.joinable? return super end backoff = 0.2 begin - super + super do + # Once the transaction has been started by `super`, apply your custom options + # to the Spanner transaction object. + if commit_options && @connection.current_transaction + @connection.current_transaction.set_commit_options commit_options + end + + yield + end rescue ActiveRecord::StatementInvalid => err if err.cause.is_a? Google::Cloud::AbortedError - sleep(delay_from_aborted(err) || backoff *= 1.3) + sleep(delay_from_aborted(err) || (backoff *= 1.3)) retry end raise diff --git a/lib/activerecord_spanner_adapter/connection.rb b/lib/activerecord_spanner_adapter/connection.rb index e82e8e11..40100157 100644 --- a/lib/activerecord_spanner_adapter/connection.rb +++ b/lib/activerecord_spanner_adapter/connection.rb @@ -44,6 +44,8 @@ def self.spanners config # Call this method if you drop and recreate a database with the same name # to prevent the cached information to be used for the new database. def self.reset_information_schemas! + return unless @database + @information_schemas.each_value do |info_schema| info_schema.connection.disconnect! end diff --git a/lib/activerecord_spanner_adapter/transaction.rb b/lib/activerecord_spanner_adapter/transaction.rb index 9a1fd3a8..7ca80d89 100644 --- a/lib/activerecord_spanner_adapter/transaction.rb +++ b/lib/activerecord_spanner_adapter/transaction.rb @@ -7,14 +7,18 @@ module ActiveRecordSpannerAdapter class Transaction attr_reader :state + attr_reader :commit_options - def initialize connection, isolation + + + def initialize connection, isolation, commit_options = nil @connection = connection @isolation = isolation @committable = ![:read_only, :pdml].include?(isolation) && !isolation.is_a?(Hash) @state = :INITIALIZED @sequence_number = 0 @mutations = [] + @commit_options = commit_options end def active? @@ -95,14 +99,23 @@ def next_sequence_number @sequence_number += 1 if @committable end + # Sets the commit options for this transaction. + # This is used to set the options for the commit RPC, such as return_commit_stats and max_commit_delay. + def set_commit_options options # rubocop:disable Naming/AccessorMethodName + @commit_options = options&.dup + end + def commit raise "This transaction is not active" unless active? begin # Start a transaction with an explicit BeginTransaction RPC if the transaction only contains mutations. force_begin_read_write if @committable && !@mutations.empty? && !@grpc_transaction - - @connection.session.commit_transaction @grpc_transaction, @mutations if @committable && @grpc_transaction + if @committable && @grpc_transaction + @connection.session.commit_transaction @grpc_transaction, + @mutations, + commit_options: commit_options + end @state = :COMMITTED rescue Google::Cloud::NotFoundError => e if @connection.session_not_found? e diff --git a/lib/spanner_client_ext.rb b/lib/spanner_client_ext.rb index fcc90797..f70305b0 100644 --- a/lib/spanner_client_ext.rb +++ b/lib/spanner_client_ext.rb @@ -23,13 +23,14 @@ def create_session instance_id, database_id, labels: nil end class Session - def commit_transaction transaction, mutations = [] + def commit_transaction transaction, mutations = [], commit_options: nil ensure_service! resp = service.commit( path, mutations, - transaction_id: transaction.transaction_id + transaction_id: transaction.transaction_id, + commit_options: commit_options ) @last_updated_at = Time.now Convert.timestamp_to_time resp.commit_timestamp diff --git a/test/activerecord_spanner_adapter/transaction_test.rb b/test/activerecord_spanner_adapter/transaction_test.rb index 6996fecd..e87a0a42 100644 --- a/test/activerecord_spanner_adapter/transaction_test.rb +++ b/test/activerecord_spanner_adapter/transaction_test.rb @@ -32,6 +32,16 @@ def test_rollback assert_equal :ROLLED_BACK, transaction.state end + def test_commit_options + transaction.begin + transaction.set_commit_options return_commit_stats: true, max_commit_delay: 1000 + transaction.commit + assert_equal :COMMITTED, transaction.state + commit_options = transaction.commit_options + assert commit_options[:return_commit_stats] + assert_equal 1000, commit_options[:max_commit_delay] + end + def test_no_nested_transactions transaction.begin diff --git a/test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb b/test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb index d4687fe1..e6b2ef39 100644 --- a/test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb +++ b/test/activerecord_spanner_mock_server/inline_begin_transaction_test.rb @@ -35,6 +35,25 @@ def test_read_write_transaction_uses_inlined_begin } end + def test_read_write_transaction_with_commit_options + insert_sql = register_insert_singer_result + options_to_test = { return_commit_stats: true, max_commit_delay: 1000 } + # Start a transaction, passing the commit_options. + ActiveRecord::Base.transaction commit_options: options_to_test do + Singer.create(first_name: "Test", last_name: "User") + end + # Find the CommitRequest sent to the mock server. + commit_requests = @mock.requests.select { |req| req.is_a?(Google::Cloud::Spanner::V1::CommitRequest) } + assert_equal 1, commit_requests.length + commit_request = commit_requests.first + refute_nil commit_request + + # Assert that the commit_options are present and have the correct values. + assert_equal true, commit_request.return_commit_stats + refute_nil commit_request.max_commit_delay + assert_equal 1, commit_request.max_commit_delay.seconds + end + def test_read_write_transaction_aborted_dml_is_automatically_retried_with_inline_begin insert_sql = register_insert_singer_result