From bce332a63a172c6a924e82b1160b96f0afc5fd41 Mon Sep 17 00:00:00 2001 From: "Daniel J. B. Clarke" Date: Mon, 9 Feb 2026 15:45:10 -0500 Subject: [PATCH] Prototype postgres-facilitated job queue dispatcher --- .../orchestration/pg_dispatcher/__init__.py | 60 +++++ appyter/orchestration/pg_dispatcher/core.py | 221 ++++++++++++++++++ .../orchestration/pg_dispatcher/socketio.py | 3 + pyproject.toml | 3 +- 4 files changed, 286 insertions(+), 1 deletion(-) create mode 100644 appyter/orchestration/pg_dispatcher/__init__.py create mode 100644 appyter/orchestration/pg_dispatcher/core.py create mode 100644 appyter/orchestration/pg_dispatcher/socketio.py diff --git a/appyter/orchestration/pg_dispatcher/__init__.py b/appyter/orchestration/pg_dispatcher/__init__.py new file mode 100644 index 00000000..d6e1b130 --- /dev/null +++ b/appyter/orchestration/pg_dispatcher/__init__.py @@ -0,0 +1,60 @@ +import logging +logger = logging.getLogger(__name__) + +from appyter.orchestration.cli import orchestration +from appyter.ext.click import click_option_setenv + +def create_app(**kwargs): + from aiohttp import web + # + from appyter.orchestration.dispatcher.core import core + from appyter.orchestration.dispatcher.socketio import socketio + from appyter.ext.urllib import join_slash + # + logger.info('Initializing aiohttp...') + config = dict( + DATABASE_URL=kwargs.get('database-url'), + HOST=kwargs.get('host'), + PORT=kwargs.get('port'), + PROXY=kwargs.get('proxy'), + JOBS=kwargs.get('jobs'), + JOBS_PER_IMAGE=kwargs.get('jobs_per_image'), + TIMEOUT=kwargs.get('timeout'), + DEBUG=kwargs.get('debug'), + PREFIX=kwargs.get('prefix', '').rstrip('/'), + ) + if config['PREFIX']: + app = web.Application() + app['config'] = core['config'] = config + # + logger.info('Registering prefix redirect') + async def redirect_to_prefix(request): + path = request.match_info['path'] + raise web.HTTPFound(join_slash(app['config']['PREFIX'], path) + '/') + app.router.add_get('/{path:[^/]*}', redirect_to_prefix) + app.add_subapp(config['PREFIX'], core) + else: + app = core + app['config'] = config + # + logger.info('Initializing socketio...') + socketio.attach(app, join_slash(app['config']['PREFIX'], 'socket.io')) + # + return app + +@orchestration.command(help='Run the appyter dispatcher') +@click_option_setenv('--database-url', envvar='APPYTER_DATABASE_URL', type=str, required=True, help='The postgres database url') +@click_option_setenv('--prefix', envvar='APPYTER_PREFIX', default='/', help='Specify the prefix for which to mount the webserver onto') +@click_option_setenv('--host', envvar='APPYTER_HOST', default='127.0.0.1', help='The host the flask server should run on') +@click_option_setenv('--port', envvar='APPYTER_PORT', type=int, default=5000, help='The port this flask server should run on') +@click_option_setenv('--proxy', envvar='APPYTER_PROXY', type=bool, default=False, help='Whether this is running behind a proxy and the IP should be fixed for CORS') +@click_option_setenv('--jobs', envvar='APPYTER_JOBS', type=int, default=2, help='Number of concurrent jobs to dispatch') +@click_option_setenv('--jobs-per-image', envvar='APPYTER_JOBS_PER_IMAGE', type=int, default=1, help='Number of concurrent jobs to dispatch for any individual appyter image') +@click_option_setenv('--timeout', envvar='APPYTER_TIMEOUT', type=float, default=None, help='How long in seconds a job can run before it is cancelled') +@click_option_setenv('--debug', envvar='APPYTER_DEBUG', type=bool, default=True, help='Whether or not we should be in debugging mode, not for use in multi-tenant situations') +def dispatcher(*args, **kwargs): + from appyter.ext.aiohttp import run_app + from appyter.ext.asyncio.event_loop import with_event_loop + with with_event_loop(): + app = create_app(**kwargs) + run_app(app, host=app['config']['HOST'], port=int(app['config']['PORT'])) diff --git a/appyter/orchestration/pg_dispatcher/core.py b/appyter/orchestration/pg_dispatcher/core.py new file mode 100644 index 00000000..807f7c70 --- /dev/null +++ b/appyter/orchestration/pg_dispatcher/core.py @@ -0,0 +1,221 @@ +import asyncio +import datetime +import logging +import json +import asyncpg +from aiohttp import web +logger = logging.getLogger(__name__) + +async def create_pool(DATABASE_URL): + pool = await asyncpg.create_pool( + DATABASE_URL, + min_size=1, + max_size=10, + command_timeout=10, + ) + try: + async with pool.acquire() as conn: + await conn.execute( + """ + CREATE TABLE job_queue ( + id BIGSERIAL PRIMARY KEY, + queue TEXT NOT NULL DEFAULT 'default', + payload JSONB NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + run_at TIMESTAMPTZ NOT NULL DEFAULT now(), + attempts INT NOT NULL DEFAULT 0, + max_attempts INT NOT NULL DEFAULT 3, + last_error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() + ); + """, + ) + await conn.execute( + """ + CREATE INDEX idx_job_queue_pending + ON job_queue (status, run_at, id) + WHERE status = 'pending'; + + CREATE INDEX idx_job_queue_running_queue + ON job_queue (queue) + WHERE status = 'running'; + + CREATE INDEX idx_job_queue_queue + ON job_queue (queue); + """, + ) + except Exception as e: + pass + return pool + +async def list(pool): + async with pool.acquire() as conn: + async for record in conn.cursor( + """ + SELECT * + FROM job_queue + WHERE status != 'done'; + """, + ): + yield record + +async def enqueue(pool, payload, queue="default", run_at=None): + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + INSERT INTO job_queue (queue, payload, run_at) + VALUES ($1, $2, COALESCE($3, now())) + RETURNING + id, + queue, + run_at, + ( + SELECT count(*) + FROM job_queue q + WHERE + q.queue = job_queue.queue + AND q.status = 'pending' + AND ( + q.run_at < job_queue.run_at + OR ( + q.run_at = job_queue.run_at + AND q.id < job_queue.id + ) + ) + ) AS pending_ahead + """, + queue, + payload, + run_at, + ) + return row["pending_ahead"] + + +async def dequeue(pool, max_per_image): + async with pool.acquire() as conn: + async with conn.transaction(): + row = await conn.fetchrow( + """ + WITH candidate AS ( + SELECT jq.id, jq.queue + FROM job_queue jq + WHERE + jq.status = 'pending' + AND jq.run_at <= now() + AND NOT EXISTS ( + SELECT 1 + FROM job_queue jq2 + WHERE + jq2.queue = jq.queue + AND jq2.status = 'running' + GROUP BY jq2.queue + HAVING count(*) >= $1 + ) + ORDER BY jq.id + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + UPDATE job_queue j + SET + status = 'running', + attempts = attempts + 1, + updated_at = now() + FROM candidate c + WHERE j.id = c.id + RETURNING + j.id, + j.queue, + j.payload + """, + max_per_image + ) + return row + +async def mark_done(pool, job_id): + async with pool.acquire() as conn: + await conn.execute( + """ + UPDATE job_queue + SET status = 'done', + updated_at = now() + WHERE id = $1 + """, + job_id, + ) + +async def mark_failed(pool, job_id, error): + async with pool.acquire() as conn: + await conn.execute( + """ + UPDATE job_queue + SET + status = CASE + WHEN attempts >= max_attempts THEN 'failed' + ELSE 'pending' + END, + last_error = $1, + updated_at = now(), + run_at = now() + interval '30 seconds' + WHERE id = $2 + """, + error, + job_id, + ) + +async def worker(pool, handler, max_per_queue=1, poll_interval=1): + while True: + job = await dequeue(pool, max_per_queue) + if job is None: + await asyncio.sleep(poll_interval) + continue + try: + await handler(job) + await mark_done(pool, job['id']) + except Exception as e: + await mark_failed(pool, job['id'], str(e)) + +async def handle_job(job): + from appyter.ext.emitter import url_to_emitter + from appyter.execspec.core import url_to_executor + payload = json.loads(job['payload']) + async with url_to_executor(payload['executor']) as executor: + async with url_to_emitter(payload['url']) as emitter: + logger.info(f"Dispatching job {job['id']}") + async for msg in executor._run(**payload): + await emitter(msg) + +core = web.Application() +routes = web.RouteTableDef() + +@core.cleanup_ctx.append +async def _(app): + app["db"] = await create_pool(app['config']['DATABASE_URL']) + app["worker_tasks"] = [ + asyncio.create_task(worker(app["db"], handle_job, max_per_queue=int(app['config']['JOBS_PER_IMAGE']))) + for _ in range(int(app['config']['JOBS'])) + ] + try: + yield + finally: + for task in app["worker_tasks"]: + task.cancel() + await app["db"].close() + +@routes.get('/') +async def on_get(request): + async with core['tasks'].lock: + return web.json_response(dict( + tasks=[record async for record in list(core['db'])], + ts=datetime.datetime.now().replace( + tzinfo=datetime.timezone.utc + ).isoformat(), + )) + +@routes.post('/') +async def on_submit(request): + job = await request.json() + qsize = await enqueue(core['db'], job, job.get('executor')) + return web.json_response(qsize) + +core.add_routes(routes) diff --git a/appyter/orchestration/pg_dispatcher/socketio.py b/appyter/orchestration/pg_dispatcher/socketio.py new file mode 100644 index 00000000..71a8bd34 --- /dev/null +++ b/appyter/orchestration/pg_dispatcher/socketio.py @@ -0,0 +1,3 @@ +from appyter.ext.socketio import AsyncServer + +socketio = AsyncServer(async_mode='aiohttp') diff --git a/pyproject.toml b/pyproject.toml index 61cd6c34..05f5b30d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ classifiers = [ "Framework :: Jupyter" ] readme = "README.md" -requires-python = ">=3.9" +requires-python = ">=3.9,<4" dependencies = [ "aiohttp (>=3.12.13,<4.0.0)", "aiohttp-wsgi (>=0.10.0,<0.11.0)", @@ -50,6 +50,7 @@ appyter_init = ["cookiecutter (>=2.6.0,<3.0.0)"] production = ["aiobotocore (>=2.23.0,<3.0.0)", "kubernetes (>=33.1.0,<34.0.0)", "s3fs (>=2025.5.1,<2026.0.0)", "supervisor (>=4.2.5,<5.0.0)"] docs = ["sphinx-mdinclude (<0.6.2)", "sphinx (<8)"] doc = [] +pg_orchestrator = ["asyncpg (>=0.31.0,<0.32.0)"] [tool.poetry]