Skip to content

Latest commit

 

History

History
1005 lines (802 loc) · 33.8 KB

File metadata and controls

1005 lines (802 loc) · 33.8 KB

oaDeviceAPI Integration Guide

Purpose: Comprehensive integration guide for consuming the unified device API Status: ✅ Production Ready - October 2025 Handover Target Audience: Developers integrating oaDeviceAPI with oaDashboard and other services

Table of Contents

  1. Quick Start Integration
  2. API Discovery & Platform Detection
  3. Core API Integration
  4. Platform-Specific Integration
  5. Error Handling & Resilience
  6. Security & Authentication
  7. Performance Optimization
  8. Testing & Validation

Quick Start Integration

Basic Setup

import httpx
import asyncio
from typing import Dict, Any, Optional

class DeviceAPIClient:
    def __init__(self, base_url: str, timeout: int = 30):
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        self.client = httpx.AsyncClient(timeout=timeout)

    async def close(self):
        await self.client.aclose()

# Example usage
async def main():
    device = DeviceAPIClient("http://device-ip:9090")

    # Discover platform
    platform_info = await device.get_platform_info()
    print(f"Platform: {platform_info['platform']}")

    # Get health status
    health = await device.get_health()
    print(f"Status: {health['status']}")

    await device.close()

# Run the example
asyncio.run(main())

Environment Configuration

import os
from dataclasses import dataclass

@dataclass
class DeviceAPIConfig:
    base_url: str
    timeout: int = 30
    retry_attempts: int = 3
    retry_delay: float = 1.0

    @classmethod
    def from_env(cls) -> 'DeviceAPIConfig':
        return cls(
            base_url=os.getenv('DEVICE_API_URL', 'http://localhost:9090'),
            timeout=int(os.getenv('DEVICE_API_TIMEOUT', '30')),
            retry_attempts=int(os.getenv('DEVICE_API_RETRIES', '3')),
            retry_delay=float(os.getenv('DEVICE_API_RETRY_DELAY', '1.0'))
        )

API Discovery & Platform Detection

Platform Discovery Flow

async def discover_device_capabilities(client: DeviceAPIClient) -> Dict[str, Any]:
    """Complete device discovery and capability assessment."""

    # Step 1: Get platform information
    platform_info = await client.get_platform_info()
    platform = platform_info['platform']
    features = platform_info['features']

    # Step 2: Build capability map
    capabilities = {
        'platform': platform,
        'version': platform_info.get('version'),
        'endpoints': [],
        'features': {
            'health_monitoring': True,  # Always available
            'camera_support': features.get('camera', False),
            'tracker_integration': features.get('tracker', False),
            'camguard_integration': features.get('camguard', False),
            'screenshot_support': features.get('screenshot', False),
            'player_control': features.get('player', False)
        },
        'actions': {
            'reboot_supported': True,  # Always available
            'service_restart': platform == 'macos'  # macOS only
        }
    }

    # Step 3: Determine available endpoints
    if platform == 'macos':
        capabilities['endpoints'] = [
            '/macos/health',
            '/macos/cameras',
            '/macos/cameras/{id}/stream',
            '/macos/tracker/status',
            '/macos/camguard/status',
            '/macos/actions/restart-tracker'
        ]
    elif platform == 'orangepi':
        capabilities['endpoints'] = [
            '/orangepi/health',
            '/orangepi/screenshot',
            '/orangepi/player/status',
            '/orangepi/player/actions/restart'
        ]

    return capabilities

Health Check Integration

