Python utilities for working with Avro data files with support for local files, GCS, S3, and HTTP/HTTPS URLs.
- Unified URL Interface: Read and write Avro files transparently across local filesystem, GCS, S3, and HTTP/HTTPS
- CLI Tools: Comprehensive command-line tools for common Avro operations
- Python API: Rich programmatic interface for reading, writing, and transforming Avro data
- Partitioned I/O: Support for reading and writing partitioned/multi-file datasets
- Async Support: Non-blocking I/O primitives for high-throughput applications
- Parquet Conversion: Bidirectional conversion between Avro and Parquet formats
- Schema Operations: Schema extraction, validation, and evolution checking
pip install avrokit# AWS S3 support
pip install avrokit[aws]
# Google Cloud Storage support
pip install avrokit[gcp]
# All extras (S3 + GCS)
pip install avrokit[all]git clone https://github.com/brandtg/avrokit.git
cd avrokit
poetry env use python3.12
make install# View statistics for an Avro file
avrokit stats gs://bucket/data.avro
# Convert Avro to JSON
avrokit tojson s3://bucket/data.avro > output.json
# Extract sample records
avrokit cat file:///path/to/data.avro --limit 10
# Get schema
avrokit getschema data.avro
# Convert to Parquet
avrokit toparquet data.avro output.parquet
# Concatenate multiple files
avrokit concat file1.avro file2.avro s3://bucket/file3.avro output.avrofrom avrokit import avro_reader, avro_records, avro_writer, avro_schema, parse_url
# Read records from any URL
url = parse_url('gs://bucket/data.avro', mode='rb')
for record in avro_records(url):
print(record)
# Write Avro data
schema = avro_schema({
'type': 'record',
'name': 'User',
'fields': [
{'name': 'name', 'type': 'string'},
{'name': 'age', 'type': 'int'}
]
})
url = parse_url('output.avro', mode='wb')
with avro_writer(url, schema) as writer:
writer.append({'name': 'Alice', 'age': 30})
writer.append({'name': 'Bob', 'age': 25})avrokit is organized into four main layers:
Provides a unified abstraction over different storage backends. All URL types implement the URL protocol with common operations:
URL(base protocol): Abstract interface for all URL typesFileURL: Local filesystem access with glob pattern supportGCSURL: Google Cloud Storage viagoogle-cloud-storageS3URL: Amazon S3 viaboto3HTTPURL: HTTP/HTTPS with range request support for streaming
The parse_url() factory function automatically instantiates the correct URL class based on the scheme:
from avrokit.url import parse_url
# All of these return the appropriate URL subclass
url1 = parse_url('file:///path/to/data.avro') # FileURL
url2 = parse_url('gs://bucket/data.avro') # GCSURL
url3 = parse_url('s3://bucket/data.avro') # S3URL
url4 = parse_url('https://example.com/data.avro') # HTTPURLHigh-level Avro reading and writing primitives built on the URL layer:
Reading:
avro_reader(url): Context manager forDataFileReaderavro_records(url): Generator yielding records as dictionariesPartitionedAvroReader: Reads from multiple files or glob patterns as a single stream
Writing:
avro_writer(url, schema): Context manager forDataFileWriterPartitionedAvroWriter: Writes to multiple files with configurable size limitsTimePartitionedAvroWriter: Time-based partitioning (e.g., hourly/daily files)
Schema Operations:
avro_schema(dict): Create schema from dictionaryread_avro_schema(url): Extract schema from filevalidate_avro_schema_evolution(): Check backward/forward compatibilityadd_avro_schema_fields(): Schema augmentation utilities
Utilities:
compact_avro_data(): Remove deleted/updated records by key
Non-blocking primitives for async applications:
DeferredAvroWriter: Async writer that batches records and flushes in backgroundBlockingQueueAvroReader: Queue-based async reader for producer/consumer patterns
CLI commands implemented as classes following the Tool protocol:
| Tool | Command | Description |
|---|---|---|
CatTool |
cat |
Extract sample records with optional random sampling |
ConcatTool |
concat |
Concatenate multiple Avro files into one |
CountTool |
count |
Count records in files |
FileSortTool |
filesort |
Sort records by key across multiple files |
FromParquetTool |
fromparquet |
Convert Parquet to Avro |
GetMetaTool |
getmeta |
Extract file metadata (schema, compression, etc.) |
GetSchemaTool |
getschema |
Extract and print schema |
HttpServerTool |
httpserver |
Serve Avro files over HTTP with filtering/sampling |
PartitionTool |
partition |
Split files into multiple partitions |
RepairTool |
repair |
Fix corrupted Avro files |
StatsTool |
stats |
Compute statistics (count, nulls, sizes) |
ToJsonTool |
tojson |
Convert to newline-delimited JSON |
ToParquetTool |
toparquet |
Convert to Parquet format |
avrokit --debug <command> # Enable debug logging
avrokit cat FILE [OPTIONS]
--limit N Maximum records to output (default: 10)
--sample-rate F Random sampling rate (0.0-1.0)avrokit concat INPUT1 [INPUT2 ...] OUTPUT
INPUT can be local files, GCS, S3, or HTTP URLs
OUTPUT schema must be compatible with all inputsavrokit count FILE [FILE ...]
Returns total record count across all filesavrokit filesort INPUT OUTPUT --keys KEY1 [KEY2 ...]
Sorts records by specified keys using external merge sortavrokit fromparquet INPUT.parquet OUTPUT.avroavrokit getmeta FILE
Outputs: schema, codec, sync marker, block count, etc.avrokit getschema FILE
Outputs Avro schema as JSONavrokit httpserver --port 8080 --files "*.avro"
Serves Avro files with filtering and sampling supportavrokit partition INPUT OUTPUT --count N
Splits INPUT into N approximately equal partitionsavrokit repair INPUT OUTPUT
Attempts to recover readable records from damaged filesavrokit stats FILE [FILE ...]
Outputs: record count, file sizes, null counts per fieldavrokit tojson FILE [FILE ...]
One JSON record per line (newline-delimited JSON)avrokit toparquet INPUT.avro OUTPUT.parquet
Type mapping: Avro types → Parquet types- Python 3.12+
- Poetry
- Docker (for GCS/S3 integration tests)
# Set up Python environment
poetry env use python3.12
# Install dependencies
make install
# Pull fake-gcs-server for testing
docker pull fsouza/fake-gcs-server# Run all tests
make test
# Run with coverage
make test-coverage
# Run tests in parallel (default)
poetry run pytest -n auto# Format code
make format
# Lint code
make lint
# Type checking
make typecheck
# Add license headers
make licensemake build # Build wheel and source distributionReleases are automated via GitHub Actions using Trusted Publishing (OIDC) - no API tokens needed.
One-time PyPI setup:
- Log into PyPI
- Go to your project → Publishing
- Add a new pending publisher:
- Owner:
brandtg - Repository:
avrokit - Workflow name:
release.yml
- Owner:
- Save the configuration
To release a new version:
- Update version in
pyproject.toml(e.g.,0.1.1) - Commit and push the version bump
- Go to GitHub → Releases → Draft a new release
- Create a new tag matching the version (e.g.,
v0.1.1) - Click Publish release
The GitHub Actions workflow will automatically build and publish to PyPI.
from avrokit import PartitionedAvroReader, PartitionedAvroWriter, parse_url
# Read from multiple files as single stream
url = parse_url('data-*.avro', mode='rb')
with PartitionedAvroReader(url) as reader:
for record in reader:
process(record)
# Write to multiple files (roll every 10000 records)
url = parse_url('output/', mode='wb')
with PartitionedAvroWriter(url, schema, max_records=10000) as writer:
for record in records:
writer.append(record)
if should_roll():
writer.roll() # Create new partition filefrom avrokit import TimePartitionedAvroWriter, parse_url
from datetime import datetime
# Creates hourly files: output/2024/01/15/10.avro
url = parse_url('output/', mode='wb')
with TimePartitionedAvroWriter(
url,
schema,
time_granularity='hour',
time_field='timestamp'
) as writer:
for record in records:
writer.append(record, timestamp=record['timestamp'])import asyncio
from avrokit.asyncio import DeferredAvroWriter
from avrokit import parse_url, avro_schema
async def write_async():
url = parse_url('output.avro', mode='wb')
schema = avro_schema({...})
async with DeferredAvroWriter(url, schema) as writer:
for record in records:
await writer.append(record)
# Flushes happen automatically in background
asyncio.run(write_async())from avrokit import validate_avro_schema_evolution, read_avro_schema
from avrokit.url import parse_url
# Check if new schema is backward compatible
reader_schema = read_avro_schema(parse_url('old.avro', 'rb'))
writer_schema = read_avro_schema(parse_url('new.avro', 'rb'))
is_valid = validate_avro_schema_evolution(
reader_schema,
writer_schema,
strategy='backward' # or 'forward', 'full'
)Apache-2.0 - See LICENSE for details.
Contributions welcome! Please ensure:
- Code follows the existing style (enforced by
make formatandmake lint) - All tests pass (
make test) - Type checks pass (
make typecheck) - License headers are present (
make license)