-
Notifications
You must be signed in to change notification settings - Fork 0
[Optimize] Improve AIOSocketServer performance and signal handling #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d6ed096
76f6408
6980d34
36ecc3d
a2f7e0c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -33,7 +33,15 @@ def onException( | |||||||||||||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||||||||||||||
| e = context.get("exception") | ||||||||||||||||||||||||||||||||||||||||
| if e: | ||||||||||||||||||||||||||||||||||||||||
| exception(e) | ||||||||||||||||||||||||||||||||||||||||
| if isinstance(e, asyncio.InvalidStateError): | ||||||||||||||||||||||||||||||||||||||||
| # Known race condition between wait_for timeout and sock_accept | ||||||||||||||||||||||||||||||||||||||||
| # callback. Log as warning rather than exception to avoid alarm. | ||||||||||||||||||||||||||||||||||||||||
| warning( | ||||||||||||||||||||||||||||||||||||||||
| "AsyncIO InvalidStateError (likely accept race)", | ||||||||||||||||||||||||||||||||||||||||
| Error=str(e), | ||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||
| exception(e) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| class ServerOptions(NamedTuple): | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -392,13 +400,48 @@ async def Serve( | |||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||
| # We use a persistent accept task instead of creating a new one each | ||||||||||||||||||||||||||||||||||||||||
| # iteration via wait_for. This avoids the known race condition where | ||||||||||||||||||||||||||||||||||||||||
| # wait_for cancels the sock_accept future but the internal _sock_accept | ||||||||||||||||||||||||||||||||||||||||
| # callback still fires, causing InvalidStateError and leaking the | ||||||||||||||||||||||||||||||||||||||||
| # accepted socket file descriptor. | ||||||||||||||||||||||||||||||||||||||||
| accept_task: asyncio.Task | None = None | ||||||||||||||||||||||||||||||||||||||||
| while state.isRunning: | ||||||||||||||||||||||||||||||||||||||||
| if options.condition and not options.condition(): | ||||||||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||
| res = await asyncio.wait_for( | ||||||||||||||||||||||||||||||||||||||||
| loop.sock_accept(server), timeout=options.polling or 1.0 | ||||||||||||||||||||||||||||||||||||||||
| if accept_task is None: | ||||||||||||||||||||||||||||||||||||||||
| accept_task = loop.create_task( | ||||||||||||||||||||||||||||||||||||||||
| loop.sock_accept(server) | ||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| # asyncio.wait does NOT cancel the future on timeout, | ||||||||||||||||||||||||||||||||||||||||
| # unlike asyncio.wait_for. The accept_task persists across | ||||||||||||||||||||||||||||||||||||||||
| # timeout iterations until a connection actually arrives. | ||||||||||||||||||||||||||||||||||||||||
| done, _ = await asyncio.wait( | ||||||||||||||||||||||||||||||||||||||||
| {accept_task}, | ||||||||||||||||||||||||||||||||||||||||
| timeout=options.polling or 1.0, | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+408
to
+423
|
||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| if accept_task not in done: | ||||||||||||||||||||||||||||||||||||||||
| # Timeout, no connection yet — loop back and reuse | ||||||||||||||||||||||||||||||||||||||||
| # the same accept_task. | ||||||||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| # Connection arrived (or error). Consume the result and | ||||||||||||||||||||||||||||||||||||||||
| # clear accept_task so a new one is created next iteration. | ||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||
| res = accept_task.result() | ||||||||||||||||||||||||||||||||||||||||
| except OSError as e: | ||||||||||||||||||||||||||||||||||||||||
| if e.errno == 24: | ||||||||||||||||||||||||||||||||||||||||
| # Too many open files — backpressure | ||||||||||||||||||||||||||||||||||||||||
| await asyncio.sleep(0.1) | ||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||
| exception(e) | ||||||||||||||||||||||||||||||||||||||||
| accept_task = None | ||||||||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| accept_task = None | ||||||||||||||||||||||||||||||||||||||||
| if res is None: | ||||||||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -409,17 +452,22 @@ async def Serve( | |||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
| tasks.add(task) | ||||||||||||||||||||||||||||||||||||||||
| task.add_done_callback(tasks.discard) | ||||||||||||||||||||||||||||||||||||||||
| except asyncio.TimeoutError: | ||||||||||||||||||||||||||||||||||||||||
| except asyncio.InvalidStateError: | ||||||||||||||||||||||||||||||||||||||||
| # Defensive: should not happen with the new pattern, | ||||||||||||||||||||||||||||||||||||||||
| # but guard against it to prevent server crash. | ||||||||||||||||||||||||||||||||||||||||
| warning("InvalidStateError in accept loop (recovered)") | ||||||||||||||||||||||||||||||||||||||||
| accept_task = None | ||||||||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||||||||
| except OSError as e: | ||||||||||||||||||||||||||||||||||||||||
| # This can be: [OSError] [Errno 24] Too many open files | ||||||||||||||||||||||||||||||||||||||||
| if e.errno == 24: | ||||||||||||||||||||||||||||||||||||||||
| # Implement backpressure or wait mechanism here | ||||||||||||||||||||||||||||||||||||||||
| await asyncio.sleep(0.1) # Short delay before retrying | ||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||
| exception(e) | ||||||||||||||||||||||||||||||||||||||||
| except asyncio.CancelledError: | ||||||||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||||||||||||
| if accept_task and not accept_task.done(): | ||||||||||||||||||||||||||||||||||||||||
| accept_task.cancel() | ||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||
| await accept_task | ||||||||||||||||||||||||||||||||||||||||
| except (asyncio.CancelledError, Exception): | ||||||||||||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+462
to
+470
|
||||||||||||||||||||||||||||||||||||||||
| break | |
| finally: | |
| if accept_task and not accept_task.done(): | |
| accept_task.cancel() | |
| try: | |
| await accept_task | |
| except (asyncio.CancelledError, Exception): | |
| pass | |
| # Ensure that task cancellation propagates to the caller | |
| # while still triggering a clean shutdown. | |
| state.stop() | |
| raise | |
| finally: | |
| if accept_task and not accept_task.done(): | |
| accept_task.cancel() | |
| try: | |
| await accept_task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The event-loop exception handler now downgrades all
InvalidStateErrors to a warning without a stack trace or the loop-provided context message. This risks hiding real InvalidStateError bugs outside the accept loop and makes debugging harder; consider logging thecontext.get('message')and/or includingstack=Truefor this warning (or otherwise scoping this suppression to the accept-loop path only).