async def comprehensive_health_check(client: DeviceAPIClient) -> Dict[str, Any]:
    """Perform comprehensive health assessment."""

    health_results = {
        'device_healthy': False,
        'api_responsive': False,
        'platform_specific': {},
        'metrics': {},
        'timestamp': None
    }

    try:
        # Basic API health check
        basic_health = await client.get_health()
        health_results['api_responsive'] = True
        health_results['device_healthy'] = basic_health.get('status') == 'healthy'
        health_results['timestamp'] = basic_health.get('timestamp')

        # Platform-specific health
        platform_info = await client.get_platform_info()
        platform = platform_info['platform']

        if platform == 'macos':
            macos_health = await client.get_macos_health()
            health_results['platform_specific'] = {
                'cameras_available': macos_health.get('cameras', {}).get('count', 0),
                'tracker_running': macos_health.get('tracker', {}).get('status') == 'running',
                'camguard_running': macos_health.get('camguard', {}).get('status') == 'running',
                'temperature': macos_health.get('temperature', {}).get('current')
            }
        elif platform == 'orangepi':
            op_health = await client.get_orangepi_health()
            health_results['platform_specific'] = {
                'display_available': op_health.get('display', {}).get('connected', False),
                'player_running': op_health.get('player', {}).get('status') == 'running',
                'storage_usage': op_health.get('disk', {}).get('usage_percent')
            }

        # Extract key metrics
        health_results['metrics'] = {
            'cpu_usage': basic_health.get('cpu', {}).get('usage_percent'),
            'memory_usage': basic_health.get('memory', {}).get('usage_percent'),
            'disk_usage': basic_health.get('disk', {}).get('usage_percent'),
            'uptime': basic_health.get('uptime_seconds')
        }

    except Exception as e:
        health_results['error'] = str(e)
        health_results['device_healthy'] = False

    return health_results

Core API Integration

Complete DeviceAPI Client Implementation

import httpx
import asyncio
from typing import Dict, Any, Optional, List
from datetime import datetime

class DeviceAPIClient:
    def __init__(self, base_url: str, timeout: int = 30):
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        self.client = httpx.AsyncClient(timeout=timeout)
        self._platform_info = None

    async def close(self):
        await self.client.aclose()

    async def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
        """Internal request method with error handling."""
        url = f"{self.base_url}{endpoint}"

        try:
            response = await self.client.request(method, url, **kwargs)
            response.raise_for_status()
            return response.json()
        except httpx.HTTPStatusError as e:
            raise Exception(f"HTTP {e.response.status_code}: {e.response.text}")
        except httpx.RequestError as e:
            raise Exception(f"Request failed: {str(e)}")

    # Core Endpoints
    async def get_api_info(self) -> Dict[str, Any]:
        """Get API information and capabilities."""
        return await self._request("GET", "/")

    async def get_platform_info(self) -> Dict[str, Any]:
        """Get platform detection information."""
        if not self._platform_info:
            self._platform_info = await self._request("GET", "/platform")
        return self._platform_info

    async def get_health(self) -> Dict[str, Any]:
        """Get basic health information."""
        return await self._request("GET", "/health")

    async def reboot_system(self) -> Dict[str, Any]:
        """Reboot the system."""
        return await self._request("POST", "/actions/reboot")

    # macOS-specific endpoints
    async def get_macos_health(self) -> Dict[str, Any]:
        """Get macOS-specific health information."""
        return await self._request("GET", "/macos/health")

    async def list_cameras(self) -> List[Dict[str, Any]]:
        """List available cameras (macOS only)."""
        return await self._request("GET", "/macos/cameras")

    async def get_camera_status(self) -> Dict[str, Any]:
        """Get camera availability status."""
        return await self._request("GET", "/macos/cameras/status")

    async def get_camera_stream_url(self, camera_id: str) -> str:
        """Get MJPEG stream URL for camera."""
        return f"{self.base_url}/macos/cameras/{camera_id}/stream"

    async def get_tracker_status(self) -> Dict[str, Any]:
        """Get oaTracker service status."""
        return await self._request("GET", "/macos/tracker/status")

    async def get_tracker_stats(self) -> Dict[str, Any]:
        """Get tracker runtime statistics."""
        return await self._request("GET", "/macos/tracker/stats")

    async def get_camguard_status(self) -> Dict[str, Any]:
        """Get CamGuard recording status."""
        return await self._request("GET", "/macos/camguard/status")

    async def restart_tracker(self) -> Dict[str, Any]:
        """Restart tracker service."""
        return await self._request("POST", "/macos/actions/restart-tracker")

    # OrangePi-specific endpoints
    async def get_orangepi_health(self) -> Dict[str, Any]:
        """Get OrangePi-specific health information."""
        return await self._request("GET", "/orangepi/health")

    async def capture_screenshot(self) -> bytes:
        """Capture display screenshot (OrangePi only)."""
        response = await self.client.get(f"{self.base_url}/orangepi/screenshot")
        response.raise_for_status()
        return response.content

    async def get_player_status(self) -> Dict[str, Any]:
        """Get video player status."""
        return await self._request("GET", "/orangepi/player/status")

    async def restart_player(self) -> Dict[str, Any]:
        """Restart video player."""
        return await self._request("POST", "/orangepi/player/actions/restart")

