Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# pull official base image
FROM python:3.11.3-alpine as app

# set work directory
WORKDIR /usr/src/app

# set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

# install psycopg2
RUN apk update \
&& apk add --virtual build-deps gcc python3-dev musl-dev \
&& apk add postgresql postgresql-contrib postgresql-dev \
&& pip install psycopg2 \
&& pip install -U pip setuptools wheel \
&& apk del build-deps

RUN apk update && apk add python3-dev \
gcc \
libc-dev \
libffi-dev

# copy project
COPY . /usr/src/app/

# install dependencies
RUN pip install --upgrade pip
RUN pip install pipenv
RUN pipenv install


EXPOSE $APP_PORT
15 changes: 15 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
pydantic = "2.6"
aiohttp = "3.9"
asyncpg = "0.29"

[dev-packages]

[requires]
python_version = "3.11"
python_full_version = "3.11"
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,21 @@
# Task Manager
# Task Manager

## Quick start
```bash
# build
docker-compose build --no-cache
```
```bash
# up
docker-compose up
```
It starts `synthetic_test.py` by default.


## Docker env example
- POSTGRES_USER=task_manager
- POSTGRES_PASS=task_manager_password
- POSTGRES_NAME=task_manager
- POSTGRES_HOST=postgres
- POSTGRES_PORT=5432
- POSTGRES_HOST_AUTH_METHOD=trust
40 changes: 40 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
version: '3'


services:
app:
# restart: always
volumes:
- .:/usr/src/app/
env_file: .env
build:
context: .
dockerfile: Dockerfile

depends_on:
postgres:
condition: service_healthy
links:
- postgres
command: pipenv run python3 junk/synthetic_test.py asyncpg_storage

postgres:
image: postgres:15-alpine
container_name: "postgres"
volumes:
- .:/usr/src/app/
- postgres_data:/var/lib/postgresql/data
env_file:
- .env
ports:
- "5432:5432"
working_dir: /usr/src/app/
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U task_manager" ]
interval: 1s
timeout: 10s
retries: 0

volumes:
.: {}
postgres_data: {}
58 changes: 51 additions & 7 deletions junk/synthetic_test.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,53 @@
import asyncio
import os
import sys
from pathlib import Path
import argparse

# resolving import problem
parent_dir = os.path.dirname(os.path.realpath(__file__))
sys.path.append(str(Path(parent_dir).resolve().parents[0]))

from task_manager.core import TaskManager
from task_manager.core import SessionClosedException
from task_manager.storage import InMemoryStorage
from task_manager.core.storage import StorageInterface


class StorageType:
IN_MEMORY_STORAGE = 'in_memory_storage'
ASYNCPG_STORAGE = 'asyncpg_storage'

@staticmethod
def choices():
return [
StorageType.IN_MEMORY_STORAGE,
StorageType.ASYNCPG_STORAGE
]


parser = argparse.ArgumentParser()

storage = InMemoryStorage()
parser.add_argument(
"storage_type",
default=StorageType.IN_MEMORY_STORAGE,
choices=StorageType.choices(),
help="Storage type for synthetic test",
)

args = parser.parse_args()

if args.storage_type == StorageType.IN_MEMORY_STORAGE:
from task_manager.storage import InMemoryStorage as storage
elif args.storage_type == StorageType.ASYNCPG_STORAGE:
from task_manager.storage import AsyncPGStorage as storage
else:
exit('Invalid storage type')

storage: StorageInterface = storage()
engine = TaskManager(storage)

loop = asyncio.get_event_loop()


def make_consumer(name, max_messages):
async def consumer():
Expand All @@ -16,14 +57,15 @@ async def consumer():
task = await session.consume_task(['test'])
print(f"consumer '{name}' got task '{task.idn}' with payload {task.payload} in '{task.topic}'")
await asyncio.sleep(3)
await engine.finish_task(task.idn, None)
print(f"consumer '{name}' finished task '{task.idn}' with payload {task.payload} in '{task.topic}'")

return consumer()


async def main():
asyncio.create_task(make_consumer("1 message", 1))
asyncio.create_task(make_consumer("permanent", 100))
loop.create_task(make_consumer("permanent", 100))
loop.create_task(make_consumer("1 message", 1))

await asyncio.sleep(0.1)

Expand Down Expand Up @@ -56,18 +98,19 @@ async def main():
await publisher.publish_task('test', {"id": 5})

await asyncio.sleep(1)
asyncio.create_task(make_consumer('new', 100))
loop.create_task(make_consumer('new', 100))

await asyncio.sleep(3.5)

# test session closing
session = engine.new_session()

async def close_session():
await asyncio.sleep(1)
print('clossing session')
session.close()

asyncio.create_task(close_session())
loop.create_task(close_session())
try:
print('waiting for new task')
task = await session.consume_task(['test'])
Expand All @@ -76,4 +119,5 @@ async def close_session():

await asyncio.sleep(2)

asyncio.run(main())

loop.run_until_complete(main())
2 changes: 1 addition & 1 deletion task_manager/core/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def close_session(self, session: Session):
async def create_task(self, queue, payload):
return await self.storage.create_task(queue, payload)

async def finish_task(self, idn: str, error: None|str, message: str = ''):
async def finish_task(self, idn: str, error: None | str, message: str = ''):
await self.storage.finish_task(idn, error, message)

async def take_pending_task(self, topics: list[str]):
Expand Down
11 changes: 2 additions & 9 deletions task_manager/core/storage.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
from dataclasses import dataclass
from typing import Callable, Awaitable, Any, TypeVar, Generic


@dataclass
class ConsumedTask:
idn: str
topic: str
payload: dict

from task_manager.core.tasks import ConsumedTask

OnTaskCallback = Callable[[str], Awaitable[Any]]
T = TypeVar('T')
Expand All @@ -28,7 +21,7 @@ class StorageInterface:
async def create_task(self, queue, payload) -> str:
raise NotImplemented()

async def finish_task(self, idn: str, error: None|str = None, message: str = ''):
async def finish_task(self, idn: str, error: None | str = None, message: str = ''):
raise NotImplemented()

async def take_pending(self, idn) -> TransactionalResult[ConsumedTask] | None:
Expand Down
8 changes: 8 additions & 0 deletions task_manager/core/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dataclasses import dataclass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to move data types definitions to separate file, we should keep it in layer's package (task_manager.storage), for example we can consider next structure:

  • task_manager.storage.interface.py - Storage interface declaration
  • task_manager.datatypes.py - ConsumedTask and TransactionalResult declaration
  • task_manager._init_.py - import correspond items from subpackages

...or just keep it as it is in a single file for now

Copy link
Author

@codefather-labs codefather-labs Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will greatly disrupt the current structure. As I understand it, task_manager.core is a package of interfaces. I don’t see any point in transferring the storage interface from core to storages package, since this is an implementations package



@dataclass
class ConsumedTask:
idn: str
topic: str
payload: dict
10 changes: 10 additions & 0 deletions task_manager/core/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import json
from uuid import UUID


class TaskWaiter:
Expand All @@ -8,3 +10,11 @@ def __init__(self, topics: list[str]):

async def wait(self):
return await asyncio.wait_for(self.result, None)


class UUIDEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, UUID):
# if the obj is uuid, we simply return the value of uuid
return str(obj)
return json.JSONEncoder.default(self, obj)
3 changes: 2 additions & 1 deletion task_manager/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .in_memory import InMemoryStorage
from .in_memory import InMemoryStorage
from .asyncpg_storage import AsyncPGStorage
Loading