Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/vertice-core/src/vertice_core/adk/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from vertice_core.providers.vertex_ai import VertexAIProvider
from vertice_core.memory.cortex.cortex import MemoryCortex
from vertice_core.messaging.events import SystemEvent, get_event_bus
from vertice_core.adk.registry import ApiRegistry


class VerticeAgent(abc.ABC):
Expand All @@ -32,6 +33,7 @@ def __init__(
# Injected providers
self._provider = VertexAIProvider(project=project, location=location, model_name=model)
self._cortex = MemoryCortex()
self._api_registry = ApiRegistry(project=project, location=location)

def emit_event(self, event_type: str, payload: Mapping[str, Any]) -> None:
"""
Expand Down
69 changes: 69 additions & 0 deletions packages/vertice-core/src/vertice_core/adk/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
ApiRegistry: Dynamic API Discovery for Vertice SDK.

Handles dynamic discovery of Google Cloud and Vertice services.
Designed to fail gracefully in non-GCP environments.
"""

from __future__ import annotations

import os
import logging
from typing import Optional, Dict, Any

logger = logging.getLogger(__name__)


class ApiRegistry:
"""
Registry for dynamic API service discovery.
"""

def __init__(self, project: Optional[str] = None, location: str = "global") -> None:
self.project = project
self.location = location
self._services: Dict[str, Any] = {}

# Initialize discovery during instantiation
self._init_discovery()

def _init_discovery(self) -> None:
"""
Initialize service discovery mechanisms.

This method is designed to be resilient:
1. Checks for GCP environment markers.
2. Attempts to load credentials (if applicable).
3. Swallows errors to prevent runtime crashes in offline/CI environments.
"""
try:
# Simple check for GCP context
has_creds = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
has_project = os.getenv("GOOGLE_CLOUD_PROJECT") or self.project

if not has_creds and not has_project:
logger.info(
"ApiRegistry: No GCP credentials/project found. "
"Running in disconnected/local mode."
)
return

# Placeholder for actual discovery logic (e.g., using google-api-python-client)
# In a real implementation, we would build clients here.
# for service in ["aiplatform", "storage"]:
# self._services[service] = discovery.build(service, ...)

logger.debug(f"ApiRegistry: Initialized for project={self.project}")

except Exception as e:
# Critical: Do not crash the agent if discovery fails
logger.warning(f"ApiRegistry: Discovery initialization failed (non-fatal): {e}")

def get_service(self, name: str) -> Optional[Any]:
"""Retrieve a discovered service client."""
return self._services.get(name)

def is_connected(self) -> bool:
"""Check if registry has successfully connected to cloud services."""
# Simplified check for now
return bool(self.project or os.getenv("GOOGLE_CLOUD_PROJECT"))
46 changes: 0 additions & 46 deletions reproduce_issue.py

This file was deleted.

54 changes: 54 additions & 0 deletions tests/unit/test_api_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Unit tests for ApiRegistry and its integration with VerticeAgent.
"""

import pytest
from unittest.mock import patch, MagicMock
import os
from vertice_core.adk.registry import ApiRegistry
from vertice_core.adk.base import VerticeAgent


class TestApiRegistry:
def test_init_with_gcp_env(self):
"""Test initialization when GCP environment is detected."""
with patch.dict(
os.environ,
{"GOOGLE_APPLICATION_CREDENTIALS": "fake.json", "GOOGLE_CLOUD_PROJECT": "test-proj"},
):
registry = ApiRegistry()
assert registry.is_connected()

def test_init_without_gcp_env(self):
"""Test initialization when NO GCP environment is detected (should not crash)."""
# Clear specific env vars to simulate non-GCP
with patch.dict(os.environ, {}, clear=True):
registry = ApiRegistry()
# It should not crash, and be in disconnected mode.
assert not registry.is_connected()

def test_vertice_agent_integration(self):
"""Test that VerticeAgent instantiates ApiRegistry correctly."""

# We need to mock VertexAIProvider and MemoryCortex to avoid external calls/DB requirements
with (
patch("vertice_core.adk.base.VertexAIProvider"),
patch("vertice_core.adk.base.MemoryCortex"),
):

# Subclass abstract VerticeAgent
class MockAgent(VerticeAgent):
@property
def system_prompt(self):
return "sys"

async def query(self, *, input, **kwargs):
return {}

# Test with explicit project/location
agent = MockAgent(project="test-project", location="us-central1")

assert hasattr(agent, "_api_registry")
assert isinstance(agent._api_registry, ApiRegistry)
assert agent._api_registry.project == "test-project"
assert agent._api_registry.location == "us-central1"
Loading