diff --git a/tplus/evm/managers/settle.py b/tplus/evm/managers/settle.py index 8beb132..78ecb9d 100644 --- a/tplus/evm/managers/settle.py +++ b/tplus/evm/managers/settle.py @@ -126,11 +126,12 @@ def decrypt_settlement_approval_message( SettlementApproval: The decrypted approval dictionary, or None if decryption/parsing fails. """ user = user or self.default_user + key = "approval" try: - encrypted_data = message["encrypted_data"] + encrypted_data = message[key] except KeyError as err: - self.logger.warning(f"Missing expected key 'encrypted_data' in approval message: {err}") + self.logger.warning(f"Missing expected key '{key}' in approval message: {err}") return None try: @@ -216,10 +217,9 @@ async def init_settlement( ) approval_task: asyncio.Task | None = None + handler: SettlementApprovalHandler | None = None if on_approved or then_execute: - handler = SettlementApprovalHandler(self) - if then_execute: async def _internal_on_approved(info, approval): @@ -229,18 +229,17 @@ async def _internal_on_approved(info, approval): else: effective_callback = on_approved - async def approval_handling_task_fn(): - try: - async with asyncio.timeout(12): - await handler.handle_approvals( - on_approval_received=effective_callback, - stop_at=1, - pending_settlements={expected_nonce: settlement_info}, - ) - except TimeoutError: - self.logger.info("Approval handler timed out") + handler = SettlementApprovalHandler(self) - approval_task = asyncio.create_task(approval_handling_task_fn()) + approval_task = asyncio.create_task( + handler.handle_approvals( + on_approval_received=effective_callback, + stop_at=1, + pending_settlements={expected_nonce: settlement_info}, + ) + ) + + await handler.wait_until_subscribed() self._approval_handling_tasks.setdefault(user.public_key, {}) self._approval_handling_tasks[user.public_key][expected_nonce] = approval_task @@ -356,65 +355,59 @@ def __init__( self.settlement_manager = settlement_manager self.logger = settlement_manager.logger self.on_approval_received = None + self._subscribed_event = asyncio.Event() + + async def wait_until_subscribed(self): + """Wait until the approval stream subscription is active.""" + await self._subscribed_event.wait() async def handle_approvals( self, - on_approval_received: ( - "Callable[[SettlementInfo | None, SettlementApproval], Awaitable[None] | None] | None" - ) = None, + on_approval_received=None, pending_settlements: dict[int, SettlementInfo] | None = None, stop_at: int | None = None, user: "UserPublicKey | None" = None, ) -> None: - """ - Continuously listen for settlement approvals and match them with pending settlements. - - Args: - on_approval_received: Optional callback function that will be called when an approval - is received. Called with (settlement_info, approval_dict). If not provided, - approvals will just be logged. - pending_settlements: Dictionary mapping nonce -> SettlementInfo for settlements - waiting for approval. Approved settlements will be removed from this dict, if given. - To handle any settlement regardless, pass ``None`` or leave as default. - stop_at: The amount of approvals to handle before stopping. - user: Specify the user. Defaults to the default user. - """ user = user or self.settlement_manager.default_user.public_key self.logger.info(f"Starting approval handler for user {user}") + + pending_settlements = pending_settlements or {} amount_handled = 0 try: - async for message in self.settlement_manager.ce.settlements.stream_approvals(user): + stream = self.settlement_manager.ce.settlements.stream_approvals(user) + ait = stream.__aiter__() + pending_first = asyncio.create_task(ait.__anext__()) + self._subscribed_event.set() + + try: + message = await pending_first + except StopAsyncIteration: + return + + while True: approval = self.settlement_manager.decrypt_settlement_approval_message(message) - if approval is None: - continue - - nonce = approval.inner.nonce - pending_settlements = pending_settlements or {} - settlement_info = pending_settlements.get(nonce) - if settlement_info is None: - self.logger.debug(f"Received approval for unknown nonce {nonce}, ignoring") - continue - - else: - self.logger.info(f"Received approval for nonce {nonce}") - del pending_settlements[nonce] - - callback = on_approval_received or self.on_approval_received - if callback: - try: - result = callback(settlement_info, approval) - if asyncio.iscoroutine(result): - await result - except Exception as err: - self.logger.error( - f"Error in on_approval_received callback for nonce {nonce}: {err}", - exc_info=True, - ) - - amount_handled += 1 - if stop_at is not None and amount_handled >= stop_at: - break + if approval: + nonce = approval.inner.nonce + settlement_info = pending_settlements.get(nonce) + + if settlement_info is None: + self.logger.debug(f"Received approval for unknown nonce {nonce}, ignoring") + else: + self.logger.info(f"Received approval for nonce {nonce}") + del pending_settlements[nonce] + + callback = on_approval_received or self.on_approval_received + if callback: + result = callback(settlement_info, approval) + if asyncio.iscoroutine(result): + await result + + amount_handled += 1 + if stop_at and amount_handled >= stop_at: + break + + message = await ait.__anext__() except asyncio.TimeoutError: self.logger.info("Approval handler timed out")