oaDashboard Integration Pattern

class DeviceManager:
    """Manages multiple devices for oaDashboard integration."""

    def __init__(self):
        self.devices: Dict[str, DeviceAPIClient] = {}
        self.device_capabilities: Dict[str, Dict[str, Any]] = {}

    async def add_device(self, device_id: str, url: str) -> bool:
        """Add a device to management."""
        try:
            client = DeviceAPIClient(url)

            # Test connectivity and discover capabilities
            capabilities = await discover_device_capabilities(client)

            self.devices[device_id] = client
            self.device_capabilities[device_id] = capabilities

            return True
        except Exception as e:
            print(f"Failed to add device {device_id}: {e}")
            return False

    async def get_all_device_health(self) -> Dict[str, Dict[str, Any]]:
        """Get health status for all managed devices."""
        health_results = {}

        tasks = []
        for device_id, client in self.devices.items():
            task = asyncio.create_task(
                comprehensive_health_check(client)
            )
            tasks.append((device_id, task))

        for device_id, task in tasks:
            try:
                health_results[device_id] = await task
            except Exception as e:
                health_results[device_id] = {
                    'device_healthy': False,
                    'error': str(e)
                }

        return health_results

    async def execute_platform_specific_action(
        self,
        device_id: str,
        action: str,
        **kwargs
    ) -> Dict[str, Any]:
        """Execute platform-specific actions."""

        if device_id not in self.devices:
            raise ValueError(f"Device {device_id} not found")

        client = self.devices[device_id]
        capabilities = self.device_capabilities[device_id]
        platform = capabilities['platform']

        try:
            if platform == 'macos':
                if action == 'restart_tracker':
                    return await client.restart_tracker()
                elif action == 'get_camera_list':
                    return await client.list_cameras()
                elif action == 'get_tracker_status':
                    return await client.get_tracker_status()
                elif action == 'get_camguard_status':
                    return await client.get_camguard_status()

            elif platform == 'orangepi':
                if action == 'restart_player':
                    return await client.restart_player()
                elif action == 'capture_screenshot':
                    return await client.capture_screenshot()
                elif action == 'get_player_status':
                    return await client.get_player_status()

            # Common actions
            elif action == 'reboot':
                return await client.reboot_system()

            else:
                raise ValueError(f"Unsupported action '{action}' for platform '{platform}'")

        except Exception as e:
            return {'error': str(e), 'success': False}

    async def close_all(self):
        """Close all device connections."""
        for client in self.devices.values():
            await client.close()
        self.devices.clear()
        self.device_capabilities.clear()

Platform-Specific Integration

macOS Integration Patterns

