diff --git a/lib/active_record/connection_adapters/spanner/database_statements.rb b/lib/active_record/connection_adapters/spanner/database_statements.rb index 105f8e33..94e07d6c 100644 --- a/lib/active_record/connection_adapters/spanner/database_statements.rb +++ b/lib/active_record/connection_adapters/spanner/database_statements.rb @@ -231,7 +231,10 @@ def execute_ddl statements def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs commit_options = kwargs.delete :commit_options - + exclude_from_streams = kwargs.delete :exclude_txn_from_change_streams + @_spanner_begin_transaction_options = { + exclude_txn_from_change_streams: exclude_from_streams + } if !requires_new && current_transaction.joinable? return super end @@ -253,6 +256,9 @@ def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs retry end raise + ensure + # Clean up the instance variable to avoid leaking options. + @_spanner_begin_transaction_options = nil end end @@ -272,7 +278,8 @@ def transaction_isolation_levels def begin_db_transaction log "BEGIN" do - @connection.begin_transaction + opts = @_spanner_begin_transaction_options || {} + @connection.begin_transaction nil, **opts end end @@ -306,7 +313,8 @@ def begin_isolated_db_transaction isolation end log "BEGIN #{isolation}" do - @connection.begin_transaction isolation + opts = @_spanner_begin_transaction_options || {} + @connection.begin_transaction isolation, **opts end end diff --git a/lib/activerecord_spanner_adapter/connection.rb b/lib/activerecord_spanner_adapter/connection.rb index 40100157..dc72c3f5 100644 --- a/lib/activerecord_spanner_adapter/connection.rb +++ b/lib/activerecord_spanner_adapter/connection.rb @@ -276,9 +276,11 @@ def create_transaction_after_failed_first_statement original_error # Transactions - def begin_transaction isolation = nil + def begin_transaction isolation = nil, **options raise "Nested transactions are not allowed" if current_transaction&.active? - self.current_transaction = Transaction.new self, isolation || @isolation_level + exclude_from_streams = options.fetch :exclude_txn_from_change_streams, false + self.current_transaction = Transaction.new self, isolation || @isolation_level, + exclude_txn_from_change_streams: exclude_from_streams current_transaction.begin current_transaction end diff --git a/lib/activerecord_spanner_adapter/transaction.rb b/lib/activerecord_spanner_adapter/transaction.rb index 7ca80d89..c2763c60 100644 --- a/lib/activerecord_spanner_adapter/transaction.rb +++ b/lib/activerecord_spanner_adapter/transaction.rb @@ -8,10 +8,12 @@ module ActiveRecordSpannerAdapter class Transaction attr_reader :state attr_reader :commit_options + attr_reader :begin_transaction_selector + attr_accessor :exclude_txn_from_change_streams - def initialize connection, isolation, commit_options = nil + def initialize connection, isolation, commit_options = nil, exclude_txn_from_change_streams: false @connection = connection @isolation = isolation @committable = ![:read_only, :pdml].include?(isolation) && !isolation.is_a?(Hash) @@ -19,6 +21,7 @@ def initialize connection, isolation, commit_options = nil @sequence_number = 0 @mutations = [] @commit_options = commit_options + @exclude_txn_from_change_streams = exclude_txn_from_change_streams end def active? @@ -63,7 +66,8 @@ def begin @begin_transaction_selector = Google::Cloud::Spanner::V1::TransactionSelector.new \ begin: Google::Cloud::Spanner::V1::TransactionOptions.new( read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new, - isolation_level: grpc_isolation + isolation_level: grpc_isolation, + exclude_txn_from_change_streams: @exclude_txn_from_change_streams ) end @state = :STARTED diff --git a/test/activerecord_spanner_adapter/transaction_test.rb b/test/activerecord_spanner_adapter/transaction_test.rb index e87a0a42..cc859c43 100644 --- a/test/activerecord_spanner_adapter/transaction_test.rb +++ b/test/activerecord_spanner_adapter/transaction_test.rb @@ -42,6 +42,15 @@ def test_commit_options assert_equal 1000, commit_options[:max_commit_delay] end + def test_exclude_txn_from_change_streams + transaction.exclude_txn_from_change_streams = true + assert transaction.exclude_txn_from_change_streams + transaction.begin + assert_equal true, transaction.begin_transaction_selector.begin.exclude_txn_from_change_streams + transaction.commit + assert_equal :COMMITTED, transaction.state + end + def test_no_nested_transactions transaction.begin diff --git a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb index 79f945e7..80d88a06 100644 --- a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb +++ b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb @@ -1335,6 +1335,28 @@ def test_upsert_all_dml assert_equal 1, execute_requests.length end + def test_create_with_sequence_and_exclude_from_change_streams + sql = "INSERT INTO `table_with_sequence` (`name`) VALUES (@p1) THEN RETURN `id`" + @mock.put_statement_result sql, MockServerTests::create_id_returning_result_set(1, 1) + + record = TableWithSequence.transaction(exclude_txn_from_change_streams: true) do + TableWithSequence.create(name: "Foo") + end + + assert_equal 1, record.id + execute_requests = @mock.requests.select do |req| + req.is_a?(Google::Cloud::Spanner::V1::ExecuteSqlRequest) && req.sql == sql + end + + assert_equal 1, execute_requests.length + exec_req = execute_requests.first + + refute_nil exec_req.transaction + begin_opts = exec_req.transaction&.begin + refute_nil begin_opts + assert_equal true, begin_opts.exclude_txn_from_change_streams + end + def test_binary_id user = User.create!( email: "test@example.com",