diff --git a/app-requirements.md b/app-requirements.md new file mode 100644 index 0000000..5f2534d --- /dev/null +++ b/app-requirements.md @@ -0,0 +1,14 @@ +- The /toggle endpoint updates the users recommendation based on the most popular items in the city. +If you look at the /toggle endpoint. That's what runs the process that generates the user recommendations. + +- There is a boolean switch that changes the process that is ran (popular/machine learning). +We need to have your recommendation algorithm implementation ran everytime that endpoint is called. + + +### So we need the following: + +1. Functions to query all the data you need for your algorithm (in order for this to work in production we need to be careful with memory. Does your algorithm algorithm work in chunks? Can it work by processing 10,000 rows of data at a time. Or does it need all the data at once? Another solution would be to use a cluster computing framework, which might be the better way to go about it) + +2. Function that takes in the queried data and begins the recommendation algorithm you've made + +3. Function that populates the UserRecommendation table and associates the UserRecommendation ID with it's corresponding Users. (when I was doing research it mentioned that some users are likely to get very similar recommendations. So in order to save data for production they would give the same recommendations to three users that are very similar. So in the implementation the UserRecommendation ID can be associated with more than 1 user. This doesn't have to be the case if you don't want, we can have a OnetoOne relationship with UserRecommendation and the User). How the popular recommendation currently works is: it creates one UserRecommendation object, and it gives that ID to every User in our database. \ No newline at end of file diff --git a/app/main/__init__.py b/app/main/__init__.py index 2acb0e9..d104d1d 100644 --- a/app/main/__init__.py +++ b/app/main/__init__.py @@ -9,7 +9,7 @@ def create_app(config_name): app = Flask(__name__) - DB.init() + DB.__init__() app.config.from_object(config_by_name[config_name]) flask_bcrypt.init_app(app) return app diff --git a/app/main/controller/user_recommendation_controller.py b/app/main/controller/user_recommendation_controller.py index b63a563..9b6aa53 100644 --- a/app/main/controller/user_recommendation_controller.py +++ b/app/main/controller/user_recommendation_controller.py @@ -19,7 +19,7 @@ def get(self): @api.route('/v1') class Recommendation(Resource): - + @api.doc('Gets a specific user\'s product recommendations') def get(self): user_id = int(request.args.get('userId', None)) diff --git a/app/main/dataModel/category.py b/app/main/dataModel/category.py index 9160015..3a49999 100644 --- a/app/main/dataModel/category.py +++ b/app/main/dataModel/category.py @@ -18,15 +18,17 @@ class Category(object): COLLECTION = "categories" def __init__(self, id, name): - self.id = id; + self.id = id self.name = name self.added_on = time.time() self.last_updated = self.added_on + # adds the categorical info if not found in the database. def insert(self): if not DB.find_one(Category.COLLECTION, {"_id": self.id}): DB.insert(collection=Category.COLLECTION, data=self.json()) + # return the info in json format. def json(self): return { '_id': self.id, diff --git a/app/main/dataModel/merchant_product.py b/app/main/dataModel/merchant_product.py index eb7c6d2..c5934d0 100644 --- a/app/main/dataModel/merchant_product.py +++ b/app/main/dataModel/merchant_product.py @@ -21,7 +21,7 @@ class Merchant_Product(object): COLLECTION = "merchant_products" def __init__(self, id, merchant_id, product_id, price, currency, discounted_price): - self.id = id; + self.id = id self.merchant_id = merchant_id self.product_id = product_id self.price = price diff --git a/app/main/dataModel/user.py b/app/main/dataModel/user.py index 0775fd3..c5a770a 100644 --- a/app/main/dataModel/user.py +++ b/app/main/dataModel/user.py @@ -45,9 +45,15 @@ def json(self): 'recommendation_id': self.recommendation_id, 'city_id': self.city_id } + + # I dont see why there is no need for insert of the user data? or was it missed ? + def insert(self): + if not DB.find_one(User.COLLECTION, {"_id": self.id}): + DB.insert(collection=User.COLLECTION, data=self.json()) + # to get the id of the user ?. TO do find the use of this function. nOT SURE WHAT TO RETURN HERE. def get_id(self): - self.getId() + return self.id def set_id(self, id): self.id = id diff --git a/app/main/dataModel/user_purchase.py b/app/main/dataModel/user_purchase.py index e9045fc..96e5c04 100644 --- a/app/main/dataModel/user_purchase.py +++ b/app/main/dataModel/user_purchase.py @@ -35,9 +35,9 @@ def insert(self): def json(self): return { '_id': self.id, - 'user_id': self.name, - 'product_id': self.retail_price, - 'purchased_count': self.categories, + 'user_id': self.user_id, + 'product_id': self.product_id, + 'purchased_count': self.purchased_count, 'added_on': self.added_on, 'last_updated': self.last_updated } diff --git a/app/main/database.py b/app/main/database.py index e39fdd2..cd17e82 100644 --- a/app/main/database.py +++ b/app/main/database.py @@ -9,7 +9,8 @@ class DB(object): DB_NAME = "recommendation" @staticmethod - def init(): + def __init__(): + # connecting to the database. client = pymongo.MongoClient(DB.URI) DB.DATABASE = client[DB.DB_NAME] diff --git a/app/main/model/vowpal_wabbit.py b/app/main/model/vowpal_wabbit.py new file mode 100644 index 0000000..4abf601 --- /dev/null +++ b/app/main/model/vowpal_wabbit.py @@ -0,0 +1,266 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +""" +This file provides a wrapper to run Vowpal Wabbit from the command line through python. +It is not recommended to use this approach in production, there are python bindings that can be installed from the +repository or pip or the command line can be used. This is merely to demonstrate vw usage in the example notebooks. +""" + +import os +from subprocess import run +from tempfile import TemporaryDirectory +import pandas as pd + +from reco_utils.common.constants import ( + DEFAULT_USER_COL, + DEFAULT_ITEM_COL, + DEFAULT_RATING_COL, + DEFAULT_TIMESTAMP_COL, + DEFAULT_PREDICTION_COL, +) + + +class VW: + """Vowpal Wabbit Class""" + + def __init__( + self, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_timestamp=DEFAULT_TIMESTAMP_COL, + col_prediction=DEFAULT_PREDICTION_COL, + **kwargs, + ): + """Initialize model parameters + + Args: + col_user (str): user column name + col_item (str): item column name + col_rating (str): rating column name + col_timestamp (str): timestamp column name + col_prediction (str): prediction column name + """ + + # create temporary files + self.tempdir = TemporaryDirectory() + self.train_file = os.path.join(self.tempdir.name, "train.dat") + self.test_file = os.path.join(self.tempdir.name, "test.dat") + self.model_file = os.path.join(self.tempdir.name, "vw.model") + self.prediction_file = os.path.join(self.tempdir.name, "prediction.dat") + + # set DataFrame columns + self.col_user = col_user + self.col_item = col_item + self.col_rating = col_rating + self.col_timestamp = col_timestamp + self.col_prediction = col_prediction + + self.logistic = "logistic" in kwargs.values() + self.train_cmd = self.parse_train_params(params=kwargs) + self.test_cmd = self.parse_test_params(params=kwargs) + + @staticmethod + def to_vw_cmd(params): + """Convert dictionary of parameters to vw command line. + + Args: + params (dict): key = parameter, value = value (use True if parameter is just a flag) + + Returns: + list[str]: vw command line parameters as list of strings + """ + + cmd = ["vw"] + for k, v in params.items(): + if v is False: + # don't add parameters with a value == False + continue + + # add the correct hyphen to the parameter + cmd.append(f"-{k}" if len(k) == 1 else f"--{k}") + if v is not True: + # don't add an argument for parameters with value == True + cmd.append("{}".format(v)) + + return cmd + + def parse_train_params(self, params): + """Parse input hyper-parameters to build vw train commands + + Args: + params (dict): key = parameter, value = value (use True if parameter is just a flag) + + Returns: + list[str]: vw command line parameters as list of strings + """ + + # make a copy of the original hyper parameters + train_params = params.copy() + + # remove options that are handled internally, not supported, or test only parameters + invalid = [ + "data", + "final_regressor", + "invert_hash", + "readable_model", + "t", + "testonly", + "i", + "initial_regressor", + "link", + ] + + for option in invalid: + if option in train_params: + del train_params[option] + + train_params.update( + { + "d": self.train_file, + "f": self.model_file, + "quiet": params.get("quiet", True), + } + ) + return self.to_vw_cmd(params=train_params) + + def parse_test_params(self, params): + """Parse input hyper-parameters to build vw test commands + + Args: + params (dict): key = parameter, value = value (use True if parameter is just a flag) + + Returns: + list[str]: vw command line parameters as list of strings + """ + + # make a copy of the original hyper parameters + test_params = params.copy() + + # remove options that are handled internally, ot supported or train only parameters + invalid = [ + "data", + "f", + "final_regressor", + "initial_regressor", + "test_only", + "invert_hash", + "readable_model", + "b", + "bit_precision", + "holdout_off", + "c", + "cache", + "k", + "kill_cache", + "l", + "learning_rate", + "l1", + "l2", + "initial_t", + "power_t", + "decay_learning_rate", + "q", + "quadratic", + "cubic", + "i", + "interactions", + "rank", + "lrq", + "lrqdropout", + "oaa", + ] + for option in invalid: + if option in test_params: + del test_params[option] + + test_params.update( + { + "d": self.test_file, + "i": self.model_file, + "quiet": params.get("quiet", True), + "p": self.prediction_file, + "t": True, + } + ) + return self.to_vw_cmd(params=test_params) + + def to_vw_file(self, df, train=True): + """Convert Pandas DataFrame to vw input format file + + Args: + df (pd.DataFrame): input DataFrame + train (bool): flag for train mode (or test mode if False) + """ + + output = self.train_file if train else self.test_file + with open(output, "w") as f: + # extract columns and create a new dataframe + tmp = df[[self.col_rating, self.col_user, self.col_item]].reset_index() + + if train: + # we need to reset the rating type to an integer to simplify the vw formatting + tmp[self.col_rating] = tmp[self.col_rating].astype("int64") + + # convert rating to binary value + if self.logistic: + max_value = tmp[self.col_rating].max() + tmp[self.col_rating] = tmp[self.col_rating].apply( + lambda x: 2 * round(x / max_value) - 1 + ) + else: + tmp[self.col_rating] = "" + + # convert each row to VW input format (https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Input-format) + # [label] [tag]|[user namespace] [user id feature] |[item namespace] [movie id feature] + # label is the true rating, tag is a unique id for the example just used to link predictions to truth + # user and item namespaces separate features to support interaction features through command line options + for _, row in tmp.iterrows(): + f.write( + "{rating} {index}|user {userID} |item {itemID}\n".format( + rating=row[self.col_rating], + index=row["index"], + userID=row[self.col_user], + itemID=row[self.col_item], + ) + ) + + def fit(self, df): + """Train model + + Args: + df (pd.DataFrame): input training data + """ + + # write dataframe to disk in vw format + self.to_vw_file(df=df) + + # train model + run(self.train_cmd, check=True) + + def predict(self, df): + """Predict results + + Args: + df (pd.DataFrame): input test data + """ + + # write dataframe to disk in vw format + self.to_vw_file(df=df, train=False) + + # generate predictions + run(self.test_cmd, check=True) + + # read predictions + return df.join( + pd.read_csv( + self.prediction_file, + delim_whitespace=True, + names=[self.col_prediction], + index_col=1, + ) + ) + + def __del__(self): + self.tempdir.cleanup() diff --git a/app/main/service/UserRecommendationsService.py b/app/main/service/UserRecommendationsService.py index 80a2c2a..e3a4824 100644 --- a/app/main/service/UserRecommendationsService.py +++ b/app/main/service/UserRecommendationsService.py @@ -4,11 +4,14 @@ from ..repository.UserRecommendationRepository import UserRecommendationRepository from ..repository.UserRepository import UserRepository -class UserRecommendationService(object): +from ..util.data_cleaning import DataProcessing +import pandas as pd +class UserRecommendationService(object): - def __init__(self, recommendation_active = False): + def __init__(self, recommendation_active = False , process='purchase_count'): self.recommendation_active = recommendation_active + self.process = process def update_user_recommendations(self): @@ -18,11 +21,12 @@ def update_user_recommendations(self): self._generic_process() + ''' Queries for the most popular items in a given city. Inserts one entry to the user_recommendation database collection per city. Updates every User's recommendation_id to that of the inserted user_recommendation entry - + This is the generic recommnedation which is based on the popilar items in the given city. ''' def _generic_process(self): #TODO: Change logic so that the entire process is not commited unless everything is ran correctly (in case of errors) @@ -33,13 +37,23 @@ def _generic_process(self): # list_of_city_users = self._get_all_city_users(city) self._set_user_recommendation(city) + ''' - Calls a function that begins the recommendation algorithm process + Calls a function that begins the recommendation algorithm process Should insert new recommendations in the user_recommendation database collection and update a User's recommendation_id. ''' def _machine_learning_process(self): - return None + purchase_count_explicit, purchase_count_implicit,users_explicit, users_implicit = DataProcessing.start() + + if self.process =='purchase_count': + purchase_count = pd.Dataframe(purchase_count_explicit.groupby(['product_id'])['purchased_count'].sum()) + top10 = purchase_count.sort_values('purchase_count' , ascending=False).head(10) + + UserRecommendationRepository.bulk_delete() + UserRecommendationRepository.insert(top10) + + return UserRecommendationRepository ''' diff --git a/app/main/util/data_cleaning.py b/app/main/util/data_cleaning.py new file mode 100644 index 0000000..d154eab --- /dev/null +++ b/app/main/util/data_cleaning.py @@ -0,0 +1,92 @@ +from ..dataModel.user_recommendation import UserRecommendation +from ..dataModel.category import Category +from ..dataModel.merchant_category import Merchant_Category +from ..dataModel.merchant_product import Merchant_Product +from ..dataModel.merchant import Merchant +from ..dataModel.product import Product +from ..dataModel.user_purchase import UserPurchase +from ..dataModel.user import User + +from ..database import DB + +import numpy as np +import pandas as pd + +def read_mongo(db, collection,no_id=True): + """ Read from Mongo and Store into DataFrame """ + + # Connect to MongoDB - For that we are making a call to the DB class which gives us the database connection and gives helper functions for that. + + # Make a query to the specific DB and Collection + data = db.find_all(collection) + # Expand the user_data mongo field and construct the DataFrame + df = pd.DataFrame(list(data)) + + + # Delete the _id from the dataframe as it is of no use for other collection. + # The user_id is the only thing required else the id generated in each of the other collection is of no use. + if no_id: + del df['id'] + # need to take care of the added_on and last_updated tables also. + + return df + + +class DataProcessing(object): + def __init__(self): + + self.user_df = read_mongo(DB, User.COLLECTION , no_id= False) + self.user_purchase_df = read_mongo(DB, UserPurchase.COLLECTION , no_id=True) # Not sure abt the no_id here if there are mutliple entries for the same user + self.product_df = read_mongo(DB, Product.COLLECTION, no_id=False) + self.merchant_df = read_mongo(DB, Merchant.COLLECTION, no_id=False) + self.merchant_prod_df = read_mongo(DB, Merchant_Product.COLLECTION, no_id=True) + self.merchant_cat_df = read_mongo(DB, Merchant_Category.COLLECTION, no_id=True) + self.category_df = read_mongo(DB , Category.COLLECTION, no_id=False) + # the user_recommenation dataframe could be used for something like the toggling task or something. + self.user_rec_df = read_mongo(DB, UserRecommendation.COLLECTION, no_id=True) + + + def start(self): + + """ + The last updated data or added_on doesn't make any useful contribution to the data unless we need to just check if the + item is available in the shop or not while recommending so we will just use those column fields for just an affirmation + the item is available at the recommended shop. + """ + self.user_df.drop(['added_on', 'last_updated'], inplace=True) + self.user_purchase_df.drop(['added_on', 'last_updated'], inplace=True) + self.product_df.drop(['added_on' , 'last_updated'] , inplace=True) + self.merchant_df.drop(['added_on' , 'last_updated'] , inplace=True) + self.merchant_prod_df.drop(['added_on' , 'last_updated'] , inplace=True) + self.merchant_cat_df.drop(['added_on' , 'last_updated'] , inplace=True) + self.category_df.drop(['added_on' , 'last_updated'] , inplace=True) + + # self.user_rec_df.drop(['added_on' , 'last_updated'] , inplace=True) + + """ + Age column has a NaN and some very high values. In my view ages below 5 and above 90 do not make much sense, + and hence, these are being replaced with NaNs. + All the NaNs are then replaced with mean value of Age, and its data type is set as int. + """ + + self.user_df.loc[(self.user_df.age > 90)|(self.user_df.age<5) ,'age' ] = np.nan + self.user_df.age = self.user_df.fillna( self.user_df.age.mean() ) + self.user_df.age = self.user_df.age.astype(np.int32) + + + # we need to make sure the items that the user givves are also present in our database. + + """ + The explicit purchase_count represented by 1–10 and implicit represented by 0 will have to be segregated now + """ + purchase_count_explicit = self.user_purchase_df[self.user_purchase_df.purchase_count !=0] + purchase_count_implicit = self.user_purchase_df[self.user_purchase_df.purchase_count ==0] + + users_explicit = self.user_df[self.user_df.id.isin(purchase_count_explicit.user_id)] + users_implicit = self.user_df[self.user_df.id.isin(purchase_count_implicit.user_id)] + + + return purchase_count_explicit, purchase_count_implicit, \ + users_explicit, users_implicit + + diff --git a/requirements.txt b/requirements.txt index 1763521..67141a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,4 +21,6 @@ python-editor==1.0.4 pytz==2019.1 six==1.12.0 Werkzeug==0.15.5 -pymongo==3.5.1 \ No newline at end of file +pymongo==3.5.1 +numpy==1.18.0 +pandas==0.25.3 \ No newline at end of file