class MacDeviceIntegration:
    """macOS-specific device integration for camera and tracker services."""

    def __init__(self, client: DeviceAPIClient):
        self.client = client

    async def setup_camera_monitoring(self) -> Dict[str, Any]:
        """Setup camera monitoring and streaming."""

        # Check camera availability
        camera_status = await self.client.get_camera_status()
        if not camera_status.get('connected', False):
            return {'error': 'No cameras available', 'cameras': []}

        # Get available cameras
        cameras = await self.client.list_cameras()

        # Build camera monitoring configuration
        camera_config = {
            'total_cameras': len(cameras),
            'active_cameras': [],
            'stream_urls': [],
            'health_check_interval': 30  # seconds
        }

        for camera in cameras:
            camera_id = camera['id']
            stream_url = self.client.get_camera_stream_url(camera_id)

            camera_config['active_cameras'].append({
                'id': camera_id,
                'name': camera.get('name', f'Camera {camera_id}'),
                'resolution': camera.get('resolution'),
                'fps': camera.get('fps'),
                'stream_url': stream_url
            })

        return camera_config

    async def monitor_tracker_service(self) -> Dict[str, Any]:
        """Monitor oaTracker service health and performance."""

        tracker_status = await self.client.get_tracker_status()
        tracker_stats = await self.client.get_tracker_stats()

        monitoring_data = {
            'service_running': tracker_status.get('status') == 'running',
            'uptime_seconds': tracker_status.get('uptime_seconds', 0),
            'last_detection': tracker_stats.get('last_detection_time'),
            'total_detections': tracker_stats.get('total_detections', 0),
            'processing_fps': tracker_stats.get('processing_fps', 0),
            'memory_usage': tracker_stats.get('memory_usage_mb', 0),
            'model_version': tracker_stats.get('model_version'),
            'alerts': []
        }

        # Generate alerts for potential issues
        if not monitoring_data['service_running']:
            monitoring_data['alerts'].append({
                'level': 'error',
                'message': 'Tracker service is not running'
            })

        if monitoring_data['processing_fps'] < 10:
            monitoring_data['alerts'].append({
                'level': 'warning',
                'message': f'Low processing FPS: {monitoring_data["processing_fps"]}'
            })

        if monitoring_data['memory_usage'] > 1000:  # > 1GB
            monitoring_data['alerts'].append({
                'level': 'warning',
                'message': f'High memory usage: {monitoring_data["memory_usage"]}MB'
            })

        return monitoring_data

    async def setup_camguard_integration(self) -> Dict[str, Any]:
        """Setup CamGuard recording service integration."""

        camguard_status = await self.client.get_camguard_status()

        integration_config = {
            'service_running': camguard_status.get('status') == 'running',
            'recording_active': camguard_status.get('recording', False),
            'storage_info': camguard_status.get('storage', {}),
            'configuration': {
                'recording_quality': camguard_status.get('quality', 'medium'),
                'retention_days': camguard_status.get('retention_days', 7),
                'motion_detection': camguard_status.get('motion_detection', True)
            },
            'alerts': []
        }

        # Check storage space
        storage_usage = integration_config['storage_info'].get('usage_percent', 0)
        if storage_usage > 90:
            integration_config['alerts'].append({
                'level': 'critical',
                'message': f'Storage almost full: {storage_usage}%'
            })
        elif storage_usage > 80:
            integration_config['alerts'].append({
                'level': 'warning',
                'message': f'St getting full: {storage_usage}%'
            })

        return integration_config

OrangePi Integration Patterns

