Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 54 additions & 61 deletions tplus/evm/managers/settle.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,12 @@ def decrypt_settlement_approval_message(
SettlementApproval: The decrypted approval dictionary, or None if decryption/parsing fails.
"""
user = user or self.default_user
key = "approval"

try:
encrypted_data = message["encrypted_data"]
encrypted_data = message[key]
except KeyError as err:
self.logger.warning(f"Missing expected key 'encrypted_data' in approval message: {err}")
self.logger.warning(f"Missing expected key '{key}' in approval message: {err}")
return None

try:
Expand Down Expand Up @@ -216,10 +217,9 @@ async def init_settlement(
)

approval_task: asyncio.Task | None = None
handler: SettlementApprovalHandler | None = None

if on_approved or then_execute:
handler = SettlementApprovalHandler(self)

if then_execute:

async def _internal_on_approved(info, approval):
Expand All @@ -229,18 +229,17 @@ async def _internal_on_approved(info, approval):
else:
effective_callback = on_approved

async def approval_handling_task_fn():
try:
async with asyncio.timeout(12):
await handler.handle_approvals(
on_approval_received=effective_callback,
stop_at=1,
pending_settlements={expected_nonce: settlement_info},
)
except TimeoutError:
self.logger.info("Approval handler timed out")
handler = SettlementApprovalHandler(self)

approval_task = asyncio.create_task(approval_handling_task_fn())
approval_task = asyncio.create_task(
handler.handle_approvals(
on_approval_received=effective_callback,
stop_at=1,
pending_settlements={expected_nonce: settlement_info},
)
)

await handler.wait_until_subscribed()

self._approval_handling_tasks.setdefault(user.public_key, {})
self._approval_handling_tasks[user.public_key][expected_nonce] = approval_task
Expand Down Expand Up @@ -356,65 +355,59 @@ def __init__(
self.settlement_manager = settlement_manager
self.logger = settlement_manager.logger
self.on_approval_received = None
self._subscribed_event = asyncio.Event()

async def wait_until_subscribed(self):
"""Wait until the approval stream subscription is active."""
await self._subscribed_event.wait()

async def handle_approvals(
self,
on_approval_received: (
"Callable[[SettlementInfo | None, SettlementApproval], Awaitable[None] | None] | None"
) = None,
on_approval_received=None,
pending_settlements: dict[int, SettlementInfo] | None = None,
stop_at: int | None = None,
user: "UserPublicKey | None" = None,
) -> None:
"""
Continuously listen for settlement approvals and match them with pending settlements.

Args:
on_approval_received: Optional callback function that will be called when an approval
is received. Called with (settlement_info, approval_dict). If not provided,
approvals will just be logged.
pending_settlements: Dictionary mapping nonce -> SettlementInfo for settlements
waiting for approval. Approved settlements will be removed from this dict, if given.
To handle any settlement regardless, pass ``None`` or leave as default.
stop_at: The amount of approvals to handle before stopping.
user: Specify the user. Defaults to the default user.
"""
user = user or self.settlement_manager.default_user.public_key
self.logger.info(f"Starting approval handler for user {user}")

pending_settlements = pending_settlements or {}
amount_handled = 0

try:
async for message in self.settlement_manager.ce.settlements.stream_approvals(user):
stream = self.settlement_manager.ce.settlements.stream_approvals(user)
ait = stream.__aiter__()
pending_first = asyncio.create_task(ait.__anext__())
self._subscribed_event.set()

try:
message = await pending_first
except StopAsyncIteration:
return

while True:
approval = self.settlement_manager.decrypt_settlement_approval_message(message)
if approval is None:
continue

nonce = approval.inner.nonce
pending_settlements = pending_settlements or {}
settlement_info = pending_settlements.get(nonce)
if settlement_info is None:
self.logger.debug(f"Received approval for unknown nonce {nonce}, ignoring")
continue

else:
self.logger.info(f"Received approval for nonce {nonce}")
del pending_settlements[nonce]

callback = on_approval_received or self.on_approval_received
if callback:
try:
result = callback(settlement_info, approval)
if asyncio.iscoroutine(result):
await result
except Exception as err:
self.logger.error(
f"Error in on_approval_received callback for nonce {nonce}: {err}",
exc_info=True,
)

amount_handled += 1
if stop_at is not None and amount_handled >= stop_at:
break
if approval:
nonce = approval.inner.nonce
settlement_info = pending_settlements.get(nonce)

if settlement_info is None:
self.logger.debug(f"Received approval for unknown nonce {nonce}, ignoring")
else:
self.logger.info(f"Received approval for nonce {nonce}")
del pending_settlements[nonce]

callback = on_approval_received or self.on_approval_received
if callback:
result = callback(settlement_info, approval)
if asyncio.iscoroutine(result):
await result

amount_handled += 1
if stop_at and amount_handled >= stop_at:
break

message = await ait.__anext__()

except asyncio.TimeoutError:
self.logger.info("Approval handler timed out")