diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..85e9afe --- /dev/null +++ b/Dockerfile @@ -0,0 +1,33 @@ +# pull official base image +FROM python:3.11.3-alpine as app + +# set work directory +WORKDIR /usr/src/app + +# set environment variables +ENV PYTHONDONTWRITEBYTECODE 1 +ENV PYTHONUNBUFFERED 1 + +# install psycopg2 +RUN apk update \ + && apk add --virtual build-deps gcc python3-dev musl-dev \ + && apk add postgresql postgresql-contrib postgresql-dev \ + && pip install psycopg2 \ + && pip install -U pip setuptools wheel \ + && apk del build-deps + +RUN apk update && apk add python3-dev \ + gcc \ + libc-dev \ + libffi-dev + +# copy project +COPY . /usr/src/app/ + +# install dependencies +RUN pip install --upgrade pip +RUN pip install pipenv +RUN pipenv install + + +EXPOSE $APP_PORT diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..fdedaa2 --- /dev/null +++ b/Pipfile @@ -0,0 +1,15 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +pydantic = "2.6" +aiohttp = "3.9" +asyncpg = "0.29" + +[dev-packages] + +[requires] +python_version = "3.11" +python_full_version = "3.11" diff --git a/README.md b/README.md index b5086a9..a940ba1 100644 --- a/README.md +++ b/README.md @@ -1 +1,21 @@ -# Task Manager \ No newline at end of file +# Task Manager + +## Quick start +```bash +# build +docker-compose build --no-cache +``` +```bash +# up +docker-compose up +``` +It starts `synthetic_test.py` by default. + + +## Docker env example +- POSTGRES_USER=task_manager +- POSTGRES_PASS=task_manager_password +- POSTGRES_NAME=task_manager +- POSTGRES_HOST=postgres +- POSTGRES_PORT=5432 +- POSTGRES_HOST_AUTH_METHOD=trust \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..772c2c9 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,40 @@ +version: '3' + + +services: + app: +# restart: always + volumes: + - .:/usr/src/app/ + env_file: .env + build: + context: . + dockerfile: Dockerfile + + depends_on: + postgres: + condition: service_healthy + links: + - postgres + command: pipenv run python3 junk/synthetic_test.py asyncpg_storage + + postgres: + image: postgres:15-alpine + container_name: "postgres" + volumes: + - .:/usr/src/app/ + - postgres_data:/var/lib/postgresql/data + env_file: + - .env + ports: + - "5432:5432" + working_dir: /usr/src/app/ + healthcheck: + test: [ "CMD-SHELL", "pg_isready -U task_manager" ] + interval: 1s + timeout: 10s + retries: 0 + +volumes: + .: {} + postgres_data: {} \ No newline at end of file diff --git a/junk/synthetic_test.py b/junk/synthetic_test.py index c2247fe..24d957c 100644 --- a/junk/synthetic_test.py +++ b/junk/synthetic_test.py @@ -1,12 +1,53 @@ import asyncio +import os +import sys +from pathlib import Path +import argparse + +# resolving import problem +parent_dir = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(str(Path(parent_dir).resolve().parents[0])) from task_manager.core import TaskManager from task_manager.core import SessionClosedException -from task_manager.storage import InMemoryStorage +from task_manager.core.storage import StorageInterface + + +class StorageType: + IN_MEMORY_STORAGE = 'in_memory_storage' + ASYNCPG_STORAGE = 'asyncpg_storage' + + @staticmethod + def choices(): + return [ + StorageType.IN_MEMORY_STORAGE, + StorageType.ASYNCPG_STORAGE + ] + + +parser = argparse.ArgumentParser() -storage = InMemoryStorage() +parser.add_argument( + "storage_type", + default=StorageType.IN_MEMORY_STORAGE, + choices=StorageType.choices(), + help="Storage type for synthetic test", +) + +args = parser.parse_args() + +if args.storage_type == StorageType.IN_MEMORY_STORAGE: + from task_manager.storage import InMemoryStorage as storage +elif args.storage_type == StorageType.ASYNCPG_STORAGE: + from task_manager.storage import AsyncPGStorage as storage +else: + exit('Invalid storage type') + +storage: StorageInterface = storage() engine = TaskManager(storage) +loop = asyncio.get_event_loop() + def make_consumer(name, max_messages): async def consumer(): @@ -16,14 +57,15 @@ async def consumer(): task = await session.consume_task(['test']) print(f"consumer '{name}' got task '{task.idn}' with payload {task.payload} in '{task.topic}'") await asyncio.sleep(3) + await engine.finish_task(task.idn, None) print(f"consumer '{name}' finished task '{task.idn}' with payload {task.payload} in '{task.topic}'") return consumer() async def main(): - asyncio.create_task(make_consumer("1 message", 1)) - asyncio.create_task(make_consumer("permanent", 100)) + loop.create_task(make_consumer("permanent", 100)) + loop.create_task(make_consumer("1 message", 1)) await asyncio.sleep(0.1) @@ -56,18 +98,19 @@ async def main(): await publisher.publish_task('test', {"id": 5}) await asyncio.sleep(1) - asyncio.create_task(make_consumer('new', 100)) + loop.create_task(make_consumer('new', 100)) await asyncio.sleep(3.5) # test session closing session = engine.new_session() + async def close_session(): await asyncio.sleep(1) print('clossing session') session.close() - asyncio.create_task(close_session()) + loop.create_task(close_session()) try: print('waiting for new task') task = await session.consume_task(['test']) @@ -76,4 +119,5 @@ async def close_session(): await asyncio.sleep(2) -asyncio.run(main()) + +loop.run_until_complete(main()) diff --git a/task_manager/core/manager.py b/task_manager/core/manager.py index 6366967..0f3708f 100644 --- a/task_manager/core/manager.py +++ b/task_manager/core/manager.py @@ -22,7 +22,7 @@ def close_session(self, session: Session): async def create_task(self, queue, payload): return await self.storage.create_task(queue, payload) - async def finish_task(self, idn: str, error: None|str, message: str = ''): + async def finish_task(self, idn: str, error: None | str, message: str = ''): await self.storage.finish_task(idn, error, message) async def take_pending_task(self, topics: list[str]): diff --git a/task_manager/core/storage.py b/task_manager/core/storage.py index 3ad80d4..748493e 100644 --- a/task_manager/core/storage.py +++ b/task_manager/core/storage.py @@ -1,13 +1,6 @@ -from dataclasses import dataclass from typing import Callable, Awaitable, Any, TypeVar, Generic - -@dataclass -class ConsumedTask: - idn: str - topic: str - payload: dict - +from task_manager.core.tasks import ConsumedTask OnTaskCallback = Callable[[str], Awaitable[Any]] T = TypeVar('T') @@ -28,7 +21,7 @@ class StorageInterface: async def create_task(self, queue, payload) -> str: raise NotImplemented() - async def finish_task(self, idn: str, error: None|str = None, message: str = ''): + async def finish_task(self, idn: str, error: None | str = None, message: str = ''): raise NotImplemented() async def take_pending(self, idn) -> TransactionalResult[ConsumedTask] | None: diff --git a/task_manager/core/tasks.py b/task_manager/core/tasks.py new file mode 100644 index 0000000..50669d4 --- /dev/null +++ b/task_manager/core/tasks.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass + + +@dataclass +class ConsumedTask: + idn: str + topic: str + payload: dict diff --git a/task_manager/core/utils.py b/task_manager/core/utils.py index ca431d4..c6387a2 100644 --- a/task_manager/core/utils.py +++ b/task_manager/core/utils.py @@ -1,4 +1,6 @@ import asyncio +import json +from uuid import UUID class TaskWaiter: @@ -8,3 +10,11 @@ def __init__(self, topics: list[str]): async def wait(self): return await asyncio.wait_for(self.result, None) + + +class UUIDEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, UUID): + # if the obj is uuid, we simply return the value of uuid + return str(obj) + return json.JSONEncoder.default(self, obj) diff --git a/task_manager/storage/__init__.py b/task_manager/storage/__init__.py index bc3c528..4b45b7b 100644 --- a/task_manager/storage/__init__.py +++ b/task_manager/storage/__init__.py @@ -1 +1,2 @@ -from .in_memory import InMemoryStorage \ No newline at end of file +from .in_memory import InMemoryStorage +from .asyncpg_storage import AsyncPGStorage \ No newline at end of file diff --git a/task_manager/storage/asyncpg_storage.py b/task_manager/storage/asyncpg_storage.py new file mode 100644 index 0000000..a489e78 --- /dev/null +++ b/task_manager/storage/asyncpg_storage.py @@ -0,0 +1,295 @@ +import asyncio +import json +import os +from pathlib import Path +import uuid +from typing import Optional, Tuple, List + +import asyncpg +from asyncpg import Connection, Pool +from asyncpg.transaction import Transaction + +from task_manager.core.tasks import ConsumedTask +from task_manager.core.storage import ( + StorageInterface, OnTaskCallback, TransactionalResult +) +from task_manager.storage.tasks import Task, TaskStatus, create_task_from_dict +from task_manager.storage.utils import generator + + +class AsyncPGConsumedTaskResult(TransactionalResult[ConsumedTask]): + def __init__( + self, + task: Task, + transaction: Transaction + ): + self.task = task + self.transaction = transaction + + async def get_data(self) -> Task: + # TODO another returning dataclass impl ? + # TODO I think current Task impl not bed + return self.task + + async def commit(self): + await self.transaction.commit() + + async def rollback(self): + await self.transaction.rollback() + + +class AsyncPGStorage(StorageInterface): + def __init__(self, subscription_channel: str = 'tasks'): + self.pool: Optional[Pool] = None + self.subscription_channel = subscription_channel + self.subscribe_conn: Optional[Connection] = None + self.connection_data = { + "user": os.environ.get("POSTGRES_USER", "postgres"), + "password": os.environ.get("POSTGRES_PASS", "task_manager_password"), + "database": os.environ.get("POSTGRES_NAME", "task_manager"), + "host": os.environ.get("POSTGRES_HOST", "127.0.0.1"), + "port": os.environ.get("POSTGRES_PORT", "5432"), + } + loop = asyncio.get_event_loop() + loop.run_until_complete( + self.create_connection() + ) + loop.run_until_complete( + self.__create_schema() + ) + loop.run_until_complete( + self.create_subscriber() + ) + + self.on_task_callbacks: List[OnTaskCallback] = [] + + async def __create_schema(self): + project_dir = str(Path( + os.path.dirname(os.path.realpath(__file__)) + ).resolve().parents[1]) + + sql_migrations_fir_path = os.path.join( + project_dir, 'task_manager', 'storage', 'resources', 'sql_migrations' + ) + files = [] + + # async loop is noway + for migration_file in os.listdir(sql_migrations_fir_path): + file = open( + os.path.join( + sql_migrations_fir_path, + migration_file + ) + ) + files.append(file.read()) + file.close() + + async with self.pool.acquire() as connection: + connection: Connection + + await connection.execute( + "\r".join(files) + ) + + async def __subscription_callback( + self, + connection: Connection, + pid: int, + channel: str, + payload: str + ): + """ + :param callable callback: + A callable or a coroutine function receiving the following + arguments: + **connection**: a Connection the callback is registered with; + **pid**: PID of the Postgres server that sent the notification; + **channel**: name of the channel the notification was sent to; + **payload**: task idn (str[uuid]). + :return: + """ + + async for callback in generator(self.on_task_callbacks): + print(f"callback from postgres: {payload}") + await callback(payload) + + async def create_subscriber(self): + + await self.subscribe_conn.add_listener( + channel=self.subscription_channel, + callback=self.__subscription_callback + ) + + async def create_connection(self): + if not self.pool or self.pool.is_closing(): + self.pool = await asyncpg.create_pool( + **self.connection_data, + max_size=10 + ) + if not self.subscribe_conn or self.subscribe_conn.is_closed(): + self.subscribe_conn = await asyncpg.connect( + **self.connection_data + ) + + async def close_connection(self): + if self.subscribe_conn: + await self.pool.release(self.subscribe_conn) + await self.subscribe_conn.close() + + await self.pool.close() if self.pool else None + + async def __create_transaction(self) -> Tuple[Transaction, Connection]: + """ + for transfer transaction control flow + to the Session -> AsyncPGConsumedTaskResult -> (commit/rollback) + """ + conn: Connection = await self.pool.acquire() + transaction: Transaction = conn.transaction() + await transaction.start() + + return transaction, conn + + async def create_task(self, queue, payload) -> str: + """ + idn UUID PRIMARY KEY, + topic VARCHAR (50) NOT NULL, + payload JSON NOT NULL, + status int NOT NULL, + error VARCHAR (255) NULL, + description VARCHAR (255) NULL + """ + idn = str(uuid.uuid4()) + q = "INSERT INTO tasks (" \ + "idn, topic, payload, status" \ + ") VALUES (" \ + f"'{idn}', '{queue}', '{json.dumps(payload)}', {TaskStatus.NEW}" \ + ")" + + async with self.pool.acquire() as connection: + connection: Connection + await connection.execute(q) + + if self.on_task_callbacks: + notify_q = f"NOTIFY {self.subscription_channel}, '{idn}';" + await connection.execute(notify_q) + + return idn + + async def take_pending(self, idn) -> TransactionalResult[ConsumedTask] | None: + """ + idn UUID PRIMARY KEY, + topic VARCHAR (50) NOT NULL, + payload JSON NOT NULL, + status int NOT NULL, + error VARCHAR (255) NULL, + description VARCHAR (255) NULL + """ + + transaction, conn = await self.__create_transaction() + + q = ( + "WITH selected_for_update as (" + "SELECT * FROM tasks " + f"WHERE (idn='{idn}' and status={TaskStatus.NEW}) " + "FOR UPDATE" + ") " + "UPDATE tasks " + f"SET status = {TaskStatus.IN_PROGRESS} " + f"FROM selected_for_update " + f"WHERE (tasks.idn=selected_for_update.idn) " + f"RETURNING * ;" + ) + + rec: asyncpg.Record = await conn.fetchrow(q) + + if rec: + task = create_task_from_dict(dict(rec)) + return AsyncPGConsumedTaskResult( + task, + transaction + ) + await transaction.rollback() + await self.pool.release(conn) + + return None + + async def take_first_pending(self, topics: list[str]) -> TransactionalResult[ConsumedTask] | None: + """ + idn UUID PRIMARY KEY, + topic VARCHAR (50) NOT NULL, + payload JSON NOT NULL, + status int NOT NULL, + error VARCHAR (255) NULL, + description VARCHAR (255) NULL + """ + + transaction, conn = await self.__create_transaction() + + q = ( + "WITH selected_for_update as (" + "SELECT * FROM tasks " + "WHERE (" + f"status = {TaskStatus.NEW} and " + f"topic in ({str([f'{i}' for i in topics])[1:-1]})) " + "FOR UPDATE" + ") " + "UPDATE tasks " + f"SET status = {TaskStatus.IN_PROGRESS} " + f"FROM selected_for_update " + f"WHERE tasks.idn = selected_for_update.idn " + f"RETURNING * ;" + ) + + rec: asyncpg.Record = await conn.fetchrow(q) + + if rec: + task = create_task_from_dict(dict(rec)) + return AsyncPGConsumedTaskResult( + task, + transaction + ) + + await transaction.rollback() + await self.pool.release(conn) + + return None + + async def finish_task(self, idn: str, error: None | str = None, message: str = ''): + + bool_map = { + True: 'true', + False: 'false', + None: 'null' + } + + q = ( + "WITH selected_for_update as (" + "SELECT * FROM tasks " + f"WHERE idn = '{idn}' " + f"FOR UPDATE " + ") " + "UPDATE tasks " + f"SET status = {TaskStatus.FINISHED}, " + f"error = {bool_map[error]}, " + f"description = {message if message else bool_map[None]} " + f"FROM selected_for_update " + f"WHERE tasks.idn = selected_for_update.idn " + f"RETURNING *;" + ) + + async with self.pool.acquire() as connection: + connection: Connection + + async with connection.transaction() as transaction: + transaction: Transaction + + rec: asyncpg.Record = await connection.fetchrow(q) + + if not rec: + await transaction.rollback() + await self.pool.release(connection) + # todo: add exceptions inheritance level to interface and raise appropriate one + raise Exception(f"Can't finish task. Task '{idn}' not found in storage") + + def add_on_task_callback(self, callback: OnTaskCallback): + self.on_task_callbacks.append(callback) diff --git a/task_manager/storage/in_memory.py b/task_manager/storage/in_memory.py index 7a4e458..2d361c3 100644 --- a/task_manager/storage/in_memory.py +++ b/task_manager/storage/in_memory.py @@ -1,22 +1,11 @@ import asyncio import uuid -from dataclasses import dataclass -from task_manager.core.storage import StorageInterface, ConsumedTask, OnTaskCallback, TransactionalResult - -NEW = 0 -IN_PROGRESS = 1 -FINISHED = 2 - - -@dataclass -class Task: - idn: str - topic: str - payload: dict - status: int - error: None|str - description: str = '' +from task_manager.core.storage import ( + StorageInterface, TransactionalResult, + ConsumedTask, OnTaskCallback +) +from task_manager.storage.tasks import Task, TaskStatus class ConsumedTaskResult(TransactionalResult[ConsumedTask]): @@ -31,7 +20,7 @@ async def commit(self): self.lock.release() async def rollback(self): - self.task.status = NEW + self.task.status = TaskStatus.NEW self.lock.release() @@ -43,7 +32,7 @@ def __init__(self): self.on_task_callbacks = [] async def create_task(self, queue, payload) -> str: - task = Task(str(uuid.uuid4()), queue, payload, NEW, None) + task = Task(str(uuid.uuid4()), queue, payload, TaskStatus.NEW, None) self.tasks.append(task) for callback in self.on_task_callbacks: @@ -55,8 +44,8 @@ async def take_pending(self, idn) -> TransactionalResult[ConsumedTask] | None: await self.lock.acquire() for task in self.tasks: - if task.idn == idn and task.status == NEW: - task.status = IN_PROGRESS + if task.idn == idn and task.status == TaskStatus.NEW: + task.status = TaskStatus.IN_PROGRESS return ConsumedTaskResult( task, self.lock @@ -67,10 +56,10 @@ async def take_pending(self, idn) -> TransactionalResult[ConsumedTask] | None: async def take_first_pending(self, topics: list[str]) -> TransactionalResult[ConsumedTask] | None: await self.lock.acquire() - + for task in self.tasks: - if task.topic in topics and task.status == NEW: - task.status = IN_PROGRESS + if task.topic in topics and task.status == TaskStatus.NEW: + task.status = TaskStatus.IN_PROGRESS return ConsumedTaskResult( task, self.lock @@ -79,10 +68,10 @@ async def take_first_pending(self, topics: list[str]) -> TransactionalResult[Con self.lock.release() return None - async def finish_task(self, idn: str, error: None|str = None, message: str = ''): + async def finish_task(self, idn: str, error: None | str = None, message: str = ''): for task in self.tasks: if task.idn == idn: - task.status = FINISHED + task.status = TaskStatus.FINISHED task.error = error task.message = message return diff --git a/task_manager/storage/resources/sql_migrations/001_create_schema.sql b/task_manager/storage/resources/sql_migrations/001_create_schema.sql new file mode 100644 index 0000000..a7bf931 --- /dev/null +++ b/task_manager/storage/resources/sql_migrations/001_create_schema.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS tasks ( + idn UUID PRIMARY KEY, + topic VARCHAR (50) NOT NULL, + payload JSON NOT NULL, + status int NOT NULL, + error VARCHAR (255) NULL, + description VARCHAR (255) NULL +) diff --git a/task_manager/storage/tasks.py b/task_manager/storage/tasks.py new file mode 100644 index 0000000..631b631 --- /dev/null +++ b/task_manager/storage/tasks.py @@ -0,0 +1,24 @@ +import json +from dataclasses import dataclass + +from task_manager.core import ConsumedTask +from task_manager.core.utils import UUIDEncoder + + +class TaskStatus: + NEW = 0 + IN_PROGRESS = 1 + FINISHED = 2 + + +@dataclass +class Task(ConsumedTask): + status: int + error: None | str + description: str = '' + + +def create_task_from_dict(data: dict): + if isinstance(data.get('payload'), str): + data['payload'] = json.loads(data['payload']) + return Task(**json.loads(json.dumps(data, cls=UUIDEncoder))) diff --git a/task_manager/storage/utils.py b/task_manager/storage/utils.py new file mode 100644 index 0000000..0a66a47 --- /dev/null +++ b/task_manager/storage/utils.py @@ -0,0 +1,6 @@ +from typing import Iterable + + +async def generator(iterable: Iterable): + for i in iterable: + yield i