From 7cbab20a280fb578a198e0deb851c0cfb34c66c1 Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Fri, 20 Jun 2025 18:42:03 +0000 Subject: [PATCH 1/6] feat: Adding PDML fallback support for DML operations --- .../spanner/database_statements.rb | 9 +++- ...ansaction_mutation_limit_exceeded_error.rb | 20 +++++++++ .../connection.rb | 33 ++++++++++++-- .../transaction.rb | 9 ++++ ...ner_active_record_with_mock_server_test.rb | 44 +++++++++++++++++++ 5 files changed, 110 insertions(+), 5 deletions(-) create mode 100644 lib/active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error.rb diff --git a/lib/active_record/connection_adapters/spanner/database_statements.rb b/lib/active_record/connection_adapters/spanner/database_statements.rb index 94e07d6c..99107f12 100644 --- a/lib/active_record/connection_adapters/spanner/database_statements.rb +++ b/lib/active_record/connection_adapters/spanner/database_statements.rb @@ -72,10 +72,11 @@ def execute_query_or_dml statement_type, sql, name, binds ActiveSupport::Dependencies.interlock.permit_concurrent_loads do if transaction_required transaction do - @connection.execute_query sql, params: params, types: types, request_options: request_options + @connection.execute_query sql, statement_type: statement_type, params: params, types: types, +request_options: request_options end else - @connection.execute_query sql, params: params, types: types, single_use_selector: selector, + @connection.execute_query sql, statement_type: statement_type, params: params, types: types, single_use_selector: selector, request_options: request_options end end @@ -232,6 +233,7 @@ def execute_ddl statements def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs commit_options = kwargs.delete :commit_options exclude_from_streams = kwargs.delete :exclude_txn_from_change_streams + fallback_to_pdml_enabled = kwargs.delete :fallback_to_pdml_enabled || false @_spanner_begin_transaction_options = { exclude_txn_from_change_streams: exclude_from_streams } @@ -247,6 +249,9 @@ def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs if commit_options && @connection.current_transaction @connection.current_transaction.set_commit_options commit_options end + if fallback_to_pdml_enabled && @connection.current_transaction + @connection.current_transaction.fallback_to_pdml_enabled = fallback_to_pdml_enabled + end yield end diff --git a/lib/active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error.rb b/lib/active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error.rb new file mode 100644 index 00000000..899d0683 --- /dev/null +++ b/lib/active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error.rb @@ -0,0 +1,20 @@ +module Google + module Cloud + module Spanner + module Errors + # Custom exception raised when a transaction exceeds the mutation limit in Google Cloud Spanner. + # This provides a specific error class for a common, recoverable scenario. + class TransactionMutationLimitExceededError < Google::Cloud::Error + ERROR_MESSAGE = "The transaction contains too many mutations".freeze + + def self.is_mutation_limit_error? exception + return false if exception.message.nil? + cause = exception.cause + return false unless cause.is_a? GRPC::InvalidArgument + cause.details.include?(ERROR_MESSAGE) || exception.message.include?(ERROR_MESSAGE) + end + end + end + end + end +end diff --git a/lib/activerecord_spanner_adapter/connection.rb b/lib/activerecord_spanner_adapter/connection.rb index dc72c3f5..a354f0a6 100644 --- a/lib/activerecord_spanner_adapter/connection.rb +++ b/lib/activerecord_spanner_adapter/connection.rb @@ -7,8 +7,11 @@ require "google/cloud/spanner" require "spanner_client_ext" require "activerecord_spanner_adapter/information_schema" +require_relative "../active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error" module ActiveRecordSpannerAdapter + TransactionMutationLimitExceededError = Google::Cloud::Spanner::Errors::TransactionMutationLimitExceededError + class Connection attr_reader :instance_id attr_reader :database_id @@ -209,7 +212,7 @@ def run_batch # DQL, DML Statements - def execute_query sql, params: nil, types: nil, single_use_selector: nil, request_options: nil + def execute_query sql, statement_type: nil, params: nil, types: nil, single_use_selector: nil, request_options: nil if params converted_params, types = Google::Cloud::Spanner::Convert.to_input_params_and_types( @@ -223,11 +226,12 @@ def execute_query sql, params: nil, types: nil, single_use_selector: nil, reques end selector = transaction_selector || single_use_selector - execute_sql_request sql, converted_params, types, selector, request_options + execute_sql_request sql, statement_type, converted_params, types, selector, request_options end # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity - def execute_sql_request sql, converted_params, types, selector, request_options = nil + def execute_sql_request sql, statement_type, converted_params, types, selector, request_options = nil + current_transaction&.increment_dml_counter if statement_type == :dml && current_transaction&.active? res = session.execute_query \ sql, params: converted_params, @@ -252,6 +256,29 @@ def execute_sql_request sql, converted_params, types, selector, request_options end raise rescue Google::Cloud::Error => e + # If the error is a TransactionMutationLimitExceededError, check if it is safe to fallback to a PDML transaction. + # If it is safe, create a PDML transaction and retry the request. + if TransactionMutationLimitExceededError.is_mutation_limit_error? e + is_safe_to_fallback = + current_transaction&.dml_statement_count == 1 && current_transaction&.mutations&.empty? + + if current_transaction&.fallback_to_pdml_enabled && is_safe_to_fallback + pdml_transaction = ActiveRecordSpannerAdapter::Transaction.new self, :pdml + pdml_transaction.begin + pdml_selector = pdml_transaction.transaction_selector + + result = session.execute_query( + sql, + params: converted_params, + types: types, + transaction: pdml_selector, + request_options: request_options + ) + return result + end + # If it is not safe to fallback, raise a TransactionMutationLimitExceededError. + raise ::ActiveRecordSpannerAdapter::TransactionMutationLimitExceededError + end # Check if it was the first statement in a transaction that included a BeginTransaction # option in the request. If so, execute an explicit BeginTransaction and then retry the # request without the BeginTransaction option. diff --git a/lib/activerecord_spanner_adapter/transaction.rb b/lib/activerecord_spanner_adapter/transaction.rb index c2763c60..644991e9 100644 --- a/lib/activerecord_spanner_adapter/transaction.rb +++ b/lib/activerecord_spanner_adapter/transaction.rb @@ -10,6 +10,9 @@ class Transaction attr_reader :commit_options attr_reader :begin_transaction_selector attr_accessor :exclude_txn_from_change_streams + attr_accessor :fallback_to_pdml_enabled + attr_reader :dml_statement_count + attr_reader :mutations @@ -22,6 +25,8 @@ def initialize connection, isolation, commit_options = nil, exclude_txn_from_cha @mutations = [] @commit_options = commit_options @exclude_txn_from_change_streams = exclude_txn_from_change_streams + @fallback_to_pdml_enabled = false + @dml_statement_count = 0 end def active? @@ -37,6 +42,10 @@ def buffer mutation @mutations << mutation end + def increment_dml_counter + @dml_statement_count += 1 + end + # Begins the transaction. # # Read-only and PDML transactions are started by executing a BeginTransaction RPC. diff --git a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb index 80d88a06..c3ee4f53 100644 --- a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb +++ b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb @@ -13,6 +13,7 @@ module MockServerTests CommitRequest = Google::Cloud::Spanner::V1::CommitRequest ExecuteSqlRequest = Google::Cloud::Spanner::V1::ExecuteSqlRequest + BeginTransactionRequest = Google::Cloud::Spanner::V1::BeginTransactionRequest class SpannerActiveRecordMockServerTest < BaseSpannerMockServerTest VERSION_7_1_0 = Gem::Version.create('7.1.0') @@ -192,6 +193,49 @@ def test_selects_one_singer_without_transaction end end + def test_update_all_on_table_with_sequence_falls_back_to_pdml + update_sql = "UPDATE `table_with_sequence` SET `name` = @p1 WHERE `table_with_sequence`.`id` = @p2" + + mutation_limit_error = GRPC::InvalidArgument.new("The transaction contains too many mutations") + @mock.push_error(update_sql, mutation_limit_error) + @mock.put_statement_result update_sql, StatementResult.new(1) + + TableWithSequence.transaction(fallback_to_pdml_enabled: true) do + TableWithSequence.where(id: 1).update_all(name: "New Foo Name") + end + + update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql == update_sql } + assert_equal 2, update_requests.length, "Should have been two attempts for the UPDATE DML" + + pdml_begin_request = @mock.requests.find { |req| req.is_a?(BeginTransactionRequest) && req.options&.partitioned_dml } + refute_nil pdml_begin_request, "A BeginTransactionRequest for PDML should have been sent" + + fallback_request = update_requests[1] + assert fallback_request.transaction&.id, "Fallback DML should run within a transaction and have an ID" + assert_nil fallback_request.transaction&.begin, "Fallback DML should use the existing PDML transaction, not begin a new one" + end + + def test_no_fallback_to_pdml_on_table_with_sequence_when_disabled + update_sql = "UPDATE `table_with_sequence` SET `name` = @p1 WHERE `table_with_sequence`.`id` = @p2" + + mutation_limit_error = GRPC::InvalidArgument.new("The transaction contains too many mutations") + @mock.push_error(update_sql, mutation_limit_error) + + err = assert_raises ActiveRecord::StatementInvalid do + TableWithSequence.transaction do + TableWithSequence.where(id: 1).update_all(name: "This name will not be updated") + end + end + + assert_kind_of Google::Cloud::Spanner::Errors::TransactionMutationLimitExceededError, err.cause + + update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql == update_sql } + assert_equal 1, update_requests.length, "Should only have been one attempt for the UPDATE DML" + + pdml_begin_request = @mock.requests.find { |req| req.is_a?(BeginTransactionRequest) && req.options&.partitioned_dml } + assert_nil pdml_begin_request, "No PDML transaction should have been started" + end + def test_selects_singers_with_condition # This query does not use query parameters because the where clause is specified as a string. # ActiveRecord sees that as a SQL fragment that will disable the usage of prepared statements. From a2108a3962554983f9e50c381bbee998ebb2a0d3 Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Fri, 20 Jun 2025 19:01:35 +0000 Subject: [PATCH 2/6] fix: Fixing robocop build failures --- .../spanner/database_statements.rb | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/lib/active_record/connection_adapters/spanner/database_statements.rb b/lib/active_record/connection_adapters/spanner/database_statements.rb index 99107f12..d56689e0 100644 --- a/lib/active_record/connection_adapters/spanner/database_statements.rb +++ b/lib/active_record/connection_adapters/spanner/database_statements.rb @@ -72,11 +72,18 @@ def execute_query_or_dml statement_type, sql, name, binds ActiveSupport::Dependencies.interlock.permit_concurrent_loads do if transaction_required transaction do - @connection.execute_query sql, statement_type: statement_type, params: params, types: types, -request_options: request_options + @connection.execute_query sql, + statement_type: statement_type, + params: params, + types: types, + request_options: request_options end else - @connection.execute_query sql, statement_type: statement_type, params: params, types: types, single_use_selector: selector, + @connection.execute_query sql, + statement_type: statement_type, + params: params, + types: types, + single_use_selector: selector, request_options: request_options end end @@ -230,7 +237,7 @@ def execute_ddl statements # Transaction - def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs + def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity commit_options = kwargs.delete :commit_options exclude_from_streams = kwargs.delete :exclude_txn_from_change_streams fallback_to_pdml_enabled = kwargs.delete :fallback_to_pdml_enabled || false From 1e4d1153899333742f02852c3ab45dbe05bcab93 Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Mon, 23 Jun 2025 17:41:02 +0000 Subject: [PATCH 3/6] fix: updated pdml fallback implementation --- .../read_write_transactions_test.rb | 28 +++++++++ .../snippets/partitioned-dml/application.rb | 60 ++++++++++++++++++- .../spanner/database_statements.rb | 50 ++++++++++++---- ...ansaction_mutation_limit_exceeded_error.rb | 17 ++++-- .../connection.rb | 29 ++------- .../transaction.rb | 13 ++-- ...ner_active_record_with_mock_server_test.rb | 24 +++++++- 7 files changed, 167 insertions(+), 54 deletions(-) diff --git a/acceptance/cases/transactions/read_write_transactions_test.rb b/acceptance/cases/transactions/read_write_transactions_test.rb index 16fec677..530587d2 100644 --- a/acceptance/cases/transactions/read_write_transactions_test.rb +++ b/acceptance/cases/transactions/read_write_transactions_test.rb @@ -273,6 +273,34 @@ def test_create_record_with_sequence_using_mutations TableWithSequence.connection.use_client_side_id_for_mutations = reset_value end end + def test_single_dml_succeeds_with_fallback_to_pdml_enabled + # This test verifies that a normal, successful DML statement works as + # expected when the fallback isolation is enabled. Because no mutation + # limit error occurs, the fallback to PDML should NOT be triggered. + create_test_records + assert_equal 1, Author.count + + Author.transaction isolation: :fallback_to_pdml do + Author.where(name: "David").delete_all + end + + assert_equal 0, Author.count, "The record should have been deleted" + end + + def test_other_errors_do_not_trigger_fallback + # This test ensures that if a transaction with fallback enabled fails + # for a reason OTHER than the mutation limit, it fails normally and + # does not attempt to fall back to PDML. + create_test_records + initial_author_count = Author.count + + assert_raises ActiveRecord::StatementInvalid do + Author.transaction isolation: :fallback_to_pdml do + Author.create! name: nil + end + end + assert_equal initial_author_count, Author.count, "Transaction should have rolled back" + end end end end diff --git a/examples/snippets/partitioned-dml/application.rb b/examples/snippets/partitioned-dml/application.rb index 83a4f792..03219e86 100644 --- a/examples/snippets/partitioned-dml/application.rb +++ b/examples/snippets/partitioned-dml/application.rb @@ -19,8 +19,6 @@ def self.run puts "" puts "Deleting all albums in the database using Partitioned DML" - # Note that a Partitioned DML transaction can contain ONLY ONE DML statement. - # If we want to delete all data in two different tables, we need to do so in two different PDML transactions. Album.transaction isolation: :pdml do count = Album.delete_all puts "Deleted #{count} albums" @@ -39,6 +37,64 @@ def self.run puts "Singers in the database: #{singer_count}" puts "Albums in the database: #{album_count}" end + + def self.run_two_dmls_in_pdml_transaction_test + begin + Singer.transaction isolation: :pdml do + Album.delete_all + Singer.delete_all + end + rescue ActiveRecord::StatementInvalid + puts " SUCCESS: As expected, the transaction failed because PDML only supports one DML statement." + ensure + Album.delete_all + Singer.delete_all + end + end + + def self.demonstrate_successful_fallback + begin + singers_to_create = (1..10).map { |i| { first_name: "Test", last_name: "Singer #{i}" } } + Singer.create singers_to_create + puts " #{Singer.count} singers now in database." + + puts "\n Running a large delete operation with 'isolation: :fallback_to_pdml'..." + puts " NOTE: A real operation of this type on millions of rows could fail with a mutation limit error." + puts " The adapter would catch this error and automatically retry with a PDML transaction." + + Singer.transaction isolation: :fallback_to_pdml do + Singer.where("last_name LIKE 'Singer %'").delete_all + end + + puts "\n SUCCESS: The transaction completed successfully thanks to the PDML fallback." + puts " Remaining singers: #{Singer.count}" + rescue StandardError => e + puts "\n FAILED: The transaction unexpectedly failed with error: #{e.message}" + ensure + Singer.delete_all + end + end + + def self.demonstrate_no_fallback_when_disabled + begin + puts " Running a transaction that will fail, without enabling the fallback..." + + Singer.transaction do + # To demonstrate this, we simulate what Active Record would do if Spanner + # returned a mutation limit error. It would raise a generic StatementInvalid error. + puts " Simulating a DML operation that exceeds the mutation limit..." + raise ActiveRecordSpannerAdapter::TransactionMutationLimitExceededError, + "Simulated: The transaction contains too many mutations" + end + rescue ActiveRecord::StatementInvalid + puts " SUCCESS: As expected, the transaction failed with a generic ActiveRecord::StatementInvalid." + puts " No fallback was attempted." + end + end end Application.run +Application.run_two_dmls_in_pdml_transaction_test + +Application.demonstrate_successful_fallback +Application.demonstrate_no_fallback_when_disabled diff --git a/lib/active_record/connection_adapters/spanner/database_statements.rb b/lib/active_record/connection_adapters/spanner/database_statements.rb index d56689e0..1fc45db0 100644 --- a/lib/active_record/connection_adapters/spanner/database_statements.rb +++ b/lib/active_record/connection_adapters/spanner/database_statements.rb @@ -7,6 +7,7 @@ # frozen_string_literal: true require "active_record/gem_version" +require "active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error" module ActiveRecord module ConnectionAdapters @@ -14,6 +15,7 @@ module Spanner module DatabaseStatements VERSION_7_1_0 = Gem::Version.create "7.1.0" RequestOptions = Google::Cloud::Spanner::V1::RequestOptions + TransactionMutationLimitExceededError = Google::Cloud::Spanner::Errors::TransactionMutationLimitExceededError # DDL, DML and DQL Statements @@ -73,14 +75,12 @@ def execute_query_or_dml statement_type, sql, name, binds if transaction_required transaction do @connection.execute_query sql, - statement_type: statement_type, params: params, types: types, request_options: request_options end else @connection.execute_query sql, - statement_type: statement_type, params: params, types: types, single_use_selector: selector, @@ -237,10 +237,9 @@ def execute_ddl statements # Transaction - def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity + def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs, &block # rubocop:disable Metrics/PerceivedComplexity commit_options = kwargs.delete :commit_options exclude_from_streams = kwargs.delete :exclude_txn_from_change_streams - fallback_to_pdml_enabled = kwargs.delete :fallback_to_pdml_enabled || false @_spanner_begin_transaction_options = { exclude_txn_from_change_streams: exclude_from_streams } @@ -256,9 +255,6 @@ def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs # ru if commit_options && @connection.current_transaction @connection.current_transaction.set_commit_options commit_options end - if fallback_to_pdml_enabled && @connection.current_transaction - @connection.current_transaction.fallback_to_pdml_enabled = fallback_to_pdml_enabled - end yield end @@ -266,8 +262,17 @@ def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs # ru if err.cause.is_a? Google::Cloud::AbortedError sleep(delay_from_aborted(err) || (backoff *= 1.3)) retry + elsif TransactionMutationLimitExceededError.is_mutation_limit_error? err.cause + raise err unless isolation == :fallback_to_pdml + transaction( + requires_new: true, + isolation: :pdml, + joinable: false, + **kwargs, &block + ) + else + raise err end - raise ensure # Clean up the instance variable to avoid leaking options. @_spanner_begin_transaction_options = nil @@ -284,7 +289,8 @@ def transaction_isolation_levels # These are not really isolation levels, but it is the only (best) way to pass in additional # transaction options to the connection. read_only: "READ_ONLY", - buffered_mutations: "BUFFERED_MUTATIONS" + buffered_mutations: "BUFFERED_MUTATIONS", + fallback_to_pdml: "FALLBACK_TO_PDML" } end @@ -321,7 +327,8 @@ def begin_isolated_db_transaction isolation if isolation.count != 1 else raise "Unsupported isolation level: #{isolation}" unless - [:serializable, :repeatable_read, :read_only, :buffered_mutations, :pdml].include? isolation + [:serializable, :repeatable_read, :read_only, :buffered_mutations, :pdml, + :fallback_to_pdml].include? isolation end log "BEGIN #{isolation}" do @@ -330,6 +337,29 @@ def begin_isolated_db_transaction isolation end end + def create_savepoint name = "active_record_1" + if @connection.current_transaction&.is_pdml? + # PDML transactions do not support savepoints. Silently ignore the + # request. This allows methods like `update_all` to proceed + # without crashing when they are run in a PDML fallback context. + return + end + super + end + + def release_savepoint name = "active_record_1" + # We must also override release_savepoint to be consistent. If we + # ignore the creation of a savepoint, we must also ignore its release. + return if @connection.current_transaction&.is_pdml? + super + end + + def rollback_to_savepoint name = "active_record_1" + # And we must override rollback_to_savepoint for consistency. + return if @connection.current_transaction&.is_pdml? + super + end + def commit_db_transaction log "COMMIT" do @connection.commit_transaction diff --git a/lib/active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error.rb b/lib/active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error.rb index 899d0683..d1b9a8b0 100644 --- a/lib/active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error.rb +++ b/lib/active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error.rb @@ -1,17 +1,22 @@ +# Copyright 2025 Google LLC +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. + module Google module Cloud module Spanner module Errors # Custom exception raised when a transaction exceeds the mutation limit in Google Cloud Spanner. # This provides a specific error class for a common, recoverable scenario. - class TransactionMutationLimitExceededError < Google::Cloud::Error + class TransactionMutationLimitExceededError < ActiveRecord::StatementInvalid ERROR_MESSAGE = "The transaction contains too many mutations".freeze - def self.is_mutation_limit_error? exception - return false if exception.message.nil? - cause = exception.cause - return false unless cause.is_a? GRPC::InvalidArgument - cause.details.include?(ERROR_MESSAGE) || exception.message.include?(ERROR_MESSAGE) + def self.is_mutation_limit_error? error + return false if error.nil? + error.is_a?(Google::Cloud::InvalidArgumentError) && + error.message&.include?(ERROR_MESSAGE) end end end diff --git a/lib/activerecord_spanner_adapter/connection.rb b/lib/activerecord_spanner_adapter/connection.rb index a354f0a6..f6e9fa56 100644 --- a/lib/activerecord_spanner_adapter/connection.rb +++ b/lib/activerecord_spanner_adapter/connection.rb @@ -212,7 +212,7 @@ def run_batch # DQL, DML Statements - def execute_query sql, statement_type: nil, params: nil, types: nil, single_use_selector: nil, request_options: nil + def execute_query sql, params: nil, types: nil, single_use_selector: nil, request_options: nil if params converted_params, types = Google::Cloud::Spanner::Convert.to_input_params_and_types( @@ -226,12 +226,11 @@ def execute_query sql, statement_type: nil, params: nil, types: nil, single_use_ end selector = transaction_selector || single_use_selector - execute_sql_request sql, statement_type, converted_params, types, selector, request_options + execute_sql_request sql, converted_params, types, selector, request_options end # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity - def execute_sql_request sql, statement_type, converted_params, types, selector, request_options = nil - current_transaction&.increment_dml_counter if statement_type == :dml && current_transaction&.active? + def execute_sql_request sql, converted_params, types, selector, request_options = nil res = session.execute_query \ sql, params: converted_params, @@ -256,28 +255,8 @@ def execute_sql_request sql, statement_type, converted_params, types, selector, end raise rescue Google::Cloud::Error => e - # If the error is a TransactionMutationLimitExceededError, check if it is safe to fallback to a PDML transaction. - # If it is safe, create a PDML transaction and retry the request. if TransactionMutationLimitExceededError.is_mutation_limit_error? e - is_safe_to_fallback = - current_transaction&.dml_statement_count == 1 && current_transaction&.mutations&.empty? - - if current_transaction&.fallback_to_pdml_enabled && is_safe_to_fallback - pdml_transaction = ActiveRecordSpannerAdapter::Transaction.new self, :pdml - pdml_transaction.begin - pdml_selector = pdml_transaction.transaction_selector - - result = session.execute_query( - sql, - params: converted_params, - types: types, - transaction: pdml_selector, - request_options: request_options - ) - return result - end - # If it is not safe to fallback, raise a TransactionMutationLimitExceededError. - raise ::ActiveRecordSpannerAdapter::TransactionMutationLimitExceededError + raise end # Check if it was the first statement in a transaction that included a BeginTransaction # option in the request. If so, execute an explicit BeginTransaction and then retry the diff --git a/lib/activerecord_spanner_adapter/transaction.rb b/lib/activerecord_spanner_adapter/transaction.rb index 644991e9..d9bf9fd4 100644 --- a/lib/activerecord_spanner_adapter/transaction.rb +++ b/lib/activerecord_spanner_adapter/transaction.rb @@ -10,9 +10,6 @@ class Transaction attr_reader :commit_options attr_reader :begin_transaction_selector attr_accessor :exclude_txn_from_change_streams - attr_accessor :fallback_to_pdml_enabled - attr_reader :dml_statement_count - attr_reader :mutations @@ -25,8 +22,6 @@ def initialize connection, isolation, commit_options = nil, exclude_txn_from_cha @mutations = [] @commit_options = commit_options @exclude_txn_from_change_streams = exclude_txn_from_change_streams - @fallback_to_pdml_enabled = false - @dml_statement_count = 0 end def active? @@ -42,10 +37,6 @@ def buffer mutation @mutations << mutation end - def increment_dml_counter - @dml_statement_count += 1 - end - # Begins the transaction. # # Read-only and PDML transactions are started by executing a BeginTransaction RPC. @@ -167,6 +158,10 @@ def mark_aborted @state = :ABORTED end + def is_pdml? + @isolation == :pdml + end + # Sets the underlying gRPC transaction to use for this Transaction. # This is used for queries/DML statements that inlined the BeginTransaction option and returned # a transaction in the metadata. diff --git a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb index c3ee4f53..2fb57f55 100644 --- a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb +++ b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb @@ -200,7 +200,7 @@ def test_update_all_on_table_with_sequence_falls_back_to_pdml @mock.push_error(update_sql, mutation_limit_error) @mock.put_statement_result update_sql, StatementResult.new(1) - TableWithSequence.transaction(fallback_to_pdml_enabled: true) do + TableWithSequence.transaction isolation: :fallback_to_pdml do TableWithSequence.where(id: 1).update_all(name: "New Foo Name") end @@ -227,7 +227,7 @@ def test_no_fallback_to_pdml_on_table_with_sequence_when_disabled end end - assert_kind_of Google::Cloud::Spanner::Errors::TransactionMutationLimitExceededError, err.cause + assert_kind_of Google::Cloud::InvalidArgumentError, err.cause update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql == update_sql } assert_equal 1, update_requests.length, "Should only have been one attempt for the UPDATE DML" @@ -236,6 +236,26 @@ def test_no_fallback_to_pdml_on_table_with_sequence_when_disabled assert_nil pdml_begin_request, "No PDML transaction should have been started" end + def test_no_fallback_to_pdml_on_table_with_sequence_when_error_is_not_valid + + update_sql_regex = /UPDATE `table_with_sequence`/ + other_error = GRPC::AlreadyExists.new("This is some other database error") + @mock.push_error(update_sql_regex, other_error) + + err = assert_raises ActiveRecord::StatementInvalid do + TableWithSequence.transaction isolation: :fallback_to_pdml do + TableWithSequence.where(id: 1).update_all(name: "This name will not be updated") + end + end + + assert_kind_of Google::Cloud::InvalidArgumentError, err.cause + update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql.match(update_sql_regex) } + assert_equal 2, update_requests.length, "Should only have been one attempt for the UPDATE DML" + + pdml_begin_request = @mock.requests.find { |req| req.is_a?(BeginTransactionRequest) && req.options&.partitioned_dml } + assert_nil pdml_begin_request, "No PDML transaction should have been started" + end + def test_selects_singers_with_condition # This query does not use query parameters because the where clause is specified as a string. # ActiveRecord sees that as a SQL fragment that will disable the usage of prepared statements. From cc18301667b32107ac8583822a04cc203b56784e Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Tue, 24 Jun 2025 16:08:54 +0000 Subject: [PATCH 4/6] fix: removing recursive call from pdml fallback implementation --- .../read_write_transactions_test.rb | 1 + .../snippets/partitioned-dml/application.rb | 115 +++++++++--------- .../spanner/database_statements.rb | 56 +++------ 3 files changed, 73 insertions(+), 99 deletions(-) diff --git a/acceptance/cases/transactions/read_write_transactions_test.rb b/acceptance/cases/transactions/read_write_transactions_test.rb index 530587d2..ca362027 100644 --- a/acceptance/cases/transactions/read_write_transactions_test.rb +++ b/acceptance/cases/transactions/read_write_transactions_test.rb @@ -273,6 +273,7 @@ def test_create_record_with_sequence_using_mutations TableWithSequence.connection.use_client_side_id_for_mutations = reset_value end end + def test_single_dml_succeeds_with_fallback_to_pdml_enabled # This test verifies that a normal, successful DML statement works as # expected when the fallback isolation is enabled. Because no mutation diff --git a/examples/snippets/partitioned-dml/application.rb b/examples/snippets/partitioned-dml/application.rb index 03219e86..87a2fa1e 100644 --- a/examples/snippets/partitioned-dml/application.rb +++ b/examples/snippets/partitioned-dml/application.rb @@ -10,7 +10,7 @@ require_relative "models/album" class Application - def self.run + def self.run # rubocop:disable Metrics/AbcSize singer_count = Singer.all.count album_count = Album.all.count puts "" @@ -19,10 +19,65 @@ def self.run puts "" puts "Deleting all albums in the database using Partitioned DML" + # Note that a Partitioned DML transaction can contain ONLY ONE DML statement. + # If we want to delete all data in two different tables, we need to do so in two different PDML transactions. Album.transaction isolation: :pdml do count = Album.delete_all puts "Deleted #{count} albums" end + puts "" + puts "Deleting all singers in the database using normal Read-Write transaction with PDML fallback" + # + # This example demonstrates using `isolation: :fallback_to_pdml`. + # + # --- HOW IT WORKS --- + # 1. Initial Attempt: The transaction starts as a normal, atomic, read-write transaction. + # + # 2. The Trigger: If that transaction fails with a `TransactionMutationLimitExceededError`, + # the adapter automatically catches the error. + # + # 3. The Fallback: The adapter then retries the ENTIRE code block in a new, + # non-atomic Partitioned DML (PDML) transaction. + # + # --- WARNING: CRITICAL USAGE REQUIREMENTS --- + # This implementation retries the whole transaction block without checking its contents. + # The user of this feature is responsible for ensuring the following: + # + # 1. SINGLE DML STATEMENT: The block SHOULD contain only ONE DML statement. + # If it contains more, the PDML retry will fail with a low-level `seqno` error. + # + # 2. IDEMPOTENCY: The block MUST be "idempotent" (safe to run multiple times), + # as the code may be executed more than once. + # + # 3. NON-ATOMIC: The retried PDML transaction is NOT atomic. Do not use this + # for multi-step operations that must all succeed or fail together. + # + Singer.transaction isolation: :fallback_to_pdml do + count = Singer.delete_all + puts "Deleted #{count} singers" + end + Singer.transaction isolation: :fallback_to_pdml do + begin + singers_to_create = (1..10).map { |i| { first_name: "Test", last_name: "Singer #{i}" } } + Singer.create singers_to_create + puts " #{Singer.count} singers now in database." + + puts "\n Running a large delete operation with 'isolation: :fallback_to_pdml'..." + puts " NOTE: A real operation of this type on millions of rows could fail with a mutation limit error." + puts " The adapter would catch this error and automatically retry with a PDML transaction." + + Singer.transaction isolation: :fallback_to_pdml do + Singer.where("last_name LIKE 'Singer %'").delete_all + end + + puts "\n SUCCESS: The transaction completed successfully thanks to the PDML fallback." + puts " Remaining singers: #{Singer.count}" + rescue StandardError => e + puts "\n FAILED: The transaction unexpectedly failed with error: #{e.message}" + ensure + Singer.delete_all + end + end puts "" puts "Deleting all singers in the database using Partitioned DML" @@ -37,64 +92,6 @@ def self.run puts "Singers in the database: #{singer_count}" puts "Albums in the database: #{album_count}" end - - def self.run_two_dmls_in_pdml_transaction_test - begin - Singer.transaction isolation: :pdml do - Album.delete_all - Singer.delete_all - end - rescue ActiveRecord::StatementInvalid - puts " SUCCESS: As expected, the transaction failed because PDML only supports one DML statement." - ensure - Album.delete_all - Singer.delete_all - end - end - - def self.demonstrate_successful_fallback - begin - singers_to_create = (1..10).map { |i| { first_name: "Test", last_name: "Singer #{i}" } } - Singer.create singers_to_create - puts " #{Singer.count} singers now in database." - - puts "\n Running a large delete operation with 'isolation: :fallback_to_pdml'..." - puts " NOTE: A real operation of this type on millions of rows could fail with a mutation limit error." - puts " The adapter would catch this error and automatically retry with a PDML transaction." - - Singer.transaction isolation: :fallback_to_pdml do - Singer.where("last_name LIKE 'Singer %'").delete_all - end - - puts "\n SUCCESS: The transaction completed successfully thanks to the PDML fallback." - puts " Remaining singers: #{Singer.count}" - rescue StandardError => e - puts "\n FAILED: The transaction unexpectedly failed with error: #{e.message}" - ensure - Singer.delete_all - end - end - - def self.demonstrate_no_fallback_when_disabled - begin - puts " Running a transaction that will fail, without enabling the fallback..." - - Singer.transaction do - # To demonstrate this, we simulate what Active Record would do if Spanner - # returned a mutation limit error. It would raise a generic StatementInvalid error. - puts " Simulating a DML operation that exceeds the mutation limit..." - raise ActiveRecordSpannerAdapter::TransactionMutationLimitExceededError, - "Simulated: The transaction contains too many mutations" - end - rescue ActiveRecord::StatementInvalid - puts " SUCCESS: As expected, the transaction failed with a generic ActiveRecord::StatementInvalid." - puts " No fallback was attempted." - end - end end Application.run -Application.run_two_dmls_in_pdml_transaction_test - -Application.demonstrate_successful_fallback -Application.demonstrate_no_fallback_when_disabled diff --git a/lib/active_record/connection_adapters/spanner/database_statements.rb b/lib/active_record/connection_adapters/spanner/database_statements.rb index 1fc45db0..42cf4165 100644 --- a/lib/active_record/connection_adapters/spanner/database_statements.rb +++ b/lib/active_record/connection_adapters/spanner/database_statements.rb @@ -263,15 +263,12 @@ def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs, &bl sleep(delay_from_aborted(err) || (backoff *= 1.3)) retry elsif TransactionMutationLimitExceededError.is_mutation_limit_error? err.cause - raise err unless isolation == :fallback_to_pdml - transaction( - requires_new: true, - isolation: :pdml, - joinable: false, - **kwargs, &block - ) + is_fallback_enabled = isolation == :fallback_to_pdml + raise unless is_fallback_enabled + @_spanner_begin_transaction_options[:isolation] = :pdml + retry else - raise err + raise end ensure # Clean up the instance variable to avoid leaking options. @@ -320,44 +317,23 @@ def begin_db_transaction # (this is the same as :read_only) # def begin_isolated_db_transaction isolation - if isolation.is_a? Hash - raise "Unsupported isolation level: #{isolation}" unless - isolation[:timestamp] || isolation[:staleness] || isolation[:strong] + opts = @_spanner_begin_transaction_options || {} + # If isolation level is specified in the options, use that instead of the default isolation level. + isolation_option = opts[:isolation] || isolation + if isolation_option.is_a? Hash + raise "Unsupported isolation level: #{isolation_option}" unless + isolation_option[:timestamp] || isolation_option[:staleness] || isolation_option[:strong] raise "Only one option is supported. It must be one of `timestamp`, `staleness` or `strong`." \ - if isolation.count != 1 + if isolation_option.count != 1 else - raise "Unsupported isolation level: #{isolation}" unless + raise "Unsupported isolation level: #{isolation_option}" unless [:serializable, :repeatable_read, :read_only, :buffered_mutations, :pdml, - :fallback_to_pdml].include? isolation + :fallback_to_pdml].include? isolation_option end - log "BEGIN #{isolation}" do - opts = @_spanner_begin_transaction_options || {} - @connection.begin_transaction isolation, **opts - end - end - - def create_savepoint name = "active_record_1" - if @connection.current_transaction&.is_pdml? - # PDML transactions do not support savepoints. Silently ignore the - # request. This allows methods like `update_all` to proceed - # without crashing when they are run in a PDML fallback context. - return + log "BEGIN #{isolation_option}" do + @connection.begin_transaction isolation_option, **opts.except(:isolation) end - super - end - - def release_savepoint name = "active_record_1" - # We must also override release_savepoint to be consistent. If we - # ignore the creation of a savepoint, we must also ignore its release. - return if @connection.current_transaction&.is_pdml? - super - end - - def rollback_to_savepoint name = "active_record_1" - # And we must override rollback_to_savepoint for consistency. - return if @connection.current_transaction&.is_pdml? - super end def commit_db_transaction From f9ba2cdfe0e441a243926497ae3195fb192cdc38 Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Wed, 25 Jun 2025 12:09:05 +0000 Subject: [PATCH 5/6] fix: Updating mock server tests to capture all mock server requests during PDML fallback --- .../snippets/partitioned-dml/application.rb | 32 ++---------- .../connection.rb | 7 +-- .../transaction.rb | 4 -- ...ner_active_record_with_mock_server_test.rb | 50 +++++++++++++++---- 4 files changed, 48 insertions(+), 45 deletions(-) diff --git a/examples/snippets/partitioned-dml/application.rb b/examples/snippets/partitioned-dml/application.rb index 87a2fa1e..ccf23073 100644 --- a/examples/snippets/partitioned-dml/application.rb +++ b/examples/snippets/partitioned-dml/application.rb @@ -10,7 +10,7 @@ require_relative "models/album" class Application - def self.run # rubocop:disable Metrics/AbcSize + def self.run singer_count = Singer.all.count album_count = Album.all.count puts "" @@ -25,6 +25,7 @@ def self.run # rubocop:disable Metrics/AbcSize count = Album.delete_all puts "Deleted #{count} albums" end + puts "" puts "Deleting all singers in the database using normal Read-Write transaction with PDML fallback" # @@ -39,15 +40,14 @@ def self.run # rubocop:disable Metrics/AbcSize # 3. The Fallback: The adapter then retries the ENTIRE code block in a new, # non-atomic Partitioned DML (PDML) transaction. # - # --- WARNING: CRITICAL USAGE REQUIREMENTS --- + # --- USAGE REQUIREMENTS --- # This implementation retries the whole transaction block without checking its contents. # The user of this feature is responsible for ensuring the following: # - # 1. SINGLE DML STATEMENT: The block SHOULD contain only ONE DML statement. + # 1. SINGLE DML STATEMENT: The block should contain only ONE DML statement. # If it contains more, the PDML retry will fail with a low-level `seqno` error. # - # 2. IDEMPOTENCY: The block MUST be "idempotent" (safe to run multiple times), - # as the code may be executed more than once. + # 2. IDEMPOTENCY: The DML statement must be idempotent. See https://cloud.google.com/spanner/docs/dml-partitioned#partitionable-idempotent for more information. # rubocop:disable Layout/LineLength # # 3. NON-ATOMIC: The retried PDML transaction is NOT atomic. Do not use this # for multi-step operations that must all succeed or fail together. @@ -56,28 +56,6 @@ def self.run # rubocop:disable Metrics/AbcSize count = Singer.delete_all puts "Deleted #{count} singers" end - Singer.transaction isolation: :fallback_to_pdml do - begin - singers_to_create = (1..10).map { |i| { first_name: "Test", last_name: "Singer #{i}" } } - Singer.create singers_to_create - puts " #{Singer.count} singers now in database." - - puts "\n Running a large delete operation with 'isolation: :fallback_to_pdml'..." - puts " NOTE: A real operation of this type on millions of rows could fail with a mutation limit error." - puts " The adapter would catch this error and automatically retry with a PDML transaction." - - Singer.transaction isolation: :fallback_to_pdml do - Singer.where("last_name LIKE 'Singer %'").delete_all - end - - puts "\n SUCCESS: The transaction completed successfully thanks to the PDML fallback." - puts " Remaining singers: #{Singer.count}" - rescue StandardError => e - puts "\n FAILED: The transaction unexpectedly failed with error: #{e.message}" - ensure - Singer.delete_all - end - end puts "" puts "Deleting all singers in the database using Partitioned DML" diff --git a/lib/activerecord_spanner_adapter/connection.rb b/lib/activerecord_spanner_adapter/connection.rb index f6e9fa56..f2cc9acc 100644 --- a/lib/activerecord_spanner_adapter/connection.rb +++ b/lib/activerecord_spanner_adapter/connection.rb @@ -255,9 +255,6 @@ def execute_sql_request sql, converted_params, types, selector, request_options end raise rescue Google::Cloud::Error => e - if TransactionMutationLimitExceededError.is_mutation_limit_error? e - raise - end # Check if it was the first statement in a transaction that included a BeginTransaction # option in the request. If so, execute an explicit BeginTransaction and then retry the # request without the BeginTransaction option. @@ -265,6 +262,10 @@ def execute_sql_request sql, converted_params, types, selector, request_options selector = create_transaction_after_failed_first_statement e retry end + + if TransactionMutationLimitExceededError.is_mutation_limit_error? e + raise + end # It was not the first statement, so propagate the error. raise end diff --git a/lib/activerecord_spanner_adapter/transaction.rb b/lib/activerecord_spanner_adapter/transaction.rb index d9bf9fd4..c2763c60 100644 --- a/lib/activerecord_spanner_adapter/transaction.rb +++ b/lib/activerecord_spanner_adapter/transaction.rb @@ -158,10 +158,6 @@ def mark_aborted @state = :ABORTED end - def is_pdml? - @isolation == :pdml - end - # Sets the underlying gRPC transaction to use for this Transaction. # This is used for queries/DML statements that inlined the BeginTransaction option and returned # a transaction in the metadata. diff --git a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb index 2fb57f55..0662a3c9 100644 --- a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb +++ b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb @@ -195,24 +195,50 @@ def test_selects_one_singer_without_transaction def test_update_all_on_table_with_sequence_falls_back_to_pdml update_sql = "UPDATE `table_with_sequence` SET `name` = @p1 WHERE `table_with_sequence`.`id` = @p2" - mutation_limit_error = GRPC::InvalidArgument.new("The transaction contains too many mutations") + @mock.push_error(update_sql, mutation_limit_error) - @mock.put_statement_result update_sql, StatementResult.new(1) + @mock.push_error(update_sql, mutation_limit_error) + @mock.put_statement_result(update_sql, StatementResult.new(1)) TableWithSequence.transaction isolation: :fallback_to_pdml do TableWithSequence.where(id: 1).update_all(name: "New Foo Name") end - update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql == update_sql } - assert_equal 2, update_requests.length, "Should have been two attempts for the UPDATE DML" - - pdml_begin_request = @mock.requests.find { |req| req.is_a?(BeginTransactionRequest) && req.options&.partitioned_dml } + # A BeginTransactionRequest for a read-write transaction should have been sent + rw_begin_requests = @mock.requests.select { |req| req.is_a?(Google::Cloud::Spanner::V1::BeginTransactionRequest) && !req.options&.partitioned_dml } + assert_equal 1, rw_begin_requests.length, "Exactly one explicit read-write BeginTransactionRequest should have been sent" + + # A RollbackRequest should have been sent for the failed read-write transaction. + rollback_request = @mock.requests.find { |req| req.is_a?(Google::Cloud::Spanner::V1::RollbackRequest) } + refute_nil rollback_request, "A RollbackRequest should have been sent after the second attempt failed" + + # We assert for 3 total attempts for the UPDATE DML. This complex sequence + # is a result of the error triggering two different retry/fallback mechanisms + # at different layers of the adapter. The flow is as follows: + # + # 1. Attempt #1 (Initial "Piggybacked" DML): The first `update_all` call is + # sent without a pre-existing transaction. This attempt is mocked to fail + # before a transaction ID is assigned. + # + # 2. Attempt #2 (Low-Level Retry): The `execute_sql_request` method catches + # this initial failure. It explicitly begins a new transaction by sending a + # `BeginTransactionRequest`, which generates a real transaction ID. This + # is the transaction that can, and will, be rolled back later. The method + # then `retry`s the `UPDATE` statement. We also mock this second attempt to + # fail so the error can propagate to the high-level fallback logic. + # + # 3. Attempt #3 (High-Level PDML Fallback): The main `transaction` method's + # `rescue` block finally sees the error. Because a transaction ID now exists + # from step #2, it is first rolled back Then, the successful fallback + # to a PDML transaction is initiated, running the `UPDATE` one last time. + # + update_requests = @mock.requests.select { |req| req.is_a?(Google::Cloud::Spanner::V1::ExecuteSqlRequest) && req.sql == update_sql } + assert_equal 3, update_requests.length, "Should have been three attempts for the UPDATE DML before the PDML fallback" + + # A PDML transaction should have been started for the final, successful attempt. + pdml_begin_request = @mock.requests.find { |req| req.is_a?(Google::Cloud::Spanner::V1::BeginTransactionRequest) && req.options&.partitioned_dml } refute_nil pdml_begin_request, "A BeginTransactionRequest for PDML should have been sent" - - fallback_request = update_requests[1] - assert fallback_request.transaction&.id, "Fallback DML should run within a transaction and have an ID" - assert_nil fallback_request.transaction&.begin, "Fallback DML should use the existing PDML transaction, not begin a new one" end def test_no_fallback_to_pdml_on_table_with_sequence_when_disabled @@ -220,6 +246,7 @@ def test_no_fallback_to_pdml_on_table_with_sequence_when_disabled mutation_limit_error = GRPC::InvalidArgument.new("The transaction contains too many mutations") @mock.push_error(update_sql, mutation_limit_error) + @mock.push_error(update_sql, mutation_limit_error) err = assert_raises ActiveRecord::StatementInvalid do TableWithSequence.transaction do @@ -229,8 +256,9 @@ def test_no_fallback_to_pdml_on_table_with_sequence_when_disabled assert_kind_of Google::Cloud::InvalidArgumentError, err.cause + update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql == update_sql } - assert_equal 1, update_requests.length, "Should only have been one attempt for the UPDATE DML" + assert_equal 2, update_requests.length, "Should only have been two attempts for the UPDATE DML" pdml_begin_request = @mock.requests.find { |req| req.is_a?(BeginTransactionRequest) && req.options&.partitioned_dml } assert_nil pdml_begin_request, "No PDML transaction should have been started" From 1c118927d6d35d15e723692813fae7246e58db0c Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Wed, 25 Jun 2025 13:19:02 +0000 Subject: [PATCH 6/6] test: fix mock server test to not assert rollback grpc requests --- .../connection.rb | 7 ++- ...ner_active_record_with_mock_server_test.rb | 44 +++++-------------- 2 files changed, 14 insertions(+), 37 deletions(-) diff --git a/lib/activerecord_spanner_adapter/connection.rb b/lib/activerecord_spanner_adapter/connection.rb index f2cc9acc..f6e9fa56 100644 --- a/lib/activerecord_spanner_adapter/connection.rb +++ b/lib/activerecord_spanner_adapter/connection.rb @@ -255,6 +255,9 @@ def execute_sql_request sql, converted_params, types, selector, request_options end raise rescue Google::Cloud::Error => e + if TransactionMutationLimitExceededError.is_mutation_limit_error? e + raise + end # Check if it was the first statement in a transaction that included a BeginTransaction # option in the request. If so, execute an explicit BeginTransaction and then retry the # request without the BeginTransaction option. @@ -262,10 +265,6 @@ def execute_sql_request sql, converted_params, types, selector, request_options selector = create_transaction_after_failed_first_statement e retry end - - if TransactionMutationLimitExceededError.is_mutation_limit_error? e - raise - end # It was not the first statement, so propagate the error. raise end diff --git a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb index 0662a3c9..c3005752 100644 --- a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb +++ b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb @@ -195,9 +195,9 @@ def test_selects_one_singer_without_transaction def test_update_all_on_table_with_sequence_falls_back_to_pdml update_sql = "UPDATE `table_with_sequence` SET `name` = @p1 WHERE `table_with_sequence`.`id` = @p2" + mutation_limit_error = GRPC::InvalidArgument.new("The transaction contains too many mutations") - @mock.push_error(update_sql, mutation_limit_error) @mock.push_error(update_sql, mutation_limit_error) @mock.put_statement_result(update_sql, StatementResult.new(1)) @@ -205,40 +205,19 @@ def test_update_all_on_table_with_sequence_falls_back_to_pdml TableWithSequence.where(id: 1).update_all(name: "New Foo Name") end - # A BeginTransactionRequest for a read-write transaction should have been sent - rw_begin_requests = @mock.requests.select { |req| req.is_a?(Google::Cloud::Spanner::V1::BeginTransactionRequest) && !req.options&.partitioned_dml } - assert_equal 1, rw_begin_requests.length, "Exactly one explicit read-write BeginTransactionRequest should have been sent" - - # A RollbackRequest should have been sent for the failed read-write transaction. - rollback_request = @mock.requests.find { |req| req.is_a?(Google::Cloud::Spanner::V1::RollbackRequest) } - refute_nil rollback_request, "A RollbackRequest should have been sent after the second attempt failed" - - # We assert for 3 total attempts for the UPDATE DML. This complex sequence - # is a result of the error triggering two different retry/fallback mechanisms - # at different layers of the adapter. The flow is as follows: - # - # 1. Attempt #1 (Initial "Piggybacked" DML): The first `update_all` call is - # sent without a pre-existing transaction. This attempt is mocked to fail - # before a transaction ID is assigned. - # - # 2. Attempt #2 (Low-Level Retry): The `execute_sql_request` method catches - # this initial failure. It explicitly begins a new transaction by sending a - # `BeginTransactionRequest`, which generates a real transaction ID. This - # is the transaction that can, and will, be rolled back later. The method - # then `retry`s the `UPDATE` statement. We also mock this second attempt to - # fail so the error can propagate to the high-level fallback logic. - # - # 3. Attempt #3 (High-Level PDML Fallback): The main `transaction` method's - # `rescue` block finally sees the error. Because a transaction ID now exists - # from step #2, it is first rolled back Then, the successful fallback - # to a PDML transaction is initiated, running the `UPDATE` one last time. - # - update_requests = @mock.requests.select { |req| req.is_a?(Google::Cloud::Spanner::V1::ExecuteSqlRequest) && req.sql == update_sql } - assert_equal 3, update_requests.length, "Should have been three attempts for the UPDATE DML before the PDML fallback" + # The first attempt should have failed with a TransactionMutationLimitExceededError. + # The second attempt should have succeeded with a PDML transaction. + # So we should have two requests for the same DML statement. + update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql == update_sql } + assert_equal 2, update_requests.length, "Should have been two attempts for the UPDATE DML" # A PDML transaction should have been started for the final, successful attempt. pdml_begin_request = @mock.requests.find { |req| req.is_a?(Google::Cloud::Spanner::V1::BeginTransactionRequest) && req.options&.partitioned_dml } refute_nil pdml_begin_request, "A BeginTransactionRequest for PDML should have been sent" + + fallback_request = update_requests[1] + assert fallback_request.transaction&.id, "Fallback DML should run within a transaction that has an ID (created by the PDML transaction)" + assert_nil fallback_request.transaction&.begin, "Fallback DML should use the existing PDML transaction, not begin a new one" end def test_no_fallback_to_pdml_on_table_with_sequence_when_disabled @@ -246,7 +225,6 @@ def test_no_fallback_to_pdml_on_table_with_sequence_when_disabled mutation_limit_error = GRPC::InvalidArgument.new("The transaction contains too many mutations") @mock.push_error(update_sql, mutation_limit_error) - @mock.push_error(update_sql, mutation_limit_error) err = assert_raises ActiveRecord::StatementInvalid do TableWithSequence.transaction do @@ -258,7 +236,7 @@ def test_no_fallback_to_pdml_on_table_with_sequence_when_disabled update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql == update_sql } - assert_equal 2, update_requests.length, "Should only have been two attempts for the UPDATE DML" + assert_equal 1, update_requests.length, "Should only have been two attempts for the UPDATE DML" pdml_begin_request = @mock.requests.find { |req| req.is_a?(BeginTransactionRequest) && req.options&.partitioned_dml } assert_nil pdml_begin_request, "No PDML transaction should have been started"