Skip to content

asyncpg storage (not done yet)#1

Open
codefather-labs wants to merge 8 commits intorequestum-team:masterfrom
codefather-labs:master
Open

asyncpg storage (not done yet)#1
codefather-labs wants to merge 8 commits intorequestum-team:masterfrom
codefather-labs:master

Conversation

@codefather-labs
Copy link

No description provided.

# todo postgres impl
self.on_task_callbacks.append(callback)

# self.subscribe_conn.add_listener(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

raise NotImplemented()


class StorageType:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Core logic layer should not know about storage interface implementations. I would move this enum to cli command where it is used.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced

notify_q = f"NOTIFY {self.subscription_channel}, '{idn}';"
await connection.execute(notify_q)

# FIXME what is this ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed



@dataclass
class Task(ConsumedTask):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no one method from storage interface use this class, so it shouldn't be presented here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced to task_manager.storage.tasks

@@ -0,0 +1,21 @@
from dataclasses import dataclass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to move data types definitions to separate file, we should keep it in layer's package (task_manager.storage), for example we can consider next structure:

  • task_manager.storage.interface.py - Storage interface declaration
  • task_manager.datatypes.py - ConsumedTask and TransactionalResult declaration
  • task_manager._init_.py - import correspond items from subpackages

...or just keep it as it is in a single file for now

Copy link
Author

@codefather-labs codefather-labs Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will greatly disrupt the current structure. As I understand it, task_manager.core is a package of interfaces. I don’t see any point in transferring the storage interface from core to storages package, since this is an implementations package



@dataclass
class AsyncPGTask(Task):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this class is required, can we just pass ConsumedTask to the TransactionalResult instance?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the fields status, error, description, which are not in ConsumedTask

This means we need to either edit ConsumedTask, which is not correct from the point of view of solid, or modify it - inherit from it and extend it. Which is what was done.

I removed AsyncPGTask and replaced it with Task from which it was inherited


await self.pool.close() if self.pool else None

async def __create_transaction(self) -> Tuple[Transaction, Connection]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we use AsyncPG context manager isntead of this method?
async with connection.transaction()

Copy link
Author

@codefather-labs codefather-labs Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because some calls pass a transaction object along the stack. namely TransactionalResult has commit and rollback methods. In the AsyncPGConsumedTaskResult implementation, these methods commit or rollback transactions, respectively.

In this flow, if we use the context manager async with connection.transaction(), the transaction will be committed automatically immediately from the output of the context manager and the connection from the pool will be closed, even before calling the rollback and commit methods from AsyncPGConsumedTaskResult

Copy link
Author

@codefather-labs codefather-labs Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class AsyncPGConsumedTaskResult(TransactionalResult[ConsumedTask]):
def __init__(
self,
task: AsyncPGTask,
lock: Transaction
):
self.task = task
self.lock = lock
async def get_data(self) -> AsyncPGTask:
# TODO another returning dataclass impl ?
# TODO I think current AsyncPGTask impl not bed
return self.task
async def commit(self):
await self.lock.commit()
async def rollback(self):
await self.lock.rollback()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we transfer control of the transaction object along the stack to the TaskManager

async def take_pending_task(self, topics: list[str]):
if transaction := await self.storage.take_first_pending(topics):
await transaction.commit()
return await transaction.get_data()
return None
async def _on_new_task(self, idn: str):
if not (transaction := await self.storage.take_pending(idn)):
return
for session in self.sessions:
if session.deliver_task(await transaction.get_data()):
await transaction.commit()
return
await transaction.rollback()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's why we don't use a context manager here - we need to transfer control of the open transaction along the stack to the TaskManager and it then decides to commit it or do a rollback

status: int
error: None|str
description: str = ''
from task_manager.core.storage import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file should bot be changed by this PR

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't understand. For what?
This is a ready-made InMemory implementation

@@ -0,0 +1,86 @@
import asyncio
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be removed? We already have a common test for both storage implementationsin this PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

entrypoint.sh Outdated
@@ -0,0 +1,17 @@
# DEPRECATED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can remove this entrypoint.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@@ -0,0 +1,8 @@
CREATE TABLE IF NOT EXISTS tasks (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file is related to postgresql storage implementation and should be placed in it's package resources folder.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants