diff --git a/acceptance/cases/transactions/read_write_transactions_test.rb b/acceptance/cases/transactions/read_write_transactions_test.rb index 16fec677..ca362027 100644 --- a/acceptance/cases/transactions/read_write_transactions_test.rb +++ b/acceptance/cases/transactions/read_write_transactions_test.rb @@ -273,6 +273,35 @@ def test_create_record_with_sequence_using_mutations TableWithSequence.connection.use_client_side_id_for_mutations = reset_value end end + + def test_single_dml_succeeds_with_fallback_to_pdml_enabled + # This test verifies that a normal, successful DML statement works as + # expected when the fallback isolation is enabled. Because no mutation + # limit error occurs, the fallback to PDML should NOT be triggered. + create_test_records + assert_equal 1, Author.count + + Author.transaction isolation: :fallback_to_pdml do + Author.where(name: "David").delete_all + end + + assert_equal 0, Author.count, "The record should have been deleted" + end + + def test_other_errors_do_not_trigger_fallback + # This test ensures that if a transaction with fallback enabled fails + # for a reason OTHER than the mutation limit, it fails normally and + # does not attempt to fall back to PDML. + create_test_records + initial_author_count = Author.count + + assert_raises ActiveRecord::StatementInvalid do + Author.transaction isolation: :fallback_to_pdml do + Author.create! name: nil + end + end + assert_equal initial_author_count, Author.count, "Transaction should have rolled back" + end end end end diff --git a/examples/snippets/partitioned-dml/application.rb b/examples/snippets/partitioned-dml/application.rb index 83a4f792..ccf23073 100644 --- a/examples/snippets/partitioned-dml/application.rb +++ b/examples/snippets/partitioned-dml/application.rb @@ -26,6 +26,37 @@ def self.run puts "Deleted #{count} albums" end + puts "" + puts "Deleting all singers in the database using normal Read-Write transaction with PDML fallback" + # + # This example demonstrates using `isolation: :fallback_to_pdml`. + # + # --- HOW IT WORKS --- + # 1. Initial Attempt: The transaction starts as a normal, atomic, read-write transaction. + # + # 2. The Trigger: If that transaction fails with a `TransactionMutationLimitExceededError`, + # the adapter automatically catches the error. + # + # 3. The Fallback: The adapter then retries the ENTIRE code block in a new, + # non-atomic Partitioned DML (PDML) transaction. + # + # --- USAGE REQUIREMENTS --- + # This implementation retries the whole transaction block without checking its contents. + # The user of this feature is responsible for ensuring the following: + # + # 1. SINGLE DML STATEMENT: The block should contain only ONE DML statement. + # If it contains more, the PDML retry will fail with a low-level `seqno` error. + # + # 2. IDEMPOTENCY: The DML statement must be idempotent. See https://cloud.google.com/spanner/docs/dml-partitioned#partitionable-idempotent for more information. # rubocop:disable Layout/LineLength + # + # 3. NON-ATOMIC: The retried PDML transaction is NOT atomic. Do not use this + # for multi-step operations that must all succeed or fail together. + # + Singer.transaction isolation: :fallback_to_pdml do + count = Singer.delete_all + puts "Deleted #{count} singers" + end + puts "" puts "Deleting all singers in the database using Partitioned DML" Singer.transaction isolation: :pdml do diff --git a/lib/active_record/connection_adapters/spanner/database_statements.rb b/lib/active_record/connection_adapters/spanner/database_statements.rb index 94e07d6c..42cf4165 100644 --- a/lib/active_record/connection_adapters/spanner/database_statements.rb +++ b/lib/active_record/connection_adapters/spanner/database_statements.rb @@ -7,6 +7,7 @@ # frozen_string_literal: true require "active_record/gem_version" +require "active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error" module ActiveRecord module ConnectionAdapters @@ -14,6 +15,7 @@ module Spanner module DatabaseStatements VERSION_7_1_0 = Gem::Version.create "7.1.0" RequestOptions = Google::Cloud::Spanner::V1::RequestOptions + TransactionMutationLimitExceededError = Google::Cloud::Spanner::Errors::TransactionMutationLimitExceededError # DDL, DML and DQL Statements @@ -72,10 +74,16 @@ def execute_query_or_dml statement_type, sql, name, binds ActiveSupport::Dependencies.interlock.permit_concurrent_loads do if transaction_required transaction do - @connection.execute_query sql, params: params, types: types, request_options: request_options + @connection.execute_query sql, + params: params, + types: types, + request_options: request_options end else - @connection.execute_query sql, params: params, types: types, single_use_selector: selector, + @connection.execute_query sql, + params: params, + types: types, + single_use_selector: selector, request_options: request_options end end @@ -229,7 +237,7 @@ def execute_ddl statements # Transaction - def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs + def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs, &block # rubocop:disable Metrics/PerceivedComplexity commit_options = kwargs.delete :commit_options exclude_from_streams = kwargs.delete :exclude_txn_from_change_streams @_spanner_begin_transaction_options = { @@ -254,8 +262,14 @@ def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs if err.cause.is_a? Google::Cloud::AbortedError sleep(delay_from_aborted(err) || (backoff *= 1.3)) retry + elsif TransactionMutationLimitExceededError.is_mutation_limit_error? err.cause + is_fallback_enabled = isolation == :fallback_to_pdml + raise unless is_fallback_enabled + @_spanner_begin_transaction_options[:isolation] = :pdml + retry + else + raise end - raise ensure # Clean up the instance variable to avoid leaking options. @_spanner_begin_transaction_options = nil @@ -272,7 +286,8 @@ def transaction_isolation_levels # These are not really isolation levels, but it is the only (best) way to pass in additional # transaction options to the connection. read_only: "READ_ONLY", - buffered_mutations: "BUFFERED_MUTATIONS" + buffered_mutations: "BUFFERED_MUTATIONS", + fallback_to_pdml: "FALLBACK_TO_PDML" } end @@ -302,19 +317,22 @@ def begin_db_transaction # (this is the same as :read_only) # def begin_isolated_db_transaction isolation - if isolation.is_a? Hash - raise "Unsupported isolation level: #{isolation}" unless - isolation[:timestamp] || isolation[:staleness] || isolation[:strong] + opts = @_spanner_begin_transaction_options || {} + # If isolation level is specified in the options, use that instead of the default isolation level. + isolation_option = opts[:isolation] || isolation + if isolation_option.is_a? Hash + raise "Unsupported isolation level: #{isolation_option}" unless + isolation_option[:timestamp] || isolation_option[:staleness] || isolation_option[:strong] raise "Only one option is supported. It must be one of `timestamp`, `staleness` or `strong`." \ - if isolation.count != 1 + if isolation_option.count != 1 else - raise "Unsupported isolation level: #{isolation}" unless - [:serializable, :repeatable_read, :read_only, :buffered_mutations, :pdml].include? isolation + raise "Unsupported isolation level: #{isolation_option}" unless + [:serializable, :repeatable_read, :read_only, :buffered_mutations, :pdml, + :fallback_to_pdml].include? isolation_option end - log "BEGIN #{isolation}" do - opts = @_spanner_begin_transaction_options || {} - @connection.begin_transaction isolation, **opts + log "BEGIN #{isolation_option}" do + @connection.begin_transaction isolation_option, **opts.except(:isolation) end end diff --git a/lib/active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error.rb b/lib/active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error.rb new file mode 100644 index 00000000..d1b9a8b0 --- /dev/null +++ b/lib/active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error.rb @@ -0,0 +1,25 @@ +# Copyright 2025 Google LLC +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. + +module Google + module Cloud + module Spanner + module Errors + # Custom exception raised when a transaction exceeds the mutation limit in Google Cloud Spanner. + # This provides a specific error class for a common, recoverable scenario. + class TransactionMutationLimitExceededError < ActiveRecord::StatementInvalid + ERROR_MESSAGE = "The transaction contains too many mutations".freeze + + def self.is_mutation_limit_error? error + return false if error.nil? + error.is_a?(Google::Cloud::InvalidArgumentError) && + error.message&.include?(ERROR_MESSAGE) + end + end + end + end + end +end diff --git a/lib/activerecord_spanner_adapter/connection.rb b/lib/activerecord_spanner_adapter/connection.rb index dc72c3f5..f6e9fa56 100644 --- a/lib/activerecord_spanner_adapter/connection.rb +++ b/lib/activerecord_spanner_adapter/connection.rb @@ -7,8 +7,11 @@ require "google/cloud/spanner" require "spanner_client_ext" require "activerecord_spanner_adapter/information_schema" +require_relative "../active_record/connection_adapters/spanner/errors/transaction_mutation_limit_exceeded_error" module ActiveRecordSpannerAdapter + TransactionMutationLimitExceededError = Google::Cloud::Spanner::Errors::TransactionMutationLimitExceededError + class Connection attr_reader :instance_id attr_reader :database_id @@ -252,6 +255,9 @@ def execute_sql_request sql, converted_params, types, selector, request_options end raise rescue Google::Cloud::Error => e + if TransactionMutationLimitExceededError.is_mutation_limit_error? e + raise + end # Check if it was the first statement in a transaction that included a BeginTransaction # option in the request. If so, execute an explicit BeginTransaction and then retry the # request without the BeginTransaction option. diff --git a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb index 80d88a06..c3005752 100644 --- a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb +++ b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb @@ -13,6 +13,7 @@ module MockServerTests CommitRequest = Google::Cloud::Spanner::V1::CommitRequest ExecuteSqlRequest = Google::Cloud::Spanner::V1::ExecuteSqlRequest + BeginTransactionRequest = Google::Cloud::Spanner::V1::BeginTransactionRequest class SpannerActiveRecordMockServerTest < BaseSpannerMockServerTest VERSION_7_1_0 = Gem::Version.create('7.1.0') @@ -192,6 +193,75 @@ def test_selects_one_singer_without_transaction end end + def test_update_all_on_table_with_sequence_falls_back_to_pdml + update_sql = "UPDATE `table_with_sequence` SET `name` = @p1 WHERE `table_with_sequence`.`id` = @p2" + + mutation_limit_error = GRPC::InvalidArgument.new("The transaction contains too many mutations") + + @mock.push_error(update_sql, mutation_limit_error) + @mock.put_statement_result(update_sql, StatementResult.new(1)) + + TableWithSequence.transaction isolation: :fallback_to_pdml do + TableWithSequence.where(id: 1).update_all(name: "New Foo Name") + end + + # The first attempt should have failed with a TransactionMutationLimitExceededError. + # The second attempt should have succeeded with a PDML transaction. + # So we should have two requests for the same DML statement. + update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql == update_sql } + assert_equal 2, update_requests.length, "Should have been two attempts for the UPDATE DML" + + # A PDML transaction should have been started for the final, successful attempt. + pdml_begin_request = @mock.requests.find { |req| req.is_a?(Google::Cloud::Spanner::V1::BeginTransactionRequest) && req.options&.partitioned_dml } + refute_nil pdml_begin_request, "A BeginTransactionRequest for PDML should have been sent" + + fallback_request = update_requests[1] + assert fallback_request.transaction&.id, "Fallback DML should run within a transaction that has an ID (created by the PDML transaction)" + assert_nil fallback_request.transaction&.begin, "Fallback DML should use the existing PDML transaction, not begin a new one" + end + + def test_no_fallback_to_pdml_on_table_with_sequence_when_disabled + update_sql = "UPDATE `table_with_sequence` SET `name` = @p1 WHERE `table_with_sequence`.`id` = @p2" + + mutation_limit_error = GRPC::InvalidArgument.new("The transaction contains too many mutations") + @mock.push_error(update_sql, mutation_limit_error) + + err = assert_raises ActiveRecord::StatementInvalid do + TableWithSequence.transaction do + TableWithSequence.where(id: 1).update_all(name: "This name will not be updated") + end + end + + assert_kind_of Google::Cloud::InvalidArgumentError, err.cause + + + update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql == update_sql } + assert_equal 1, update_requests.length, "Should only have been two attempts for the UPDATE DML" + + pdml_begin_request = @mock.requests.find { |req| req.is_a?(BeginTransactionRequest) && req.options&.partitioned_dml } + assert_nil pdml_begin_request, "No PDML transaction should have been started" + end + + def test_no_fallback_to_pdml_on_table_with_sequence_when_error_is_not_valid + + update_sql_regex = /UPDATE `table_with_sequence`/ + other_error = GRPC::AlreadyExists.new("This is some other database error") + @mock.push_error(update_sql_regex, other_error) + + err = assert_raises ActiveRecord::StatementInvalid do + TableWithSequence.transaction isolation: :fallback_to_pdml do + TableWithSequence.where(id: 1).update_all(name: "This name will not be updated") + end + end + + assert_kind_of Google::Cloud::InvalidArgumentError, err.cause + update_requests = @mock.requests.select { |req| req.is_a?(ExecuteSqlRequest) && req.sql.match(update_sql_regex) } + assert_equal 2, update_requests.length, "Should only have been one attempt for the UPDATE DML" + + pdml_begin_request = @mock.requests.find { |req| req.is_a?(BeginTransactionRequest) && req.options&.partitioned_dml } + assert_nil pdml_begin_request, "No PDML transaction should have been started" + end + def test_selects_singers_with_condition # This query does not use query parameters because the where clause is specified as a string. # ActiveRecord sees that as a SQL fragment that will disable the usage of prepared statements.