Simple, reliable, and scalable pub/sub system based on FastAPI and PostgreSQL
fastpubsub is a lightweight publish-subscribe messaging system built with FastAPI and PostgreSQL. It provides a simple HTTP API for message publishing and subscription management with powerful features like message filtering, delivery guarantees, dead-letter queues, and automatic retries with exponential backoff. The system is built with asyncio for efficient concurrent operations and uses SQLAlchemy's async engine with psycopg's native async support.
- 🎯 Topic-based messaging - Organize messages by topics
- 🔍 Message filtering - Subscribe to specific messages using JSON-based filters
- 🔄 Automatic retries - Configurable retry logic with exponential backoff
- 💀 Dead Letter Queue (DLQ) - Handle failed messages gracefully
- 📊 Metrics & Monitoring - Built-in subscription metrics and Prometheus support
- 🐳 Docker-ready - Easy deployment with Docker
- 🔒 Reliable delivery - Acknowledgment and negative-acknowledgment support
- 🧹 Automatic cleanup - Background jobs for message maintenance
fastpubsub uses PostgreSQL as its backend, leveraging stored procedures and JSONB capabilities for high-performance message routing and filtering. The system is built with asyncio for efficient concurrent operations, using SQLAlchemy's async engine with psycopg's native async support. The architecture consists of:
- API Server: Asynchronous RESTful HTTP API for all operations
- Database: PostgreSQL with custom functions for message management, accessed via async SQLAlchemy
- Cleanup Workers: Background jobs for message maintenance
All commands use the official Docker image from Docker Hub.
- Docker installed
- PostgreSQL database (can also run in Docker)
First, you need to run database migrations:
docker run --rm \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
allisson/fastpubsub db-migrateRun the API server:
docker run -p 8000:8000 \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
allisson/fastpubsub serverThe API will be available at http://localhost:8000. You can access the interactive API documentation at http://localhost:8000/docs.
Apply database migrations to set up or upgrade the schema:
docker run --rm \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
allisson/fastpubsub db-migrateThis command creates all necessary tables, indexes, and stored procedures.
Start the HTTP API server:
docker run -p 8000:8000 \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
allisson/fastpubsub serverThe server runs with Gunicorn and Uvicorn workers for production-grade performance.
Remove acknowledged messages older than a specified threshold:
docker run --rm \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
allisson/fastpubsub cleanup_acked_messagesThis removes acked messages to prevent database bloat. By default, messages older than 1 hour (3600 seconds) are deleted.
Release messages that are stuck in "delivered" state (locked but not acked/nacked):
docker run --rm \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
allisson/fastpubsub cleanup_stuck_messagesThis handles cases where a consumer crashed without acknowledging messages. By default, messages locked for more than 60 seconds are released.
It's recommended to run cleanup commands periodically using cron or a scheduler like Kubernetes CronJob:
# Example: Run cleanup_acked_messages every hour
0 * * * * docker run --rm -e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' allisson/fastpubsub cleanup_acked_messages
# Example: Run cleanup_stuck_messages every 5 minutes
*/5 * * * * docker run --rm -e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' allisson/fastpubsub cleanup_stuck_messagesGenerate a secure random secret key for authentication:
docker run --rm allisson/fastpubsub generate_secret_keyOutput:
new_secret=a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6
Use this secret key to set the FASTPUBSUB_AUTH_SECRET_KEY environment variable.
Create a new client with API credentials:
docker run --rm \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
-e FASTPUBSUB_AUTH_ENABLED='true' \
-e FASTPUBSUB_AUTH_SECRET_KEY='your-secret-key' \
allisson/fastpubsub create_client "My Application" "*" trueArguments:
- Client name (e.g., "My Application")
- Scopes (e.g., "*" for admin, or "topics:create topics:read")
- Is active flag (true or false)
Output:
client_id=550e8400-e29b-41d4-a716-446655440000
client_secret=a1b2c3d4e5f6g7h8
Save the client_id and client_secret securely - the secret cannot be retrieved later.
Configure fastpubsub using environment variables. All variables are prefixed with FASTPUBSUB_.
| Variable | Description | Default |
|---|---|---|
FASTPUBSUB_DATABASE_URL |
PostgreSQL connection URL (required) | - |
FASTPUBSUB_DATABASE_ECHO |
Enable SQLAlchemy query logging | false |
FASTPUBSUB_DATABASE_POOL_SIZE |
Connection pool size | 5 |
FASTPUBSUB_DATABASE_MAX_OVERFLOW |
Max overflow connections | 10 |
FASTPUBSUB_DATABASE_POOL_PRE_PING |
Test connections before use | true |
| Variable | Description | Default |
|---|---|---|
FASTPUBSUB_LOG_LEVEL |
Log level (debug, info, warning, error) | info |
FASTPUBSUB_LOG_FORMATTER |
Log format string | See below |
Default log format:
asctime=%(asctime)s level=%(levelname)s pathname=%(pathname)s line=%(lineno)s message=%(message)s
| Variable | Description | Default |
|---|---|---|
FASTPUBSUB_SUBSCRIPTION_MAX_ATTEMPTS |
Maximum delivery attempts before DLQ | 5 |
FASTPUBSUB_SUBSCRIPTION_BACKOFF_MIN_SECONDS |
Minimum retry delay | 5 |
FASTPUBSUB_SUBSCRIPTION_BACKOFF_MAX_SECONDS |
Maximum retry delay | 300 |
| Variable | Description | Default |
|---|---|---|
FASTPUBSUB_API_DEBUG |
Enable debug mode | false |
FASTPUBSUB_API_HOST |
Server bind host | 0.0.0.0 |
FASTPUBSUB_API_PORT |
Server port | 8000 |
FASTPUBSUB_API_NUM_WORKERS |
Number of Gunicorn workers | 1 |
| Variable | Description | Default |
|---|---|---|
FASTPUBSUB_AUTH_ENABLED |
Enable authentication | false |
FASTPUBSUB_AUTH_SECRET_KEY |
Secret key for JWT signing (required if auth enabled) | None |
FASTPUBSUB_AUTH_ALGORITHM |
JWT signing algorithm | HS256 |
FASTPUBSUB_AUTH_ACCESS_TOKEN_EXPIRE_MINUTES |
Access token expiration time in minutes | 30 |
| Variable | Description | Default |
|---|---|---|
FASTPUBSUB_CLEANUP_ACKED_MESSAGES_OLDER_THAN_SECONDS |
Delete acked messages older than (seconds) | 3600 |
FASTPUBSUB_CLEANUP_STUCK_MESSAGES_LOCK_TIMEOUT_SECONDS |
Release messages locked longer than (seconds) | 60 |
docker run -p 8000:8000 \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
-e FASTPUBSUB_LOG_LEVEL='info' \
-e FASTPUBSUB_API_NUM_WORKERS='4' \
-e FASTPUBSUB_SUBSCRIPTION_MAX_ATTEMPTS='10' \
allisson/fastpubsub serverfastpubsub supports optional JWT-based authentication to secure API access. When authentication is disabled (default), all API endpoints are accessible without credentials. When enabled, clients must authenticate using OAuth2 client credentials flow.
Authentication uses a scope-based permission system. Scopes can be global or object-specific:
Global Scopes:
*- Admin mode, full access to all resources and operationstopics:create- Can create new topicstopics:read- Can list or get topicstopics:delete- Can delete topicstopics:publish- Can publish messages to topicssubscriptions:create- Can create new subscriptionssubscriptions:read- Can list or get subscriptionssubscriptions:delete- Can delete subscriptionssubscriptions:consume- Can consume messages from subscriptionsclients:create- Can create new clientsclients:update- Can update clientsclients:read- Can list or get clientsclients:delete- Can delete clients
Object-Specific Scopes:
You can restrict access to specific resources by appending the resource ID to the scope:
topics:publish:my-topic-id- Can only publish to the topic with ID "my-topic-id"subscriptions:consume:my-subscription- Can only consume from the subscription with ID "my-subscription"
Multiple scopes can be combined, separated by spaces: topics:create topics:read subscriptions:read
Request:
POST /oauth/token
Content-Type: application/json
{
"client_id": "550e8400-e29b-41d4-a716-446655440000",
"client_secret": "a1b2c3d4e5f6g7h8"
}Response: 201 Created
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "Bearer",
"expires_in": 1800,
"scope": "topics:create topics:read"
}Include the access token in the Authorization header for authenticated requests:
curl -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." \
http://localhost:8000/topicsClients represent applications or services that access the API. Each client has credentials (client_id and client_secret) and a set of scopes that define their permissions.
POST /clients
Authorization: Bearer <token>Request Body:
{
"name": "My Application",
"scopes": "topics:create topics:read subscriptions:consume",
"is_active": true
}Response: 201 Created
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"secret": "a1b2c3d4e5f6g7h8"
}Note: The client secret is only returned once during creation. Store it securely.
GET /clients/{id}
Authorization: Bearer <token>Response: 200 OK
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"name": "My Application",
"scopes": "topics:create topics:read",
"is_active": true,
"token_version": 1,
"created_at": "2025-12-29T15:30:00Z",
"updated_at": "2025-12-29T15:30:00Z"
}GET /clients?offset=0&limit=10
Authorization: Bearer <token>Response: 200 OK
{
"data": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"name": "My Application",
"scopes": "topics:create topics:read",
"is_active": true,
"token_version": 1,
"created_at": "2025-12-29T15:30:00Z",
"updated_at": "2025-12-29T15:30:00Z"
}
]
}PUT /clients/{id}
Authorization: Bearer <token>Request Body:
{
"name": "Updated Application Name",
"scopes": "topics:read subscriptions:read",
"is_active": true
}Response: 200 OK
Note: Updating a client increments its token_version, which invalidates all existing access tokens for that client.
DELETE /clients/{id}
Authorization: Bearer <token>Response: 204 No Content
Topics are channels where messages are published.
POST /topicsRequest Body:
{
"id": "user-events"
}Response: 201 Created
{
"id": "user-events",
"created_at": "2025-12-29T15:30:00Z"
}GET /topics/{id}Response: 200 OK
GET /topics?offset=0&limit=10Response: 200 OK
{
"data": [
{
"id": "user-events",
"created_at": "2025-12-29T15:30:00Z"
}
]
}DELETE /topics/{id}Response: 204 No Content
POST /topics/{id}/messagesRequest Body:
[
{
"event": "user.created",
"user_id": "123",
"country": "BR"
},
{
"event": "user.updated",
"user_id": "456",
"country": "US"
}
]Response: 204 No Content
Subscriptions receive messages from topics, optionally filtered.
POST /subscriptionsRequest Body:
{
"id": "user-processor",
"topic_id": "user-events",
"filter": {"country": ["BR", "US"]},
"max_delivery_attempts": 5,
"backoff_min_seconds": 5,
"backoff_max_seconds": 300
}Response: 201 Created
Filter Examples:
{"country": ["BR", "US"]}- Messages where country is BR or US{"event": ["user.created"]}- Only user.created events{"premium": [true]}- Only premium users{}ornull- No filtering, receive all messages
GET /subscriptions/{id}Response: 200 OK
GET /subscriptions?offset=0&limit=10Response: 200 OK
DELETE /subscriptions/{id}Response: 204 No Content
Retrieve messages from a subscription:
GET /subscriptions/{id}/messages?consumer_id=worker-1&batch_size=10Response: 200 OK
{
"data": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"subscription_id": "user-processor",
"payload": {
"event": "user.created",
"user_id": "123",
"country": "BR"
},
"delivery_attempts": 1,
"created_at": "2025-12-29T15:30:00Z"
}
]
}Parameters:
consumer_id: Unique identifier for the consumer (required)batch_size: Number of messages to retrieve (default: 10, max: 100)
Mark messages as successfully processed:
POST /subscriptions/{id}/acksRequest Body:
[
"550e8400-e29b-41d4-a716-446655440000",
"660e8400-e29b-41d4-a716-446655440001"
]Response: 204 No Content
Mark messages for retry:
POST /subscriptions/{id}/nacksRequest Body:
[
"550e8400-e29b-41d4-a716-446655440000"
]Response: 204 No Content
NACKed messages will be retried with exponential backoff until max_delivery_attempts is reached.
Messages that exceed max_delivery_attempts are moved to the DLQ.
GET /subscriptions/{id}/dlq?offset=0&limit=10Response: 200 OK
Move messages back to the subscription queue for reprocessing:
POST /subscriptions/{id}/dlq/reprocessRequest Body:
[
"550e8400-e29b-41d4-a716-446655440000"
]Response: 204 No Content
GET /subscriptions/{id}/metricsResponse: 200 OK
{
"subscription_id": "user-processor",
"available": 150,
"delivered": 25,
"acked": 1000,
"dlq": 5
}Metrics:
available: Messages ready to be consumeddelivered: Messages currently locked by consumersacked: Total acknowledged messagesdlq: Messages in the dead letter queue
Health check endpoints are useful for monitoring and orchestration systems like Kubernetes.
Check if the application is alive:
GET /livenessResponse: 200 OK
{
"status": "alive"
}The liveness endpoint always returns a successful response if the application is running. Use this endpoint to determine if the application needs to be restarted.
Check if the application is ready to handle requests:
GET /readinessResponse: 200 OK
{
"status": "ready"
}Error Response: 503 Service Unavailable
{
"detail": "database is down"
}The readiness endpoint checks if the database connection is healthy. Use this endpoint to determine if the application should receive traffic.
Get Prometheus-compatible metrics for monitoring:
GET /metricsResponse: 200 OK (Prometheus text format)
# HELP http_requests_total Total number of requests by method, path and status
# TYPE http_requests_total counter
http_requests_total{method="GET",path="/topics",status="200"} 42.0
...
The metrics endpoint exposes application metrics in Prometheus format, including:
- HTTP request counts and latencies
- Request duration histograms
- Active requests gauge
- And other standard FastAPI metrics
You can configure Prometheus to scrape this endpoint for monitoring and alerting.
# 1. Generate a secret key
docker run --rm allisson/fastpubsub generate_secret_key
# Output: new_secret=a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6
# 2. Start the server with authentication enabled
docker run -p 8000:8000 \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
-e FASTPUBSUB_AUTH_ENABLED='true' \
-e FASTPUBSUB_AUTH_SECRET_KEY='a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6' \
allisson/fastpubsub server
# 3. Create an admin client (requires initial client creation via CLI or direct DB access)
docker run --rm \
-e FASTPUBSUB_DATABASE_URL='postgresql+psycopg://YOUR_USER:YOUR_PASSWORD@YOUR_HOST:5432/YOUR_DATABASE' \
-e FASTPUBSUB_AUTH_ENABLED='true' \
-e FASTPUBSUB_AUTH_SECRET_KEY='a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6' \
allisson/fastpubsub create_client "Admin Client" "*" true
# Output:
# client_id=550e8400-e29b-41d4-a716-446655440000
# client_secret=a1b2c3d4e5f6g7h8
# 4. Get an access token
curl -X POST http://localhost:8000/oauth/token \
-H "Content-Type: application/json" \
-d '{
"client_id": "550e8400-e29b-41d4-a716-446655440000",
"client_secret": "a1b2c3d4e5f6g7h8"
}'
# Output: {"access_token": "eyJhbGc...", "token_type": "Bearer", "expires_in": 1800, "scope": "*"}
# 5. Use the token to access protected endpoints
TOKEN="eyJhbGc..."
curl -H "Authorization: Bearer $TOKEN" http://localhost:8000/topics# 1. Create a topic
curl -X POST http://localhost:8000/topics \
-H "Content-Type: application/json" \
-d '{"id": "notifications"}'
# 2. Create a subscription
curl -X POST http://localhost:8000/subscriptions \
-H "Content-Type: application/json" \
-d '{
"id": "email-sender",
"topic_id": "notifications"
}'
# 3. Publish messages
curl -X POST http://localhost:8000/topics/notifications/messages \
-H "Content-Type: application/json" \
-d '[
{"type": "email", "to": "user@example.com", "subject": "Welcome!"}
]'
# 4. Consume messages
curl "http://localhost:8000/subscriptions/email-sender/messages?consumer_id=worker-1&batch_size=10"
# 5. Acknowledge messages
curl -X POST http://localhost:8000/subscriptions/email-sender/acks \
-H "Content-Type: application/json" \
-d '["550e8400-e29b-41d4-a716-446655440000"]'# Assuming you have an admin token
ADMIN_TOKEN="eyJhbGc..."
# 1. Create a client that can only publish to a specific topic
curl -X POST http://localhost:8000/clients \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Publisher Service",
"scopes": "topics:publish:notifications",
"is_active": true
}'
# 2. Create a client that can only consume from a specific subscription
curl -X POST http://localhost:8000/clients \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Consumer Service",
"scopes": "subscriptions:consume:email-sender",
"is_active": true
}'
# 3. Create a client with multiple permissions
curl -X POST http://localhost:8000/clients \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Multi-Purpose Service",
"scopes": "topics:create topics:read topics:publish subscriptions:create subscriptions:read",
"is_active": true
}'# Create a subscription that only receives messages from BR and US
curl -X POST http://localhost:8000/subscriptions \
-H "Content-Type: application/json" \
-d '{
"id": "regional-processor",
"topic_id": "user-events",
"filter": {"country": ["BR", "US"]}
}'
# Publish messages - only BR/US messages will be routed to this subscription
curl -X POST http://localhost:8000/topics/user-events/messages \
-H "Content-Type: application/json" \
-d '[
{"event": "user.login", "user_id": "1", "country": "BR"},
{"event": "user.login", "user_id": "2", "country": "JP"},
{"event": "user.login", "user_id": "3", "country": "US"}
]'The subscription will only receive messages with country set to "BR" or "US" (messages for user 1 and 3, not user 2).
The filter feature uses a simple JSON style where keys are field names and values are arrays of acceptable values:
{
"filter": {
"event_type": ["order.created", "order.updated"],
"priority": ["high", "critical"],
"region": ["us-east", "us-west"]
}
}This filter matches messages that have:
event_typeequal to "order.created" OR "order.updated"- AND
priorityequal to "high" OR "critical" - AND
regionequal to "us-east" OR "us-west"
# Check metrics to see if there are DLQ messages
curl "http://localhost:8000/subscriptions/email-sender/metrics"
# List DLQ messages
curl "http://localhost:8000/subscriptions/email-sender/dlq"
# Reprocess DLQ messages after fixing the issue
curl -X POST http://localhost:8000/subscriptions/email-sender/dlq/reprocess \
-H "Content-Type: application/json" \
-d '["550e8400-e29b-41d4-a716-446655440000"]'# Check if the application is alive (for restart decisions)
curl "http://localhost:8000/liveness"
# Check if the application is ready to serve traffic
curl "http://localhost:8000/readiness"Kubernetes example configuration:
livenessProbe:
httpGet:
path: /liveness
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /readiness
port: 8000
initialDelaySeconds: 10
periodSeconds: 5# Access Prometheus metrics
curl "http://localhost:8000/metrics"Prometheus scrape configuration:
scrape_configs:
- job_name: 'fastpubsub'
static_configs:
- targets: ['localhost:8000']
metrics_path: '/metrics'
scrape_interval: 15s- Publish: Messages are published to a topic
- Route: Messages are routed to all subscriptions for that topic
- Filter: Subscriptions with filters only receive matching messages
- Consume: Consumers fetch messages in batches
- Process: Consumer processes the message
- ACK/NACK: Consumer acknowledges success or requests retry
- Retry: Failed messages are retried with exponential backoff
- DLQ: Messages exceeding max attempts move to the dead letter queue
- Always acknowledge messages: Use ACK for success, NACK for retriable failures
- Use unique consumer IDs: Helps with debugging and metrics
- Handle idempotency: Messages may be delivered more than once
- Implement timeouts: Don't let message processing hang indefinitely
- Monitor DLQ: Regularly check and handle dead-letter messages
- Batch consumption: Use appropriate
batch_sizefor your workload - Multiple workers: Run multiple consumers with different
consumer_id - Optimize filters: More specific filters reduce unnecessary message delivery
- Regular cleanup: Schedule cleanup jobs to maintain database performance
- Connection pooling: Configure appropriate pool sizes for your load
- Run cleanup workers: Essential for production deployments
- Monitor metrics: Track available, delivered, acked, and DLQ counts
- Set appropriate timeouts: Configure backoff settings based on your use case
- Database backups: Regular PostgreSQL backups are crucial
- Enable authentication: Set
FASTPUBSUB_AUTH_ENABLED=truefor production deployments - Secure secret keys: Generate strong secret keys using the
generate_secret_keycommand - Principle of least privilege: Grant clients only the scopes they need
- Rotate credentials: Regularly update client secrets by recreating clients
- Token management: Access tokens expire after 30 minutes by default (configurable)
- Revoke access: Update a client to increment its
token_versionand invalidate all existing tokens
Once the server is running, you can access the interactive API documentation:
- Swagger UI:
http://localhost:8000/docs - ReDoc:
http://localhost:8000/redoc
This project is licensed under the MIT License - see the LICENSE file for details.
Contributions are welcome! Please feel free to submit issues and pull requests.
If you're having trouble connecting to the database from Docker:
- Use
host.docker.internalinstead oflocalhostwhen running on Docker Desktop - Ensure your PostgreSQL allows connections from Docker networks
- Check firewall rules if using a remote database
- Verify the subscription exists and is properly configured
- Check if filters are too restrictive
- Look at metrics to see message counts
- Ensure cleanup_stuck_messages is running if consumers crashed
- Increase the number of API workers (
FASTPUBSUB_API_NUM_WORKERS) - Run multiple consumer instances
- Check database connection pool settings
- Review and optimize your subscription filters
401 Unauthorized / Invalid Token:
- Verify that
FASTPUBSUB_AUTH_ENABLED=trueis set on the server - Ensure you're using a valid access token obtained from
/oauth/token - Check that the token hasn't expired (default: 30 minutes)
- Verify the client is still active and hasn't been deleted or disabled
- If the client was updated, old tokens are invalidated - request a new token
403 Forbidden / Insufficient Scope:
- Check that the client has the required scope for the operation
- For object-specific operations, ensure the scope includes the resource ID
- Use
*scope for admin/testing purposes (not recommended for production) - Example: To publish to topic "events", client needs
topics:publishortopics:publish:eventsscope
Missing FASTPUBSUB_AUTH_SECRET_KEY:
- Generate a secret key using
docker run --rm allisson/fastpubsub generate_secret_key - Set it as an environment variable:
FASTPUBSUB_AUTH_SECRET_KEY=your-generated-key - The same secret key must be used across all server instances
This section is for developers who want to contribute to fastpubsub or run it locally without Docker.
- Python 3.14+: The project requires Python 3.14 or later
- uv: Fast Python package installer and resolver (installation guide)
- PostgreSQL 14+: Local PostgreSQL instance for development
- make: For running Makefile commands (usually pre-installed on Unix-like systems)
- Clone the repository:
git clone https://github.com/allisson/fastpubsub.git
cd fastpubsub- Start a local PostgreSQL instance (optional):
If you don't have PostgreSQL running, you can use the provided Makefile command:
make start-postgresqlThis starts a PostgreSQL container with default credentials:
- User:
fastpubsub - Password:
fastpubsub - Database:
fastpubsub - Port:
5432
To stop and remove the PostgreSQL container later:
make remove-postgresql- Set up environment variables:
Copy the sample environment file and adjust as needed:
cp env.sample .envEdit .env to configure your local database connection and other settings.
- Install dependencies:
# Install uv if you haven't already
pip install uv
# Install project dependencies (including development dependencies)
uv syncThis creates a virtual environment at .venv and installs all required packages.
- Run database migrations:
make run-db-migrateOr manually:
PYTHONPATH=./ uv run python fastpubsub/main.py db-migrateRun the full test suite:
make testOr manually with pytest:
uv run pytest -vFor coverage reporting:
uv run pytest -v --cov=fastpubsub --cov-report=term-missingThe project uses ruff for linting and formatting, along with pre-commit hooks.
Run linting:
make lintThis runs all pre-commit hooks including:
- Ruff linting and formatting
- Various file checks (trailing whitespace, YAML/JSON validation, etc.)
- MyPy type checking
Install pre-commit hooks (recommended):
uv run pre-commit installAfter installation, the hooks will run automatically on every commit.
Manual formatting:
# Format code with ruff
uv run ruff format .
# Run ruff checks with auto-fix
uv run ruff check --fix .Start the development server:
make run-serverOr manually:
PYTHONPATH=./ uv run python fastpubsub/main.py serverThe API will be available at:
- Server:
http://localhost:8000 - Swagger UI:
http://localhost:8000/docs - ReDoc:
http://localhost:8000/redoc
Create a new migration:
make create-migrationThis generates a new migration file in migrations/versions/. Edit the file to define your schema changes.
Apply migrations:
make run-db-migrateBuild the Docker image:
make docker-buildOr manually:
docker build --rm -t fastpubsub .- Create a feature branch:
git checkout -b feature/your-feature-name- Make your changes and test locally:
# Run linting
make lint
# Run tests
make test
# Start the server to manually test
make run-server- Commit your changes:
The pre-commit hooks will automatically run linting and checks. Ensure all checks pass.
git add .
git commit -m "Your commit message"- Push and create a pull request:
git push origin feature/your-feature-namefastpubsub/
├── fastpubsub/ # Main application package
│ ├── api/ # FastAPI routes and API logic
│ ├── services/ # Business logic and services
│ ├── config.py # Configuration management
│ ├── database.py # Database connection and migrations
│ ├── models.py # Pydantic models
│ ├── main.py # CLI entry point
│ └── ...
├── migrations/ # Alembic database migrations
│ └── versions/ # Migration files
├── tests/ # Test suite
│ ├── api/ # API tests
│ ├── services/ # Service tests
│ └── ...
├── Dockerfile # Production Docker image
├── Makefile # Development commands
├── pyproject.toml # Project metadata and dependencies
├── ruff.toml # Ruff linter configuration
├── .pre-commit-config.yaml # Pre-commit hooks configuration
└── README.md # This file
| Command | Description |
|---|---|
make test |
Run the test suite with pytest |
make lint |
Run pre-commit hooks (linting, formatting, checks) |
make start-postgresql |
Start a local PostgreSQL Docker container |
make remove-postgresql |
Stop and remove the PostgreSQL container |
make create-migration |
Create a new Alembic migration file |
make run-db-migrate |
Apply database migrations |
make run-server |
Start the development server |
make docker-build |
Build the Docker image locally |
- Virtual Environment: The project uses
uvwhich automatically manages a virtual environment in.venv/ - Python Version: Ensure you're using Python 3.14+ as specified in
pyproject.toml - Environment Variables: All configuration is done via environment variables prefixed with
FASTPUBSUB_ - IDE Setup: Consider configuring your IDE to use the
.venv/bin/pythoninterpreter - Database: The test suite uses the same database configured in your
.envfile
Made with ❤️ using FastAPI and PostgreSQL