asyncpg storage (not done yet)#1
asyncpg storage (not done yet)#1codefather-labs wants to merge 8 commits intorequestum-team:masterfrom
Conversation
| # todo postgres impl | ||
| self.on_task_callbacks.append(callback) | ||
|
|
||
| # self.subscribe_conn.add_listener( |
task_manager/core/storage.py
Outdated
| raise NotImplemented() | ||
|
|
||
|
|
||
| class StorageType: |
There was a problem hiding this comment.
Core logic layer should not know about storage interface implementations. I would move this enum to cli command where it is used.
| notify_q = f"NOTIFY {self.subscription_channel}, '{idn}';" | ||
| await connection.execute(notify_q) | ||
|
|
||
| # FIXME what is this ? |
task_manager/core/tasks.py
Outdated
|
|
||
|
|
||
| @dataclass | ||
| class Task(ConsumedTask): |
There was a problem hiding this comment.
no one method from storage interface use this class, so it shouldn't be presented here
There was a problem hiding this comment.
replaced to task_manager.storage.tasks
| @@ -0,0 +1,21 @@ | |||
| from dataclasses import dataclass | |||
There was a problem hiding this comment.
if we want to move data types definitions to separate file, we should keep it in layer's package (task_manager.storage), for example we can consider next structure:
- task_manager.storage.interface.py - Storage interface declaration
- task_manager.datatypes.py - ConsumedTask and TransactionalResult declaration
- task_manager._init_.py - import correspond items from subpackages
...or just keep it as it is in a single file for now
There was a problem hiding this comment.
I think this will greatly disrupt the current structure. As I understand it, task_manager.core is a package of interfaces. I don’t see any point in transferring the storage interface from core to storages package, since this is an implementations package
|
|
||
|
|
||
| @dataclass | ||
| class AsyncPGTask(Task): |
There was a problem hiding this comment.
Not sure this class is required, can we just pass ConsumedTask to the TransactionalResult instance?
There was a problem hiding this comment.
We need the fields status, error, description, which are not in ConsumedTask
This means we need to either edit ConsumedTask, which is not correct from the point of view of solid, or modify it - inherit from it and extend it. Which is what was done.
I removed AsyncPGTask and replaced it with Task from which it was inherited
|
|
||
| await self.pool.close() if self.pool else None | ||
|
|
||
| async def __create_transaction(self) -> Tuple[Transaction, Connection]: |
There was a problem hiding this comment.
why don't we use AsyncPG context manager isntead of this method?
async with connection.transaction()
There was a problem hiding this comment.
Because some calls pass a transaction object along the stack. namely TransactionalResult has commit and rollback methods. In the AsyncPGConsumedTaskResult implementation, these methods commit or rollback transactions, respectively.
In this flow, if we use the context manager async with connection.transaction(), the transaction will be committed automatically immediately from the output of the context manager and the connection from the pool will be closed, even before calling the rollback and commit methods from AsyncPGConsumedTaskResult
There was a problem hiding this comment.
task-manager/task_manager/storage/asyncpg_storage.py
Lines 31 to 49 in 8c8b6fd
There was a problem hiding this comment.
we transfer control of the transaction object along the stack to the TaskManager
task-manager/task_manager/core/manager.py
Lines 28 to 43 in 8c8b6fd
There was a problem hiding this comment.
that's why we don't use a context manager here - we need to transfer control of the open transaction along the stack to the TaskManager and it then decides to commit it or do a rollback
| status: int | ||
| error: None|str | ||
| description: str = '' | ||
| from task_manager.core.storage import ( |
There was a problem hiding this comment.
this file should bot be changed by this PR
There was a problem hiding this comment.
Didn't understand. For what?
This is a ready-made InMemory implementation
test_postgres_storage.py
Outdated
| @@ -0,0 +1,86 @@ | |||
| import asyncio | |||
There was a problem hiding this comment.
Should it be removed? We already have a common test for both storage implementationsin this PR.
entrypoint.sh
Outdated
| @@ -0,0 +1,17 @@ | |||
| # DEPRECATED | |||
There was a problem hiding this comment.
I guess we can remove this entrypoint.
| @@ -0,0 +1,8 @@ | |||
| CREATE TABLE IF NOT EXISTS tasks ( | |||
There was a problem hiding this comment.
this file is related to postgresql storage implementation and should be placed in it's package resources folder.
No description provided.