Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: ['3.9', '3.10', '3.11', '3.12', '3.13']
python-version: ['3.13']
os: [ubuntu-latest]

steps:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "sailhouse"
version = "0.1.0"
version = "0.1.1"
description = "Python SDK for Sailhouse - Event Streaming Platform"
readme = "README.md"
authors = [
Expand Down
47 changes: 45 additions & 2 deletions src/sailhouse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,31 @@ def __init__(
"x-source": "sailhouse-python"
})

async def pull(
self,
topic: str,
subscription: str,
) -> Event:
"""Pull an event from a subscription, locking it for processing"""
url = f"{self.BASE_URL}/topics/{topic}/subscriptions/{subscription}/events/pull"

response = self.session.post(url, timeout=self.timeout)
if response.status_code == 204:
return None

if response.status_code != 200:
raise SailhouseError(
f"Failed to pull message: {response.status_code}")

data = response.json()
return Event(
id=data['id'],
data=data['data'],
_topic=topic,
_subscription=subscription,
_client=self
)

async def get_events(
self,
topic: str,
Expand Down Expand Up @@ -143,6 +168,20 @@ async def acknowledge_message(
raise SailhouseError(
f"Failed to acknowledge message: {response.status_code}")

async def nack_message(
self,
topic: str,
subscription: str,
event_id: str
) -> None:
"""Nacknowledge a message"""
url = f"{self.BASE_URL}/topics/{topic}/subscriptions/{subscription}/events/{event_id}/nack"

response = self.session.post(url, timeout=self.timeout)
if response.status_code not in (200, 204):
raise SailhouseError(
f"Failed to nack message: {response.status_code}")

async def subscribe(
self,
topic: str,
Expand All @@ -156,9 +195,13 @@ async def subscribe(
"""Subscribe to events with polling"""
while True:
try:
events = await self.get_events(topic, subscription)
for event in events.events:
event = await self.pull(topic, subscription)
if event:
await handler(event)
# We want to try and fetch another message immediately
# after processing the current one
continue

await asyncio.sleep(polling_interval)
except Exception as e:
if on_error:
Expand Down
108 changes: 82 additions & 26 deletions test/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ def client():
@pytest.fixture
def mock_response():
class MockResponse:
def __init__(self, status_code, json_data):
def __init__(self, status_code, json_data=None):
self.status_code = status_code
self._json_data = json_data
self.text = json.dumps(json_data)
self.text = json.dumps(json_data) if json_data else ""

def json(self):
if self._json_data is None:
raise ValueError("No JSON data available")
return self._json_data

return MockResponse

# Test Event class


def test_event_creation():
client = SailhouseClient(token="test-token")
Expand Down Expand Up @@ -142,40 +142,62 @@ async def test_acknowledge_message_failure(client, mock_response):


@pytest.mark.asyncio
async def test_subscribe_handler_called(client, mock_response):
test_events = {
"events": [
{"id": "1", "data": {"message": "test1"}},
],
"offset": 0,
"limit": 10
async def test_pull_success(client, mock_response):
test_event = {
"id": "1",
"data": {"message": "test1"}
}

with patch.object(client.session, 'post', return_value=mock_response(200, test_event)):
event = await client.pull("test-topic", "test-sub")

assert isinstance(event, Event)
assert event.id == "1"
assert event.data == {"message": "test1"}
assert event._topic == "test-topic"
assert event._subscription == "test-sub"


@pytest.mark.asyncio
async def test_pull_no_message(client, mock_response):
with patch.object(client.session, 'post', return_value=mock_response(204)):
event = await client.pull("test-topic", "test-sub")
assert event is None


@pytest.mark.asyncio
async def test_pull_failure(client, mock_response):
with patch.object(client.session, 'post', return_value=mock_response(400, {})):
with pytest.raises(SailhouseError):
await client.pull("test-topic", "test-sub")


@pytest.mark.asyncio
async def test_subscribe_handler_called(client, mock_response):
handler = AsyncMock()

# Set up a mock that will return events once then exit
get_events_mock = AsyncMock()
get_events_mock.return_value = GetEventsResponse(
events=[Event(
# Mock pull to return one event and then None
pull_mock = AsyncMock()
pull_mock.side_effect = [
Event(
id="1",
data={"message": "test1"},
_topic="test-topic",
_subscription="test-sub",
_client=client
)],
offset=0,
limit=10
)
),
None
]

with patch.object(client, 'get_events', get_events_mock):
with patch.object(client, 'pull', pull_mock):
# Create a task for the subscription
task = asyncio.create_task(
client.subscribe(
"test-topic",
"test-sub",
handler,
polling_interval=0.5,
exit_on_error=True # This will make it exit after first iteration
polling_interval=0.1,
exit_on_error=True
)
)

Expand All @@ -196,19 +218,19 @@ async def test_subscribe_handler_called(client, mock_response):

@pytest.mark.asyncio
async def test_subscribe_with_error_handler(client):
error_handler = AsyncMock()
error_handler = Mock() # Changed from AsyncMock to Mock since we're not awaiting it
handler = AsyncMock()

# Create a mock that raises an exception
get_events_mock = AsyncMock(side_effect=Exception("Test error"))
pull_mock = AsyncMock(side_effect=Exception("Test error"))

with patch.object(client, 'get_events', get_events_mock):
with patch.object(client, 'pull', pull_mock):
task = asyncio.create_task(
client.subscribe(
"test-topic",
"test-sub",
handler,
polling_interval=0.5,
polling_interval=0.1,
on_error=error_handler,
exit_on_error=True
)
Expand All @@ -227,3 +249,37 @@ async def test_subscribe_with_error_handler(client):

# Verify error handler was called
error_handler.assert_called_once()


@pytest.mark.asyncio
async def test_subscribe_continuous_polling(client):
handler = AsyncMock()

# Mock pull to return None (simulating no messages)
pull_mock = AsyncMock(return_value=None)

with patch.object(client, 'pull', pull_mock):
task = asyncio.create_task(
client.subscribe(
"test-topic",
"test-sub",
handler,
polling_interval=0.1
)
)

# Wait for a few polling intervals
await asyncio.sleep(0.3)

# Cancel the task
task.cancel()

try:
await task
except asyncio.CancelledError:
pass

# Verify handler was not called (since no messages were returned)
handler.assert_not_called()
# Verify pull was called multiple times
assert pull_mock.call_count > 1