Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: "1.8.5"
virtualenvs-create: true

- name: Install dependencies
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ jobs:
if: steps.version-check.outputs.skip == 'false'
uses: snok/install-poetry@v1
with:
version: "1.8.5"
virtualenvs-create: true

- name: Install dependencies
Expand Down
98 changes: 98 additions & 0 deletions dev_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env python3
"""
Development runner to observe SDK behavior including SSE streaming and watchdog.

Usage:
REFORGE_SDK_KEY=your-key python dev_runner.py

Or set a specific config key to watch:
REFORGE_SDK_KEY=your-key python dev_runner.py my.config.key
"""

import logging
import sys
import time
import os

from sdk_reforge import ReforgeSDK, Options


def setup_logging() -> None:
"""Configure logging to show SDK internals."""
root_logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
)
root_logger.addHandler(handler)

# Set root to DEBUG to see everything
root_logger.setLevel(logging.DEBUG)

# Quiet down noisy libraries
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.WARNING)


def main() -> None:
setup_logging()

sdk_key = os.environ.get("REFORGE_SDK_KEY")
if not sdk_key:
print("Error: REFORGE_SDK_KEY environment variable not set")
print("Usage: REFORGE_SDK_KEY=your-key python dev_runner.py [config.key]")
sys.exit(1)

# Optional: config key to watch
watch_key = sys.argv[1] if len(sys.argv) > 1 else None

print(f"Starting SDK with key: {sdk_key[:10]}...")
print(f"Watching config key: {watch_key or '(none)'}")
print("Press Ctrl+C to stop\n")
print("=" * 60)

options = Options(
sdk_key=sdk_key,
connection_timeout_seconds=10,
)

sdk = ReforgeSDK(options)
config_sdk = sdk.config_sdk()

print("=" * 60)
print("SDK initialized, entering main loop...")
print("=" * 60 + "\n")

try:
iteration = 0
while True:
iteration += 1

status_parts = [
f"[{iteration}]",
f"initialized={config_sdk.is_ready()}",
f"hwm={config_sdk.highwater_mark()}",
]

if watch_key:
try:
value = config_sdk.get(watch_key, default="<not found>")
status_parts.append(f"{watch_key}={value!r}")
except Exception as e:
status_parts.append(f"{watch_key}=<error: {e}>")

print(" | ".join(status_parts))
time.sleep(5)

except KeyboardInterrupt:
print("\n\nShutting down...")

sdk.close()
print("Done.")


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sdk-reforge"
version = "1.1.1"
version = "1.2.0"
description = "Python sdk for Reforge Feature Flags and Config as a Service: https://www.reforge.com"
license = "MIT"
authors = ["Michael Berkowitz <michael.berkowitz@gmail.com>", "James Kebinger <james.kebinger@reforge.com>"]
Expand Down
2 changes: 1 addition & 1 deletion sdk_reforge/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.1
1.2.0
129 changes: 82 additions & 47 deletions sdk_reforge/_sse_connection_manager.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import base64
import time
from typing import Optional, Callable
from typing import Optional, Callable, TYPE_CHECKING

import sseclient # type: ignore
from requests import Response
from requests.exceptions import HTTPError

from sdk_reforge._internal_logging import InternalLogger
from sdk_reforge._requests import ApiClient, UnauthorizedException
from sdk_reforge._requests import ApiClient
from sdk_reforge._sse_watchdog import WatchdogResponseWrapper
import prefab_pb2 as Prefab
from sdk_reforge.config_sdk_interface import ConfigSDKInterface

if TYPE_CHECKING:
from sdk_reforge._sse_watchdog import SSEWatchdog

SHORT_CONNECTION_THRESHOLD = 2 # seconds
CONSECUTIVE_SHORT_CONNECTION_LIMIT = 2 # times
MIN_BACKOFF_TIME = 1 # seconds
Expand All @@ -29,68 +34,98 @@ def __init__(
api_client: ApiClient,
config_client: ConfigSDKInterface,
urls: list[str],
watchdog: Optional["SSEWatchdog"] = None,
):
self.api_client = api_client
self.config_client = config_client
self.sse_client: Optional[sseclient.SSEClient] = None
self.timing = Timing()
self.urls = urls
self.watchdog = watchdog

def streaming_loop(self) -> None:
too_short_connection_count = 0
backoff_time = MIN_BACKOFF_TIME

