Skip to content

fix(realtime): make AsyncRealtimeChannel.subscribe() await server acknowledgement#1394

Open
kelvinvelasquez-SDE wants to merge 4 commits intosupabase:mainfrom
kelvinvelasquez-SDE:fix/subscribe-await-ack
Open

fix(realtime): make AsyncRealtimeChannel.subscribe() await server acknowledgement#1394
kelvinvelasquez-SDE wants to merge 4 commits intosupabase:mainfrom
kelvinvelasquez-SDE:fix/subscribe-await-ack

Conversation

@kelvinvelasquez-SDE
Copy link

@kelvinvelasquez-SDE kelvinvelasquez-SDE commented Feb 11, 2026

🚀 Description

This PR addresses a critical race condition in the realtime-py client where AsyncRealtimeChannel.subscribe() would return immediately without waiting for the server's acknowledgement (ACK).

The Problem

Previously, await channel.subscribe() initiated the subscription request generally but did not await confirmation. This meant that subsequent operations (like broadcasting a message) could fail or be dropped if executed immediately after "subscribing", because the channel was not yet actually joined on the server side.

The Solution

I have enhanced subscribe() to be fully awaitable and blocking (asynchronously) until a definitive response is received from the server.

Key Implementation Details:

  • AsyncIO Future: Introduced an internal asyncio.Future that tracks the subscription state.
  • Event Handling: The future resolves on SUBSCRIBED and rejects on CHANNEL_ERROR or TIMED_OUT.
  • Timeout Control: Added an optional timeout parameter to subscribe(), allowing callers to override the default socket timeout.
  • Backward Compatibility: The existing callback parameter is still supported and functions as before, ensuring no breaking changes for users relying on the callback pattern.

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update

🔗 Related Issue

Fixes #1209

📸 Screenshots (if UI related)

N/A

✅ Checklist:

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation (Docstrings updated)
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes

🧪 Testing Strategy

  • Unit Tests: Added tests/test_issue_1209.py covering:
    • Successful subscription logic.
    • Error handling (server replies with error).
    • Timeout handling (server fails to reply).
  • Regression Tests: Updated tests/test_connection.py to ensure existing tests correctly simulate the server ACK, preventing regressions in the test suite.

Summary by CodeRabbit

  • New Features

    • Subscriptions can accept an optional timeout and now wait for server confirmation and binding reconciliation before completing.
  • Bug Fixes

    • Clearer timeout handling with descriptive TimeoutError and robust propagation of subscription errors.
    • Stronger validation and handling of server/client binding mismatches to avoid silent state inconsistencies.
  • Tests

    • New and updated tests covering subscribe success, error, timeout, and simulated server replies.

@coderabbitai
Copy link

coderabbitai bot commented Feb 11, 2026

📝 Walkthrough

Walkthrough

Subscribe now awaits server confirmation using a Python Future and optional timeout; an internal callback resolves or rejects the Future based on SUBSCRIBED, CHANNEL_ERROR, or timeout. Join/config payload construction and server binding reconciliation were adjusted. Tests added/updated for success, error, and timeout flows.

Changes

Cohort / File(s) Summary
Subscription core
src/realtime/src/realtime/_async/channel.py
subscribe signature gains timeout; introduces a Python Future and _internal_callback to coordinate completion; awaits server join_push and binding reconciliation; explicit TimeoutError handling; join/config payload now computes presence/access_token and validates server bindings.
Tests — connection
src/realtime/tests/test_connection.py
Extended imports (ReplyMessage, SuccessReplyMessage, ReplyPostgresChanges); run channel.subscribe in background via asyncio.create_task; simulate server replies via channel._handle_message to drive subscription flow in existing tests.
Tests — issue 1209
src/realtime/tests/test_issue_1209.py
New test module with three async tests covering subscribe success, subscribe error, and subscribe timeout using mocked client and triggered join_push replies.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Channel as AsyncRealtimeChannel
    participant Future as SubscriptionFuture
    participant Server as RealtimeServer

    Client->>Channel: subscribe(callback, timeout)
    Channel->>Future: create Future
    Channel->>Channel: setup _internal_callback
    Channel->>Server: send join_push (join/config)
    activate Channel
    Channel->>Future: await (timeout)
    deactivate Channel

    alt Server responds SUBSCRIBED and bindings ok
        Server->>Channel: join_push SUCCESS (ReplyPostgresChanges)
        Channel->>Channel: _internal_callback(SUBSCRIBED)
        Channel->>Future: set_result(channel)
        Future-->>Client: returns AsyncRealtimeChannel
    else Server responds ERROR or binding mismatch
        Server->>Channel: join_push ERROR or invalid bindings
        Channel->>Channel: _internal_callback(CHANNEL_ERROR, exception)
        Channel->>Future: set_exception(exception)
        Future-->>Client: raises Exception
    else Timeout
        Future->>Channel: wait_for times out
        Channel->>Future: cancel / set_exception(TimeoutError)
        Future-->>Client: raises TimeoutError
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Suggested reviewers

  • o-santi
🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 53.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: making AsyncRealtimeChannel.subscribe() await server acknowledgement.
Linked Issues check ✅ Passed The PR successfully addresses issue #1209's requirements: subscribe now awaits server responses, guarantees callback invocation before returning, includes timeout support, and prevents race conditions.
Out of Scope Changes check ✅ Passed All changes are scoped to fixing the subscribe race condition: internal Future coordination, timeout handling, server binding validation, and corresponding test updates.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@src/realtime/src/realtime/_async/channel.py`:
- Around line 240-276: In on_join_push_ok, don't let an empty or None
server_postgres_changes silently clear client callbacks; after retrieving
server_postgres_changes check if self.postgres_changes_callbacks is non-empty
but server_postgres_changes is empty/None or has a different length/content, and
in that case call asyncio.create_task(self.unsubscribe()), invoke
_internal_callback(RealtimeSubscribeStates.CHANNEL_ERROR, Exception("mismatch
between server and client bindings for postgres changes")), and return;
otherwise proceed with the existing reconciliation loop that compares
server_binding fields to each postgres_callback and builds new_postgres_bindings
before assigning self.postgres_changes_callbacks and calling
_internal_callback(RealtimeSubscribeStates.SUBSCRIBED, None).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@src/realtime/src/realtime/_async/channel.py`:
- Around line 197-210: The _internal_callback function can exit early if the
user-supplied callback raises, causing subscribe() to hang; wrap the callback
invocation in a try/except block around the call to callback(status, err) inside
_internal_callback so any exception from the user callback is caught and ignored
(or captured separately) and does not prevent the subsequent future resolution
logic for RealtimeSubscribeStates.SUBSCRIBED, CHANNEL_ERROR, or TIMED_OUT;
ensure you still use the original err when calling future.set_exception for
CHANNEL_ERROR and that future.set_result/future.set_exception always runs even
if the callback raised.
- Around line 305-314: The timeout handler after await asyncio.wait_for(future,
timeout=wait_timeout) cancels the python future but leaves the channel's join
push active, allowing late ACKs to mark the channel joined; update the except
asyncio.TimeoutError block in the subscribe/join routine to also locate the join
push object created earlier (e.g., the local variable often named push or
join_push) and explicitly clean it up before raising: if the push exists and is
not yet resolved, call its cleanup API (e.g., unsubscribe(), cancel(), or
destroy() depending on the Push class implementation) or send a
leave/unsubscribe on the channel, then cancel the future and raise the
TimeoutError as before. Ensure you reference the same push variable used when
creating the join push so the join is properly torn down on python-level
timeouts.
- Around line 182-185: The push() method's docstring wrongly states the timeout
is in milliseconds; update the docstring to state timeout is in seconds to match
AsyncPush (which uses asyncio.sleep(self.timeout)) and the module's socket
timeout convention, and ensure any related mentions (e.g., in AsyncPush or
Channel.push references) consistently document "seconds" for the timeout
parameter.

…ck errors, clean up join_push on timeout, fix docstring
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@src/realtime/src/realtime/_async/channel.py`:
- Around line 310-322: The timeout selection uses a falsy check so a
caller-passed 0 is ignored; change the logic to use an explicit None check
(e.g., set wait_timeout = timeout if timeout is not None else self.timeout) in
the method in realtime/_async/channel.py where wait_timeout is computed (the
block referencing future, self.timeout, and self.join_push.destroy()). This
preserves intentional 0 or other falsy timeouts while still defaulting to
self.timeout when timeout is not provided.
- Around line 245-291: The on_join_push_ok handler does not detect when
server_postgres_changes contains more bindings than the client registered
(postgres_changes_callbacks), so add an explicit length check and treat any size
mismatch as an error: inside on_join_push_ok (the function handling the join
reply), after reading server_postgres_changes and before assigning
self.postgres_changes_callbacks/new_postgres_bindings, compare
len(server_postgres_changes) to len(self.postgres_changes_callbacks) (or
len(new_postgres_bindings) vs original callbacks) and if server has extra
entries call asyncio.create_task(self.unsubscribe()) and invoke
_internal_callback(RealtimeSubscribeStates.CHANNEL_ERROR, Exception(...)) to
surface the mismatch, keeping the existing mismatch message and flow used
elsewhere in this function.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@src/realtime/src/realtime/_async/channel.py`:
- Around line 223-227: The bug is that the bound method
self.presence._has_callback_attached is referenced instead of called, so
presence_enabled is always truthy; update the assignment in the Channel (or
relevant class) where presence_enabled is computed to call the method
(self.presence._has_callback_attached()) and combine its boolean result with
presence.get("enabled", False) before setting presence["enabled"]; ensure you
reference the existing symbols presence_enabled,
self.presence._has_callback_attached, and presence in your fix so behavior only
enables presence when the callback is actually attached or the config enables
it.

Comment on lines +223 to +227
presence_enabled = self.presence._has_callback_attached or presence.get(
"enabled", False
)
presence["enabled"] = presence_enabled

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Call _has_callback_attached(); current code always enables presence.
Line 223 references the bound method instead of invoking it, so presence_enabled is always truthy and presence gets enabled for all channels.

🐛 Proposed fix
-        presence_enabled = self.presence._has_callback_attached or presence.get(
+        presence_enabled = self.presence._has_callback_attached() or presence.get(
             "enabled", False
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
presence_enabled = self.presence._has_callback_attached or presence.get(
"enabled", False
)
presence["enabled"] = presence_enabled
presence_enabled = self.presence._has_callback_attached() or presence.get(
"enabled", False
)
presence["enabled"] = presence_enabled
🤖 Prompt for AI Agents
In `@src/realtime/src/realtime/_async/channel.py` around lines 223 - 227, The bug
is that the bound method self.presence._has_callback_attached is referenced
instead of called, so presence_enabled is always truthy; update the assignment
in the Channel (or relevant class) where presence_enabled is computed to call
the method (self.presence._has_callback_attached()) and combine its boolean
result with presence.get("enabled", False) before setting presence["enabled"];
ensure you reference the existing symbols presence_enabled,
self.presence._has_callback_attached, and presence in your fix so behavior only
enables presence when the callback is actually attached or the config enables
it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

AsyncRealtimeChannel.subscribe returns without waiting for an answer

1 participant