diff --git a/test/asynchronous/test_retryable_writes.py b/test/asynchronous/test_retryable_writes.py index ddb1d39eb7..6b1b8ef681 100644 --- a/test/asynchronous/test_retryable_writes.py +++ b/test/asynchronous/test_retryable_writes.py @@ -43,14 +43,16 @@ from bson.int64 import Int64 from bson.raw_bson import RawBSONDocument from bson.son import SON +from pymongo import MongoClient from pymongo.errors import ( AutoReconnect, ConnectionFailure, - OperationFailure, + NotPrimaryError, ServerSelectionTimeoutError, WriteConcernError, ) from pymongo.monitoring import ( + CommandFailedEvent, CommandSucceededEvent, ConnectionCheckedOutEvent, ConnectionCheckOutFailedEvent, @@ -601,5 +603,195 @@ def raise_connection_err_select_server(*args, **kwargs): self.assertEqual(sent_txn_id, final_txn_id, msg) +class TestErrorPropagationAfterEncounteringMultipleErrors(AsyncIntegrationTest): + # Only run against replica sets as mongos does not propagate the NoWritesPerformed label to the drivers. + @async_client_context.require_replica_set + # Run against server versions 6.0 and above. + @async_client_context.require_version_min(6, 0) # type: ignore[untyped-decorator] + async def asyncSetUp(self) -> None: + await super().asyncSetUp() + self.setup_client = MongoClient(**async_client_context.default_client_options) + self.addCleanup(self.setup_client.close) + + # TODO: After PYTHON-4595 we can use async event handlers and remove this workaround. + def configure_fail_point_sync(self, command_args, off=False) -> None: + cmd = {"configureFailPoint": "failCommand"} + cmd.update(command_args) + if off: + cmd["mode"] = "off" + cmd.pop("data", None) + self.setup_client.admin.command(cmd) + + async def test_01_drivers_return_the_correct_error_when_receiving_only_errors_without_NoWritesPerformed( + self + ) -> None: + # Create a client with retryWrites=true. + listener = OvertCommandListener() + + # Configure a fail point with error code 91 (ShutdownInProgress) with the RetryableError and SystemOverloadedError error labels. + command_args = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "errorLabels": ["RetryableError", "SystemOverloadedError"], + "errorCode": 91, + }, + } + + # Via the command monitoring CommandFailedEvent, configure a fail point with error code 10107 (NotWritablePrimary). + command_args_inner = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["insert"], + "errorCode": 10107, + "errorLabels": ["RetryableError", "SystemOverloadedError"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + # Configure the 10107 fail point command only if the the failed event is for the 91 error configured in step 2. + if listener.failed_events: + return + assert event.failure["code"] == 91 + self.configure_fail_point_sync(command_args_inner) + listener.failed_events.append(event) + + listener.failed = failed + + client = await self.async_rs_client(retryWrites=True, event_listeners=[listener]) + self.addAsyncCleanup(client.close) + + self.configure_fail_point_sync(command_args) + + # Attempt an insertOne operation on any record for any database and collection. + # Expect the insertOne to fail with a server error. + with self.assertRaises(NotPrimaryError) as exc: + await client.test.test.insert_one({}) + + # Assert that the error code of the server error is 10107. + assert exc.exception.errors["code"] == 10107 # type:ignore[call-overload] + + # Disable the fail point. + self.configure_fail_point_sync({}, off=True) + + async def test_02_drivers_return_the_correct_error_when_receiving_only_errors_with_NoWritesPerformed( + self + ) -> None: + # Create a client with retryWrites=true. + listener = OvertCommandListener() + + # Configure a fail point with error code 91 (ShutdownInProgress) with the RetryableError and SystemOverloadedError error labels. + command_args = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "errorLabels": ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"], + "errorCode": 91, + }, + } + + # Via the command monitoring CommandFailedEvent, configure a fail point with error code `10107` (NotWritablePrimary) + # and a NoWritesPerformed label. + command_args_inner = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["insert"], + "errorCode": 10107, + "errorLabels": ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + if listener.failed_events: + return + # Configure the 10107 fail point command only if the the failed event is for the 91 error configured in step 2. + assert event.failure["code"] == 91 + self.configure_fail_point_sync(command_args_inner) + listener.failed_events.append(event) + + listener.failed = failed + + client = await self.async_rs_client(retryWrites=True, event_listeners=[listener]) + self.addAsyncCleanup(client.close) + + self.configure_fail_point_sync(command_args) + + # Attempt an insertOne operation on any record for any database and collection. + # Expect the insertOne to fail with a server error. + with self.assertRaises(NotPrimaryError) as exc: + await client.test.test.insert_one({}) + + # Assert that the error code of the server error is 91. + assert exc.exception.errors["code"] == 91 # type:ignore[call-overload] + + # Disable the fail point. + self.configure_fail_point_sync({}, off=True) + + async def test_03_drivers_return_the_correct_error_when_receiving_some_errors_with_NoWritesPerformed_and_some_without_NoWritesPerformed( + self + ) -> None: + # TODO: read the expected behavior and add breakpoint() to the retry loop + # Create a client with retryWrites=true. + listener = OvertCommandListener() + + # Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error + # code `91` (NotWritablePrimary) and the `NoWritesPerformed`, `RetryableError` and `SystemOverloadedError` labels. + command_args_inner = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["insert"], + "errorLabels": ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"], + "errorCode": 91, + }, + } + + # Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and + # `SystemOverloadedError` error labels but without the `NoWritesPerformed` error label. + command_args = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "errorCode": 91, + "errorLabels": ["RetryableError", "SystemOverloadedError"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + # Configure the fail point command only if the the failed event is for the 91 error configured in step 2. + if listener.failed_events: + return + assert event.failure["code"] == 91 + self.configure_fail_point_sync(command_args_inner) + listener.failed_events.append(event) + + listener.failed = failed + + client = await self.async_rs_client(retryWrites=True, event_listeners=[listener]) + self.addAsyncCleanup(client.close) + + self.configure_fail_point_sync(command_args) + + # Attempt an insertOne operation on any record for any database and collection. + # Expect the insertOne to fail with a server error. + from pymongo.errors import OperationFailure + + with self.assertRaises(Exception) as exc: + await client.test.test.insert_one({}) + + # Assert that the error code of the server error is 91. + assert exc.exception.errors["code"] == 91 + # Assert that the error does not contain the error label `NoWritesPerformed`. + assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"] + + # Disable the fail point. + self.configure_fail_point_sync({}, off=True) + + if __name__ == "__main__": unittest.main() diff --git a/test/test_retryable_writes.py b/test/test_retryable_writes.py index a74a3e8030..dd55d371dc 100644 --- a/test/test_retryable_writes.py +++ b/test/test_retryable_writes.py @@ -43,14 +43,16 @@ from bson.int64 import Int64 from bson.raw_bson import RawBSONDocument from bson.son import SON +from pymongo import MongoClient from pymongo.errors import ( AutoReconnect, ConnectionFailure, - OperationFailure, + NotPrimaryError, ServerSelectionTimeoutError, WriteConcernError, ) from pymongo.monitoring import ( + CommandFailedEvent, CommandSucceededEvent, ConnectionCheckedOutEvent, ConnectionCheckOutFailedEvent, @@ -597,5 +599,195 @@ def raise_connection_err_select_server(*args, **kwargs): self.assertEqual(sent_txn_id, final_txn_id, msg) +class TestErrorPropagationAfterEncounteringMultipleErrors(IntegrationTest): + # Only run against replica sets as mongos does not propagate the NoWritesPerformed label to the drivers. + @client_context.require_replica_set + # Run against server versions 6.0 and above. + @client_context.require_version_min(6, 0) # type: ignore[untyped-decorator] + def setUp(self) -> None: + super().setUp() + self.setup_client = MongoClient(**client_context.default_client_options) + self.addCleanup(self.setup_client.close) + + # TODO: After PYTHON-4595 we can use async event handlers and remove this workaround. + def configure_fail_point_sync(self, command_args, off=False) -> None: + cmd = {"configureFailPoint": "failCommand"} + cmd.update(command_args) + if off: + cmd["mode"] = "off" + cmd.pop("data", None) + self.setup_client.admin.command(cmd) + + def test_01_drivers_return_the_correct_error_when_receiving_only_errors_without_NoWritesPerformed( + self + ) -> None: + # Create a client with retryWrites=true. + listener = OvertCommandListener() + + # Configure a fail point with error code 91 (ShutdownInProgress) with the RetryableError and SystemOverloadedError error labels. + command_args = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "errorLabels": ["RetryableError", "SystemOverloadedError"], + "errorCode": 91, + }, + } + + # Via the command monitoring CommandFailedEvent, configure a fail point with error code 10107 (NotWritablePrimary). + command_args_inner = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["insert"], + "errorCode": 10107, + "errorLabels": ["RetryableError", "SystemOverloadedError"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + # Configure the 10107 fail point command only if the the failed event is for the 91 error configured in step 2. + if listener.failed_events: + return + assert event.failure["code"] == 91 + self.configure_fail_point_sync(command_args_inner) + listener.failed_events.append(event) + + listener.failed = failed + + client = self.rs_client(retryWrites=True, event_listeners=[listener]) + self.addCleanup(client.close) + + self.configure_fail_point_sync(command_args) + + # Attempt an insertOne operation on any record for any database and collection. + # Expect the insertOne to fail with a server error. + with self.assertRaises(NotPrimaryError) as exc: + client.test.test.insert_one({}) + + # Assert that the error code of the server error is 10107. + assert exc.exception.errors["code"] == 10107 # type:ignore[call-overload] + + # Disable the fail point. + self.configure_fail_point_sync({}, off=True) + + def test_02_drivers_return_the_correct_error_when_receiving_only_errors_with_NoWritesPerformed( + self + ) -> None: + # Create a client with retryWrites=true. + listener = OvertCommandListener() + + # Configure a fail point with error code 91 (ShutdownInProgress) with the RetryableError and SystemOverloadedError error labels. + command_args = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "errorLabels": ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"], + "errorCode": 91, + }, + } + + # Via the command monitoring CommandFailedEvent, configure a fail point with error code `10107` (NotWritablePrimary) + # and a NoWritesPerformed label. + command_args_inner = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["insert"], + "errorCode": 10107, + "errorLabels": ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + if listener.failed_events: + return + # Configure the 10107 fail point command only if the the failed event is for the 91 error configured in step 2. + assert event.failure["code"] == 91 + self.configure_fail_point_sync(command_args_inner) + listener.failed_events.append(event) + + listener.failed = failed + + client = self.rs_client(retryWrites=True, event_listeners=[listener]) + self.addCleanup(client.close) + + self.configure_fail_point_sync(command_args) + + # Attempt an insertOne operation on any record for any database and collection. + # Expect the insertOne to fail with a server error. + with self.assertRaises(NotPrimaryError) as exc: + client.test.test.insert_one({}) + + # Assert that the error code of the server error is 91. + assert exc.exception.errors["code"] == 91 # type:ignore[call-overload] + + # Disable the fail point. + self.configure_fail_point_sync({}, off=True) + + def test_03_drivers_return_the_correct_error_when_receiving_some_errors_with_NoWritesPerformed_and_some_without_NoWritesPerformed( + self + ) -> None: + # TODO: read the expected behavior and add breakpoint() to the retry loop + # Create a client with retryWrites=true. + listener = OvertCommandListener() + + # Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error + # code `91` (NotWritablePrimary) and the `NoWritesPerformed`, `RetryableError` and `SystemOverloadedError` labels. + command_args_inner = { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": ["insert"], + "errorLabels": ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"], + "errorCode": 91, + }, + } + + # Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and + # `SystemOverloadedError` error labels but without the `NoWritesPerformed` error label. + command_args = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "errorCode": 91, + "errorLabels": ["RetryableError", "SystemOverloadedError"], + }, + } + + def failed(event: CommandFailedEvent) -> None: + # Configure the fail point command only if the the failed event is for the 91 error configured in step 2. + if listener.failed_events: + return + assert event.failure["code"] == 91 + self.configure_fail_point_sync(command_args_inner) + listener.failed_events.append(event) + + listener.failed = failed + + client = self.rs_client(retryWrites=True, event_listeners=[listener]) + self.addCleanup(client.close) + + self.configure_fail_point_sync(command_args) + + # Attempt an insertOne operation on any record for any database and collection. + # Expect the insertOne to fail with a server error. + from pymongo.errors import OperationFailure + + with self.assertRaises(Exception) as exc: + client.test.test.insert_one({}) + + # Assert that the error code of the server error is 91. + assert exc.exception.errors["code"] == 91 + # Assert that the error does not contain the error label `NoWritesPerformed`. + assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"] + + # Disable the fail point. + self.configure_fail_point_sync({}, off=True) + + if __name__ == "__main__": unittest.main()