Purpose: Comprehensive integration guide for consuming the unified device API Status: ✅ Production Ready - October 2025 Handover Target Audience: Developers integrating oaDeviceAPI with oaDashboard and other services
- Quick Start Integration
- API Discovery & Platform Detection
- Core API Integration
- Platform-Specific Integration
- Error Handling & Resilience
- Security & Authentication
- Performance Optimization
- Testing & Validation
import httpx
import asyncio
from typing import Dict, Any, Optional
class DeviceAPIClient:
def __init__(self, base_url: str, timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.client = httpx.AsyncClient(timeout=timeout)
async def close(self):
await self.client.aclose()
# Example usage
async def main():
device = DeviceAPIClient("http://device-ip:9090")
# Discover platform
platform_info = await device.get_platform_info()
print(f"Platform: {platform_info['platform']}")
# Get health status
health = await device.get_health()
print(f"Status: {health['status']}")
await device.close()
# Run the example
asyncio.run(main())import os
from dataclasses import dataclass
@dataclass
class DeviceAPIConfig:
base_url: str
timeout: int = 30
retry_attempts: int = 3
retry_delay: float = 1.0
@classmethod
def from_env(cls) -> 'DeviceAPIConfig':
return cls(
base_url=os.getenv('DEVICE_API_URL', 'http://localhost:9090'),
timeout=int(os.getenv('DEVICE_API_TIMEOUT', '30')),
retry_attempts=int(os.getenv('DEVICE_API_RETRIES', '3')),
retry_delay=float(os.getenv('DEVICE_API_RETRY_DELAY', '1.0'))
)async def discover_device_capabilities(client: DeviceAPIClient) -> Dict[str, Any]:
"""Complete device discovery and capability assessment."""
# Step 1: Get platform information
platform_info = await client.get_platform_info()
platform = platform_info['platform']
features = platform_info['features']
# Step 2: Build capability map
capabilities = {
'platform': platform,
'version': platform_info.get('version'),
'endpoints': [],
'features': {
'health_monitoring': True, # Always available
'camera_support': features.get('camera', False),
'tracker_integration': features.get('tracker', False),
'camguard_integration': features.get('camguard', False),
'screenshot_support': features.get('screenshot', False),
'player_control': features.get('player', False)
},
'actions': {
'reboot_supported': True, # Always available
'service_restart': platform == 'macos' # macOS only
}
}
# Step 3: Determine available endpoints
if platform == 'macos':
capabilities['endpoints'] = [
'/macos/health',
'/macos/cameras',
'/macos/cameras/{id}/stream',
'/macos/tracker/status',
'/macos/camguard/status',
'/macos/actions/restart-tracker'
]
elif platform == 'orangepi':
capabilities['endpoints'] = [
'/orangepi/health',
'/orangepi/screenshot',
'/orangepi/player/status',
'/orangepi/player/actions/restart'
]
return capabilitiesasync def comprehensive_health_check(client: DeviceAPIClient) -> Dict[str, Any]:
"""Perform comprehensive health assessment."""
health_results = {
'device_healthy': False,
'api_responsive': False,
'platform_specific': {},
'metrics': {},
'timestamp': None
}
try:
# Basic API health check
basic_health = await client.get_health()
health_results['api_responsive'] = True
health_results['device_healthy'] = basic_health.get('status') == 'healthy'
health_results['timestamp'] = basic_health.get('timestamp')
# Platform-specific health
platform_info = await client.get_platform_info()
platform = platform_info['platform']
if platform == 'macos':
macos_health = await client.get_macos_health()
health_results['platform_specific'] = {
'cameras_available': macos_health.get('cameras', {}).get('count', 0),
'tracker_running': macos_health.get('tracker', {}).get('status') == 'running',
'camguard_running': macos_health.get('camguard', {}).get('status') == 'running',
'temperature': macos_health.get('temperature', {}).get('current')
}
elif platform == 'orangepi':
op_health = await client.get_orangepi_health()
health_results['platform_specific'] = {
'display_available': op_health.get('display', {}).get('connected', False),
'player_running': op_health.get('player', {}).get('status') == 'running',
'storage_usage': op_health.get('disk', {}).get('usage_percent')
}
# Extract key metrics
health_results['metrics'] = {
'cpu_usage': basic_health.get('cpu', {}).get('usage_percent'),
'memory_usage': basic_health.get('memory', {}).get('usage_percent'),
'disk_usage': basic_health.get('disk', {}).get('usage_percent'),
'uptime': basic_health.get('uptime_seconds')
}
except Exception as e:
health_results['error'] = str(e)
health_results['device_healthy'] = False
return health_resultsimport httpx
import asyncio
from typing import Dict, Any, Optional, List
from datetime import datetime
class DeviceAPIClient:
def __init__(self, base_url: str, timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.client = httpx.AsyncClient(timeout=timeout)
self._platform_info = None
async def close(self):
await self.client.aclose()
async def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
"""Internal request method with error handling."""
url = f"{self.base_url}{endpoint}"
try:
response = await self.client.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise Exception(f"HTTP {e.response.status_code}: {e.response.text}")
except httpx.RequestError as e:
raise Exception(f"Request failed: {str(e)}")
# Core Endpoints
async def get_api_info(self) -> Dict[str, Any]:
"""Get API information and capabilities."""
return await self._request("GET", "/")
async def get_platform_info(self) -> Dict[str, Any]:
"""Get platform detection information."""
if not self._platform_info:
self._platform_info = await self._request("GET", "/platform")
return self._platform_info
async def get_health(self) -> Dict[str, Any]:
"""Get basic health information."""
return await self._request("GET", "/health")
async def reboot_system(self) -> Dict[str, Any]:
"""Reboot the system."""
return await self._request("POST", "/actions/reboot")
# macOS-specific endpoints
async def get_macos_health(self) -> Dict[str, Any]:
"""Get macOS-specific health information."""
return await self._request("GET", "/macos/health")
async def list_cameras(self) -> List[Dict[str, Any]]:
"""List available cameras (macOS only)."""
return await self._request("GET", "/macos/cameras")
async def get_camera_status(self) -> Dict[str, Any]:
"""Get camera availability status."""
return await self._request("GET", "/macos/cameras/status")
async def get_camera_stream_url(self, camera_id: str) -> str:
"""Get MJPEG stream URL for camera."""
return f"{self.base_url}/macos/cameras/{camera_id}/stream"
async def get_tracker_status(self) -> Dict[str, Any]:
"""Get oaTracker service status."""
return await self._request("GET", "/macos/tracker/status")
async def get_tracker_stats(self) -> Dict[str, Any]:
"""Get tracker runtime statistics."""
return await self._request("GET", "/macos/tracker/stats")
async def get_camguard_status(self) -> Dict[str, Any]:
"""Get CamGuard recording status."""
return await self._request("GET", "/macos/camguard/status")
async def restart_tracker(self) -> Dict[str, Any]:
"""Restart tracker service."""
return await self._request("POST", "/macos/actions/restart-tracker")
# OrangePi-specific endpoints
async def get_orangepi_health(self) -> Dict[str, Any]:
"""Get OrangePi-specific health information."""
return await self._request("GET", "/orangepi/health")
async def capture_screenshot(self) -> bytes:
"""Capture display screenshot (OrangePi only)."""
response = await self.client.get(f"{self.base_url}/orangepi/screenshot")
response.raise_for_status()
return response.content
async def get_player_status(self) -> Dict[str, Any]:
"""Get video player status."""
return await self._request("GET", "/orangepi/player/status")
async def restart_player(self) -> Dict[str, Any]:
"""Restart video player."""
return await self._request("POST", "/orangepi/player/actions/restart")class DeviceManager:
"""Manages multiple devices for oaDashboard integration."""
def __init__(self):
self.devices: Dict[str, DeviceAPIClient] = {}
self.device_capabilities: Dict[str, Dict[str, Any]] = {}
async def add_device(self, device_id: str, url: str) -> bool:
"""Add a device to management."""
try:
client = DeviceAPIClient(url)
# Test connectivity and discover capabilities
capabilities = await discover_device_capabilities(client)
self.devices[device_id] = client
self.device_capabilities[device_id] = capabilities
return True
except Exception as e:
print(f"Failed to add device {device_id}: {e}")
return False
async def get_all_device_health(self) -> Dict[str, Dict[str, Any]]:
"""Get health status for all managed devices."""
health_results = {}
tasks = []
for device_id, client in self.devices.items():
task = asyncio.create_task(
comprehensive_health_check(client)
)
tasks.append((device_id, task))
for device_id, task in tasks:
try:
health_results[device_id] = await task
except Exception as e:
health_results[device_id] = {
'device_healthy': False,
'error': str(e)
}
return health_results
async def execute_platform_specific_action(
self,
device_id: str,
action: str,
**kwargs
) -> Dict[str, Any]:
"""Execute platform-specific actions."""
if device_id not in self.devices:
raise ValueError(f"Device {device_id} not found")
client = self.devices[device_id]
capabilities = self.device_capabilities[device_id]
platform = capabilities['platform']
try:
if platform == 'macos':
if action == 'restart_tracker':
return await client.restart_tracker()
elif action == 'get_camera_list':
return await client.list_cameras()
elif action == 'get_tracker_status':
return await client.get_tracker_status()
elif action == 'get_camguard_status':
return await client.get_camguard_status()
elif platform == 'orangepi':
if action == 'restart_player':
return await client.restart_player()
elif action == 'capture_screenshot':
return await client.capture_screenshot()
elif action == 'get_player_status':
return await client.get_player_status()
# Common actions
elif action == 'reboot':
return await client.reboot_system()
else:
raise ValueError(f"Unsupported action '{action}' for platform '{platform}'")
except Exception as e:
return {'error': str(e), 'success': False}
async def close_all(self):
"""Close all device connections."""
for client in self.devices.values():
await client.close()
self.devices.clear()
self.device_capabilities.clear()class MacDeviceIntegration:
"""macOS-specific device integration for camera and tracker services."""
def __init__(self, client: DeviceAPIClient):
self.client = client
async def setup_camera_monitoring(self) -> Dict[str, Any]:
"""Setup camera monitoring and streaming."""
# Check camera availability
camera_status = await self.client.get_camera_status()
if not camera_status.get('connected', False):
return {'error': 'No cameras available', 'cameras': []}
# Get available cameras
cameras = await self.client.list_cameras()
# Build camera monitoring configuration
camera_config = {
'total_cameras': len(cameras),
'active_cameras': [],
'stream_urls': [],
'health_check_interval': 30 # seconds
}
for camera in cameras:
camera_id = camera['id']
stream_url = self.client.get_camera_stream_url(camera_id)
camera_config['active_cameras'].append({
'id': camera_id,
'name': camera.get('name', f'Camera {camera_id}'),
'resolution': camera.get('resolution'),
'fps': camera.get('fps'),
'stream_url': stream_url
})
return camera_config
async def monitor_tracker_service(self) -> Dict[str, Any]:
"""Monitor oaTracker service health and performance."""
tracker_status = await self.client.get_tracker_status()
tracker_stats = await self.client.get_tracker_stats()
monitoring_data = {
'service_running': tracker_status.get('status') == 'running',
'uptime_seconds': tracker_status.get('uptime_seconds', 0),
'last_detection': tracker_stats.get('last_detection_time'),
'total_detections': tracker_stats.get('total_detections', 0),
'processing_fps': tracker_stats.get('processing_fps', 0),
'memory_usage': tracker_stats.get('memory_usage_mb', 0),
'model_version': tracker_stats.get('model_version'),
'alerts': []
}
# Generate alerts for potential issues
if not monitoring_data['service_running']:
monitoring_data['alerts'].append({
'level': 'error',
'message': 'Tracker service is not running'
})
if monitoring_data['processing_fps'] < 10:
monitoring_data['alerts'].append({
'level': 'warning',
'message': f'Low processing FPS: {monitoring_data["processing_fps"]}'
})
if monitoring_data['memory_usage'] > 1000: # > 1GB
monitoring_data['alerts'].append({
'level': 'warning',
'message': f'High memory usage: {monitoring_data["memory_usage"]}MB'
})
return monitoring_data
async def setup_camguard_integration(self) -> Dict[str, Any]:
"""Setup CamGuard recording service integration."""
camguard_status = await self.client.get_camguard_status()
integration_config = {
'service_running': camguard_status.get('status') == 'running',
'recording_active': camguard_status.get('recording', False),
'storage_info': camguard_status.get('storage', {}),
'configuration': {
'recording_quality': camguard_status.get('quality', 'medium'),
'retention_days': camguard_status.get('retention_days', 7),
'motion_detection': camguard_status.get('motion_detection', True)
},
'alerts': []
}
# Check storage space
storage_usage = integration_config['storage_info'].get('usage_percent', 0)
if storage_usage > 90:
integration_config['alerts'].append({
'level': 'critical',
'message': f'Storage almost full: {storage_usage}%'
})
elif storage_usage > 80:
integration_config['alerts'].append({
'level': 'warning',
'message': f'St getting full: {storage_usage}%'
})
return integration_configclass OrangePiDeviceIntegration:
"""OrangePi-specific device integration for display and player services."""
def __init__(self, client: DeviceAPIClient):
self.client = client
async def setup_display_monitoring(self) -> Dict[str, Any]:
"""Setup display monitoring and screenshot capabilities."""
health = await self.client.get_orangepi_health()
display_config = {
'display_connected': health.get('display', {}).get('connected', False),
'resolution': health.get('display', {}).get('resolution'),
'refresh_rate': health.get('display', {}).get('refresh_rate'),
'screenshot_capability': True,
'capture_interval': 60, # seconds between screenshots
'storage_path': health.get('temp_dir', '/tmp')
}
return display_config
async def capture_display_screenshot(self) -> Dict[str, Any]:
"""Capture and store display screenshot."""
try:
screenshot_data = await self.client.capture_screenshot()
# Store screenshot (example: save to file or upload)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"screenshot_{timestamp}.jpg"
# In a real implementation, you would save to cloud storage
# or process the image as needed
return {
'success': True,
'filename': filename,
'size_bytes': len(screenshot_data),
'timestamp': timestamp,
'content_type': 'image/jpeg'
}
except Exception as e:
return {
'success': False,
'error': str(e),
'timestamp': datetime.now().isoformat()
}
async def monitor_player_service(self) -> Dict[str, Any]:
"""Monitor video player service status and health."""
player_status = await self.client.get_player_status()
monitoring_data = {
'service_running': player_status.get('status') == 'running',
'current_media': player_status.get('current_media'),
'playback_state': player_status.get('playback_state', 'stopped'),
'volume': player_status.get('volume', 50),
'position_seconds': player_status.get('position_seconds', 0),
'duration_seconds': player_status.get('duration_seconds', 0),
'uptime_seconds': player_status.get('uptime_seconds', 0),
'last_error': player_status.get('last_error'),
'alerts': []
}
# Generate alerts
if not monitoring_data['service_running']:
monitoring_data['alerts'].append({
'level': 'error',
'message': 'Player service is not running'
})
if monitoring_data['last_error']:
monitoring_data['alerts'].append({
'level': 'warning',
'message': f'Player error: {monitoring_data["last_error"]}'
})
return monitoring_data
async def execute_player_control(self, action: str, **kwargs) -> Dict[str, Any]:
"""Execute player control actions."""
try:
if action == 'restart':
return await self.client.restart_player()
else:
return {
'success': False,
'error': f'Unsupported player action: {action}'
}
except Exception as e:
return {
'success': False,
'error': str(e)
}import asyncio
from typing import Callable, Any
from functools import wraps
def retry_on_failure(max_attempts: int = 3, delay: float = 1.0):
"""Decorator for retrying failed operations."""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
await asyncio.sleep(delay * (2 ** attempt)) # Exponential backoff
else:
break
raise last_exception
return wrapper
return decorator
class ResilientDeviceClient:
"""Device client with built-in resilience features."""
def __init__(self, base_url: str, timeout: int = 30):
self.client = DeviceAPIClient(base_url, timeout)
self.circuit_breaker = CircuitBreaker()
@retry_on_failure(max_attempts=3, delay=1.0)
async def get_health_with_fallback(self) -> Dict[str, Any]:
"""Get health status with circuit breaker protection."""
if self.circuit_breaker.is_open():
return {
'status': 'unknown',
'error': 'Circuit breaker is open',
'fallback_mode': True
}
try:
health = await self.client.get_health()
self.circuit_breaker.record_success()
return health
except Exception as e:
self.circuit_breaker.record_failure()
return {
'status': 'error',
'error': str(e),
'fallback_mode': False
}
class CircuitBreaker:
"""Simple circuit breaker implementation."""
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def is_open(self) -> bool:
if self.state == 'OPEN':
if (time.time() - self.last_failure_time) > self.recovery_timeout:
self.state = 'HALF_OPEN'
return False
return True
return False
def record_success(self):
self.failure_count = 0
self.state = 'CLOSED'
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'class DeviceHealthMonitor:
"""Continuous health monitoring for device fleet."""
def __init__(self, device_manager: DeviceManager):
self.device_manager = device_manager
self.health_history: Dict[str, List[Dict[str, Any]]] = {}
self.alert_callbacks: List[Callable] = []
async def start_monitoring(self, interval: int = 60):
"""Start continuous health monitoring."""
while True:
try:
health_data = await self.device_manager.get_all_device_health()
for device_id, health in health_data.items():
await self._process_device_health(device_id, health)
await asyncio.sleep(interval)
except Exception as e:
print(f"Health monitoring error: {e}")
await asyncio.sleep(interval)
async def _process_device_health(self, device_id: str, health: Dict[str, Any]):
"""Process individual device health data."""
# Store health history
if device_id not in self.health_history:
self.health_history[device_id] = []
self.health_history[device_id].append({
'timestamp': datetime.now().isoformat(),
'health': health
})
# Keep only last 100 entries
if len(self.health_history[device_id]) > 100:
self.health_history[device_id] = self.health_history[device_id][-100:]
# Check for alerts
await self._check_alerts(device_id, health)
async def _check_alerts(self, device_id: str, health: Dict[str, Any]):
"""Check for health alerts and trigger callbacks."""
alerts = []
# Device offline
if not health.get('api_responsive', False):
alerts.append({
'device_id': device_id,
'level': 'critical',
'message': 'Device is not responding',
'timestamp': datetime.now().isoformat()
})
# High resource usage
metrics = health.get('metrics', {})
if metrics.get('cpu_usage', 0) > 90:
alerts.append({
'device_id': device_id,
'level': 'warning',
'message': f'High CPU usage: {metrics["cpu_usage"]}%',
'timestamp': datetime.now().isoformat()
})
if metrics.get('memory_usage', 0) > 90:
alerts.append({
'device_id': device_id,
'level': 'warning',
'message': f'High memory usage: {metrics["memory_usage"]}%',
'timestamp': datetime.now().isoformat()
})
# Trigger alert callbacks
for alert in alerts:
for callback in self.alert_callbacks:
try:
await callback(alert)
except Exception as e:
print(f"Alert callback error: {e}")class SecureDeviceClient:
"""Secure device client with Tailscale authentication."""
def __init__(self, base_url: str, tailscale_subnet: str = "100.x.x.x/8"):
self.base_url = base_url
self.tailscale_subnet = tailscale_subnet
self.client = httpx.AsyncClient(
headers={'User-Agent': 'oaDashboard/1.0'},
timeout=30
)
async def verify_tailscale_connection(self) -> bool:
"""Verify connection is through Tailscale."""
try:
# Check if source IP is in Tailscale subnet
response = await self.client.get(f"{self.base_url}/platform")
platform_info = response.json()
# The API should reject non-Tailscale connections
return response.status_code == 200
except Exception:
return False
async def secure_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
"""Make secure request with additional validation."""
# Add security headers
headers = kwargs.get('headers', {})
headers.update({
'X-Request-ID': str(uuid.uuid4()),
'X-Client-Version': '1.0.0'
})
kwargs['headers'] = headers
# Make request
response = await self.client.request(method, f"{self.base_url}{endpoint}", **kwargs)
# Validate response
if response.status_code == 403:
raise Exception("Access denied - not in Tailscale subnet")
response.raise_for_status()
return response.json()import aioredis
from typing import Optional
class OptimizedDeviceManager:
"""Optimized device manager with connection pooling and caching."""
def __init__(self, redis_url: Optional[str] = None):
self.connection_pool = httpx.AsyncClient(
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)
)
self.redis_client: Optional[aioredis.Redis] = None
self.cache_ttl = 60 # seconds
async def initialize(self):
"""Initialize connection pool and cache."""
if os.getenv('REDIS_URL'):
self.redis_client = aioredis.from_url(os.getenv('REDIS_URL'))
async def get_cached_health(self, device_url: str) -> Optional[Dict[str, Any]]:
"""Get cached health data if available."""
if not self.redis_client:
return None
cache_key = f"health:{device_url}"
cached_data = await self.redis_client.get(cache_key)
if cached_data:
return json.loads(cached_data)
return None
async def cache_health_data(self, device_url: str, health_data: Dict[str, Any]):
"""Cache health data for future requests."""
if not self.redis_client:
return
cache_key = f"health:{device_url}"
await self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps(health_data)
)
async def close(self):
"""Close connection pool and Redis client."""
await self.connection_pool.aclose()
if self.redis_client:
await self.redis_client.close()import pytest
from unittest.mock import AsyncMock, patch
class TestDeviceAPIIntegration:
"""Integration tests for Device API client."""
@pytest.fixture
async def mock_client(self):
"""Create mock device client for testing."""
with patch('httpx.AsyncClient') as mock_client_class:
mock_client = AsyncMock()
mock_client_class.return_value = mock_client
client = DeviceAPIClient("http://test-device:9090")
client.client = mock_client
yield client, mock_client
@pytest.mark.asyncio
async def test_platform_discovery(self, mock_client):
"""Test platform discovery functionality."""
client, http_mock = mock_client
# Mock platform response
http_mock.request.return_value.json.return_value = {
'platform': 'macos',
'features': {
'camera': True,
'tracker': True,
'camguard': True
}
}
# Test platform discovery
capabilities = await discover_device_capabilities(client)
assert capabilities['platform'] == 'macos'
assert capabilities['features']['camera_support'] is True
assert capabilities['features']['tracker_integration'] is True
assert len(capabilities['endpoints']) > 0
@pytest.mark.asyncio
async def test_health_monitoring(self, mock_client):
"""Test health monitoring functionality."""
client, http_mock = mock_client
# Mock health responses
http_mock.request.side_effect = [
{'platform': 'macos', 'features': {'camera': True}},
{'status': 'healthy', 'cpu': {'usage_percent': 45.2}},
{
'cameras': {'count': 2},
'tracker': {'status': 'running'},
'camguard': {'status': 'running'}
}
]
# Test health monitoring
health = await comprehensive_health_check(client)
assert health['api_responsive'] is True
assert health['device_healthy'] is True
assert 'platform_specific' in health
assert 'metrics' in health
# Manual testing script
async def manual_integration_test():
"""Manual integration test for development."""
# Test with actual device
device_url = "http://192.168.100.10:9090" # Replace with actual device IP
async with DeviceAPIClient(device_url) as client:
try:
# Test basic connectivity
api_info = await client.get_api_info()
print(f"Connected to: {api_info['name']} v{api_info['version']}")
# Test platform detection
platform_info = await client.get_platform_info()
print(f"Platform: {platform_info['platform']}")
print(f"Features: {platform_info['features']}")
# Test health monitoring
health = await client.get_health()
print(f"Health Status: {health['status']}")
# Platform-specific tests
if platform_info['platform'] == 'macos':
cameras = await client.list_cameras()
print(f"Available cameras: {len(cameras)}")
tracker_status = await client.get_tracker_status()
print(f"Tracker status: {tracker_status['status']}")
elif platform_info['platform'] == 'orangepi':
player_status = await client.get_player_status()
print(f"Player status: {player_status['status']}")
print("✅ All integration tests passed")
except Exception as e:
print(f"❌ Integration test failed: {e}")
if __name__ == "__main__":
asyncio.run(manual_integration_test())Integration Guide Version: 1.0 Last Updated: October 2025 Handover Status: ✅ Complete Target Audience: oaDashboard developers and system integrators Testing Status: ✅ Comprehensive test suite included