Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 193 additions & 1 deletion test/asynchronous/test_retryable_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@
from bson.int64 import Int64
from bson.raw_bson import RawBSONDocument
from bson.son import SON
from pymongo import MongoClient
from pymongo.errors import (
AutoReconnect,
ConnectionFailure,
OperationFailure,
NotPrimaryError,
ServerSelectionTimeoutError,
WriteConcernError,
)
from pymongo.monitoring import (
CommandFailedEvent,
CommandSucceededEvent,
ConnectionCheckedOutEvent,
ConnectionCheckOutFailedEvent,
Expand Down Expand Up @@ -601,5 +603,195 @@ def raise_connection_err_select_server(*args, **kwargs):
self.assertEqual(sent_txn_id, final_txn_id, msg)


class TestErrorPropagationAfterEncounteringMultipleErrors(AsyncIntegrationTest):
# Only run against replica sets as mongos does not propagate the NoWritesPerformed label to the drivers.
@async_client_context.require_replica_set
# Run against server versions 6.0 and above.
@async_client_context.require_version_min(6, 0) # type: ignore[untyped-decorator]
async def asyncSetUp(self) -> None:
await super().asyncSetUp()
self.setup_client = MongoClient(**async_client_context.default_client_options)
self.addCleanup(self.setup_client.close)

# TODO: After PYTHON-4595 we can use async event handlers and remove this workaround.
def configure_fail_point_sync(self, command_args, off=False) -> None:
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
if off:
cmd["mode"] = "off"
cmd.pop("data", None)
self.setup_client.admin.command(cmd)

async def test_01_drivers_return_the_correct_error_when_receiving_only_errors_without_NoWritesPerformed(
self
) -> None:
# Create a client with retryWrites=true.
listener = OvertCommandListener()

# Configure a fail point with error code 91 (ShutdownInProgress) with the RetryableError and SystemOverloadedError error labels.
command_args = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["insert"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 91,
},
}

# Via the command monitoring CommandFailedEvent, configure a fail point with error code 10107 (NotWritablePrimary).
command_args_inner = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["insert"],
"errorCode": 10107,
"errorLabels": ["RetryableError", "SystemOverloadedError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the 10107 fail point command only if the the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(command_args_inner)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(retryWrites=True, event_listeners=[listener])
self.addAsyncCleanup(client.close)

self.configure_fail_point_sync(command_args)

# Attempt an insertOne operation on any record for any database and collection.
# Expect the insertOne to fail with a server error.
with self.assertRaises(NotPrimaryError) as exc:
await client.test.test.insert_one({})

# Assert that the error code of the server error is 10107.
assert exc.exception.errors["code"] == 10107 # type:ignore[call-overload]

# Disable the fail point.
self.configure_fail_point_sync({}, off=True)

async def test_02_drivers_return_the_correct_error_when_receiving_only_errors_with_NoWritesPerformed(
self
) -> None:
# Create a client with retryWrites=true.
listener = OvertCommandListener()

# Configure a fail point with error code 91 (ShutdownInProgress) with the RetryableError and SystemOverloadedError error labels.
command_args = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["insert"],
"errorLabels": ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"],
"errorCode": 91,
},
}

# Via the command monitoring CommandFailedEvent, configure a fail point with error code `10107` (NotWritablePrimary)
# and a NoWritesPerformed label.
command_args_inner = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["insert"],
"errorCode": 10107,
"errorLabels": ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"],
},
}

def failed(event: CommandFailedEvent) -> None:
if listener.failed_events:
return
# Configure the 10107 fail point command only if the the failed event is for the 91 error configured in step 2.
assert event.failure["code"] == 91
self.configure_fail_point_sync(command_args_inner)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(retryWrites=True, event_listeners=[listener])
self.addAsyncCleanup(client.close)

self.configure_fail_point_sync(command_args)

# Attempt an insertOne operation on any record for any database and collection.
# Expect the insertOne to fail with a server error.
with self.assertRaises(NotPrimaryError) as exc:
await client.test.test.insert_one({})

# Assert that the error code of the server error is 91.
assert exc.exception.errors["code"] == 91 # type:ignore[call-overload]

# Disable the fail point.
self.configure_fail_point_sync({}, off=True)

async def test_03_drivers_return_the_correct_error_when_receiving_some_errors_with_NoWritesPerformed_and_some_without_NoWritesPerformed(
self
) -> None:
# TODO: read the expected behavior and add breakpoint() to the retry loop
# Create a client with retryWrites=true.
listener = OvertCommandListener()

# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
# code `91` (NotWritablePrimary) and the `NoWritesPerformed`, `RetryableError` and `SystemOverloadedError` labels.
command_args_inner = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NoahStapp I had to change this from the spec because otherwise the error is never triggered (it is retried). I think this was a bug in the original spec, since I'm the first one implementing it. Should I make a PR to update the specification?

Copy link
Contributor

@NoahStapp NoahStapp Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe the change you had to make?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was a copy-paste error. I had to change mode: {times: 1} to mode: alwaysOn for the second failCommand to ensure it failed on backpressure retries.

"data": {
"failCommands": ["insert"],
"errorLabels": ["RetryableError", "SystemOverloadedError", "NoWritesPerformed"],
"errorCode": 91,
},
}

# Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and
# `SystemOverloadedError` error labels but without the `NoWritesPerformed` error label.
command_args = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["insert"],
"errorCode": 91,
"errorLabels": ["RetryableError", "SystemOverloadedError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the fail point command only if the the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(command_args_inner)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(retryWrites=True, event_listeners=[listener])
self.addAsyncCleanup(client.close)

self.configure_fail_point_sync(command_args)

# Attempt an insertOne operation on any record for any database and collection.
# Expect the insertOne to fail with a server error.
from pymongo.errors import OperationFailure

with self.assertRaises(Exception) as exc:
await client.test.test.insert_one({})

# Assert that the error code of the server error is 91.
assert exc.exception.errors["code"] == 91
# Assert that the error does not contain the error label `NoWritesPerformed`.
assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"]

# Disable the fail point.
self.configure_fail_point_sync({}, off=True)


if __name__ == "__main__":
unittest.main()
Loading
Loading