A data stream reduction middleware that intelligently reduces large multivariate data streams to representative subsets while maintaining proportional characteristics.
Distilled uses spatial vector analysis and A/B testing strategies to continuously analyze streaming data and pass along a configurable percentage (default 10%) that accurately represents the full dataset's characteristics across a sliding time window.
- Proportional Representation: Maintains statistical accuracy of multivariate characteristics
- Sliding Time Window: Configurable time horizon (60-3600 seconds) for analysis
- Extensible Design: Object-oriented architecture with customizable grading functions
- Generator/Coroutine Pattern: Efficient streaming data processing
- Real-time Processing: Optimized for high-throughput data streams
- Data Ingestion: Receives batches of raw data points
- Vector Evaluation: Applies grading functions to extract characteristics
- Proportion Analysis: Calculates current vs. sent data proportions
- Optimal Selection: Uses vector analysis to select representative subset
- Time Horizon Management: Maintains FIFO queues with automatic cleanup
Raw Data Stream → Grading Functions → Vector Analysis → Optimal Selection → Reduced Stream
↓ ↓ ↓
DataPoints Proportions Time Horizon
Comparison Management
pip install -r requirements.txtfrom distilled import DistilledProcessor, NumericGrader, CategoricalGrader
# Define how to evaluate your data characteristics
grading_functions = {
"age": NumericGrader("age", lambda person: float(person.age)),
"gender": CategoricalGrader("gender", ["male", "female", "other"],
lambda person: person.gender.lower()),
"income": NumericGrader("income", lambda person: person.income)
}
# Create processor
processor = DistilledProcessor(
grading_functions=grading_functions,
time_horizon_seconds=3600, # 1 hour window
reduction_percentage=0.1, # 10% pass-through
batch_size=100
)
# Process data batches
batch_data = get_your_data_batch() # Your data source
selected_points = processor.process_batch(batch_data)
# selected_points now contains ~10% of input that best represents full datasetMain processor class implementing the generator/coroutine pattern:
- process_batch(): Process a batch and return selected subset
- get_current_stats(): Get current proportion statistics
- reset(): Reset processor state
Represents individual data points with metadata:
data: Raw data payloadtimestamp: Processing timestampvector_values: Evaluated characteristicssent_previously: Tracking flag
Define how to evaluate data characteristics:
- NumericGrader: Extracts numerical values
- CategoricalGrader: Classifies into predefined categories
- LambdaGrader: Quick wrapper for simple functions
The core algorithm works as follows:
- Gap Analysis: Calculate representation gaps between current and sent data
- Point Scoring: Score each candidate point's improvement potential
- Greedy Selection: Select points that best minimize representation gaps
- Tie Breaking: Prioritize smallest proportional buckets first
- Update Tracking: Maintain sent data proportions for next iteration
Controls how long data is retained for analysis:
processor = DistilledProcessor(
grading_functions=functions,
time_horizon_seconds=1800 # 30 minutes
)Controls what percentage of data passes through:
processor = DistilledProcessor(
grading_functions=functions,
reduction_percentage=0.05 # 5% pass-through
)Controls internal processing batch size:
processor = DistilledProcessor(
grading_functions=functions,
batch_size=50 # Smaller batches
)See examples/basic_usage.py for a complete working example with sample data.
Run tests with:
python -m pytest tests/Or run individual test files:
python tests/test_basic.pyCurrent Status: Architecture and API design complete with method stubs.
Next Steps:
- Implement grading function evaluation
- Implement proportion calculation algorithms
- Implement vector analysis and selection logic
- Implement time horizon management
- Add comprehensive testing
- Performance optimization
This is an open source project. Contributions are welcome!
MIT License - see LICENSE file for details.
Distilled - Intelligent data stream reduction for the modern data pipeline.