diff --git a/src/py/extra/__init__.py b/src/py/extra/__init__.py index 4093aa0..4260fcb 100644 --- a/src/py/extra/__init__.py +++ b/src/py/extra/__init__.py @@ -1,9 +1,9 @@ -from .http.model import ( +from .http.model import ( # NOQA: F401 HTTPRequest, HTTPResponse, HTTPResponseLine, HTTPRequestError, -) # NOQA: F401 +) from .decorators import on, expose, pre, post # NOQA: F401 from .server import run # NOQA: F401 from .model import Service # NOQA: F401 diff --git a/src/py/extra/server.py b/src/py/extra/server.py index e1cd13b..99c6cb6 100644 --- a/src/py/extra/server.py +++ b/src/py/extra/server.py @@ -33,7 +33,15 @@ def onException( ) -> None: e = context.get("exception") if e: - exception(e) + if isinstance(e, asyncio.InvalidStateError): + # Known race condition between wait_for timeout and sock_accept + # callback. Log as warning rather than exception to avoid alarm. + warning( + "AsyncIO InvalidStateError (likely accept race)", + Error=str(e), + ) + else: + exception(e) class ServerOptions(NamedTuple): @@ -392,13 +400,48 @@ async def Serve( ) try: + # We use a persistent accept task instead of creating a new one each + # iteration via wait_for. This avoids the known race condition where + # wait_for cancels the sock_accept future but the internal _sock_accept + # callback still fires, causing InvalidStateError and leaking the + # accepted socket file descriptor. + accept_task: asyncio.Task | None = None while state.isRunning: if options.condition and not options.condition(): break try: - res = await asyncio.wait_for( - loop.sock_accept(server), timeout=options.polling or 1.0 + if accept_task is None: + accept_task = loop.create_task( + loop.sock_accept(server) + ) + + # asyncio.wait does NOT cancel the future on timeout, + # unlike asyncio.wait_for. The accept_task persists across + # timeout iterations until a connection actually arrives. + done, _ = await asyncio.wait( + {accept_task}, + timeout=options.polling or 1.0, ) + + if accept_task not in done: + # Timeout, no connection yet — loop back and reuse + # the same accept_task. + continue + + # Connection arrived (or error). Consume the result and + # clear accept_task so a new one is created next iteration. + try: + res = accept_task.result() + except OSError as e: + if e.errno == 24: + # Too many open files — backpressure + await asyncio.sleep(0.1) + else: + exception(e) + accept_task = None + continue + + accept_task = None if res is None: continue else: @@ -409,17 +452,22 @@ async def Serve( ) tasks.add(task) task.add_done_callback(tasks.discard) - except asyncio.TimeoutError: + except asyncio.InvalidStateError: + # Defensive: should not happen with the new pattern, + # but guard against it to prevent server crash. + warning("InvalidStateError in accept loop (recovered)") + accept_task = None continue - except OSError as e: - # This can be: [OSError] [Errno 24] Too many open files - if e.errno == 24: - # Implement backpressure or wait mechanism here - await asyncio.sleep(0.1) # Short delay before retrying - else: - exception(e) + except asyncio.CancelledError: + break finally: + if accept_task and not accept_task.done(): + accept_task.cancel() + try: + await accept_task + except (asyncio.CancelledError, Exception): + pass server.close() for task in tasks: task.cancel()