Production-Ready Implementation of Google's Context Engineering: Sessions & Memory Framework Real-world multi-agent collaboration with shared memory architecture and Ollama phi4-mini LLM integration
The A2A Memory System is an advanced Agent-to-Agent communication system that implements Google's groundbreaking "Context Engineering: Sessions & Memory" whitepaper principles in production. This system enables intelligent agents to collaborate seamlessly through shared memory architecture, real-time session management, and context-aware processing using Ollama's phi4-mini model with 128k token context window.
Unlike traditional chatbot systems, A2A Memory System creates a persistent, shared cognitive space where multiple AI agents can:
- Share memories across sessions - Knowledge persists beyond individual conversations
- Collaborate on complex tasks - Agents coordinate and distribute work based on specialization
- Maintain contextual awareness - 128k token context enables deep understanding
- Evolve knowledge over time - Memory system learns and adapts from interactions
Based on Google's Context Engineering framework, our implementation follows these architectural patterns:
graph TB
subgraph "Frontend Layer"
UI[Next.js Interface]
RL[Real-time Logs]
end
subgraph "API Gateway"
FL[Flask REST API]
CORS[CORS Middleware]
EP[15 API Endpoints]
end
subgraph "A2A Memory Core"
MSS[Memory Storage Service]
SMS[Session Management Service]
A2AB[A2A Bridge Protocol]
end
subgraph "LLM Integration"
OL[Ollama Service]
PHI[phi4-mini Model]
CTX[128k Context Window]
end
subgraph "Agent Network"
AG1[Alex - Coordinator]
AG2[Sam - Order Mgmt]
AG3[Jordan - Tech Support]
end
UI --> FL
FL --> MSS
FL --> SMS
FL --> A2AB
FL --> OL
OL --> PHI
A2AB --> AG1
A2AB --> AG2
A2AB --> AG3
AG1 -.->|shared memory| AG2
AG2 -.->|shared memory| AG3
AG3 -.->|shared memory| AG1
Our memory system implements five distinct memory types based on cognitive science research:
- Purpose: Facts, policies, static knowledge
- Example: "Premium customers eligible for 48h expedited refunds"
- Scope: Company-wide policies, customer data, product information
- Persistence: Long-term, high confidence (0.9+)
- Purpose: Workflows, step-by-step processes
- Example: "1) Verify order โ 2) Check eligibility โ 3) Process refund โ 4) Send confirmation"
- Scope: Operational procedures, troubleshooting steps
- Persistence: Medium-term, evolves with process improvements
- Purpose: Specific interaction history, customer journey events
- Example: "Customer CUST001 had shipping delay 3 months ago, resolved with 4/5 satisfaction"
- Scope: Individual customer interactions, case histories
- Persistence: Long-term for important customers, archived for others
- Purpose: Structured information about specific entities
- Example: "Customer CUST001: Premium tier, email preference, active account"
- Scope: Customer profiles, product details, agent capabilities
- Persistence: Long-term, updated dynamically
- Purpose: Agent capabilities, coordination protocols
- Example: "For premium escalations: coordinate order + technical teams"
- Scope: Inter-agent collaboration patterns, escalation workflows
- Persistence: Long-term, optimized through use
Following Google's Context Engineering guidelines, we implement sophisticated session management:
class SessionManager:
def __init__(self, max_tokens=128000, compaction_strategy=None):
self.max_tokens = max_tokens # phi4-mini's 128k context
self.compaction_strategy = compaction_strategy or TokenBasedCompaction()
def manage_context(self, session_id: str) -> ContextWindow:
# Implement sliding window + summarization for long sessions
# Preserve high-importance memories during compaction
# Maintain conversational coherence across agents- Token-Based Compaction: Removes oldest content when approaching 128k limit
- Sliding Window: Maintains recent N conversation turns with full context
- Semantic Summarization: Compresses old content while preserving key information
- Priority-Based Retention: Keeps high-confidence memories during compaction
class MemoryManager:
def add_memory(self, memory: Memory) -> str:
# Vector embedding for semantic search
embedding = self.embedding_service.encode(memory.content)
memory.embedding = embedding
# Store with confidence weighting
self.vector_store.add(memory)
# Update memory relationships
self.build_memory_graph(memory)
def search_memories(self, query: str, top_k=5) -> List[Memory]:
# Semantic similarity search using embeddings
query_embedding = self.embedding_service.encode(query)
similar_memories = self.vector_store.similarity_search(
query_embedding,
top_k=top_k,
confidence_threshold=0.7
)
return similar_memoriesThe A2A protocol enables seamless communication between specialized agents:
sequenceDiagram
participant User
participant Alex as Alex (Coordinator)
participant Sam as Sam (Order Mgmt)
participant Jordan as Jordan (Tech Support)
participant Memory as Shared Memory
User->>Alex: "Customer order refund request"
Alex->>Memory: Query customer history
Memory-->>Alex: Customer profile + previous interactions
Alex->>Alex: Analyze task complexity
Alex->>Sam: A2A Task: "Check order status for CUST001"
Alex->>Jordan: A2A Task: "Verify system status"
Sam->>Memory: Access order database memories
Jordan->>Memory: Access technical knowledge
Sam-->>Alex: "Order delayed, refund eligible"
Jordan-->>Alex: "Systems operational"
Alex->>Memory: Store coordination results
Alex-->>User: Coordinated response with full context
@dataclass
class A2AMessage:
id: str
source_agent: str
target_agent: str
task_type: TaskType
content: str
context: Dict[str, Any]
priority: Priority
memory_refs: List[str] # Referenced shared memories
session_id: str
timestamp: datetimeAlex (Coordinator) uses intelligent task distribution:
def distribute_task(self, task: str) -> List[A2AMessage]:
# Analyze task complexity and requirements
task_analysis = self.llm_service.analyze_task(task)
# Determine required agent specializations
required_skills = self.extract_skill_requirements(task_analysis)
# Route to appropriate agents based on capabilities
messages = []
if 'order_management' in required_skills:
messages.append(self.create_message(target='sam', task=task))
if 'technical_support' in required_skills:
messages.append(self.create_message(target='jordan', task=task))
return messagesbackend/
โโโ src/
โ โโโ models.py # Memory, Session, A2A message models
โ โโโ memory_manager.py # Memory storage and retrieval
โ โโโ session_manager.py # Session lifecycle management
โ โโโ a2a_integration.py # Agent-to-agent communication
โ โโโ llm_service.py # Ollama phi4-mini integration
โ โโโ embedding_service.py # Vector embeddings for semantic search
โโโ api.py # Flask REST API (15 endpoints)
โโโ requirements.txt # Python dependencies
# Agent Communication
POST /api/agents/{agent_id}/chat # LLM-powered agent chat
POST /api/agents/collaborate # Multi-agent collaboration
GET /api/agents/{agent_id}/memories # Agent memory access
# Memory Management
POST /api/memory/ingest # Manual memory ingestion
POST /api/memory/demo/prepare # Demo scenario setup
POST /api/agents/{agent_id}/memories/search # Semantic memory search
# Session Management
GET /api/sessions # Active session monitoring
GET /api/logs # Real-time agent communication
# A2A Protocol
POST /api/memory/sync # Cross-agent memory sync
POST /api/memory/broadcast # Memory broadcastingclass LLMService:
def __init__(self, model_name="phi4-mini", max_context_tokens=128000):
self.model_name = model_name
self.max_context = max_context_tokens
self.ollama_client = OllamaClient()
def generate_response(self, prompt: str, context_memories: List[Memory]) -> str:
# Construct context-aware prompt with relevant memories
enhanced_prompt = self.build_memory_enhanced_prompt(prompt, context_memories)
# Generate response using phi4-mini's 128k context
response = self.ollama_client.generate(
model=self.model_name,
prompt=enhanced_prompt,
options={"num_ctx": self.max_context}
)
return response['response']frontend/
โโโ src/
โ โโโ app/
โ โ โโโ agents/page.tsx # Resizable agent interface
โ โ โโโ logs/page.tsx # Real-time communication logs
โ โโโ components/ui/ # shadcn/ui components
โ โโโ lib/utils.ts # Utility functions
โโโ tailwind.config.js # Styling configuration
โโโ package.json # Node.js dependencies
- Resizable Layout: Drag-to-resize panels for optimal workspace organization
- Real-time Updates: WebSocket-like polling for live agent communication
- Memory Visualization: Interactive display of shared memory states
- Multi-Agent Demo: Pre-configured scenarios for testing collaboration
With phi4-mini's 128k token context window:
- Average conversation: 2,000-8,000 tokens
- Memory capacity: 50-100 stored memories per session
- Multi-agent context: Support for 3-5 agents simultaneously
- Session duration: 4-6 hours of continuous interaction
# Memory compression and deduplication
class MemoryOptimizer:
def compress_session(self, session_id: str) -> CompactionResult:
memories = self.get_session_memories(session_id)
# Remove duplicate information
deduplicated = self.remove_duplicates(memories)
# Summarize low-importance episodic memories
compressed = self.summarize_episodes(deduplicated)
# Preserve high-confidence procedural and declarative memories
preserved = self.preserve_critical_knowledge(compressed)
return CompactionResult(
original_count=len(memories),
compressed_count=len(preserved),
compression_ratio=len(preserved) / len(memories)
)- Horizontal scaling: Multiple agent instances with shared memory backend
- Memory partitioning: Domain-specific memory namespaces
- Context caching: Pre-computed embeddings for faster retrieval
- Session sharding: Distribute sessions across multiple servers
Initial Task: "Customer is reporting that their recent order hasn't arrived and they're asking for a refund. They previously mentioned preferring email communication and are a premium member."
# 1. Prepare demo memories
curl -X POST http://localhost:5001/api/memory/demo/prepare
# Response: 5 memories ingested across 6 agents
{
"success": true,
"memories_ingested": 5,
"agents_updated": ["alex", "sam", "jordan", "concierge", "order", "support"],
"memories": [
{
"type": "entity",
"content": "Premium customer CUST001: Email preference, expedited service eligibility"
},
{
"type": "procedural",
"content": "Premium refund workflow: 1) Verify โ 2) Check eligibility โ 3) Process โ 4) Confirm"
}
]
}- Alex (Coordinator) receives the customer issue
- Memory Query: Accesses shared memories about premium customers and refund policies
- Task Distribution:
- Sends order analysis request to Sam
- Sends system verification request to Jordan
- Specialized Processing:
- Sam checks order status using procedural memory workflows
- Jordan verifies system status using technical knowledge
- Coordinated Response: Alex synthesizes specialist input into comprehensive solution
{
"session_id": "session_1768232151",
"agents_involved": ["Alex", "Sam", "Jordan"],
"responses": {
"alex_coordination": "I'll coordinate our team to handle this premium customer issue. Based on shared memory, customer prefers email communication and requires expedited service.",
"sam_analysis": "Order management analysis: Customer eligible for 48h expedited refund per premium policy. Processing refund according to established workflow.",
"jordan_technical": "Technical verification: All customer-facing systems operational. Email notification systems ready for premium customer communication.",
"final_response": "Thank you for contacting us. Our team has collaborated to provide comprehensive assistance based on your premium status and email communication preference. We're processing your expedited refund and will send confirmation within 24 hours."
},
"memories_shared": 5,
"logs_generated": 12
}# Install Ollama
curl -fsSL https://ollama.ai/install.sh | sh
# Pull phi4-mini model
ollama pull phi4-mini
# Verify 128k context capability
ollama show phi4-minicd backend
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
python api.pycd frontend
npm install
npm run dev# 1. Prepare shared memories
curl -X POST http://localhost:5001/api/memory/demo/prepare
# 2. Test agent collaboration
curl -X POST http://localhost:5001/api/agents/collaborate \
-H "Content-Type: application/json" \
-d '{"task": "Customer order refund - premium member prefers email"}'
# 3. Access web interface
open http://localhost:3000/agents# Search for relevant memories using natural language
POST /api/agents/alex/memories/search
{
"query": "premium customer email communication policy",
"top_k": 5,
"confidence_threshold": 0.75
}class MemoryEvolution:
def track_memory_changes(self, memory_id: str) -> MemoryHistory:
# Track how memories are updated, refined, or deprecated over time
# Useful for understanding system learning patterns
return self.memory_store.get_history(memory_id)class PersistentMemoryService:
def persist_critical_memories(self, session_id: str):
# Identify high-value memories for long-term storage
# Maintain knowledge across system restarts
critical_memories = self.identify_critical_knowledge(session_id)
self.long_term_storage.save(critical_memories)Our system uses advanced vector embeddings for semantic memory retrieval:
class SemanticMemoryRetrieval:
def __init__(self):
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
self.vector_store = ChromaDB()
def create_memory_embedding(self, memory: Memory) -> np.ndarray:
# Combine content and metadata for richer embeddings
enriched_text = f"{memory.content} {memory.memory_type.value} {' '.join(memory.tags)}"
return self.embedding_model.encode(enriched_text)
def find_related_memories(self, query: str, context: Dict) -> List[Memory]:
# Context-aware retrieval considering current conversation state
query_embedding = self.embedding_model.encode(query)
# Apply context filters (agent capabilities, session history)
filtered_results = self.vector_store.similarity_search(
query_embedding,
where={"scope": "shared", "confidence": {"$gte": 0.7}},
n_results=10
)
return self.rank_by_relevance(filtered_results, context)class SessionStateManager:
def __init__(self):
self.active_sessions = {}
self.session_store = RedisStore() # For production scalability
def manage_session_lifecycle(self, session_id: str):
session = self.get_or_create_session(session_id)
# Monitor token usage against 128k context limit
if session.token_count > 100000: # 78% of context used
self.trigger_compaction(session_id)
# Update session state
session.last_activity = datetime.utcnow()
session.memory_access_count += 1
# Persist changes
self.session_store.save(session_id, session)class A2ABridge:
def __init__(self):
self.message_queue = AsyncQueue()
self.agent_registry = AgentRegistry()
async def route_message(self, message: A2AMessage):
# Intelligent message routing based on agent capabilities
target_agent = self.agent_registry.get_agent(message.target_agent)
if not target_agent.is_available():
# Implement fallback routing or queuing
await self.queue_for_retry(message)
return
# Enrich message with relevant shared memories
relevant_memories = await self.find_relevant_memories(
message.content,
message.source_agent,
message.target_agent
)
enriched_message = self.enrich_with_context(message, relevant_memories)
# Route to target agent
response = await target_agent.process_message(enriched_message)
# Log interaction for system learning
await self.log_interaction(message, response)
return responseThis implementation is based on key insights from:
- Google's Context Engineering: Sessions & Memory - Memory organization patterns, session management strategies
- Cognitive Science Research - Declarative vs procedural memory classification
- Multi-Agent Systems Theory - Coordination protocols, task distribution algorithms
- Vector Database Design - Semantic search, embedding strategies
- LLM Context Management - Token optimization, context window utilization
- Production A2A Protocol: First open-source implementation of multi-agent memory sharing
- 128k Context Utilization: Optimal strategies for phi4-mini's extended context window
- Real-time Memory Evolution: Dynamic memory updating and relationship building
- Resizable Agent Interface: Novel UX for multi-agent interaction monitoring
- Graph Neural Networks: Enhanced memory relationship modeling
- Federated Learning: Multi-organization agent collaboration
- Quantum-Safe Encryption: Secure inter-agent communication
- Auto-Scaling: Dynamic agent spawning based on workload
- Memory Consolidation: Sleep-like memory optimization cycles
- Emotional Memory: Sentiment-aware memory storage and retrieval
- Causal Reasoning: Understanding cause-effect relationships in memories
- Meta-Learning: Agents learning how to learn more effectively
We welcome contributions! Key areas for improvement:
- Memory Algorithms: Novel compression and retrieval strategies
- Agent Specialization: New agent types and capabilities
- UI/UX: Enhanced visualization of agent interactions
- Performance: Optimization for large-scale deployments
MIT License - see LICENSE for details
- Google Research Team - Context Engineering: Sessions & Memory whitepaper
- Ollama Team - phi4-mini model and local LLM infrastructure
- Next.js & Vercel - Frontend framework and development tools
- Flask Community - Robust backend API framework