while self.config_client.continue_connection_processing():
try:
logger.debug("Starting streaming connection")
headers = {
"Last-Event-ID": f"{self.config_client.highwater_mark()}",
"accept": "text/event-stream",
}
response = self.api_client.resilient_request(
"/api/v2/sse/config",
headers=headers,
stream=True,
auth=("authuser", self.config_client.options.api_key),
timeout=(5, 60),
hosts=self.urls,
)
response.raise_for_status()
if response.ok:
elapsed_time = self.timing.time_execution(
lambda: self.process_response(response)
try:
while self.config_client.continue_connection_processing():
try:
logger.debug("Starting streaming connection")
headers = {
"Last-Event-ID": f"{self.config_client.highwater_mark()}",
"accept": "text/event-stream",
}
response = self.api_client.resilient_request(
"/api/v2/sse/config",
headers=headers,
stream=True,
auth=("authuser", self.config_client.options.api_key),
timeout=(5, 60),
hosts=self.urls,
)
if elapsed_time < SHORT_CONNECTION_THRESHOLD:
too_short_connection_count += 1
if (
too_short_connection_count
>= CONSECUTIVE_SHORT_CONNECTION_LIMIT
):
raise TooQuickConnectionException()
else:
too_short_connection_count = 0
backoff_time = MIN_BACKOFF_TIME
time.sleep(backoff_time)
except UnauthorizedException:
self.config_client.handle_unauthorized_response()
except TooQuickConnectionException as e:
logger.debug(f"Connection ended quickly: {str(e)}. Will apply backoff.")
backoff_time = min(backoff_time * 2, MAX_BACKOFF_TIME)
time.sleep(backoff_time)
except Exception as e:
if not self.config_client.is_shutting_down():
logger.warning(
f"Streaming connection error: {str(e)}, Will retry in {backoff_time} seconds"
response.raise_for_status()
if response.ok:
elapsed_time = self.timing.time_execution(
lambda: self.process_response(response)
)
if elapsed_time < SHORT_CONNECTION_THRESHOLD:
too_short_connection_count += 1
if (
too_short_connection_count
>= CONSECUTIVE_SHORT_CONNECTION_LIMIT
):
raise TooQuickConnectionException()
else:
too_short_connection_count = 0
backoff_time = MIN_BACKOFF_TIME
time.sleep(backoff_time)
except TooQuickConnectionException as e:
logger.debug(
f"Connection ended quickly: {str(e)}. Will apply backoff."
)
backoff_time = min(backoff_time * 2, MAX_BACKOFF_TIME)
time.sleep(backoff_time)

"""
Hand off a successful response here for processing
"""
except HTTPError as e:
# Check for unauthorized (401/403) responses
if e.response is not None and e.response.status_code in (401, 403):
logger.warning(
f"Received {e.response.status_code} response, stopping SSE"
)
self.config_client.handle_unauthorized_response()
else:
if not self.config_client.is_shutting_down():
backoff_time = min(backoff_time * 2, MAX_BACKOFF_TIME)
logger.warning(
f"Streaming connection error ({type(e).__name__}): {str(e)}, "
f"Will retry in {backoff_time} seconds"
)
time.sleep(backoff_time)
except BaseException as e:
# Re-raise system exceptions that should terminate the thread
if isinstance(e, (KeyboardInterrupt, SystemExit)):
raise
if not self.config_client.is_shutting_down():
backoff_time = min(backoff_time * 2, MAX_BACKOFF_TIME)
logger.warning(
f"Streaming connection error ({type(e).__name__}): {str(e)}, "
f"Will retry in {backoff_time} seconds"
)
time.sleep(backoff_time)
finally:
logger.info(
f"Streaming loop exited "
f"(shutdown={self.config_client.is_shutting_down()})"
)

def process_response(self, response: Response) -> None:
self.sse_client = sseclient.SSEClient(response)
"""Hand off a successful response here for processing."""
# Wrap response to track data received for watchdog
if self.watchdog:
wrapped_response = WatchdogResponseWrapper(response, self.watchdog.touch)
self.sse_client = sseclient.SSEClient(wrapped_response)
else:
self.sse_client = sseclient.SSEClient(response)

if self.sse_client is not None:
for event in self.sse_client.events():
if self.config_client.is_shutting_down():
Expand Down
Loading
Loading