class OrangePiDeviceIntegration:
    """OrangePi-specific device integration for display and player services."""

    def __init__(self, client: DeviceAPIClient):
        self.client = client

    async def setup_display_monitoring(self) -> Dict[str, Any]:
        """Setup display monitoring and screenshot capabilities."""

        health = await self.client.get_orangepi_health()

        display_config = {
            'display_connected': health.get('display', {}).get('connected', False),
            'resolution': health.get('display', {}).get('resolution'),
            'refresh_rate': health.get('display', {}).get('refresh_rate'),
            'screenshot_capability': True,
            'capture_interval': 60,  # seconds between screenshots
            'storage_path': health.get('temp_dir', '/tmp')
        }

        return display_config

    async def capture_display_screenshot(self) -> Dict[str, Any]:
        """Capture and store display screenshot."""

        try:
            screenshot_data = await self.client.capture_screenshot()

            # Store screenshot (example: save to file or upload)
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = f"screenshot_{timestamp}.jpg"

            # In a real implementation, you would save to cloud storage
            # or process the image as needed

            return {
                'success': True,
                'filename': filename,
                'size_bytes': len(screenshot_data),
                'timestamp': timestamp,
                'content_type': 'image/jpeg'
            }

        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            }

    async def monitor_player_service(self) -> Dict[str, Any]:
        """Monitor video player service status and health."""

        player_status = await self.client.get_player_status()

        monitoring_data = {
            'service_running': player_status.get('status') == 'running',
            'current_media': player_status.get('current_media'),
            'playback_state': player_status.get('playback_state', 'stopped'),
            'volume': player_status.get('volume', 50),
            'position_seconds': player_status.get('position_seconds', 0),
            'duration_seconds': player_status.get('duration_seconds', 0),
            'uptime_seconds': player_status.get('uptime_seconds', 0),
            'last_error': player_status.get('last_error'),
            'alerts': []
        }

        # Generate alerts
        if not monitoring_data['service_running']:
            monitoring_data['alerts'].append({
                'level': 'error',
                'message': 'Player service is not running'
            })

        if monitoring_data['last_error']:
            monitoring_data['alerts'].append({
                'level': 'warning',
                'message': f'Player error: {monitoring_data["last_error"]}'
            })

        return monitoring_data

    async def execute_player_control(self, action: str, **kwargs) -> Dict[str, Any]:
        """Execute player control actions."""

        try:
            if action == 'restart':
                return await self.client.restart_player()
            else:
                return {
                    'success': False,
                    'error': f'Unsupported player action: {action}'
                }

        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }

Error Handling & Resilience

Robust Error Handling

import asyncio
from typing import Callable, Any
from functools import wraps

def retry_on_failure(max_attempts: int = 3, delay: float = 1.0):
    """Decorator for retrying failed operations."""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        await asyncio.sleep(delay * (2 ** attempt))  # Exponential backoff
                    else:
                        break

            raise last_exception

        return wrapper
    return decorator

class ResilientDeviceClient:
    """Device client with built-in resilience features."""

    def __init__(self, base_url: str, timeout: int = 30):
        self.client = DeviceAPIClient(base_url, timeout)
        self.circuit_breaker = CircuitBreaker()

    @retry_on_failure(max_attempts=3, delay=1.0)
    async def get_health_with_fallback(self) -> Dict[str, Any]:
        """Get health status with circuit breaker protection."""

        if self.circuit_breaker.is_open():
            return {
                'status': 'unknown',
                'error': 'Circuit breaker is open',
                'fallback_mode': True
            }

        try:
            health = await self.client.get_health()
            self.circuit_breaker.record_success()
            return health

        except Exception as e:
            self.circuit_breaker.record_failure()
            return {
                'status': 'error',
                'error': str(e),
                'fallback_mode': False
            }

class CircuitBreaker:
    """Simple circuit breaker implementation."""

    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN

    def is_open(self) -> bool:
        if self.state == 'OPEN':
            if (time.time() - self.last_failure_time) > self.recovery_timeout:
                self.state = 'HALF_OPEN'
                return False
            return True
        return False

    def record_success(self):
        self.failure_count = 0
        self.state = 'CLOSED'

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'

Health Monitoring Integration

