This repository contains the scalable e2e implementation for data augmentation for Kitana. The code is written in Python and contains sample data, sample execution code, and the data augmentation code.
Please follow the instructions below to run the code.
- Clone the repository
- Make sure you are in the correct directory:
cd kitana-e2e- Run the following command to install the required libraries:
# If you are using python venv.
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt# If you are using conda, there is a environment.yml file in the repository.
conda env create -f environment.yml- Run the following command to execute the code:
python sample_execution.pyapi/- Contains the interfaces for external modules to interact with the core functionality of the search engine.config/- Configuration settings for the project, including default paths, device settings, etc.data_provider/- Core modules for data management, handling buyer and seller data.market/- It loads the buyer and seller data.models/- Defines all data models used throughout the project, including loaders and specific models for buyers and sellers.preprocessing/- Data preprocessing utilities, ensuring data is clean and formatted correctly before entering the workflow.resources/- Manages and optimizes computing resources, ensuring efficient use of available hardware.search/- Core search engine functionality, implementing the algorithms that enhance buyer dataset with seller features.sketches/- Contains the sketches for the data augmentation process. It is indexed by thejoin_keys.statistics/- Statistical tools and functions. It contains a linear regression model to determine the augmentation effect.utils/- General utilities used across the project for a variety of support tasks.main.py- The entry point of the project, initializing and starting the search engine.
This part outlines the typical execution flow within the codebase, starting from input data specifications to the generation of experiment results, specifically focusing on how a single experiment run (like the one configured for data/country_extend_1/seller) proceeds.
- The process begins in
main.py. This script is responsible for defining and triggering various experiment configurations. - An experiment is typically defined by instantiating a
Configobject (fromsearch_engine.config). This object aggregates several configuration dataclasses:DataConfig: Specifies the data inputs. Key parameters include:directory_path: Path to the directory containing seller CSV files (e.g.,data/country_extend_1/seller).buyer_csv: Path to the buyer's CSV file.join_keys: A list of lists specifying potential join columns (e.g.,[['country']]).target_feature: The column name in the buyer's data to be predicted.need_to_clean_data: Boolean, if true, data cleaning is performed.one_target_feature: Boolean, if true, buyer data is initially reduced to join keys and target.
SearchConfig: Specifies search algorithm parameters. Key parameters:iterations: Number of features to select.fit_by_residual: Boolean, if true, the search aims to explain the residuals of a model built on the buyer's initial features.device: 'cpu' or 'cuda'.batch_size: For sketch processing, can be 'auto'.
ExperimentConfig: Specifies experiment logging and output.LoggingConfig: Configures logging.
main.pythen instantiatesScaledExperiment(fromsearch_engine.experiment.experiment) with thisConfig.- Finally, it calls the
run()method on theScaledExperimentinstance and typically saves the returned results (often an 'augplan' and 'accuracy' list) to a JSON file.
The ScaledExperiment class (search_engine/experiment/experiment.py) orchestrates the entire experimental procedure. Its run() method executes the following steps:
- An instance of
PrepareBuyer(fromsearch_engine.data_provider.buyer_data) is created. PrepareBuyerinherits fromPrepareData(search_engine/data_provider/base_data.py):- Loads the buyer CSV specified in
DataConfig.buyer_csv. - Join Key Processing (
_check_join_keys,_construct_join_keysinPrepareData):- Validates that the provided
join_keys(e.g.,[['country']]) exist in the buyer's DataFrame columns. - If a join key is multi-column (e.g.,
['colA', 'colB']), a new concatenated column (e.g.,'colA_colB') is created in the DataFrame to serve as a single string join key. A list of these processed string join keys is stored (e.g.,self.join_keys_in_string).
- Validates that the provided
- Data Cleaning (
_data_cleaninginPrepareData, ifneed_to_clean_datais true):- Converts columns to numeric where possible (coercing errors).
- Removes columns with > 40% missing values.
- Fills remaining NaNs (e.g., with 0 or mean, depending on context).
- Buyer-Specific Column Selection (
PrepareBuyer.__init__):- The buyer DataFrame is trimmed to include only:
- The processed
join_keys_in_string. - The
target_feature. - Numerical columns identified during cleaning (or a predefined list if cleaning is off).
- The processed
- If
DataConfig.one_target_featureis true, it's further trimmed to just join keys and the target feature.
- The buyer DataFrame is trimmed to include only:
- Calculates
buyer_key_domain: a dictionary mapping each processed join key string to a set of its unique values in the buyer data.
- Loads the buyer CSV specified in
- The
PrepareBuyerinstance is stored inPrepareBuyerSellers(a wrapper class).
- Iterates through all CSV files in the
DataConfig.directory_path. - For each seller CSV:
- An instance of
PrepareSeller(fromsearch_engine.data_provider.seller_data) is created. PrepareSelleralso inherits fromPrepareDataand performs similar loading, join key processing, and data cleaning asPrepareBuyer. Seller columns are trimmed to join keys and numerical/specified features.- The
PrepareSellerinstance is added viaPrepareBuyerSellers.add_seller(), which in turn callsPrepareSellers.add_sellers()(search_engine/data_provider/sellers.py). - Seller Filtering (
PrepareSellers.add_sellers):- For each of the seller's processed join keys, it checks if the seller's unique values for that key have any intersection with the
buyer_key_domainfor the same key. - If a join key in the seller has no overlap with the buyer's values for that key, that join key column is dropped from the seller's DataFrame, and the key is removed from the seller's list of active join keys.
- If a seller ends up with no valid (overlapping) join keys, it is effectively discarded.
- For each of the seller's processed join keys, it checks if the seller's unique values for that key have any intersection with the
- Global Domain Update (
PrepareSellers.update_domain):- If the seller is kept, its unique values for its valid join keys are used to update a global
join_key_domainsdictionary withinPrepareSellers. This dictionary tracks all unique values seen for each join key across all valid and filtered sellers.
- If the seller is kept, its unique values for its valid join keys are used to update a global
- An instance of
- An instance of
DataMarket(fromsearch_engine.market.data_market) is created. - Register Buyer (
DataMarket.register_buyer):- The (cleaned and processed) buyer DataFrame and its
join_keys(filtered byPrepareBuyerSellers.filter_join_keysagainst the global seller domain) are passed. Thetarget_featureand globaljoin_key_domains(fromPrepareSellers) are also passed. - Initial Model & Residuals:
- A linear regression model is fit on the buyer's initial non-join-key features against the
target_feature. The R² of this baseline model is calculated and stored (this is the first value in theaccuracylist of the experiment results). - If
SearchConfig.fit_by_residualis true, the buyer's DataFrame for sketching purposes is replaced by a DataFrame containing only its join keys and the residuals from this initial model. The "target feature" for sketching becomes these residuals.
- A linear regression model is fit on the buyer's initial non-join-key features against the
- Buyer Sketch Creation: For each active join key of the buyer:
- A
SketchBaseobject (fromsearch_engine.sketches.base) is obtained/created. ThisSketchBaseis specific to the join key string and is shared by all sellers (and the buyer) using this key. It's initialized with the globaljoin_key_domainsfor this key. - A
BuyerSketchobject (fromsearch_engine.sketches.sketch_buyer) is created. BuyerSketch.register_this_buyer()is called:- This calls
SketchBase._calibrate():- Calculates sums (X), sums of squares (X²), and counts (1) of the buyer's features (or residuals if
fit_by_residual) grouped by the join key values. - If not
fit_by_residual, it also calculates sums of cross-products of buyer features with the target (XY). - These statistics are normalized (to means) and aligned/reindexed to the full
join_key_domain, creating PyTorch tensors. These tensors are the "buyer sketch" components.
- Calculates sums (X), sums of squares (X²), and counts (1) of the buyer's features (or residuals if
- Then
SketchBase._register_df()is called to pass these tensors to aSketchLoaderinstance associated with theSketchBase. SketchLoader(search_engine/sketches/loader.py) stores these buyer sketch tensors (typically in memory, in "batch 0" as buyers are assumed small).
- This calls
- A
- The (cleaned and processed) buyer DataFrame and its
- Register Sellers (
DataMarket.register_seller): For each valid (filtered) seller fromPrepareSellers:- The seller's feature column names are prefixed with
seller_name_to avoid collisions. - For each active join key of the seller:
- A
SellerSketchobject (fromsearch_engine.sketches.sketch_seller) is created, using the sharedSketchBasefor that join key. SellerSketch.register_this_seller()is called:- If the seller has many features (more than
SketchBase.ram_batch_size), its features are partitioned. - For each partition (or all features if few):
SketchBase._calibrate()is called to create sketch tensors (X, X², 1) for this set of seller features, grouped by join key, normalized, and aligned to the domain.SketchBase._register_df()passes these tensors to theSketchLoader.
SketchLoaderappends these seller feature sketch tensors to appropriate batches (batching along the feature dimension). If a batch becomes full (reachesbatch_sizefeatures), it can be offloaded to disk.SketchLoaderalso maintains afeature_index_mapwithin theSketchBaseto trace features in sketch batches back to their original seller and column name.
- If the seller has many features (more than
- A
- The seller's feature column names are prefixed with
- An instance of
SearchEngine(fromsearch_engine.search.search_engine) is created with the populatedDataMarket. SearchEngine.start(iterations)is called:- This initiates an iterative feature selection process for the number of
iterationsspecified inSearchConfig. - In each iteration (
SearchEngine.search_one_iteration):- It aims to find the single best seller feature to add to the current buyer model.
- It iterates over each
join_keycommon to the buyer and sellers. - It retrieves the buyer's current sketch tensors (Y, Y², 1, XY or Residuals, Residuals², 1) for that join key.
- It iterates through all batches of seller sketch features (X, X², 1) for that join key from the
SketchLoader. - R² Calculation using Sketches: For each candidate seller feature in a batch:
- It performs "sketch joins" by element-wise multiplication and summation of the buyer's and seller's sketch tensors. This allows it to calculate the sufficient statistics (covariances, etc.) needed for a linear regression model as if the buyer data were joined with that seller feature.
- If
fit_by_residualis true: It calculates the R² of regressing the buyer's current residuals onto the candidate seller feature. - If
fit_by_residualis false: It calculates the R² of a model including all current buyer features plus the candidate seller feature, predicting the original buyer target. This involves more complex matrix operations (XTX, XTY, inversion) using the sketch components. - Features causing singularities or already selected are skipped.
- The seller feature (from any join key, any batch) that yields the highest R² improvement is chosen as
best_featurefor this iteration. Its originalseller_idandbest_featurename (prefixed) are retrieved usingSketchBase.get_df_by_feature_index.
- Augmentation Plan Update: The chosen
(seller_id, iteration_number, seller_name, best_feature_name)is added toself.augplan. - Update Buyer State (
SearchEngine._update_residual):- The original buyer DataFrame (from
DataMarket.buyer_dataset_for_residualif fitting by residual, orDataMarket.buyer_id_to_df_and_name[0]["dataframe"]which should reflect augmentations from previous iterations) is retrieved. - The original DataFrame for the chosen
seller_idis retrieved. Thebest_feature(with itsseller_name_prefix) is selected from it, along with thejoin_key. - The buyer DataFrame is joined with this seller feature on the
join_key. The seller feature is typically aggregated (e.g., mean) byjoin_keybefore joining. - This augmented buyer DataFrame becomes the new basis for the buyer.
- Crucially, the buyer is effectively re-registered in the
DataMarketwith this augmented data:DataMarket.buyer_sketchesand related buyer info are reset.DataMarket.register_buyer()is called again with the augmented buyer DataFrame. This recalculates the R² of the new model (now including thebest_feature), and this R² is appended toDataMarket.augplan_acc. Iffit_by_residual, new residuals are computed. New buyer sketches are generated and loaded.
- The original buyer DataFrame (from
- This initiates an iterative feature selection process for the number of
- The loop continues for the specified number of
iterations.
- After the search loop,
plot_results()may be called to generate plots (e.g., accuracy vs. iteration). - The
run()method returns a dictionary containing:'augplan': The list of selected features.'accuracy': The list of R² values, starting with the buyer's initial R² and then the R² after adding each feature from theaugplan.'time_taken': Execution time.
This flow details how the system processes input data, transforms it into sketches, uses these sketches to efficiently find relevant features from sellers, and iteratively augments the buyer's dataset, tracking predictive accuracy along the way.