Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "sailhouse"
version = "0.1.1"
version = "1.5.0"
description = "Python SDK for Sailhouse - Event Streaming Platform"
readme = "README.md"
authors = [
Expand Down
34 changes: 32 additions & 2 deletions src/sailhouse/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,41 @@
from .client import SailhouseClient, GetEventsResponse, Event
from .client import SailhouseClient, GetEventsResponse, Event, WaitOptions, WaitGroup
from .exceptions import SailhouseError
from .admin import AdminClient, FilterCondition, ComplexFilter, Filter, RegisterResult
from .subscriber import SailhouseSubscriber, SubscriberOptions, SubscriptionHandler
from .push_subscriptions import (
PushSubscriptionVerifier,
PushSubscriptionVerificationError,
SignatureComponents,
PushSubscriptionHeaders,
PushSubscriptionPayload,
VerificationOptions,
verify_push_subscription_signature,
verify_push_subscription_signature_safe
)

__version__ = "0.1.0"
__version__ = "1.5.0"

__all__ = [
"SailhouseClient",
"Event",
"GetEventsResponse",
"SailhouseError",
"AdminClient",
"FilterCondition",
"ComplexFilter",
"Filter",
"RegisterResult",
"WaitOptions",
"WaitGroup",
"SailhouseSubscriber",
"SubscriberOptions",
"SubscriptionHandler",
"PushSubscriptionVerifier",
"PushSubscriptionVerificationError",
"SignatureComponents",
"PushSubscriptionHeaders",
"PushSubscriptionPayload",
"VerificationOptions",
"verify_push_subscription_signature",
"verify_push_subscription_signature_safe",
]
89 changes: 89 additions & 0 deletions src/sailhouse/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from dataclasses import dataclass
from typing import Dict, Any, Optional, List, Union, Literal
from .exceptions import SailhouseError
import requests


@dataclass
class FilterCondition:
path: str
condition: str
value: Any


@dataclass
class ComplexFilter:
conditions: List[FilterCondition]
operator: Literal["and", "or"] = "and"


# Union type for filters - can be boolean, None, or complex filter
Filter = Union[bool, None, ComplexFilter]


@dataclass
class RegisterResult:
outcome: Literal["created", "updated", "none"]


class AdminClient:
def __init__(self, sailhouse_client: 'SailhouseClient'):
self._client = sailhouse_client

def register_push_subscription(
self,
topic: str,
subscription: str,
endpoint: str,
**kwargs
) -> RegisterResult:
"""Register a push subscription for webhook delivery"""
url = f"{self._client.base_url}/admin/topics/{topic}/subscriptions/{subscription}/push"

body: Dict[str, Any] = {
"endpoint": endpoint
}

# Handle optional parameters
filter_condition = kwargs.get('filter_condition')
rate_limit = kwargs.get('rate_limit')
deduplication = kwargs.get('deduplication')

# Handle filter_condition - only add to body if explicitly provided
if 'filter_condition' in kwargs:
if isinstance(filter_condition, bool):
body["filter"] = filter_condition
elif filter_condition is None:
body["filter"] = None
elif isinstance(filter_condition, ComplexFilter):
body["filter"] = {
"conditions": [
{
"path": cond.path,
"condition": cond.condition,
"value": cond.value
}
for cond in filter_condition.conditions
],
"operator": filter_condition.operator
}

if rate_limit is not None:
body["rate_limit"] = rate_limit

if deduplication is not None:
body["deduplication"] = deduplication

response = self._client.session.post(
url,
json=body,
timeout=self._client.timeout
)

if response.status_code not in (200, 201):
raise SailhouseError(
f"Failed to register push subscription: {response.status_code} - {response.text}"
)

data = response.json()
return RegisterResult(outcome=data.get("outcome", "none"))
129 changes: 121 additions & 8 deletions src/sailhouse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,47 @@
import websockets
import asyncio
import json
import uuid

T = TypeVar('T')


@dataclass
class WaitOptions:
ttl: Optional[int] = None # TTL in seconds


@dataclass
class WaitGroup:
instance_id: str
client: 'SailhouseClient'

async def publish(
self,
topic: str,
data: Dict[str, Any],
*,
scheduled_time: Optional[datetime] = None,
metadata: Optional[Dict[str, Any]] = None
) -> None:
"""Publish an event under this waitgroup"""
await self.client.publish(
topic=topic,
data=data,
scheduled_time=scheduled_time,
metadata=metadata,
wait_group_instance_id=self.instance_id
)


@dataclass
class Event(Generic[T]):
id: str
data: Dict[str, Any]
_topic: str
_subscription: str
_client: 'SailhouseClient'
metadata: Optional[Dict[str, Any]] = None

def as_type(self, cls: type[T]) -> T:
"""Convert event data to specified type"""
Expand Down Expand Up @@ -57,14 +87,18 @@ def __init__(
"Authorization": token,
"x-source": "sailhouse-python"
})

# Import here to avoid circular import
from .admin import AdminClient
self.admin = AdminClient(self)

