-
Notifications
You must be signed in to change notification settings - Fork 0
refactor: replace custom DynamoDB checkpointer with langgraph-checkpoint-aws #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6993256
2444f9b
0b2b5d2
4767158
b97fdb8
87fdd92
7c34be5
68bb4e0
d096f2f
4c9957e
8ebcd51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,133 @@ | ||||||||||||||||||||||||
| #!/usr/bin/env uv run python3 | ||||||||||||||||||||||||
| """Script to empty DynamoDB table and S3 bucket using aioboto3.""" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| from __future__ import annotations | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||
| import sys | ||||||||||||||||||||||||
| from pathlib import Path | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
pokryfka marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||
| import aioboto3 | ||||||||||||||||||||||||
| except ImportError: | ||||||||||||||||||||||||
| print("Error: aioboto3 not found. Please install it with 'pip install aioboto3'.") | ||||||||||||||||||||||||
| sys.exit(1) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def load_env(): | ||||||||||||||||||||||||
| """Load .env file from the project root.""" | ||||||||||||||||||||||||
| env_path = Path(".env") | ||||||||||||||||||||||||
| if env_path.exists(): | ||||||||||||||||||||||||
| print(f"Loading environment from {env_path}") | ||||||||||||||||||||||||
| with open(env_path) as f: | ||||||||||||||||||||||||
|
Comment on lines
+18
to
+22
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If this script is launched outside repository root, Proposed fix def load_env():
"""Load .env file from the project root."""
- env_path = Path(".env")
+ env_path = Path(__file__).resolve().parents[1] / ".env"📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||
| for line in f: | ||||||||||||||||||||||||
| line = line.strip() | ||||||||||||||||||||||||
| if line and not line.startswith("#") and "=" in line: | ||||||||||||||||||||||||
| key, value = line.split("=", 1) | ||||||||||||||||||||||||
| key = key.strip() | ||||||||||||||||||||||||
| value = value.strip() | ||||||||||||||||||||||||
| if len(value) >= 2 and ( | ||||||||||||||||||||||||
| (value.startswith('"') and value.endswith('"')) or | ||||||||||||||||||||||||
| (value.startswith("'") and value.endswith("'")) | ||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||
| value = value[1:-1] | ||||||||||||||||||||||||
| os.environ.setdefault(key, value) | ||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||
| print("No .env file found in current directory.") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| async def empty_dynamodb(table_name, region, endpoint_url): | ||||||||||||||||||||||||
| """Scan and delete all items from a DynamoDB table.""" | ||||||||||||||||||||||||
| session = aioboto3.Session() | ||||||||||||||||||||||||
| async with session.resource("dynamodb", region_name=region, endpoint_url=endpoint_url) as dynamodb: | ||||||||||||||||||||||||
| table = await dynamodb.Table(table_name) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||
| # Getting key schema directly from describe_table to avoid property coroutine issues | ||||||||||||||||||||||||
| desc = await table.meta.client.describe_table(TableName=table_name) | ||||||||||||||||||||||||
| key_names = [k['AttributeName'] for k in desc['Table']['KeySchema']] | ||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||
| print(f"Error accessing table {table_name}: {e}") | ||||||||||||||||||||||||
| return | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| print(f"Scanning table {table_name}...") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| count = 0 | ||||||||||||||||||||||||
| scan_params = {} | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| while True: | ||||||||||||||||||||||||
| response = await table.scan(**scan_params) | ||||||||||||||||||||||||
| items = response.get('Items', []) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| if items: | ||||||||||||||||||||||||
| async with table.batch_writer() as batch: | ||||||||||||||||||||||||
| for item in items: | ||||||||||||||||||||||||
| await batch.delete_item(Key={k: item[k] for k in key_names}) | ||||||||||||||||||||||||
| count += 1 | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| if 'LastEvaluatedKey' not in response: | ||||||||||||||||||||||||
| break | ||||||||||||||||||||||||
| scan_params['ExclusiveStartKey'] = response['LastEvaluatedKey'] | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| print(f"Deleted {count} items from DynamoDB table {table_name}.") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| async def empty_s3(bucket_name, region, endpoint_url): | ||||||||||||||||||||||||
| """Delete all objects and versions from an S3 bucket.""" | ||||||||||||||||||||||||
| session = aioboto3.Session() | ||||||||||||||||||||||||
| async with session.resource("s3", region_name=region, endpoint_url=endpoint_url) as s3: | ||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||
| bucket = await s3.Bucket(bucket_name) | ||||||||||||||||||||||||
| print(f"Deleting all objects and versions from bucket {bucket_name}...") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| deleted_count = 0 | ||||||||||||||||||||||||
| delete_batch = [] | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Delete all versions (including current ones and delete markers) | ||||||||||||||||||||||||
| async for version in bucket.object_versions.all(): | ||||||||||||||||||||||||
| # In aioboto3 resource collections, meta.data is usually pre-populated. | ||||||||||||||||||||||||
| # Accessing raw attributes from meta.data avoids coroutine property issues. | ||||||||||||||||||||||||
| key = version.meta.data.get('Key') | ||||||||||||||||||||||||
| version_id = version.meta.data.get('VersionId') | ||||||||||||||||||||||||
| delete_batch.append({'Key': key, 'VersionId': version_id}) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| if len(delete_batch) >= 1000: | ||||||||||||||||||||||||
| await bucket.delete_objects(Delete={'Objects': delete_batch}) | ||||||||||||||||||||||||
| deleted_count += len(delete_batch) | ||||||||||||||||||||||||
| delete_batch = [] | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Final batch | ||||||||||||||||||||||||
| if delete_batch: | ||||||||||||||||||||||||
| await bucket.delete_objects(Delete={'Objects': delete_batch}) | ||||||||||||||||||||||||
| deleted_count += len(delete_batch) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| print(f"Deleted {deleted_count} object(s)/version(s) from S3 bucket {bucket_name}.") | ||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||
| print(f"Error emptying bucket {bucket_name}: {e}") | ||||||||||||||||||||||||
| import traceback | ||||||||||||||||||||||||
| traceback.print_exc() | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| async def main(): | ||||||||||||||||||||||||
| load_env() | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| table_name = os.environ.get("DYNAMODB_TABLE_NAME") | ||||||||||||||||||||||||
| bucket_name = os.environ.get("S3_CHECKPOINT_BUCKET") | ||||||||||||||||||||||||
| region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION") or "us-east-1" | ||||||||||||||||||||||||
| endpoint_url = os.environ.get("AWS_ENDPOINT_URL") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| if not table_name and not bucket_name: | ||||||||||||||||||||||||
| print("Error: DYNAMODB_TABLE_NAME or S3_CHECKPOINT_BUCKET not found in .env or environment.") | ||||||||||||||||||||||||
| sys.exit(1) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| tasks = [] | ||||||||||||||||||||||||
| if table_name: | ||||||||||||||||||||||||
| print(f"--- DynamoDB ({region}) ---") | ||||||||||||||||||||||||
| tasks.append(empty_dynamodb(table_name, region, endpoint_url)) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| if bucket_name: | ||||||||||||||||||||||||
| print(f"\n--- S3 ({region}) ---") | ||||||||||||||||||||||||
| tasks.append(empty_s3(bucket_name, region, endpoint_url)) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| if tasks: | ||||||||||||||||||||||||
| await asyncio.gather(*tasks) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| if __name__ == "__main__": | ||||||||||||||||||||||||
| asyncio.run(main()) | ||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: pokryfka/coding-agent
Length of output: 1069
Use the Python module path in this class reference.
Line 24 currently shows
langgraph-checkpoint-aws.DynamoDBSaver, but the Python import path islanggraph_checkpoint_aws.DynamoDBSaver(with underscores, not hyphens).Proposed fix
📝 Committable suggestion
🤖 Prompt for AI Agents