class DeviceHealthMonitor:
    """Continuous health monitoring for device fleet."""

    def __init__(self, device_manager: DeviceManager):
        self.device_manager = device_manager
        self.health_history: Dict[str, List[Dict[str, Any]]] = {}
        self.alert_callbacks: List[Callable] = []

    async def start_monitoring(self, interval: int = 60):
        """Start continuous health monitoring."""

        while True:
            try:
                health_data = await self.device_manager.get_all_device_health()

                for device_id, health in health_data.items():
                    await self._process_device_health(device_id, health)

                await asyncio.sleep(interval)

            except Exception as e:
                print(f"Health monitoring error: {e}")
                await asyncio.sleep(interval)

    async def _process_device_health(self, device_id: str, health: Dict[str, Any]):
        """Process individual device health data."""

        # Store health history
        if device_id not in self.health_history:
            self.health_history[device_id] = []

        self.health_history[device_id].append({
            'timestamp': datetime.now().isoformat(),
            'health': health
        })

        # Keep only last 100 entries
        if len(self.health_history[device_id]) > 100:
            self.health_history[device_id] = self.health_history[device_id][-100:]

        # Check for alerts
        await self._check_alerts(device_id, health)

    async def _check_alerts(self, device_id: str, health: Dict[str, Any]):
        """Check for health alerts and trigger callbacks."""

        alerts = []

        # Device offline
        if not health.get('api_responsive', False):
            alerts.append({
                'device_id': device_id,
                'level': 'critical',
                'message': 'Device is not responding',
                'timestamp': datetime.now().isoformat()
            })

        # High resource usage
        metrics = health.get('metrics', {})
        if metrics.get('cpu_usage', 0) > 90:
            alerts.append({
                'device_id': device_id,
                'level': 'warning',
                'message': f'High CPU usage: {metrics["cpu_usage"]}%',
                'timestamp': datetime.now().isoformat()
            })

        if metrics.get('memory_usage', 0) > 90:
            alerts.append({
                'device_id': device_id,
                'level': 'warning',
                'message': f'High memory usage: {metrics["memory_usage"]}%',
                'timestamp': datetime.now().isoformat()
            })

        # Trigger alert callbacks
        for alert in alerts:
            for callback in self.alert_callbacks:
                try:
                    await callback(alert)
                except Exception as e:
                    print(f"Alert callback error: {e}")

Security & Authentication

Tailscale Security Integration

class SecureDeviceClient:
    """Secure device client with Tailscale authentication."""

    def __init__(self, base_url: str, tailscale_subnet: str = "100.x.x.x/8"):
        self.base_url = base_url
        self.tailscale_subnet = tailscale_subnet
        self.client = httpx.AsyncClient(
            headers={'User-Agent': 'oaDashboard/1.0'},
            timeout=30
        )

    async def verify_tailscale_connection(self) -> bool:
        """Verify connection is through Tailscale."""

        try:
            # Check if source IP is in Tailscale subnet
            response = await self.client.get(f"{self.base_url}/platform")
            platform_info = response.json()

            # The API should reject non-Tailscale connections
            return response.status_code == 200

        except Exception:
            return False

    async def secure_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
        """Make secure request with additional validation."""

        # Add security headers
        headers = kwargs.get('headers', {})
        headers.update({
            'X-Request-ID': str(uuid.uuid4()),
            'X-Client-Version': '1.0.0'
        })
        kwargs['headers'] = headers

        # Make request
        response = await self.client.request(method, f"{self.base_url}{endpoint}", **kwargs)

        # Validate response
        if response.status_code == 403:
            raise Exception("Access denied - not in Tailscale subnet")

        response.raise_for_status()
        return response.json()

Performance Optimization

Connection Pooling and Caching

import aioredis
from typing import Optional

class OptimizedDeviceManager:
    """Optimized device manager with connection pooling and caching."""

    def __init__(self, redis_url: Optional[str] = None):
        self.connection_pool = httpx.AsyncClient(
            limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)
        )
        self.redis_client: Optional[aioredis.Redis] = None
        self.cache_ttl = 60  # seconds

    async def initialize(self):
        """Initialize connection pool and cache."""
        if os.getenv('REDIS_URL'):
            self.redis_client = aioredis.from_url(os.getenv('REDIS_URL'))

    async def get_cached_health(self, device_url: str) -> Optional[Dict[str, Any]]:
        """Get cached health data if available."""

        if not self.redis_client:
            return None

        cache_key = f"health:{device_url}"
        cached_data = await self.redis_client.get(cache_key)

        if cached_data:
            return json.loads(cached_data)

        return None

    async def cache_health_data(self, device_url: str, health_data: Dict[str, Any]):
        """Cache health data for future requests."""

        if not self.redis_client:
            return

        cache_key = f"health:{device_url}"
        await self.redis_client.setex(
            cache_key,
            self.cache_ttl,
            json.dumps(health_data)
        )

    async def close(self):
        """Close connection pool and Redis client."""
        await self.connection_pool.aclose()
        if self.redis_client:
            await self.redis_client.close()

