Adding Checkpointer and Store Implementation of Amazon Bedrock AgentCore Memory #1935
Adding Checkpointer and Store Implementation of Amazon Bedrock AgentCore Memory #1935Hasnain Virk (hasnainvirk) wants to merge 4 commits intolangchain-ai:mainfrom
Conversation
🦋 Changeset detectedLatest commit: 963bb02 The changes in this PR will be included in the next version bump. This PR includes changesets to release 2 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
- Add AgentCore Memory validation test initializer - Export deleteThreadTests for comprehensive testing - Fix type export for CheckpointSaverTestInitializer - Increase test timeouts for AWS API throttling (2 minutes) - Add CLI build support for validation runner These changes enable validation testing of AWS-based checkpoint implementations that may experience API throttling and require longer test execution times.
Implements both BaseCheckpointSaver and BaseStore for AWS Bedrock AgentCore Memory: - Full checkpoint persistence with thread isolation via sessionId - Supports all standard operations: getTuple, list, put, putWrites, deleteThread - Handles checkpoint_ns filtering and metadata search - Automatic retry logic with exponential backoff for AWS API throttling - Rate limiting (16.7 req/sec) to stay within AgentCore Memory limits - Unique actor ID generation for test isolation - Proper Unicode handling with TextEncoder/TextDecoder - Hierarchical namespace organization for data isolation - Metadata filtering and pagination for search operations - Complex JSON value support with proper serialization - Vector similarity search capabilities via RetrieveMemoryRecordsCommand - Consistent rate limiting and error handling - thread_id maps directly to sessionId in AgentCore Memory - actor_id provides user-level isolation across sessions - checkpoint_ns stored in event payload for namespace filtering - Store namespaces map to sessionId/actorId for hierarchical organization - Proper TypeScript interfaces for all AWS SDK interactions - No explicit 'any' types - uses proper type definitions - Comprehensive error handling with typed AWS error interfaces Includes comprehensive integration tests and validation test support.
Add AWS SDK dependencies and development tools required for the new checkpoint-aws-agentcore-memory package.
73814e0 to
d224eb8
Compare
| } | ||
|
|
||
| private async handlePut(op: PutOperation): Promise<void> { | ||
| if (op.value === null) { |
There was a problem hiding this comment.
The DeleteEvent API is already being used in saver.ts → deleteThread() via DeleteEventCommand. Since store items are stored as events, the same mechanism applies.
Right now store.delete() silently does nothing, which breaks the BaseStore contract. Any LangGraph workflow that calls store.delete(namespace, key) will think it succeeded but the item is still there.
you can ListEvents filtered by the key's metadata, find the matching event(s), then DeleteEvent to remove them.
| } | ||
| } | ||
|
|
||
| private async handleListNamespaces( |
There was a problem hiding this comment.
This returns a hardcoded empty array, which means store.listNamespaces() is non-functional. This is a required part of the BaseStore interface and other providers (Postgres, Redis) implement it fully with support for prefix, suffix, maxDepth, limit, offset.
| actorId: resolvedActorId, | ||
| sessionId, | ||
| includePayloads: true, | ||
| maxResults: 100, |
There was a problem hiding this comment.
For an active thread, it's easy to exceed 100 events — each putWrites call stores one event per channel, so 10 checkpoints with 10-channel writes = 100 events right at the boundary.
| * This store uses AgentCore Memory for persistent key-value storage with | ||
| * optional vector similarity search capabilities. | ||
| */ | ||
| export class AgentCoreMemoryStore extends BaseStore { |
There was a problem hiding this comment.
Other LangGraphJS store providers (PostgresStore, RedisStore) implement setup() / start() / stop() lifecycle methods. setup() typically validates the backing resource exists and is ready.
This store has no initialization step, so if someone passes an invalid memoryId or the Memory resource is still in CREATING status, they won't find out until the first get/put/search call fails with a cryptic error.
GetMemory API returns the resource's status field (CREATING | ACTIVE | FAILED | DELETING). A setup() method could call this to validate the memory is ACTIVE before any operations.
There was a problem hiding this comment.
can we add input validation for getSessionId() / getActorId()..
Memory API enforces strict patterns on these fields:
sessionId: pattern [a-zA-Z0-9][a-zA-Z0-9-]*, max 100 chars (CreateEvent docs)
actorId: pattern [a-zA-Z0-9][a-zA-Z0-9-/](?::[a-zA-Z0-9-_/]+)[a-zA-Z0-9-_/]*, max 255 chars
| } | ||
| } | ||
|
|
||
| async deleteThread(threadId: string, actorId?: string): Promise<void> { |
There was a problem hiding this comment.
Every other method in this class calls this.rateLimit() before API calls, but deleteThread doesn't
| } | ||
| } | ||
|
|
||
| private async performMetadataSearch( |
There was a problem hiding this comment.
Same pagination issue as getTuple — this calls ListEventsCommand with maxResults but doesn't handle nextToken. If a namespace has more than op.limit || 100 store events, the search silently returns incomplete results. ListEvents API supports pagination via nextToken and the docs recommend using it.
There was a problem hiding this comment.
checkpoint_ns is being stuffed into the event payload blob and filtered client-side, but our Memory has native branch support for this purpose. CreateEvent accepts a branch: { name, rootEventId } field and ListEvents supports filter.branch.name for server-side filtering. Using branches would avoid downloading ALL events across all namespaces just to filter to one.
There was a problem hiding this comment.
getTuple() / querySpecificSession() - API supports eventMetadata filters with EQUALS_TO operator — you could filter by type=checkpoint when you only need checkpoints
| } | ||
| } | ||
|
|
||
| private async performVectorSearch( |
There was a problem hiding this comment.
store.put() writes items as Events (via CreateEventCommand), but store.search() with a query string reads from Memory Records (via RetrieveMemoryRecordsCommand).
|
StartMemoryExtractionJob processes raw events and produces structured, searchable Memory Records. The implementation doesn't expose this capability at all. Even if it's out of scope for v1, it's worth noting in the README that extraction jobs need to be triggered separately for vector search to return results |
Implements both BaseCheckpointSaver and BaseStore for AWS Bedrock AgentCore Memory: