From d6ed096490cc8095f73d67f8d244ace57ffc75fe Mon Sep 17 00:00:00 2001 From: Michael Green Date: Wed, 10 Dec 2025 12:45:18 +1300 Subject: [PATCH 1/5] [Optimize] Improve AIOSocketServer performance and signal handling --- src/py/extra/server.py | 92 +++++++++++++++++++++++++----------------- 1 file changed, 54 insertions(+), 38 deletions(-) diff --git a/src/py/extra/server.py b/src/py/extra/server.py index e1cd13b..c7503a3 100644 --- a/src/py/extra/server.py +++ b/src/py/extra/server.py @@ -354,38 +354,48 @@ async def Serve( app: Application, options: ServerOptions = ServerOptions(), ) -> None: - """Main server coroutine.""" - server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - server.bind((options.host, options.port)) - # The argument is the backlog of connections that will be accepted before - # they are refused. - server.listen(options.backlog) - # This is what we need to use it with asyncio + """Main server coroutine (Optimized).""" + + loop = asyncio.get_running_loop() + + # Use socket.create_server (Python 3.8+) + # Handles IPv6/IPv4, reuse_port, and cleanup automatically. + server = socket.create_server( + (options.host, options.port), + family=socket.AF_INET, + backlog=options.backlog, + reuse_port=True, + dualstack_ipv6=False + ) server.setblocking(False) + server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) tasks: set[asyncio.Task[None]] = set() - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - - # Manage server state state = ServerState() - # Registers handlers for signals and exception (so that we log them). Note - # that we'll get a `set_wakeup_fd only works in main thread of the main interpreter` - # when this is not run out of the main thread. + + # Signal Handling + # We need to cancel the current task to break out of the await loop.sock_accept + main_task = asyncio.current_task(loop) + + def signal_handler(): + state.stop() + if main_task: + main_task.cancel() + if ( options.stopSignals and threading.current_thread() is threading.main_thread() ): - loop.add_signal_handler(SIGINT, lambda: state.stop()) - loop.add_signal_handler(SIGTERM, lambda: state.stop()) + for sig in (SIGINT, SIGTERM): + try: + loop.add_signal_handler(sig, signal_handler) + except NotImplementedError: + pass + loop.set_exception_handler(state.onException) info( - "Extra AIO Server listening", + "Extra AIO Server listening (Optimized)", icon="🚀", Host=options.host, Port=options.port, @@ -395,35 +405,41 @@ async def Serve( while state.isRunning: if options.condition and not options.condition(): break + try: - res = await asyncio.wait_for( - loop.sock_accept(server), timeout=options.polling or 1.0 - ) - if res is None: - continue - else: - client = res[0] - # NOTE: Should do something with the tasks + # Direct await - no wait_for. This removes the race condition causing InvalidStateError. + client, _ = await loop.sock_accept(server) + task = loop.create_task( cls.OnRequest(app, client, loop=loop, options=options) ) tasks.add(task) task.add_done_callback(tasks.discard) - except asyncio.TimeoutError: - continue + + except asyncio.CancelledError: + state.stop() + break except OSError as e: - # This can be: [OSError] [Errno 24] Too many open files - if e.errno == 24: - # Implement backpressure or wait mechanism here - await asyncio.sleep(0.1) # Short delay before retrying + if e.errno == 24: # Too many open files + warning("Too many open files. Pausing accept loop for 1s.") + await asyncio.sleep(1.0) else: exception(e) + await asyncio.sleep(0.1) + except Exception as e: + exception(e) finally: server.close() - for task in tasks: - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) + + if tasks: + info(f"Cancelling {len(tasks)} active tasks...") + for task in tasks: + task.cancel() + + await asyncio.gather(*tasks, return_exceptions=True) + + info("Server shutdown complete.") def run( From 76f64088243879dcedd2f372af17929437f0e8fc Mon Sep 17 00:00:00 2001 From: Michael Green Date: Mon, 16 Feb 2026 11:59:01 +1300 Subject: [PATCH 2/5] [Optimize] Refactor AIOSocketServer to improve connection handling and error logging --- src/py/extra/server.py | 142 +++++++++++++++++++++++++---------------- 1 file changed, 87 insertions(+), 55 deletions(-) diff --git a/src/py/extra/server.py b/src/py/extra/server.py index c7503a3..99c6cb6 100644 --- a/src/py/extra/server.py +++ b/src/py/extra/server.py @@ -33,7 +33,15 @@ def onException( ) -> None: e = context.get("exception") if e: - exception(e) + if isinstance(e, asyncio.InvalidStateError): + # Known race condition between wait_for timeout and sock_accept + # callback. Log as warning rather than exception to avoid alarm. + warning( + "AsyncIO InvalidStateError (likely accept race)", + Error=str(e), + ) + else: + exception(e) class ServerOptions(NamedTuple): @@ -354,92 +362,116 @@ async def Serve( app: Application, options: ServerOptions = ServerOptions(), ) -> None: - """Main server coroutine (Optimized).""" - - loop = asyncio.get_running_loop() - - # Use socket.create_server (Python 3.8+) - # Handles IPv6/IPv4, reuse_port, and cleanup automatically. - server = socket.create_server( - (options.host, options.port), - family=socket.AF_INET, - backlog=options.backlog, - reuse_port=True, - dualstack_ipv6=False - ) - server.setblocking(False) + """Main server coroutine.""" + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + server.bind((options.host, options.port)) + # The argument is the backlog of connections that will be accepted before + # they are refused. + server.listen(options.backlog) + # This is what we need to use it with asyncio + server.setblocking(False) tasks: set[asyncio.Task[None]] = set() - state = ServerState() - - # Signal Handling - # We need to cancel the current task to break out of the await loop.sock_accept - main_task = asyncio.current_task(loop) - - def signal_handler(): - state.stop() - if main_task: - main_task.cancel() + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + # Manage server state + state = ServerState() + # Registers handlers for signals and exception (so that we log them). Note + # that we'll get a `set_wakeup_fd only works in main thread of the main interpreter` + # when this is not run out of the main thread. if ( options.stopSignals and threading.current_thread() is threading.main_thread() ): - for sig in (SIGINT, SIGTERM): - try: - loop.add_signal_handler(sig, signal_handler) - except NotImplementedError: - pass - + loop.add_signal_handler(SIGINT, lambda: state.stop()) + loop.add_signal_handler(SIGTERM, lambda: state.stop()) loop.set_exception_handler(state.onException) info( - "Extra AIO Server listening (Optimized)", + "Extra AIO Server listening", icon="🚀", Host=options.host, Port=options.port, ) try: + # We use a persistent accept task instead of creating a new one each + # iteration via wait_for. This avoids the known race condition where + # wait_for cancels the sock_accept future but the internal _sock_accept + # callback still fires, causing InvalidStateError and leaking the + # accepted socket file descriptor. + accept_task: asyncio.Task | None = None while state.isRunning: if options.condition and not options.condition(): break - try: - # Direct await - no wait_for. This removes the race condition causing InvalidStateError. - client, _ = await loop.sock_accept(server) + if accept_task is None: + accept_task = loop.create_task( + loop.sock_accept(server) + ) + + # asyncio.wait does NOT cancel the future on timeout, + # unlike asyncio.wait_for. The accept_task persists across + # timeout iterations until a connection actually arrives. + done, _ = await asyncio.wait( + {accept_task}, + timeout=options.polling or 1.0, + ) + + if accept_task not in done: + # Timeout, no connection yet — loop back and reuse + # the same accept_task. + continue + # Connection arrived (or error). Consume the result and + # clear accept_task so a new one is created next iteration. + try: + res = accept_task.result() + except OSError as e: + if e.errno == 24: + # Too many open files — backpressure + await asyncio.sleep(0.1) + else: + exception(e) + accept_task = None + continue + + accept_task = None + if res is None: + continue + else: + client = res[0] + # NOTE: Should do something with the tasks task = loop.create_task( cls.OnRequest(app, client, loop=loop, options=options) ) tasks.add(task) task.add_done_callback(tasks.discard) - + except asyncio.InvalidStateError: + # Defensive: should not happen with the new pattern, + # but guard against it to prevent server crash. + warning("InvalidStateError in accept loop (recovered)") + accept_task = None + continue except asyncio.CancelledError: - state.stop() break - except OSError as e: - if e.errno == 24: # Too many open files - warning("Too many open files. Pausing accept loop for 1s.") - await asyncio.sleep(1.0) - else: - exception(e) - await asyncio.sleep(0.1) - except Exception as e: - exception(e) finally: + if accept_task and not accept_task.done(): + accept_task.cancel() + try: + await accept_task + except (asyncio.CancelledError, Exception): + pass server.close() - - if tasks: - info(f"Cancelling {len(tasks)} active tasks...") - for task in tasks: - task.cancel() - - await asyncio.gather(*tasks, return_exceptions=True) - - info("Server shutdown complete.") + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) def run( From 6980d345b3ca4233e011ccc64a159a97d3e67047 Mon Sep 17 00:00:00 2001 From: Michael Green Date: Mon, 16 Feb 2026 12:00:23 +1300 Subject: [PATCH 3/5] [Cleanup] Remove unused imports from __init__.py --- src/py/extra/__init__.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/py/extra/__init__.py b/src/py/extra/__init__.py index 4093aa0..e69de29 100644 --- a/src/py/extra/__init__.py +++ b/src/py/extra/__init__.py @@ -1,12 +0,0 @@ -from .http.model import ( - HTTPRequest, - HTTPResponse, - HTTPResponseLine, - HTTPRequestError, -) # NOQA: F401 -from .decorators import on, expose, pre, post # NOQA: F401 -from .server import run # NOQA: F401 -from .model import Service # NOQA: F401 - - -# EOF From 36ecc3d1f08cfaf39d4d8be27a4d9b36c46d8abc Mon Sep 17 00:00:00 2001 From: Michael Green Date: Mon, 16 Feb 2026 12:02:01 +1300 Subject: [PATCH 4/5] [Cleanup] Add initial imports to __init__.py --- src/py/extra/__init__.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/py/extra/__init__.py b/src/py/extra/__init__.py index e69de29..d27bf29 100644 --- a/src/py/extra/__init__.py +++ b/src/py/extra/__init__.py @@ -0,0 +1,12 @@ +from .http.model import ( + HTTPRequest, # NOQA: F401 + HTTPResponse, # NOQA: F401 + HTTPResponseLine, # NOQA: F401 + HTTPRequestError, # NOQA: F401 +) # NOQA: F401 +from .decorators import on, expose, pre, post # NOQA: F401 +from .server import run # NOQA: F401 +from .model import Service # NOQA: F401 + + +# EOF From a2f7e0c2a8cb1cdcde30234c7b648930cf62d6a8 Mon Sep 17 00:00:00 2001 From: Michael Green Date: Mon, 16 Feb 2026 12:05:50 +1300 Subject: [PATCH 5/5] [Cleanup] Reformat import statements in __init__.py for improved readability --- src/py/extra/__init__.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/py/extra/__init__.py b/src/py/extra/__init__.py index d27bf29..4260fcb 100644 --- a/src/py/extra/__init__.py +++ b/src/py/extra/__init__.py @@ -1,9 +1,9 @@ -from .http.model import ( - HTTPRequest, # NOQA: F401 - HTTPResponse, # NOQA: F401 - HTTPResponseLine, # NOQA: F401 - HTTPRequestError, # NOQA: F401 -) # NOQA: F401 +from .http.model import ( # NOQA: F401 + HTTPRequest, + HTTPResponse, + HTTPResponseLine, + HTTPRequestError, +) from .decorators import on, expose, pre, post # NOQA: F401 from .server import run # NOQA: F401 from .model import Service # NOQA: F401