From b7e92546c7ee44d8643c1692cd2fd70f62f6d238 Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Wed, 11 Jun 2025 18:08:17 +0000 Subject: [PATCH 1/2] feat: Adding support for exclude_txn_from_change_streams option --- .../spanner/database_statements.rb | 14 ++++++++--- .../connection.rb | 6 +++-- .../transaction.rb | 10 +++++--- lib/spanner_client_ext.rb | 5 ++-- .../transaction_test.rb | 9 +++++++ ...ner_active_record_with_mock_server_test.rb | 24 +++++++++++++++++++ 6 files changed, 58 insertions(+), 10 deletions(-) diff --git a/lib/active_record/connection_adapters/spanner/database_statements.rb b/lib/active_record/connection_adapters/spanner/database_statements.rb index 105f8e33..94e07d6c 100644 --- a/lib/active_record/connection_adapters/spanner/database_statements.rb +++ b/lib/active_record/connection_adapters/spanner/database_statements.rb @@ -231,7 +231,10 @@ def execute_ddl statements def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs commit_options = kwargs.delete :commit_options - + exclude_from_streams = kwargs.delete :exclude_txn_from_change_streams + @_spanner_begin_transaction_options = { + exclude_txn_from_change_streams: exclude_from_streams + } if !requires_new && current_transaction.joinable? return super end @@ -253,6 +256,9 @@ def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs retry end raise + ensure + # Clean up the instance variable to avoid leaking options. + @_spanner_begin_transaction_options = nil end end @@ -272,7 +278,8 @@ def transaction_isolation_levels def begin_db_transaction log "BEGIN" do - @connection.begin_transaction + opts = @_spanner_begin_transaction_options || {} + @connection.begin_transaction nil, **opts end end @@ -306,7 +313,8 @@ def begin_isolated_db_transaction isolation end log "BEGIN #{isolation}" do - @connection.begin_transaction isolation + opts = @_spanner_begin_transaction_options || {} + @connection.begin_transaction isolation, **opts end end diff --git a/lib/activerecord_spanner_adapter/connection.rb b/lib/activerecord_spanner_adapter/connection.rb index 40100157..dc72c3f5 100644 --- a/lib/activerecord_spanner_adapter/connection.rb +++ b/lib/activerecord_spanner_adapter/connection.rb @@ -276,9 +276,11 @@ def create_transaction_after_failed_first_statement original_error # Transactions - def begin_transaction isolation = nil + def begin_transaction isolation = nil, **options raise "Nested transactions are not allowed" if current_transaction&.active? - self.current_transaction = Transaction.new self, isolation || @isolation_level + exclude_from_streams = options.fetch :exclude_txn_from_change_streams, false + self.current_transaction = Transaction.new self, isolation || @isolation_level, + exclude_txn_from_change_streams: exclude_from_streams current_transaction.begin current_transaction end diff --git a/lib/activerecord_spanner_adapter/transaction.rb b/lib/activerecord_spanner_adapter/transaction.rb index 7ca80d89..4e4f07bd 100644 --- a/lib/activerecord_spanner_adapter/transaction.rb +++ b/lib/activerecord_spanner_adapter/transaction.rb @@ -8,10 +8,11 @@ module ActiveRecordSpannerAdapter class Transaction attr_reader :state attr_reader :commit_options + attr_accessor :exclude_txn_from_change_streams - def initialize connection, isolation, commit_options = nil + def initialize connection, isolation, commit_options = nil, exclude_txn_from_change_streams: false @connection = connection @isolation = isolation @committable = ![:read_only, :pdml].include?(isolation) && !isolation.is_a?(Hash) @@ -19,6 +20,7 @@ def initialize connection, isolation, commit_options = nil @sequence_number = 0 @mutations = [] @commit_options = commit_options + @exclude_txn_from_change_streams = exclude_txn_from_change_streams end def active? @@ -63,7 +65,8 @@ def begin @begin_transaction_selector = Google::Cloud::Spanner::V1::TransactionSelector.new \ begin: Google::Cloud::Spanner::V1::TransactionOptions.new( read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new, - isolation_level: grpc_isolation + isolation_level: grpc_isolation, + exclude_txn_from_change_streams: @exclude_txn_from_change_streams ) end @state = :STARTED @@ -114,7 +117,8 @@ def commit if @committable && @grpc_transaction @connection.session.commit_transaction @grpc_transaction, @mutations, - commit_options: commit_options + commit_options: commit_options, + exclude_txn_from_change_streams: exclude_txn_from_change_streams end @state = :COMMITTED rescue Google::Cloud::NotFoundError => e diff --git a/lib/spanner_client_ext.rb b/lib/spanner_client_ext.rb index f70305b0..45b21ef3 100644 --- a/lib/spanner_client_ext.rb +++ b/lib/spanner_client_ext.rb @@ -23,14 +23,15 @@ def create_session instance_id, database_id, labels: nil end class Session - def commit_transaction transaction, mutations = [], commit_options: nil + def commit_transaction transaction, mutations = [], commit_options: nil, exclude_txn_from_change_streams: false ensure_service! resp = service.commit( path, mutations, transaction_id: transaction.transaction_id, - commit_options: commit_options + commit_options: commit_options, + exclude_txn_from_change_streams: exclude_txn_from_change_streams ) @last_updated_at = Time.now Convert.timestamp_to_time resp.commit_timestamp diff --git a/test/activerecord_spanner_adapter/transaction_test.rb b/test/activerecord_spanner_adapter/transaction_test.rb index e87a0a42..14b5462f 100644 --- a/test/activerecord_spanner_adapter/transaction_test.rb +++ b/test/activerecord_spanner_adapter/transaction_test.rb @@ -42,6 +42,15 @@ def test_commit_options assert_equal 1000, commit_options[:max_commit_delay] end + def test_exclude_txn_from_change_streams + transaction.begin + transaction.exclude_txn_from_change_streams = true + assert transaction.exclude_txn_from_change_streams + transaction.commit + assert_equal :COMMITTED, transaction.state + assert transaction.exclude_txn_from_change_streams + end + def test_no_nested_transactions transaction.begin diff --git a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb index 79f945e7..997b5b43 100644 --- a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb +++ b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb @@ -1335,6 +1335,30 @@ def test_upsert_all_dml assert_equal 1, execute_requests.length end + def test_upsert_all_dml_with_exclude_from_change_streams + sql = "INSERT OR UPDATE INTO `singers` (`id`,`first_name`,`last_name`) " + + "VALUES (1, 'Dave', 'Allison'), (2, 'Alice', 'Davidson'), (3, 'Rene', 'Henderson')" + @mock.put_statement_result sql, StatementResult.new(3) + values = [ + {id: 1, first_name: "Dave", last_name: "Allison"}, + {id: 2, first_name: "Alice", last_name: "Davidson"}, + {id: 3, first_name: "Rene", last_name: "Henderson"}, + ] + Singer.transaction(exclude_txn_from_change_streams: true) do + Singer.upsert_all values + end + execute_requests = @mock.requests.select { |req| + req.is_a?(Google::Cloud::Spanner::V1::ExecuteSqlRequest) && req.sql == sql + } + assert_equal 1, execute_requests.length + exec_req = execute_requests.first + + refute_nil exec_req.transaction + begin_opts = exec_req.transaction&.begin + refute_nil begin_opts + assert_equal true, begin_opts.exclude_txn_from_change_streams + end + def test_binary_id user = User.create!( email: "test@example.com", From e2aa445202a7afe7ebb1524a9c4b290c91cfffd8 Mon Sep 17 00:00:00 2001 From: Akash Anand Date: Thu, 12 Jun 2025 08:05:58 +0000 Subject: [PATCH 2/2] test: updating tests to verify that exclude_txn_from_change_streams option is set correctly --- .../transaction.rb | 4 ++-- lib/spanner_client_ext.rb | 5 ++-- .../transaction_test.rb | 4 ++-- ...ner_active_record_with_mock_server_test.rb | 24 +++++++++---------- 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/lib/activerecord_spanner_adapter/transaction.rb b/lib/activerecord_spanner_adapter/transaction.rb index 4e4f07bd..c2763c60 100644 --- a/lib/activerecord_spanner_adapter/transaction.rb +++ b/lib/activerecord_spanner_adapter/transaction.rb @@ -8,6 +8,7 @@ module ActiveRecordSpannerAdapter class Transaction attr_reader :state attr_reader :commit_options + attr_reader :begin_transaction_selector attr_accessor :exclude_txn_from_change_streams @@ -117,8 +118,7 @@ def commit if @committable && @grpc_transaction @connection.session.commit_transaction @grpc_transaction, @mutations, - commit_options: commit_options, - exclude_txn_from_change_streams: exclude_txn_from_change_streams + commit_options: commit_options end @state = :COMMITTED rescue Google::Cloud::NotFoundError => e diff --git a/lib/spanner_client_ext.rb b/lib/spanner_client_ext.rb index 45b21ef3..f70305b0 100644 --- a/lib/spanner_client_ext.rb +++ b/lib/spanner_client_ext.rb @@ -23,15 +23,14 @@ def create_session instance_id, database_id, labels: nil end class Session - def commit_transaction transaction, mutations = [], commit_options: nil, exclude_txn_from_change_streams: false + def commit_transaction transaction, mutations = [], commit_options: nil ensure_service! resp = service.commit( path, mutations, transaction_id: transaction.transaction_id, - commit_options: commit_options, - exclude_txn_from_change_streams: exclude_txn_from_change_streams + commit_options: commit_options ) @last_updated_at = Time.now Convert.timestamp_to_time resp.commit_timestamp diff --git a/test/activerecord_spanner_adapter/transaction_test.rb b/test/activerecord_spanner_adapter/transaction_test.rb index 14b5462f..cc859c43 100644 --- a/test/activerecord_spanner_adapter/transaction_test.rb +++ b/test/activerecord_spanner_adapter/transaction_test.rb @@ -43,12 +43,12 @@ def test_commit_options end def test_exclude_txn_from_change_streams - transaction.begin transaction.exclude_txn_from_change_streams = true assert transaction.exclude_txn_from_change_streams + transaction.begin + assert_equal true, transaction.begin_transaction_selector.begin.exclude_txn_from_change_streams transaction.commit assert_equal :COMMITTED, transaction.state - assert transaction.exclude_txn_from_change_streams end def test_no_nested_transactions diff --git a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb index 997b5b43..80d88a06 100644 --- a/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb +++ b/test/activerecord_spanner_mock_server/spanner_active_record_with_mock_server_test.rb @@ -1335,21 +1335,19 @@ def test_upsert_all_dml assert_equal 1, execute_requests.length end - def test_upsert_all_dml_with_exclude_from_change_streams - sql = "INSERT OR UPDATE INTO `singers` (`id`,`first_name`,`last_name`) " + - "VALUES (1, 'Dave', 'Allison'), (2, 'Alice', 'Davidson'), (3, 'Rene', 'Henderson')" - @mock.put_statement_result sql, StatementResult.new(3) - values = [ - {id: 1, first_name: "Dave", last_name: "Allison"}, - {id: 2, first_name: "Alice", last_name: "Davidson"}, - {id: 3, first_name: "Rene", last_name: "Henderson"}, - ] - Singer.transaction(exclude_txn_from_change_streams: true) do - Singer.upsert_all values + def test_create_with_sequence_and_exclude_from_change_streams + sql = "INSERT INTO `table_with_sequence` (`name`) VALUES (@p1) THEN RETURN `id`" + @mock.put_statement_result sql, MockServerTests::create_id_returning_result_set(1, 1) + + record = TableWithSequence.transaction(exclude_txn_from_change_streams: true) do + TableWithSequence.create(name: "Foo") end - execute_requests = @mock.requests.select { |req| + + assert_equal 1, record.id + execute_requests = @mock.requests.select do |req| req.is_a?(Google::Cloud::Spanner::V1::ExecuteSqlRequest) && req.sql == sql - } + end + assert_equal 1, execute_requests.length exec_req = execute_requests.first