async def pull(
self,
topic: str,
subscription: str,
) -> Event:
"""Pull an event from a subscription, locking it for processing"""
url = f"{self.BASE_URL}/topics/{topic}/subscriptions/{subscription}/events/pull"
url = f"{self.base_url}/topics/{topic}/subscriptions/{subscription}/events/pull"

response = self.session.get(url, timeout=self.timeout)
if response.status_code == 204:
Expand All @@ -80,7 +114,8 @@ async def pull(
data=data['data'],
_topic=topic,
_subscription=subscription,
_client=self
_client=self,
metadata=data.get('metadata')
)

async def get_events(
Expand All @@ -101,7 +136,7 @@ async def get_events(
if time_window is not None:
params['time_window'] = time_window

url = f"{self.BASE_URL}/topics/{topic}/subscriptions/{subscription}/events"
url = f"{self.base_url}/topics/{topic}/subscriptions/{subscription}/events"

response = self.session.get(url, params=params, timeout=self.timeout)
if response.status_code != 200:
Expand All @@ -115,7 +150,8 @@ async def get_events(
data=e['data'],
_topic=topic,
_subscription=subscription,
_client=self
_client=self,
metadata=e.get('metadata')
)
for e in data['events']
]
Expand All @@ -126,22 +162,50 @@ async def get_events(
limit=data.get('limit', 0)
)

def wait(self, options: Optional[WaitOptions] = None) -> WaitGroup:
"""Create a waitgroup for coordinated event publishing"""
instance_id = str(uuid.uuid4())

if options and options.ttl:
# Create waitgroup with TTL
url = f"{self.base_url}/waitgroups"
body = {
"instance_id": instance_id,
"ttl": options.ttl
}

response = self.session.post(
url,
json=body,
timeout=self.timeout
)

if response.status_code not in (200, 201):
raise SailhouseError(
f"Failed to create waitgroup: {response.status_code} - {response.text}"
)

return WaitGroup(instance_id=instance_id, client=self)

async def publish(
self,
topic: str,
data: Dict[str, Any],
*,
scheduled_time: Optional[datetime] = None,
metadata: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None,
wait_group_instance_id: Optional[str] = None
) -> None:
"""Publish an event to a topic"""
url = f"{self.BASE_URL}/topics/{topic}/events"
url = f"{self.base_url}/topics/{topic}/events"

body = {"data": data}
if scheduled_time:
body["send_at"] = scheduled_time.isoformat()
if metadata:
body["metadata"] = metadata
if wait_group_instance_id:
body["wait_group_instance_id"] = wait_group_instance_id

response = self.session.post(
url,
Expand All @@ -161,7 +225,7 @@ async def acknowledge_message(
event_id: str
) -> None:
"""Acknowledge a message"""
url = f"{self.BASE_URL}/topics/{topic}/subscriptions/{subscription}/events/{event_id}"
url = f"{self.base_url}/topics/{topic}/subscriptions/{subscription}/events/{event_id}"

response = self.session.post(url, timeout=self.timeout)
if response.status_code not in (200, 204):
Expand All @@ -175,7 +239,7 @@ async def nack_message(
event_id: str
) -> None:
"""Nacknowledge a message"""
url = f"{self.BASE_URL}/topics/{topic}/subscriptions/{subscription}/events/{event_id}/nack"
url = f"{self.base_url}/topics/{topic}/subscriptions/{subscription}/events/{event_id}/nack"

response = self.session.post(url, timeout=self.timeout)
if response.status_code not in (200, 204):
Expand Down Expand Up @@ -209,3 +273,52 @@ async def subscribe(
if exit_on_error:
break
continue

def subscriber(self, options=None):
"""Create a SailhouseSubscriber for long-running event processing"""
from .subscriber import SailhouseSubscriber, SubscriberOptions
return SailhouseSubscriber(self, options)

def verify_push_subscription(
self,
webhook_secret: str,
signature_header: str,
raw_body,
tolerance: int = 300
) -> bool:
"""Verify push subscription webhook signature"""
from .push_subscriptions import (
PushSubscriptionVerifier,
PushSubscriptionHeaders,
PushSubscriptionPayload,
VerificationOptions
)

verifier = PushSubscriptionVerifier(webhook_secret)
headers = PushSubscriptionHeaders(signature=signature_header)
payload = PushSubscriptionPayload(raw_body=raw_body)
options = VerificationOptions(tolerance=tolerance)

return verifier.verify(headers, payload, options)

def verify_push_subscription_safe(
self,
webhook_secret: str,
signature_header: str,
raw_body,
tolerance: int = 300
) -> bool:
"""Safe push subscription verification - returns boolean instead of raising"""
from .push_subscriptions import (
PushSubscriptionVerifier,
PushSubscriptionHeaders,
PushSubscriptionPayload,
VerificationOptions
)

verifier = PushSubscriptionVerifier(webhook_secret)
headers = PushSubscriptionHeaders(signature=signature_header)
payload = PushSubscriptionPayload(raw_body=raw_body)
options = VerificationOptions(tolerance=tolerance)

return verifier.verify_safe(headers, payload, options)
Loading