PYTHON-5212 - Do not hold Topology lock while resetting pool#2301
PYTHON-5212 - Do not hold Topology lock while resetting pool#2301NoahStapp merged 10 commits intomongodb:masterfrom
Conversation
| for conn in sockets: | ||
| await conn.close_conn(ConnectionClosedReason.POOL_CLOSED) | ||
| if not _IS_SYNC: | ||
| await asyncio.gather( |
There was a problem hiding this comment.
We probably want to use return_exceptions=True here to ensure all tasks complete.
| for conn in close_conns: | ||
| await conn.close_conn(ConnectionClosedReason.IDLE) | ||
| if not _IS_SYNC: | ||
| await asyncio.gather( |
| await self._process_change(server_description, reset_pool, interrupt_connections) | ||
| # Clear the pool from a failed heartbeat, done outside the lock to avoid blocking on connection close. | ||
| if self._opened and self._description.has_server(server_description.address) and reset_pool: | ||
| server = self._servers.get(server_description.address) |
There was a problem hiding this comment.
The has_server -> _servers.get pattern is not safe to do here (https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use)
We also don't need to check for closed/opened because pool.reset is safe to call even after close().
Instead we can do:
if reset_pool:
server = self._servers.get(server_description.address)
if server:
await server.pool.reset(interrupt_connections=interrupt_connections)
That was my thought as well, working on adding such a test today. |
| elapsed = time.monotonic() - start_time | ||
| latencies.append(elapsed) | ||
| if elapsed >= close_delay: | ||
| break |
There was a problem hiding this comment.
If elapsed is always < close_delay, this loop will never exit?
Could we replace this with an exit condition like this?:
latencies = []
should_exit = []
async def run_task():
...
if should_exit:
break
...
# Wait until all idle connections are closed to simulate real-world conditions
await listener.async_wait_for_event(monitoring.ConnectionClosedEvent, 10)
should_exit.append(True)
await task.join()There was a problem hiding this comment.
The classic list-as-mutable-state technique!
| # Wait until all idle connections are closed to simulate real-world conditions | ||
| await listener.async_wait_for_event(monitoring.ConnectionClosedEvent, 10) | ||
| # No operation latency should not significantly exceed close_delay | ||
| self.assertLessEqual(max(latencies), close_delay * 2.0) |
There was a problem hiding this comment.
I worry this test will be flaky. A single op can easily take >100 ms in our CI depending on the host (mac/windows). Could you increase the close_delay to 0.1 and increase this line to close_delay * 5? Since there are 10 connections to close, * 5 should still catch the regression right?
There was a problem hiding this comment.
I would expect close_delay * 5 to catch regressions consistently, yeah.
| minPoolSize=10, | ||
| ) | ||
| server = await (await client._get_topology()).select_server( | ||
| readable_server_selector, _Op.TEST |
There was a problem hiding this comment.
readable_server_selector -> writeable_server_selector. The test is using primary read preference so we should wait for 10 connections to the primary node.
There was a problem hiding this comment.
Because only the primary is writeable? Makes sense.
test/test_client.py
Outdated
| MongoClient(["host1", "host2"], directConnection=True) | ||
|
|
||
| @unittest.skipIf("PyPy" in sys.version, "PYTHON-2927 fails often on PyPy") | ||
| @skipIf(os.environ.get("DEBUG_LOG"), "Enabling debug logs breaks this test") |
There was a problem hiding this comment.
Was this intentionally added?
There was a problem hiding this comment.
Whoops this was for testing purposes. Intended to be done in a separate ticket.
| await listener.async_wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1) | ||
| # Wait until all idle connections are closed to simulate real-world conditions | ||
| await listener.async_wait_for_event(monitoring.ConnectionClosedEvent, 10) | ||
| should_exit.append(True) |
There was a problem hiding this comment.
The test should join the task here after should_exit.append(True) otherwise we may miss a latency. Also one more thing:
# Wait until all idle connections are closed to simulate real-world conditions
await listener.async_wait_for_event(monitoring.ConnectionClosedEvent, 10)
# Wait for one more find to complete, then shutdown the task.
n = len(latencies)
await async_wait_until(lambda: len(latencies) >= n + 1, "run one more find")
should_exit.append(True)
await task.join()
There was a problem hiding this comment.
I see, that way we ensure that the operations are still working after the pool reset completes.
No description provided.