Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/amp/admin/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,32 @@ def delete(self, namespace: str, name: str) -> None:
"""
path = f'/datasets/{namespace}/{name}'
self._admin._request('DELETE', path)

def get_sync_progress(self, namespace: str, name: str, revision: str = 'latest') -> models.SyncProgressResponse:
"""Get sync progress for a dataset version.

Returns per-table sync progress including current block numbers,
job status, and file statistics. This is useful for monitoring
the progress of data extraction jobs.

Args:
namespace: Dataset namespace
name: Dataset name
revision: Version tag or semantic version (default: 'latest')

Returns:
SyncProgressResponse with sync progress for all tables

Raises:
DatasetNotFoundError: If dataset/version not found
GetSyncProgressError: If retrieval fails

Example:
>>> client = AdminClient('http://localhost:8080')
>>> progress = client.datasets.get_sync_progress('_', 'eth_firehose', 'latest')
>>> for table in progress.tables:
... print(f'{table.table_name}: block {table.current_block}, status: {table.job_status}')
"""
path = f'/datasets/{namespace}/{name}/versions/{revision}/sync-progress'
response = self._admin._request('GET', path)
return models.SyncProgressResponse.model_validate(response.json())
62 changes: 62 additions & 0 deletions src/amp/admin/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,68 @@ class DeployRequest(BaseModel):
"""


class TableSyncProgress(BaseModel):
"""
Sync progress information for a single table
"""

table_name: str
"""
Name of the table within the dataset
"""
current_block: Optional[int] = None
"""
Highest block number that has been synced (null if no data yet)
"""
start_block: Optional[int] = None
"""
Lowest block number that has been synced (null if no data yet)
"""
job_id: Optional[int] = None
"""
ID of the writer job (null if no active job)
"""
job_status: Optional[str] = None
"""
Status of the writer job (null if no active job)
"""
files_count: int
"""
Number of Parquet files written for this table
"""
total_size_bytes: int
"""
Total size of all Parquet files in bytes
"""


class SyncProgressResponse(BaseModel):
"""
API response containing sync progress information for a dataset
"""

dataset_namespace: str
"""
Dataset namespace
"""
dataset_name: str
"""
Dataset name
"""
revision: str
"""
Requested revision
"""
manifest_hash: str
"""
Resolved manifest hash
"""
tables: list[TableSyncProgress]
"""
Sync progress for each table in the dataset
"""


class WorkerDetailResponse(BaseModel):
"""
Detailed worker information returned by the API
Expand Down
Loading