Testing & Validation

Integration Testing Framework

import pytest
from unittest.mock import AsyncMock, patch

class TestDeviceAPIIntegration:
    """Integration tests for Device API client."""

    @pytest.fixture
    async def mock_client(self):
        """Create mock device client for testing."""
        with patch('httpx.AsyncClient') as mock_client_class:
            mock_client = AsyncMock()
            mock_client_class.return_value = mock_client

            client = DeviceAPIClient("http://test-device:9090")
            client.client = mock_client

            yield client, mock_client

    @pytest.mark.asyncio
    async def test_platform_discovery(self, mock_client):
        """Test platform discovery functionality."""

        client, http_mock = mock_client

        # Mock platform response
        http_mock.request.return_value.json.return_value = {
            'platform': 'macos',
            'features': {
                'camera': True,
                'tracker': True,
                'camguard': True
            }
        }

        # Test platform discovery
        capabilities = await discover_device_capabilities(client)

        assert capabilities['platform'] == 'macos'
        assert capabilities['features']['camera_support'] is True
        assert capabilities['features']['tracker_integration'] is True
        assert len(capabilities['endpoints']) > 0

    @pytest.mark.asyncio
    async def test_health_monitoring(self, mock_client):
        """Test health monitoring functionality."""

        client, http_mock = mock_client

        # Mock health responses
        http_mock.request.side_effect = [
            {'platform': 'macos', 'features': {'camera': True}},
            {'status': 'healthy', 'cpu': {'usage_percent': 45.2}},
            {
                'cameras': {'count': 2},
                'tracker': {'status': 'running'},
                'camguard': {'status': 'running'}
            }
        ]

        # Test health monitoring
        health = await comprehensive_health_check(client)

        assert health['api_responsive'] is True
        assert health['device_healthy'] is True
        assert 'platform_specific' in health
        assert 'metrics' in health

# Manual testing script
async def manual_integration_test():
    """Manual integration test for development."""

    # Test with actual device
    device_url = "http://192.168.100.10:9090"  # Replace with actual device IP

    async with DeviceAPIClient(device_url) as client:
        try:
            # Test basic connectivity
            api_info = await client.get_api_info()
            print(f"Connected to: {api_info['name']} v{api_info['version']}")

            # Test platform detection
            platform_info = await client.get_platform_info()
            print(f"Platform: {platform_info['platform']}")
            print(f"Features: {platform_info['features']}")

            # Test health monitoring
            health = await client.get_health()
            print(f"Health Status: {health['status']}")

            # Platform-specific tests
            if platform_info['platform'] == 'macos':
                cameras = await client.list_cameras()
                print(f"Available cameras: {len(cameras)}")

                tracker_status = await client.get_tracker_status()
                print(f"Tracker status: {tracker_status['status']}")

            elif platform_info['platform'] == 'orangepi':
                player_status = await client.get_player_status()
                print(f"Player status: {player_status['status']}")

            print("✅ All integration tests passed")

        except Exception as e:
            print(f"❌ Integration test failed: {e}")

if __name__ == "__main__":
    asyncio.run(manual_integration_test())

Integration Guide Version: 1.0 Last Updated: October 2025 Handover Status: ✅ Complete Target Audience: oaDashboard developers and system integrators Testing Status: ✅ Comprehensive test suite included