diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index dc6bfbc..e59fabf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,7 +11,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: ['3.9', '3.10', '3.11', '3.12', '3.13'] + python-version: ['3.13'] os: [ubuntu-latest] steps: diff --git a/pyproject.toml b/pyproject.toml index 0279a01..7906f74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "sailhouse" -version = "0.1.0" +version = "0.1.1" description = "Python SDK for Sailhouse - Event Streaming Platform" readme = "README.md" authors = [ diff --git a/src/sailhouse/client.py b/src/sailhouse/client.py index 2693bff..3624f36 100644 --- a/src/sailhouse/client.py +++ b/src/sailhouse/client.py @@ -58,6 +58,31 @@ def __init__( "x-source": "sailhouse-python" }) + async def pull( + self, + topic: str, + subscription: str, + ) -> Event: + """Pull an event from a subscription, locking it for processing""" + url = f"{self.BASE_URL}/topics/{topic}/subscriptions/{subscription}/events/pull" + + response = self.session.post(url, timeout=self.timeout) + if response.status_code == 204: + return None + + if response.status_code != 200: + raise SailhouseError( + f"Failed to pull message: {response.status_code}") + + data = response.json() + return Event( + id=data['id'], + data=data['data'], + _topic=topic, + _subscription=subscription, + _client=self + ) + async def get_events( self, topic: str, @@ -143,6 +168,20 @@ async def acknowledge_message( raise SailhouseError( f"Failed to acknowledge message: {response.status_code}") + async def nack_message( + self, + topic: str, + subscription: str, + event_id: str + ) -> None: + """Nacknowledge a message""" + url = f"{self.BASE_URL}/topics/{topic}/subscriptions/{subscription}/events/{event_id}/nack" + + response = self.session.post(url, timeout=self.timeout) + if response.status_code not in (200, 204): + raise SailhouseError( + f"Failed to nack message: {response.status_code}") + async def subscribe( self, topic: str, @@ -156,9 +195,13 @@ async def subscribe( """Subscribe to events with polling""" while True: try: - events = await self.get_events(topic, subscription) - for event in events.events: + event = await self.pull(topic, subscription) + if event: await handler(event) + # We want to try and fetch another message immediately + # after processing the current one + continue + await asyncio.sleep(polling_interval) except Exception as e: if on_error: diff --git a/test/client_test.py b/test/client_test.py index 299b720..e578c0f 100644 --- a/test/client_test.py +++ b/test/client_test.py @@ -18,18 +18,18 @@ def client(): @pytest.fixture def mock_response(): class MockResponse: - def __init__(self, status_code, json_data): + def __init__(self, status_code, json_data=None): self.status_code = status_code self._json_data = json_data - self.text = json.dumps(json_data) + self.text = json.dumps(json_data) if json_data else "" def json(self): + if self._json_data is None: + raise ValueError("No JSON data available") return self._json_data return MockResponse -# Test Event class - def test_event_creation(): client = SailhouseClient(token="test-token") @@ -142,40 +142,62 @@ async def test_acknowledge_message_failure(client, mock_response): @pytest.mark.asyncio -async def test_subscribe_handler_called(client, mock_response): - test_events = { - "events": [ - {"id": "1", "data": {"message": "test1"}}, - ], - "offset": 0, - "limit": 10 +async def test_pull_success(client, mock_response): + test_event = { + "id": "1", + "data": {"message": "test1"} } + with patch.object(client.session, 'post', return_value=mock_response(200, test_event)): + event = await client.pull("test-topic", "test-sub") + + assert isinstance(event, Event) + assert event.id == "1" + assert event.data == {"message": "test1"} + assert event._topic == "test-topic" + assert event._subscription == "test-sub" + + +@pytest.mark.asyncio +async def test_pull_no_message(client, mock_response): + with patch.object(client.session, 'post', return_value=mock_response(204)): + event = await client.pull("test-topic", "test-sub") + assert event is None + + +@pytest.mark.asyncio +async def test_pull_failure(client, mock_response): + with patch.object(client.session, 'post', return_value=mock_response(400, {})): + with pytest.raises(SailhouseError): + await client.pull("test-topic", "test-sub") + + +@pytest.mark.asyncio +async def test_subscribe_handler_called(client, mock_response): handler = AsyncMock() - # Set up a mock that will return events once then exit - get_events_mock = AsyncMock() - get_events_mock.return_value = GetEventsResponse( - events=[Event( + # Mock pull to return one event and then None + pull_mock = AsyncMock() + pull_mock.side_effect = [ + Event( id="1", data={"message": "test1"}, _topic="test-topic", _subscription="test-sub", _client=client - )], - offset=0, - limit=10 - ) + ), + None + ] - with patch.object(client, 'get_events', get_events_mock): + with patch.object(client, 'pull', pull_mock): # Create a task for the subscription task = asyncio.create_task( client.subscribe( "test-topic", "test-sub", handler, - polling_interval=0.5, - exit_on_error=True # This will make it exit after first iteration + polling_interval=0.1, + exit_on_error=True ) ) @@ -196,19 +218,19 @@ async def test_subscribe_handler_called(client, mock_response): @pytest.mark.asyncio async def test_subscribe_with_error_handler(client): - error_handler = AsyncMock() + error_handler = Mock() # Changed from AsyncMock to Mock since we're not awaiting it handler = AsyncMock() # Create a mock that raises an exception - get_events_mock = AsyncMock(side_effect=Exception("Test error")) + pull_mock = AsyncMock(side_effect=Exception("Test error")) - with patch.object(client, 'get_events', get_events_mock): + with patch.object(client, 'pull', pull_mock): task = asyncio.create_task( client.subscribe( "test-topic", "test-sub", handler, - polling_interval=0.5, + polling_interval=0.1, on_error=error_handler, exit_on_error=True ) @@ -227,3 +249,37 @@ async def test_subscribe_with_error_handler(client): # Verify error handler was called error_handler.assert_called_once() + + +@pytest.mark.asyncio +async def test_subscribe_continuous_polling(client): + handler = AsyncMock() + + # Mock pull to return None (simulating no messages) + pull_mock = AsyncMock(return_value=None) + + with patch.object(client, 'pull', pull_mock): + task = asyncio.create_task( + client.subscribe( + "test-topic", + "test-sub", + handler, + polling_interval=0.1 + ) + ) + + # Wait for a few polling intervals + await asyncio.sleep(0.3) + + # Cancel the task + task.cancel() + + try: + await task + except asyncio.CancelledError: + pass + + # Verify handler was not called (since no messages were returned) + handler.assert_not_called() + # Verify pull was called multiple times + assert pull_mock.call_count > 1