From b978caf1a9711adebc1968ef61ef93b9b79ea8df Mon Sep 17 00:00:00 2001 From: Jean Paul Torre Date: Mon, 19 Aug 2019 17:35:49 +0200 Subject: [PATCH 1/7] Add pymongo, add database models for tables, add dev directory with endpoints --- Makefile | 21 +++++ app/__init__.py | 6 +- app/main/__init__.py | 10 +-- app/main/config.py | 8 +- app/main/controller/dev_controller.py | 84 +++++++++++++++++++ ...ser_controller.py => health_controller.py} | 0 app/main/dataModel/__init__.py | 0 app/main/dataModel/category.py | 36 ++++++++ app/main/dataModel/merchant.py | 38 +++++++++ app/main/dataModel/merchant_category.py | 39 +++++++++ app/main/dataModel/merchant_product.py | 40 +++++++++ app/main/dataModel/product.py | 45 ++++++++++ app/main/database.py | 26 ++++++ app/main/util/dto.py | 3 +- docker-compose.yml | 25 ++++++ manage.py | 6 +- requirements.txt | 4 +- 17 files changed, 369 insertions(+), 22 deletions(-) create mode 100644 Makefile create mode 100644 app/main/controller/dev_controller.py rename app/main/controller/{user_controller.py => health_controller.py} (100%) create mode 100644 app/main/dataModel/__init__.py create mode 100644 app/main/dataModel/category.py create mode 100644 app/main/dataModel/merchant.py create mode 100644 app/main/dataModel/merchant_category.py create mode 100644 app/main/dataModel/merchant_product.py create mode 100644 app/main/dataModel/product.py create mode 100644 app/main/database.py create mode 100644 docker-compose.yml diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..7f9b493 --- /dev/null +++ b/Makefile @@ -0,0 +1,21 @@ +.PHONY: clean system-packages python-packages install tests run all + +clean: + find . -type f -name '*.pyc' -delete + find . -type f -name '*.log' -delete + +system-packages: + sudo apt install python-pip -y + +python-packages: + pip install -r requirements.txt + +install: system-packages python-packages + +tests: + python manage.py test + +run: + python manage.py run + +all: clean install tests run diff --git a/app/__init__.py b/app/__init__.py index 25cc998..3aa3c9d 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -1,7 +1,9 @@ from flask_restplus import Api from flask import Blueprint -from .main.controller.user_controller import api2 as health_ns +from .main.controller.health_controller import api2 as health_ns +from .main.controller.dev_controller import api as dev_ns + blueprint = Blueprint('api', __name__) @@ -12,4 +14,4 @@ ) api.add_namespace(health_ns, path='/health') - +api.add_namespace(dev_ns, path='/dev') diff --git a/app/main/__init__.py b/app/main/__init__.py index 6e361ee..2acb0e9 100644 --- a/app/main/__init__.py +++ b/app/main/__init__.py @@ -1,17 +1,15 @@ from flask import Flask -from flask_sqlalchemy import SQLAlchemy from flask_bcrypt import Bcrypt - +from .database import DB from .config import config_by_name -db = SQLAlchemy() + flask_bcrypt = Bcrypt() def create_app(config_name): app = Flask(__name__) + DB.init() app.config.from_object(config_by_name[config_name]) - db.init_app(app) flask_bcrypt.init_app(app) - - return app \ No newline at end of file + return app diff --git a/app/main/config.py b/app/main/config.py index 1e4dbb2..de00166 100644 --- a/app/main/config.py +++ b/app/main/config.py @@ -13,18 +13,16 @@ class Config: class DevelopmentConfig(Config): # uncomment the line below to use postgres - # SQLALCHEMY_DATABASE_URI = postgres_local_base DEBUG = True - SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'flask_boilerplate_main.db') - SQLALCHEMY_TRACK_MODIFICATIONS = False + MONGO_DB_URI = "mongodb://mongodb:27017/" class TestingConfig(Config): DEBUG = True TESTING = True - SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'flask_boilerplate_test.db') +# SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'flask_boilerplate_test.db') PRESERVE_CONTEXT_ON_EXCEPTION = False - SQLALCHEMY_TRACK_MODIFICATIONS = False +# SQLALCHEMY_TRACK_MODIFICATIONS = False class ProductionConfig(Config): diff --git a/app/main/controller/dev_controller.py b/app/main/controller/dev_controller.py new file mode 100644 index 0000000..683d40b --- /dev/null +++ b/app/main/controller/dev_controller.py @@ -0,0 +1,84 @@ +from flask import request +from flask_restplus import Resource +from ..dataModel.category import Category +from ..dataModel.merchant import Merchant +from ..database import DB +from bson.json_util import dumps +import json +from bson.objectid import ObjectId + +from ..util.dto import DevEndpoint + +api = DevEndpoint.api + +@api.route('/v1/category') +@api.doc(params={'id': 'Category ID', 'name': 'Category Name'}) +class CategoryAPI(Resource): + @api.doc('Gets all the categories in the database') + def get(self): + """Gets all the categories in the categories collection""" + categories = DB.find_all("categories") + return dumps(categories) + + @api.doc('Adds a category to the database') + def post(self): + """Adds category to the database.""" + id = request.args.get('id', '') + name = request.args.get('name', '') + new_category = Category(id, name) + new_category.insert() + return ({'message': 'Successfully Added'}, 200) + +@api.route('/v1/merchant') +@api.doc(params={'id': 'Merchant ID', 'name': 'Merchant Name', 'location': 'Merchant Address'}) +class MerchantAPI(Resource): + @api.doc('Gets all the merchants in the database') + def get(self): + """Gets all the categories in the categories collection""" + merchants = DB.find_all("merchants") + return dumps(merchants) + + @api.doc('Adds a merchant to the database') + def post(self): + """Adds merchant to the database.""" + id = request.args.get('id', '') + name = request.args.get('name', '') + location = request.args.get('location', '') + new_merchant = Merchant(id, name, location) + new_merchant.insert() + return ({'message': 'Successfully Added'}, 200) + + +@api.route('/v1/product') +class ProductAPI(Resource): + @api.doc('Gets all the products in the database') + def get(self): + """Gets all the products in the products collection""" + products = DB.find_all("products") + return dumps(products) + + +@api.route('/v1/merchant_category') +class MerchantCategoryAPI(Resource): + @api.doc('Gets all the merchant categories in the database') + def get(self): + """Gets all the products in the products collection""" + merchant_categories = DB.find_all("merchant_categories") + return dumps(merchant_categories) + + +@api.route('/v1/merchant_product') +class MerchantProductAPI(Resource): + @api.doc('Gets all the merchant products in the database') + def get(self): + """Gets all the products in the products collection""" + merchant_categories = DB.find_all("merchant_products") + return dumps(merchant_categories) + + +class Encoder(json.JSONEncoder): + def default(self): + if isinstance(obj, ObjectId): + return str(obj) + else: + return obj diff --git a/app/main/controller/user_controller.py b/app/main/controller/health_controller.py similarity index 100% rename from app/main/controller/user_controller.py rename to app/main/controller/health_controller.py diff --git a/app/main/dataModel/__init__.py b/app/main/dataModel/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/main/dataModel/category.py b/app/main/dataModel/category.py new file mode 100644 index 0000000..9160015 --- /dev/null +++ b/app/main/dataModel/category.py @@ -0,0 +1,36 @@ +import time + +from ..database import DB + +''' +Category Data +Example: +{ + “id” : 1, + “name”: “soda”, + “added_on”: 12342352 + “last_updated”: 12342942, +} + +''' +class Category(object): + + COLLECTION = "categories" + + def __init__(self, id, name): + self.id = id; + self.name = name + self.added_on = time.time() + self.last_updated = self.added_on + + def insert(self): + if not DB.find_one(Category.COLLECTION, {"_id": self.id}): + DB.insert(collection=Category.COLLECTION, data=self.json()) + + def json(self): + return { + '_id': self.id, + 'name': self.name, + 'added_on': self.added_on, + 'last_updated': self.last_updated, + } diff --git a/app/main/dataModel/merchant.py b/app/main/dataModel/merchant.py new file mode 100644 index 0000000..1ce72b0 --- /dev/null +++ b/app/main/dataModel/merchant.py @@ -0,0 +1,38 @@ +import time + +from ..database import DB + +''' +Merchant Data +Example: +{ + “id” : 1, + “name”: “Food Lion”, + “location”: “Calle de Zaragoze 4, 28201 Madrid, Madrid”, + “added_on”: 12342352 + “last_updated”: 12342942, +} +''' +class Merchant(object): + + COLLECTION = "merchants" + + def __init__(self, id, name, location): + self.id = id; + self.name = name + self.location = location + self.added_on = time.time() + self.last_updated = self.added_on + + def insert(self): + if not DB.find_one(Merchant.COLLECTION, {"_id": self.id}): + DB.insert(collection=Merchant.COLLECTION, data=self.json()) + + def json(self): + return { + '_id': self.id, + 'name': self.name, + 'location': self.location, + 'added_on': self.added_on, + 'last_updated': self.last_updated + } diff --git a/app/main/dataModel/merchant_category.py b/app/main/dataModel/merchant_category.py new file mode 100644 index 0000000..e125431 --- /dev/null +++ b/app/main/dataModel/merchant_category.py @@ -0,0 +1,39 @@ +import time + +from ..database import DB + +''' +Merchant Category Data +Example: + { + “id”: 1, + “mechant_id”: 245, + “category_id”: 452 + “priceLevel”: 1 + } +''' +class Merchant_Category(object): + + COLLECTION = "merchant_categories" + + def __init__(self, id, merchant_id, category_id, price_level): + self.id = id; + self.merchant_id = merchant_id + self.category_id = category_id + self.price_level = price_level + self.added_on = time.time() + self.last_updated = self.added_on + + def insert(self): + if not DB.find_one(Merchant_Category.COLLECTION, {"_id": self.id}): + DB.insert(collection=Merchant_Category.COLLECTION, data=self.json()) + + def json(self): + return { + '_id': self.id, + 'merchant_id': self.merchant_id, + 'category_id': self.category_id, + 'price_level': self.price_level, + 'added_on': self.added_on, + 'last_updated': self.last_updated + } diff --git a/app/main/dataModel/merchant_product.py b/app/main/dataModel/merchant_product.py new file mode 100644 index 0000000..5b510c1 --- /dev/null +++ b/app/main/dataModel/merchant_product.py @@ -0,0 +1,40 @@ +import time + +from ..database import DB + +''' +Merchant Product Data +Example: + { + “id”: 1, + “mechant_id”: 245, + “product_id”: 452 + “price”: {currency: “$”, “amount”: 12.45} + } +''' +class Merchant_Product(object): + + COLLECTION = "merchant_products" + + def __init__(self, id, merchant_id, product_id, price, currency): + self.id = id; + self.merchant_id = merchant_id + self.product_id = product_id + self.price = price + self.currency = currency + self.added_on = time.time() + self.last_updated = self.added_on + + def insert(self): + if not DB.find_one(Merchant_Product.COLLECTION, {"_id": self.id}): + DB.insert(collection=Merchant_Product.COLLECTION, data=self.json()) + + def json(self): + return { + '_id': self.id, + 'merchant_id': self.merchant_id, + 'product_id': self.product_id, + 'price': {'currency': '$', “amount”: 12.45}, + 'added_on': self.added_on, + 'last_updated': self.last_updated + } diff --git a/app/main/dataModel/product.py b/app/main/dataModel/product.py new file mode 100644 index 0000000..517f8f5 --- /dev/null +++ b/app/main/dataModel/product.py @@ -0,0 +1,45 @@ +import time + +from ..database import DB + +''' +Product Data +Example: +{ + “id” : 1, + “name”: “12oz Coke”, + “retail_price”: 1.50, + “categories”: [ + “beverage”, + “soda” + ], + “added_on”: 12342352 + “last_updated”: 12342942, +} + +''' +class Product(object): + + COLLECTION = "products" + + def __init__(self, id, name, retail_price, categories): + self.id = id + self.name = name + self.retail_price = retail_price + self.categories = categories + self.added_on = time.time() + self.last_updated = self.added_on + + def insert(self): + if not DB.find_one(Product.COLLECTION, {"_id": self.id}): + DB.insert(collection=Product.COLLECTION, data=self.json()) + + def json(self): + return { + '_id': self.id, + 'name': self.name, + 'retail_price': self.retail_price, + 'categories': self.categories, + 'added_on': self.added_on, + 'last_updated': self.last_updated + } diff --git a/app/main/database.py b/app/main/database.py new file mode 100644 index 0000000..a7de370 --- /dev/null +++ b/app/main/database.py @@ -0,0 +1,26 @@ +import pymongo + +''' +Creates a database connection and introduces helper functions that call pymongo functions +''' +class DB(object): + + URI = "mongodb://mongodb:27017/" + DB_NAME = "recommendation" + + @staticmethod + def init(): + client = pymongo.MongoClient(DB.URI) + DB.DATABASE = client[DB.DB_NAME] + + @staticmethod + def insert(collection, data): + DB.DATABASE[collection].insert(data) + + @staticmethod + def find_one(collection, query): + return DB.DATABASE[collection].find_one(query) + + @staticmethod + def find_all(collection): + return DB.DATABASE[collection].find() diff --git a/app/main/util/dto.py b/app/main/util/dto.py index 3cfb240..dfa8503 100644 --- a/app/main/util/dto.py +++ b/app/main/util/dto.py @@ -3,4 +3,5 @@ class Health: api = Namespace('health', description='Checks Recommendation Service Health') - +class DevEndpoint: + api = Namespace('dev', description='Used for testing, wont be publicly available') diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..ab75ca8 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,25 @@ +version: '3.7' +services: + + recommendation: + build: + context: . + dockerfile: Dockerfile + ports: + - "5000:5000" + depends_on: + - mongodb + + mongodb: + image: mongo:3.4.22 + container_name: "mongodb" + ports: + - 27017:27017 + volumes: + - ./data/db:/data/db" + environment: + - MONGO_DATA_DIR=/data/db + - MONGO_LOG_DIR=/dev/null + - MONGODB_USER="user" + - MONGODB_USER="pass" + diff --git a/manage.py b/manage.py index 07f5997..54a2820 100644 --- a/manage.py +++ b/manage.py @@ -5,7 +5,7 @@ from flask_script import Manager from app import blueprint -from app.main import create_app, db +from app.main import create_app app = create_app(os.getenv('BOILERPLATE_ENV') or 'dev') app.register_blueprint(blueprint) @@ -14,10 +14,6 @@ manager = Manager(app) -migrate = Migrate(app, db) - -manager.add_command('db', MigrateCommand) - @manager.command def run(): diff --git a/requirements.txt b/requirements.txt index bb76ba0..1763521 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,6 @@ Flask==1.1.1 Flask-Migrate==2.5.2 flask-restplus==0.12.1 Flask-Script==2.0.6 -Flask-SQLAlchemy==2.4.0 Flask-Testing==0.7.1 itsdangerous==1.1.0 Jinja2==2.10.1 @@ -21,6 +20,5 @@ python-dateutil==2.8.0 python-editor==1.0.4 pytz==2019.1 six==1.12.0 -SQLAlchemy==1.3.6 Werkzeug==0.15.5 - +pymongo==3.5.1 \ No newline at end of file From a6c7f756bf911d6483c3f4bb554084798bd8a64a Mon Sep 17 00:00:00 2001 From: Jean Paul Torre Date: Tue, 20 Aug 2019 13:06:09 +0200 Subject: [PATCH 2/7] Add missing tables, add missing keys --- app/main/dataModel/merchant_category.py | 8 +++-- app/main/dataModel/merchant_product.py | 15 +++++--- app/main/dataModel/user.py | 46 +++++++++++++++++++++++++ app/main/dataModel/user_purchase.py | 43 +++++++++++++++++++++++ 4 files changed, 105 insertions(+), 7 deletions(-) create mode 100644 app/main/dataModel/user.py create mode 100644 app/main/dataModel/user_purchase.py diff --git a/app/main/dataModel/merchant_category.py b/app/main/dataModel/merchant_category.py index e125431..9451a2f 100644 --- a/app/main/dataModel/merchant_category.py +++ b/app/main/dataModel/merchant_category.py @@ -8,8 +8,10 @@ { “id”: 1, “mechant_id”: 245, - “category_id”: 452 - “priceLevel”: 1 + “category_id”: 452, + “priceLevel”: 1, + “added_on”: 12342352, + “last_updated”: 12342942 } ''' class Merchant_Category(object): @@ -17,7 +19,7 @@ class Merchant_Category(object): COLLECTION = "merchant_categories" def __init__(self, id, merchant_id, category_id, price_level): - self.id = id; + self.id = id self.merchant_id = merchant_id self.category_id = category_id self.price_level = price_level diff --git a/app/main/dataModel/merchant_product.py b/app/main/dataModel/merchant_product.py index 5b510c1..eb7c6d2 100644 --- a/app/main/dataModel/merchant_product.py +++ b/app/main/dataModel/merchant_product.py @@ -8,20 +8,25 @@ { “id”: 1, “mechant_id”: 245, - “product_id”: 452 - “price”: {currency: “$”, “amount”: 12.45} + “product_id”: 452, + "currency": "$", + “price”: 12.45, + "discounted_price": 10.15, + “added_on”: 12342352, + “last_updated”: 12342942 } ''' class Merchant_Product(object): COLLECTION = "merchant_products" - def __init__(self, id, merchant_id, product_id, price, currency): + def __init__(self, id, merchant_id, product_id, price, currency, discounted_price): self.id = id; self.merchant_id = merchant_id self.product_id = product_id self.price = price self.currency = currency + self.discounted_price = discounted_price self.added_on = time.time() self.last_updated = self.added_on @@ -34,7 +39,9 @@ def json(self): '_id': self.id, 'merchant_id': self.merchant_id, 'product_id': self.product_id, - 'price': {'currency': '$', “amount”: 12.45}, + 'currency': self.currency, + 'price': self.price, + 'discounted_price': self.discounted_price, 'added_on': self.added_on, 'last_updated': self.last_updated } diff --git a/app/main/dataModel/user.py b/app/main/dataModel/user.py new file mode 100644 index 0000000..3112acb --- /dev/null +++ b/app/main/dataModel/user.py @@ -0,0 +1,46 @@ +import time + +from ..database import DB + +''' +Product Data +Example: +{ + “_id”: 2, + “nationality”: 3, + "age": 22, + “average_spent”: 24.37, + “added_on”: 12342352 + “last_updated”: 12342942, + } + + + +''' +class User(object): + + COLLECTION = "user" + + def __init__(self, user_id, age, nationality, average_spent): + self.id = user_id + self.age = age + self.user_id = user_id + self.nationality = nationality + self.average_spent = average_spent + self.added_on = time.time() + self.last_updated = self.added_on + + def insert(self): + if not DB.find_one(User.COLLECTION, {"_id": self.id}): + DB.insert(collection=User.COLLECTION, data=self.json()) + + def json(self): + return { + '_id': self.id, + 'nationality': self.nationality, + 'age': self.age, + 'average_spent': self.average_spent, + 'purchased_count': self.categories, + 'added_on': self.added_on, + 'last_updated': self.last_updated + } diff --git a/app/main/dataModel/user_purchase.py b/app/main/dataModel/user_purchase.py new file mode 100644 index 0000000..e9045fc --- /dev/null +++ b/app/main/dataModel/user_purchase.py @@ -0,0 +1,43 @@ +import time + +from ..database import DB + +''' +Product Data +Example: +{ + “id”: 1, + “user_id”: 2, + “product_id”: 3, + “purchased_count”: 4 + "added_on”: 12342352”, + “last_updated”: 12342942 + } + + +''' +class UserPurchase(object): + + COLLECTION = "user_purchases" + + def __init__(self, id, user_id, product_id, purchased_count): + self.id = id + self.user_id = user_id + self.product_id = product_id + self.purchased_count = purchased_count + self.added_on = time.time() + self.last_updated = self.added_on + + def insert(self): + if not DB.find_one(UserPurchase.COLLECTION, {"_id": self.id}): + DB.insert(collection=UserPurchase.COLLECTION, data=self.json()) + + def json(self): + return { + '_id': self.id, + 'user_id': self.name, + 'product_id': self.retail_price, + 'purchased_count': self.categories, + 'added_on': self.added_on, + 'last_updated': self.last_updated + } From 373989bfbbdc7a7357aa76c230647f222c3969e0 Mon Sep 17 00:00:00 2001 From: sayoojbk Date: Mon, 23 Sep 2019 23:20:57 +0530 Subject: [PATCH 3/7] implemented vowpal wabbit code to be refactored for deployment --- .vscode/settings.json | 3 + recommendation-model/configs/constants.py | 41 ++ recommendation-model/utils/dataset.py | 234 ++++++ recommendation-model/utils/evaluation.py | 677 ++++++++++++++++++ .../utils/python_splitters.py | 280 ++++++++ .../vowpal_wabbit_deep_dive.py | 371 ++++++++++ 6 files changed, 1606 insertions(+) create mode 100644 .vscode/settings.json create mode 100644 recommendation-model/configs/constants.py create mode 100644 recommendation-model/utils/dataset.py create mode 100644 recommendation-model/utils/evaluation.py create mode 100644 recommendation-model/utils/python_splitters.py create mode 100644 recommendation-model/vowpal_wabbit_deep_dive.py diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..3c3a83b --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.pythonPath": "/home/sayooj/anaconda3/bin/python" +} \ No newline at end of file diff --git a/recommendation-model/configs/constants.py b/recommendation-model/configs/constants.py new file mode 100644 index 0000000..10e64eb --- /dev/null +++ b/recommendation-model/configs/constants.py @@ -0,0 +1,41 @@ +# ------------------------------------------------------------------------------------------------------ +# Licensed under MIT License +# Written by sayooj_bk +# ------------------------------------------------------------------------------------------------------- + +# Default column names + +# This is the user meta-data which user provides. +DEFAULT_USER_COL = 'userId' +DEFAULT_USER_BUDGET = "userBudget" +DEFAULT_PURCHASED_COUNT = "purchase_count" +DEFAULT_NATIONALITY = "nationality" + + +# The product data +DEFAULT_RETAIL_PRICE = "itemPrice" +DEFAULT_ITEM_COL = "itemId" # This is what item user prefers from the list of items available. +DEFAULT_CATEGORY = "itemCategory" + + + +# The merchant data scheme +MERCHANT_ID = "merchantId" +MERCHANT_NAME = "merchantName" +LOCATION = "location" + + + +# Merchant product data +# - MERCHANT_ID , DEFAULT_ITEM_COL, DEFAULT_RETAIL_PRICE , DISCOUNTED_PRICE +MERCHANT_PRODUCT_PRICE = "merchantProductPrice" +DISCOUNTED_PRICE = "discountedPrice" + +# Merchant category data +MERCHANT_CATEGORY_ID = "merchantCategoryId" +PRICE_LEVEL = "priceLevel" # This is the price level of the merchant like is this a expensive place or cheap place like that. + + + + + diff --git a/recommendation-model/utils/dataset.py b/recommendation-model/utils/dataset.py new file mode 100644 index 0000000..16e290f --- /dev/null +++ b/recommendation-model/utils/dataset.py @@ -0,0 +1,234 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import os +import re +import shutil +import warnings +import pandas as pd +from pandas import DataFrame + +# The below data are which have been uploaded by the user while filling out form preference. +from ..configs.constants import ( + DEFAULT_USER_COL, + DEFAULT_USER_BUDGET, + DEFAULT_PURCHASED_COUNT, + DEFAULT_NATIONALITY , + DEFAULT_ITEM_COL, + DEFAULT_ITEM_CATEGORY, + DEFAULT_RETAIL_PRICE, + MERCHANT_PRODUCT_PRICE, + DISCOUNTED_PRICE, + MERCHANT_CATEGORY_ID, + PRICE_LEVEL +) + + +import pymongo +myclient = pymongo.MongoClient("mongodb://localhost:27017/") + +# Load the fcgl database which has to be loaded and +""" +FCGL-DATABASE : +COLLECTIONS : USER DATA + PRODUCT DATA + MERCHANT DATA + + +""" +mydb = myclient["fcgl-database"] +training_collection = mydb["training_data"] +# training_collection.insert(dict) -- this will add the data to the colleciton. + +ERROR_HEADER = "Header error. At least user and item id should be provided." +WARNING_FCGL_HEADER = "The dataset has more than the required data so only few columns will be used #TODO select those few." +""" +try: + from pyspark.sql.types import ( + StructType, + StructField, + IntegerType, + FloatType, + DoubleType, + LongType, + StringType, + ) + from pyspark.sql.functions import concat_ws, col +except ImportError: + pass # so the environment without spark doesn't break +""" + +# The data points on which the recommendation system will be trained. +DEFAULT_HEADER = ( + DEFAULT_USER_COL, + DEFAULT_USER_BUDGET, + DEFAULT_PURCHASED_COUNT, + DEFAULT_NATIONALITY, + DEFAULT_ITEM_COL, + DEFAULT_ITEM_CATEGORY, + DEFAULT_RETAIL_PRICE, + MERCHANT_PRODUCT_PRICE, + DISCOUNTED_PRICE, + MERCHANT_CATEGORY_ID, + PRICE_LEVEL +) + + + +def load_pandas_df( + header=None): + """ + Loads the Mongodb dataset as pd.DataFrame. + To load customer information only, you can use load_item_df function. + + Args: + size (str): Size of the data to load. One of ("100k", "1m", "10m", "20m"). + header (list or tuple or None): Rating dataset header. + local_cache_path (str): Path (directory or a zip file) to cache the downloaded zip file. + If None, all the intermediate files will be stored in a temporary directory and removed after use. + title_col (str): Movie title column name. If None, the column will not be loaded. + genres_col (str): Genres column name. Genres are '|' separated string. + If None, the column will not be loaded. + year_col (str): Movie release year column name. If None, the column will not be loaded. + + Returns: + pd.DataFrame: Movie rating dataset. + + + **Examples** + + .. code-block:: python + + # To load just user-id, item-id, and ratings from MovieLens-1M dataset, + df = load_pandas_df('1m', ('UserId', 'ItemId', 'Rating')) + + # To load rating's timestamp together, + df = load_pandas_df('1m', ('UserId', 'ItemId', 'Rating', 'Timestamp')) + + # To load movie's title, genres, and released year info along with the ratings data, + df = load_pandas_df('1m', ('UserId', 'ItemId', 'Rating', 'Timestamp'), + title_col='Title', + genres_col='Genres', + year_col='Year' + ) + """ + if header is None: + header = DEFAULT_HEADER + elif len(header) < 2: + raise ValueError(ERROR_HEADER) + elif len(header) > 4: + warnings.warn(WARNING_FCGL_HEADER) + # header = header[:4] + + fcgl_column = header[1] + # collection_name : Collection name that is used to train the vowpal wabbit model. + # collection_name : Since this is what helps in recommnding we will push in the intial user data we get from the user while he/she + # signups for the application. + + + df = DataFrame( list(mydb.training_data.find({})) ) + + return df + + +def load_item_df( + movie_col=DEFAULT_ITEM_COL, + title_col=None, + genres_col=None, + year_col=None): + """Loads Movie info. + + Args: + movie_col (str): Movie id column name. + title_col (str): Movie title column name. If None, the column will not be loaded. + genres_col (str): Genres column name. Genres are '|' separated string. + If None, the column will not be loaded. + year_col (str): Movie release year column name. If None, the column will not be loaded. + + Returns: + pd.DataFrame: Movie information data, such as title, genres, and release year. + """ + size = size.lower() + if size not in DATA_FORMAT: + raise ValueError(ERROR_MOVIE_LENS_SIZE) + + with download_path(local_cache_path) as path: + filepath = os.path.join(path, "ml-{}.zip".format(size)) + _, item_datapath = _maybe_download_and_extract(size, filepath) + item_df = _load_item_df( + size, item_datapath, movie_col, title_col, genres_col, year_col + ) + + return item_df + + +def _load_item_df(size, item_datapath, movie_col, title_col, genres_col, year_col): + """Loads Movie info""" + if title_col is None and genres_col is None and year_col is None: + return None + + item_header = [movie_col] + usecols = [0] + + # Year is parsed from title + if title_col is not None or year_col is not None: + item_header.append("title_year") + usecols.append(1) + + genres_header_100k = None + if genres_col is not None: + # 100k data's movie genres are encoded as a binary array (the last 19 fields) + # For details, see http://files.grouplens.org/datasets/movielens/ml-100k-README.txt + if size == "100k": + genres_header_100k = [*(str(i) for i in range(19))] + item_header.extend(genres_header_100k) + usecols.extend([*range(5, 24)]) # genres columns + else: + item_header.append(genres_col) + usecols.append(2) # genres column + + item_df = pd.read_csv( + item_datapath, + sep=DATA_FORMAT[size].item_separator, + engine="python", + names=item_header, + usecols=usecols, + header=0 if DATA_FORMAT[size].item_has_header else None, + encoding="ISO-8859-1", + ) + + # Convert 100k data's format: '0|0|1|...' to 'Action|Romance|..." + if genres_header_100k is not None: + item_df[genres_col] = item_df[genres_header_100k].values.tolist() + item_df[genres_col] = item_df[genres_col].map( + lambda l: "|".join([GENRES[i] for i, v in enumerate(l) if v == 1]) + ) + + item_df.drop(genres_header_100k, axis=1, inplace=True) + + # Parse year from movie title. Note, MovieLens title format is "title (year)" + # Note, there are very few records that are missing the year info. + if year_col is not None: + + def parse_year(t): + parsed = re.split("[()]", t) + if len(parsed) > 2 and parsed[-2].isdecimal(): + return parsed[-2] + else: + return None + + item_df[year_col] = item_df["title_year"].map(parse_year) + if title_col is None: + item_df.drop("title_year", axis=1, inplace=True) + + if title_col is not None: + item_df.rename(columns={"title_year": title_col}, inplace=True) + + return item_df + + +def load_spark_df( + spark): + raise NotImplementedError + + diff --git a/recommendation-model/utils/evaluation.py b/recommendation-model/utils/evaluation.py new file mode 100644 index 0000000..349ada6 --- /dev/null +++ b/recommendation-model/utils/evaluation.py @@ -0,0 +1,677 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import numpy as np +import pandas as pd +from functools import wraps +from sklearn.metrics import ( + mean_squared_error, + mean_absolute_error, + r2_score, + explained_variance_score, + roc_auc_score, + log_loss, +) + +from reco_utils.common.constants import ( + DEFAULT_USER_COL, + DEFAULT_ITEM_COL, + DEFAULT_RATING_COL, + DEFAULT_PREDICTION_COL, + DEFAULT_K, + DEFAULT_THRESHOLD, +) +from reco_utils.dataset.pandas_df_utils import ( + has_columns, + has_same_base_dtype, + lru_cache_df, +) + + +def check_column_dtypes(func): + """Checks columns of DataFrame inputs + + This includes the checks on: + 1. whether the input columns exist in the input DataFrames + 2. whether the data types of col_user as well as col_item are matched in the two input DataFrames. + + Args: + func (function): function that will be wrapped + """ + + @wraps(func) + def check_column_dtypes_wrapper( + rating_true, + rating_pred, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_prediction=DEFAULT_PREDICTION_COL, + *args, + **kwargs + ): + """Check columns of DataFrame inputs + + Args: + rating_true (pd.DataFrame): True data + rating_pred (pd.DataFrame): Predicted data + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + """ + + if not has_columns(rating_true, [col_user, col_item, col_rating]): + raise ValueError("Missing columns in true rating DataFrame") + if not has_columns(rating_pred, [col_user, col_item, col_prediction]): + raise ValueError("Missing columns in predicted rating DataFrame") + if not has_same_base_dtype( + rating_true, rating_pred, columns=[col_user, col_item] + ): + raise ValueError("Columns in provided DataFrames are not the same datatype") + + return func( + rating_true=rating_true, + rating_pred=rating_pred, + col_user=col_user, + col_item=col_item, + col_rating=col_rating, + col_prediction=col_prediction, + *args, + **kwargs + ) + + return check_column_dtypes_wrapper + + +@check_column_dtypes +@lru_cache_df(maxsize=1) +def merge_rating_true_pred( + rating_true, + rating_pred, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_prediction=DEFAULT_PREDICTION_COL, +): + """Join truth and prediction data frames on userID and itemID and return the true + and predicted rated with the correct index. + + Args: + rating_true (pd.DataFrame): True data + rating_pred (pd.DataFrame): Predicted data + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + + Returns: + np.array: Array with the true ratings + np.array: Array with the predicted ratings + + """ + + # pd.merge will apply suffixes to columns which have the same name across both dataframes + suffixes = ["_true", "_pred"] + rating_true_pred = pd.merge( + rating_true, rating_pred, on=[col_user, col_item], suffixes=suffixes + ) + if col_rating in rating_pred.columns: + col_rating = col_rating + suffixes[0] + if col_prediction in rating_true.columns: + col_prediction = col_prediction + suffixes[1] + return rating_true_pred[col_rating], rating_true_pred[col_prediction] + + +def rmse( + rating_true, + rating_pred, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_prediction=DEFAULT_PREDICTION_COL, +): + """Calculate Root Mean Squared Error + + Args: + rating_true (pd.DataFrame): True data. There should be no duplicate (userID, itemID) pairs + rating_pred (pd.DataFrame): Predicted data. There should be no duplicate (userID, itemID) pairs + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + + Returns: + float: Root mean squared error + """ + + y_true, y_pred = merge_rating_true_pred( + rating_true=rating_true, + rating_pred=rating_pred, + col_user=col_user, + col_item=col_item, + col_rating=col_rating, + col_prediction=col_prediction, + ) + return np.sqrt(mean_squared_error(y_true, y_pred)) + + +def mae( + rating_true, + rating_pred, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_prediction=DEFAULT_PREDICTION_COL, +): + """Calculate Mean Absolute Error. + + Args: + rating_true (pd.DataFrame): True data. There should be no duplicate (userID, itemID) pairs + rating_pred (pd.DataFrame): Predicted data. There should be no duplicate (userID, itemID) pairs + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + + Returns: + float: Mean Absolute Error. + """ + + y_true, y_pred = merge_rating_true_pred( + rating_true=rating_true, + rating_pred=rating_pred, + col_user=col_user, + col_item=col_item, + col_rating=col_rating, + col_prediction=col_prediction, + ) + return mean_absolute_error(y_true, y_pred) + + +def rsquared( + rating_true, + rating_pred, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_prediction=DEFAULT_PREDICTION_COL, +): + """Calculate R squared + + Args: + rating_true (pd.DataFrame): True data. There should be no duplicate (userID, itemID) pairs + rating_pred (pd.DataFrame): Predicted data. There should be no duplicate (userID, itemID) pairs + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + + Returns: + float: R squared (min=0, max=1). + """ + + y_true, y_pred = merge_rating_true_pred( + rating_true=rating_true, + rating_pred=rating_pred, + col_user=col_user, + col_item=col_item, + col_rating=col_rating, + col_prediction=col_prediction, + ) + return r2_score(y_true, y_pred) + + +def exp_var( + rating_true, + rating_pred, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_prediction=DEFAULT_PREDICTION_COL, +): + """Calculate explained variance. + + Args: + rating_true (pd.DataFrame): True data. There should be no duplicate (userID, itemID) pairs + rating_pred (pd.DataFrame): Predicted data. There should be no duplicate (userID, itemID) pairs + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + + Returns: + float: Explained variance (min=0, max=1). + """ + + y_true, y_pred = merge_rating_true_pred( + rating_true=rating_true, + rating_pred=rating_pred, + col_user=col_user, + col_item=col_item, + col_rating=col_rating, + col_prediction=col_prediction, + ) + return explained_variance_score(y_true, y_pred) + + +def auc( + rating_true, + rating_pred, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_prediction=DEFAULT_PREDICTION_COL, +): + """Calculate the Area-Under-Curve metric for implicit feedback typed + recommender, where rating is binary and prediction is float number ranging + from 0 to 1. + + https://en.wikipedia.org/wiki/Receiver_operating_characteristic#Area_under_the_curve + + Note: + The evaluation does not require a leave-one-out scenario. + This metric does not calculate group-based AUC which considers the AUC scores + averaged across users. It is also not limited to k. Instead, it calculates the + scores on the entire prediction results regardless the users. + + Args: + rating_true (pd.DataFrame): True data + rating_pred (pd.DataFrame): Predicted data + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + + Returns: + float: auc_score (min=0, max=1) + """ + + y_true, y_pred = merge_rating_true_pred( + rating_true=rating_true, + rating_pred=rating_pred, + col_user=col_user, + col_item=col_item, + col_rating=col_rating, + col_prediction=col_prediction, + ) + return roc_auc_score(y_true, y_pred) + + +def logloss( + rating_true, + rating_pred, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_prediction=DEFAULT_PREDICTION_COL, +): + """Calculate the logloss metric for implicit feedback typed + recommender, where rating is binary and prediction is float number ranging + from 0 to 1. + + https://en.wikipedia.org/wiki/Loss_functions_for_classification#Cross_entropy_loss_(Log_Loss) + + Args: + rating_true (pd.DataFrame): True data + rating_pred (pd.DataFrame): Predicted data + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + + Returns: + float: log_loss_score (min=-inf, max=inf) + """ + + y_true, y_pred = merge_rating_true_pred( + rating_true=rating_true, + rating_pred=rating_pred, + col_user=col_user, + col_item=col_item, + col_rating=col_rating, + col_prediction=col_prediction, + ) + return log_loss(y_true, y_pred) + + +@check_column_dtypes +@lru_cache_df(maxsize=1) +def merge_ranking_true_pred( + rating_true, + rating_pred, + col_user, + col_item, + col_rating, + col_prediction, + relevancy_method, + k=DEFAULT_K, + threshold=DEFAULT_THRESHOLD, +): + """Filter truth and prediction data frames on common users + + Args: + rating_true (pd.DataFrame): True DataFrame + rating_pred (pd.DataFrame): Predicted DataFrame + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold'] + k (int): number of top k items per user (optional) + threshold (float): threshold of top items per user (optional) + + Returns: + pd.DataFrame, pd.DataFrame, int: DataFrame of recommendation hits, sorted by `col_user` and `rank` + DataFrmae of hit counts vs actual relevant items per user number of unique user ids + """ + + # Make sure the prediction and true data frames have the same set of users + common_users = set(rating_true[col_user]).intersection(set(rating_pred[col_user])) + rating_true_common = rating_true[rating_true[col_user].isin(common_users)] + rating_pred_common = rating_pred[rating_pred[col_user].isin(common_users)] + n_users = len(common_users) + + # Return hit items in prediction data frame with ranking information. This is used for calculating NDCG and MAP. + # Use first to generate unique ranking values for each item. This is to align with the implementation in + # Spark evaluation metrics, where index of each recommended items (the indices are unique to items) is used + # to calculate penalized precision of the ordered items. + if relevancy_method == "top_k": + top_k = k + elif relevancy_method == "by_threshold": + top_k = threshold + else: + raise NotImplementedError("Invalid relevancy_method") + df_hit = get_top_k_items( + dataframe=rating_pred_common, + col_user=col_user, + col_rating=col_prediction, + k=top_k, + ) + df_hit = pd.merge(df_hit, rating_true_common, on=[col_user, col_item])[ + [col_user, col_item, "rank"] + ] + + # count the number of hits vs actual relevant items per user + df_hit_count = pd.merge( + df_hit.groupby(col_user, as_index=False)[col_user].agg({"hit": "count"}), + rating_true_common.groupby(col_user, as_index=False)[col_user].agg( + {"actual": "count"} + ), + on=col_user, + ) + + return df_hit, df_hit_count, n_users + + +def precision_at_k( + rating_true, + rating_pred, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_prediction=DEFAULT_PREDICTION_COL, + relevancy_method="top_k", + k=DEFAULT_K, + threshold=DEFAULT_THRESHOLD, +): + """Precision at K. + + Note: + We use the same formula to calculate precision@k as that in Spark. + More details can be found at + http://spark.apache.org/docs/2.1.1/api/python/pyspark.mllib.html#pyspark.mllib.evaluation.RankingMetrics.precisionAt + In particular, the maximum achievable precision may be < 1, if the number of items for a + user in rating_pred is less than k. + + Args: + rating_true (pd.DataFrame): True DataFrame + rating_pred (pd.DataFrame): Predicted DataFrame + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold'] + k (int): number of top k items per user + threshold (float): threshold of top items per user (optional) + + Returns: + float: precision at k (min=0, max=1) + """ + + df_hit, df_hit_count, n_users = merge_ranking_true_pred( + rating_true=rating_true, + rating_pred=rating_pred, + col_user=col_user, + col_item=col_item, + col_rating=col_rating, + col_prediction=col_prediction, + relevancy_method=relevancy_method, + k=k, + threshold=threshold, + ) + + if df_hit.shape[0] == 0: + return 0.0 + + return (df_hit_count["hit"] / k).sum() / n_users + + +def recall_at_k( + rating_true, + rating_pred, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_prediction=DEFAULT_PREDICTION_COL, + relevancy_method="top_k", + k=DEFAULT_K, + threshold=DEFAULT_THRESHOLD, +): + """Recall at K. + + Args: + rating_true (pd.DataFrame): True DataFrame + rating_pred (pd.DataFrame): Predicted DataFrame + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold'] + k (int): number of top k items per user + threshold (float): threshold of top items per user (optional) + + Returns: + float: recall at k (min=0, max=1). The maximum value is 1 even when fewer than + k items exist for a user in rating_true. + """ + + df_hit, df_hit_count, n_users = merge_ranking_true_pred( + rating_true=rating_true, + rating_pred=rating_pred, + col_user=col_user, + col_item=col_item, + col_rating=col_rating, + col_prediction=col_prediction, + relevancy_method=relevancy_method, + k=k, + threshold=threshold, + ) + + if df_hit.shape[0] == 0: + return 0.0 + + return (df_hit_count["hit"] / df_hit_count["actual"]).sum() / n_users + + +def ndcg_at_k( + rating_true, + rating_pred, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_prediction=DEFAULT_PREDICTION_COL, + relevancy_method="top_k", + k=DEFAULT_K, + threshold=DEFAULT_THRESHOLD, +): + """Normalized Discounted Cumulative Gain (nDCG). + + Info: https://en.wikipedia.org/wiki/Discounted_cumulative_gain + + Args: + rating_true (pd.DataFrame): True DataFrame + rating_pred (pd.DataFrame): Predicted DataFrame + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold'] + k (int): number of top k items per user + threshold (float): threshold of top items per user (optional) + + Returns: + float: nDCG at k (min=0, max=1). + """ + + df_hit, df_hit_count, n_users = merge_ranking_true_pred( + rating_true=rating_true, + rating_pred=rating_pred, + col_user=col_user, + col_item=col_item, + col_rating=col_rating, + col_prediction=col_prediction, + relevancy_method=relevancy_method, + k=k, + threshold=threshold, + ) + + if df_hit.shape[0] == 0: + return 0.0 + + # calculate discounted gain for hit items + df_dcg = df_hit.copy() + # relevance in this case is always 1 + df_dcg["dcg"] = 1 / np.log1p(df_dcg["rank"]) + # sum up discount gained to get discount cumulative gain + df_dcg = df_dcg.groupby(col_user, as_index=False, sort=False).agg({"dcg": "sum"}) + # calculate ideal discounted cumulative gain + df_ndcg = pd.merge(df_dcg, df_hit_count, on=[col_user]) + df_ndcg["idcg"] = df_ndcg["actual"].apply( + lambda x: sum(1 / np.log1p(range(1, min(x, k) + 1))) + ) + + # DCG over IDCG is the normalized DCG + return (df_ndcg["dcg"] / df_ndcg["idcg"]).sum() / n_users + + +def map_at_k( + rating_true, + rating_pred, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_prediction=DEFAULT_PREDICTION_COL, + relevancy_method="top_k", + k=DEFAULT_K, + threshold=DEFAULT_THRESHOLD, +): + """Mean Average Precision at k + + The implementation of MAP is referenced from Spark MLlib evaluation metrics. + https://spark.apache.org/docs/2.3.0/mllib-evaluation-metrics.html#ranking-systems + + A good reference can be found at: + http://web.stanford.edu/class/cs276/handouts/EvaluationNew-handout-6-per.pdf + + Note: + 1. The evaluation function is named as 'MAP is at k' because the evaluation class takes top k items for + the prediction items. The naming is different from Spark. + + 2. The MAP is meant to calculate Avg. Precision for the relevant items, so it is normalized by the number of + relevant items in the ground truth data, instead of k. + + Args: + rating_true (pd.DataFrame): True DataFrame + rating_pred (pd.DataFrame): Predicted DataFrame + col_user (str): column name for user + col_item (str): column name for item + col_rating (str): column name for rating + col_prediction (str): column name for prediction + relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold'] + k (int): number of top k items per user + threshold (float): threshold of top items per user (optional) + + Returns: + float: MAP at k (min=0, max=1). + """ + + df_hit, df_hit_count, n_users = merge_ranking_true_pred( + rating_true=rating_true, + rating_pred=rating_pred, + col_user=col_user, + col_item=col_item, + col_rating=col_rating, + col_prediction=col_prediction, + relevancy_method=relevancy_method, + k=k, + threshold=threshold, + ) + + if df_hit.shape[0] == 0: + return 0.0 + + # calculate reciprocal rank of items for each user and sum them up + df_hit_sorted = df_hit.copy() + df_hit_sorted["rr"] = (df_hit_sorted.groupby(col_user).cumcount() + 1) / df_hit_sorted["rank"] + df_hit_sorted = df_hit_sorted.groupby(col_user).agg({"rr": "sum"}).reset_index() + + df_merge = pd.merge(df_hit_sorted, df_hit_count, on=col_user) + return (df_merge["rr"] / df_merge["actual"]).sum() / n_users + + +def get_top_k_items( + dataframe, col_user=DEFAULT_USER_COL, col_rating=DEFAULT_RATING_COL, k=DEFAULT_K +): + """Get the input customer-item-rating tuple in the format of Pandas + DataFrame, output a Pandas DataFrame in the dense format of top k items + for each user. + + Note: + If it is implicit rating, just append a column of constants to be + ratings. + + Args: + dataframe (pandas.DataFrame): DataFrame of rating data (in the format + customerID-itemID-rating) + col_user (str): column name for user + col_rating (str): column name for rating + k (int): number of items for each user + + Returns: + pd.DataFrame: DataFrame of top k items for each user, sorted by `col_user` and `rank` + """ + # Sort dataframe by col_user and (top k) col_rating + top_k_items = ( + dataframe.groupby(col_user, as_index=False) + .apply(lambda x: x.nlargest(k, col_rating)) + .reset_index(drop=True) + ) + # Add ranks + top_k_items["rank"] = top_k_items.groupby(col_user, sort=False).cumcount() + 1 + return top_k_items + + +"""Function name and function mapper. +Useful when we have to serialize evaluation metric names +and call the functions based on deserialized names""" +metrics = { + rmse.__name__: rmse, + mae.__name__: mae, + rsquared.__name__: rsquared, + exp_var.__name__: exp_var, + precision_at_k.__name__: precision_at_k, + recall_at_k.__name__: recall_at_k, + ndcg_at_k.__name__: ndcg_at_k, + map_at_k.__name__: map_at_k, +} diff --git a/recommendation-model/utils/python_splitters.py b/recommendation-model/utils/python_splitters.py new file mode 100644 index 0000000..a6ce3be --- /dev/null +++ b/recommendation-model/utils/python_splitters.py @@ -0,0 +1,280 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import numpy as np +import pandas as pd +from sklearn.model_selection import train_test_split as sk_split + +from reco_utils.common.constants import ( + DEFAULT_ITEM_COL, + DEFAULT_USER_COL, + DEFAULT_TIMESTAMP_COL, +) +from reco_utils.dataset.split_utils import ( + process_split_ratio, + min_rating_filter_pandas, + split_pandas_data_with_ratios, +) + + +def python_random_split(data, ratio=0.75, seed=42): + """Pandas random splitter. + + The splitter randomly splits the input data. + + Args: + data (pd.DataFrame): Pandas DataFrame to be split. + ratio (float or list): Ratio for splitting data. If it is a single float number + it splits data into two halves and the ratio argument indicates the ratio + of training data set; if it is a list of float numbers, the splitter splits + data into several portions corresponding to the split ratios. If a list is + provided and the ratios are not summed to 1, they will be normalized. + seed (int): Seed. + + Returns: + list: Splits of the input data as pd.DataFrame. + """ + multi_split, ratio = process_split_ratio(ratio) + + if multi_split: + splits = split_pandas_data_with_ratios(data, ratio, shuffle=True, seed=seed) + splits_new = [x.drop("split_index", axis=1) for x in splits] + + return splits_new + else: + return sk_split(data, test_size=None, train_size=ratio, random_state=seed) + + +def _do_stratification( + data, + ratio=0.75, + min_rating=1, + filter_by="user", + is_random=True, + seed=42, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_timestamp=DEFAULT_TIMESTAMP_COL, +): + # A few preliminary checks. + if not (filter_by == "user" or filter_by == "item"): + raise ValueError("filter_by should be either 'user' or 'item'.") + + if min_rating < 1: + raise ValueError("min_rating should be integer and larger than or equal to 1.") + + if col_user not in data.columns: + raise ValueError("Schema of data not valid. Missing User Col") + + if col_item not in data.columns: + raise ValueError("Schema of data not valid. Missing Item Col") + + if not is_random: + if col_timestamp not in data.columns: + raise ValueError("Schema of data not valid. Missing Timestamp Col") + + multi_split, ratio = process_split_ratio(ratio) + + split_by_column = col_user if filter_by == "user" else col_item + + ratio = ratio if multi_split else [ratio, 1 - ratio] + + if min_rating > 1: + data = min_rating_filter_pandas( + data, + min_rating=min_rating, + filter_by=filter_by, + col_user=col_user, + col_item=col_item, + ) + + # Split by each group and aggregate splits together. + splits = [] + + # If it is for chronological splitting, the split will be performed in a random way. + df_grouped = ( + data.sort_values(col_timestamp).groupby(split_by_column) + if is_random is False + else data.groupby(split_by_column) + ) + + for name, group in df_grouped: + group_splits = split_pandas_data_with_ratios( + df_grouped.get_group(name), ratio, shuffle=is_random, seed=seed + ) + + # Concatenate the list of split dataframes. + concat_group_splits = pd.concat(group_splits) + + splits.append(concat_group_splits) + + # Concatenate splits for all the groups together. + splits_all = pd.concat(splits) + + # Take split by split_index + splits_list = [ + splits_all[splits_all["split_index"] == x].drop("split_index", axis=1) + for x in range(len(ratio)) + ] + + return splits_list + + +def python_chrono_split( + data, + ratio=0.75, + min_rating=1, + filter_by="user", + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_timestamp=DEFAULT_TIMESTAMP_COL, +): + """Pandas chronological splitter. + + This function splits data in a chronological manner. That is, for each user / item, the + split function takes proportions of ratings which is specified by the split ratio(s). + The split is stratified. + + Args: + data (pd.DataFrame): Pandas DataFrame to be split. + ratio (float or list): Ratio for splitting data. If it is a single float number + it splits data into two halves and the ratio argument indicates the ratio of + training data set; if it is a list of float numbers, the splitter splits + data into several portions corresponding to the split ratios. If a list is + provided and the ratios are not summed to 1, they will be normalized. + seed (int): Seed. + min_rating (int): minimum number of ratings for user or item. + filter_by (str): either "user" or "item", depending on which of the two is to + filter with min_rating. + col_user (str): column name of user IDs. + col_item (str): column name of item IDs. + col_timestamp (str): column name of timestamps. + + Returns: + list: Splits of the input data as pd.DataFrame. + """ + return _do_stratification( + data, + ratio=ratio, + min_rating=min_rating, + filter_by=filter_by, + col_user=col_user, + col_item=col_item, + col_timestamp=col_timestamp, + is_random=False, + ) + + +def python_stratified_split( + data, + ratio=0.75, + min_rating=1, + filter_by="user", + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + seed=42, +): + """Pandas stratified splitter. + + For each user / item, the split function takes proportions of ratings which is + specified by the split ratio(s). The split is stratified. + + Args: + data (pd.DataFrame): Pandas DataFrame to be split. + ratio (float or list): Ratio for splitting data. If it is a single float number + it splits data into two halves and the ratio argument indicates the ratio of + training data set; if it is a list of float numbers, the splitter splits + data into several portions corresponding to the split ratios. If a list is + provided and the ratios are not summed to 1, they will be normalized. + seed (int): Seed. + min_rating (int): minimum number of ratings for user or item. + filter_by (str): either "user" or "item", depending on which of the two is to + filter with min_rating. + col_user (str): column name of user IDs. + col_item (str): column name of item IDs. + + Returns: + list: Splits of the input data as pd.DataFrame. + """ + return _do_stratification( + data, + ratio=ratio, + min_rating=min_rating, + filter_by=filter_by, + col_user=col_user, + col_item=col_item, + is_random=True, + seed=seed, + ) + + +def numpy_stratified_split(X, ratio=0.75, seed=42): + """Split the user/item affinity matrix (sparse matrix) into train and test set matrices while maintaining + local (i.e. per user) ratios. + + Main points : + + 1. In a typical recommender problem, different users rate a different number of items, + and therefore the user/affinity matrix has a sparse structure with variable number + of zeroes (unrated items) per row (user). Cutting a total amount of ratings will + result in a non-homogeneous distribution between train and test set, i.e. some test + users may have many ratings while other very little if none. + + 2. In an unsupervised learning problem, no explicit answer is given. For this reason + the split needs to be implemented in a different way then in supervised learningself. + In the latter, one typically split the dataset by rows (by examples), ending up with + the same number of features but different number of examples in the train/test setself. + This scheme does not work in the unsupervised case, as part of the rated items needs to + be used as a test set for fixed number of users. + + Solution: + + 1. Instead of cutting a total percentage, for each user we cut a relative ratio of the rated + items. For example, if user1 has rated 4 items and user2 10, cutting 25% will correspond to + 1 and 2.6 ratings in the test set, approximated as 1 and 3 according to the round() function. + In this way, the 0.75 ratio is satisfied both locally and globally, preserving the original + distribution of ratings across the train and test set. + + 2. It is easy (and fast) to satisfy this requirements by creating the test via element subtraction + from the original dataset X. We first create two copies of X; for each user we select a random + sample of local size ratio (point 1) and erase the remaining ratings, obtaining in this way the + train set matrix Xtst. The train set matrix is obtained in the opposite way. + + Args: + X (np.array, int): a sparse matrix to be split + ratio (float): fraction of the entire dataset to constitute the train set + seed (int): random seed + + Returns: + np.array, np.array: Xtr is the train set user/item affinity matrix. Xtst is the test set user/item affinity + matrix. + """ + + np.random.seed(seed) # set the random seed + test_cut = int((1 - ratio) * 100) # percentage of ratings to go in the test set + + # initialize train and test set matrices + Xtr = X.copy() + Xtst = X.copy() + + # find the number of rated movies per user + rated = np.sum(Xtr != 0, axis=1) + + # for each user, cut down a test_size% for the test set + tst = np.around((rated * test_cut) / 100).astype(int) + + for u in range(X.shape[0]): + # For each user obtain the index of rated movies + idx = np.asarray(np.where(Xtr[u] != 0))[0].tolist() + + # extract a random subset of size n from the set of rated movies without repetition + idx_tst = np.random.choice(idx, tst[u], replace=False) + idx_train = list(set(idx).difference(set(idx_tst))) + + # change the selected rated movies to unrated in the train set + Xtr[u, idx_tst] = 0 + # set the movies that appear already in the train set as 0 + Xtst[u, idx_train] = 0 + + del idx, idx_train, idx_tst + + return Xtr, Xtst diff --git a/recommendation-model/vowpal_wabbit_deep_dive.py b/recommendation-model/vowpal_wabbit_deep_dive.py new file mode 100644 index 0000000..4d4ddc7 --- /dev/null +++ b/recommendation-model/vowpal_wabbit_deep_dive.py @@ -0,0 +1,371 @@ + +import sys +sys.path.append('../..') + +import os +from subprocess import run +from tempfile import TemporaryDirectory +from time import process_time + +import pandas as pd +import papermill as pm + +from reco_utils.common.notebook_utils import is_jupyter +from reco_utils.dataset.movielens import load_pandas_df +from reco_utils.dataset.python_splitters import python_random_split +from reco_utils.evaluation.python_evaluation import (rmse, mae, exp_var, rsquared, get_top_k_items, + map_at_k, ndcg_at_k, precision_at_k, recall_at_k) + +print("System version: {}".format(sys.version)) +print("Pandas version: {}".format(pd.__version__)) + + +def to_vw(df, output, logistic=False): + """Convert Pandas DataFrame to vw input format + Args: + df (pd.DataFrame): input DataFrame + output (str): path to output file + logistic (bool): flag to convert label to logistic value + """ + with open(output, 'w') as f: + tmp = df.reset_index() + + # we need to reset the rating type to an integer to simplify the vw formatting + tmp['rating'] = tmp['rating'].astype('int64') + + # convert rating to binary value + if logistic: + tmp['rating'] = tmp['rating'].apply(lambda x: 1 if x >= 3 else -1) + + # convert each row to VW input format (https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Input-format) + # [label] [tag]|[user namespace] [user id feature] |[item namespace] [movie id feature] + # label is the true rating, tag is a unique id for the example just used to link predictions to truth + # user and item namespaces separate the features to support interaction features through command line options + for _, row in tmp.iterrows(): + f.write('{rating:d} {index:d}|user {userID:d} |item {itemID:d}\n'.format_map(row)) + + +def run_vw(train_params, test_params, test_data, prediction_path, logistic=False): + """Convenience function to train, test, and show metrics of interest + Args: + train_params (str): vw training parameters + test_params (str): vw testing parameters + test_data (pd.dataFrame): test data + prediction_path (str): path to vw prediction output + logistic (bool): flag to convert label to logistic value + Returns: + (dict): metrics and timing information + """ + + # train model + train_start = process_time() + run(train_params.split(' '), check=True) + train_stop = process_time() + + # test model + test_start = process_time() + run(test_params.split(' '), check=True) + test_stop = process_time() + + # read in predictions + pred_df = pd.read_csv(prediction_path, delim_whitespace=True, names=['prediction'], index_col=1).join(test_data) + pred_df.drop("rating", axis=1, inplace=True) + + test_df = test_data.copy() + if logistic: + # make the true label binary so that the metrics are captured correctly + test_df['rating'] = test['rating'].apply(lambda x: 1 if x >= 3 else -1) + else: + # ensure results are integers in correct range + pred_df['prediction'] = pred_df['prediction'].apply(lambda x: int(max(1, min(5, round(x))))) + + # calculate metrics + result = dict() + result['RMSE'] = rmse(test_df, pred_df) + result['MAE'] = mae(test_df, pred_df) + result['R2'] = rsquared(test_df, pred_df) + result['Explained Variance'] = exp_var(test_df, pred_df) + result['Train Time (ms)'] = (train_stop - train_start) * 1000 + result['Test Time (ms)'] = (test_stop - test_start) * 1000 + + return result + +# create temp directory to maintain data files +tmpdir = TemporaryDirectory() + +model_path = os.path.join(tmpdir.name, 'vw.model') +saved_model_path = os.path.join(tmpdir.name, 'vw_saved.model') +train_path = os.path.join(tmpdir.name, 'train.dat') +test_path = os.path.join(tmpdir.name, 'test.dat') +train_logistic_path = os.path.join(tmpdir.name, 'train_logistic.dat') +test_logistic_path = os.path.join(tmpdir.name, 'test_logistic.dat') +prediction_path = os.path.join(tmpdir.name, 'prediction.dat') +all_test_path = os.path.join(tmpdir.name, 'new_test.dat') +all_prediction_path = os.path.join(tmpdir.name, 'new_prediction.dat') + + +# # 1. Load & Transform Data + +# Select MovieLens data size: 100k, 1m, 10m, or 20m +MOVIELENS_DATA_SIZE = '100k' +TOP_K = 10 + +# load movielens data +df = load_pandas_df(MOVIELENS_DATA_SIZE) + +# split data to train and test sets, default values take 75% of each users ratings as train, and 25% as test +train, test = python_random_split(df, 0.75) + +# save train and test data in vw format +to_vw(df=train, output=train_path) +to_vw(df=test, output=test_path) + +# save data for logistic regression (requires adjusting the label) +to_vw(df=train, output=train_logistic_path, logistic=True) +to_vw(df=test, output=test_logistic_path, logistic=True) + + +# # 2. Regression Based Recommendations +# +# When considering different approaches for solving a problem with machine learning it is helpful to generate a baseline approach to understand how more complex solutions perform across dimensions of performance, time, and resource (memory or cpu) usage. +# +# Regression based approaches are some of the simplest and fastest baselines to consider for many ML problems. + +# ## 2.1 Linear Regression +# +# As the data provides a numerical rating between 1-5, fitting those values with a linear regression model is easy approach. This model is trained on examples of ratings as the target variable and corresponding user ids and movie ids as independent features. +# +# By passing each user-item rating in as an example the model will begin to learn weights based on average ratings for each user as well as average ratings per item. +# +# This however can generate predicted ratings which are no longer integers, so some additional adjustments should be made at prediction time to convert them back to the integer scale of 1 through 5 if necessary. Here, this is done in the evaluate function. + +""" +Quick description of command line parameters used + Other optional parameters can be found here: https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Command-Line-Arguments + VW uses linear regression by default, so no extra command line options + -f : indicates where the final model file will reside after training + -d : indicates which data file to use for training or testing + --quiet: this runs vw in quiet mode silencing stdout (for debugging it's helpful to not use quiet mode) + -i : indicates where to load the previously model file created during training + -t: this executes inference only (no learned updates to the model) + -p : indicates where to store prediction output +""" +train_params = 'vw -f {model} -d {data} --quiet'.format(model=model_path, data=train_path) +# save these results for later use during top-k analysis +test_params = 'vw -i {model} -d {data} -t -p {pred} --quiet'.format(model=model_path, data=test_path, pred=prediction_path) + +result = run_vw(train_params=train_params, + test_params=test_params, + test_data=test, + prediction_path=prediction_path) + +comparison = pd.DataFrame(result, index=['Linear Regression']) +comparison + + +# ## 2.2 Linear Regression with Interaction Features +# +# Previously we treated the user features and item features independently, but taking into account interactions between features can provide a mechanism to learn more fine grained preferences of the users. +# +# To generate interaction features use the quadratic command line argument and specify the namespaces that should be combined: '-q ui' combines the user and item namespaces based on the first letter of each. +# +# Currently the userIDs and itemIDs used are integers which means the feature ID is used directly, for instance when user ID 123 rates movie 456, the training example puts a 1 in the values for features 123 and 456. However when interaction is specified (or if a feature is a string) the resulting interaction feature is hashed into the available feature space. Feature hashing is a way to take a very sparse high dimensional feature space and reduce it into a lower dimensional space. This allows for reduced memory while retaining fast computation of feature and model weights. +# +# The caveat with feature hashing, is that it can lead to hash collisions, where separate features are mapped to the same location. In this case it can be beneficial to increase the size of the space to support interactions between features of high cardinality. The available feature space is dictated by the --bit_precision (-b) argument. Where the total available space for all features in the model is 2N. +# + +""" +Quick description of command line parameters used + -b : sets the memory size to 2N entries + -q : create quadratic feature interactions between features in namespaces starting with 'a' and 'b' +""" +train_params = 'vw -b 26 -q ui -f {model} -d {data} --quiet'.format(model=saved_model_path, data=train_path) +test_params = 'vw -i {model} -d {data} -t -p {pred} --quiet'.format(model=saved_model_path, data=test_path, pred=prediction_path) + +result = run_vw(train_params=train_params, + test_params=test_params, + test_data=test, + prediction_path=prediction_path) +saved_result = result + +comparison = comparison.append(pd.DataFrame(result, index=['Linear Regression w/ Interaction'])) +comparison + + +# ## 2.3 Multinomial Logistic Regression +# +# An alternative to linear regression is to leverage multinomial logistic regression, or multiclass classification, which treats each rating value as a distinct class. +# +# This avoids any non integer results, but also reduces the training data for each class which could lead to poorer performance if the counts of different rating levels are skewed. +# +# Basic multiclass logistic regression can be accomplished using the One Against All approach specified by the '--oaa N' option, where N is the number of classes and proving the logistic option for the loss function to be used. + +""" +Quick description of command line parameters used + --loss_function logistic: sets the model loss function for logistic regression + --oaa : trains N separate models using One-Against-All approach (all models are captured in the single model file) + This expects the labels to be contiguous integers starting at 1 + --link logistic: converts the predicted output from logit to probability +The predicted output is the model (label) with the largest likelihood +""" +train_params = 'vw --loss_function logistic --oaa 5 -f {model} -d {data} --quiet'.format(model=model_path, data=train_path) +test_params = 'vw --link logistic -i {model} -d {data} -t -p {pred} --quiet'.format(model=model_path, data=test_path, pred=prediction_path) + +result = run_vw(train_params=train_params, + test_params=test_params, + test_data=test, + prediction_path=prediction_path) + +comparison = comparison.append(pd.DataFrame(result, index=['Multinomial Regression'])) +comparison + + +# ## 2.4 Logistic Regression +# +# Additionally, one might simply be interested in whether the user likes or dislikes an item and we can adjust the input data to represent a binary outcome, where ratings in (1,3] are dislikes (negative results) and (3,5] are likes (positive results). +# +# This framing allows for a simple logistic regression model to be applied. To perform logistic regression the loss_function parameter is changed to 'logistic' and the target label is switched to [0, 1]. Also, be sure to set '--link logistic' during prediction to convert the logit output back to a probability value. + + +train_params = 'vw --loss_function logistic -f {model} -d {data} --quiet'.format(model=model_path, data=train_logistic_path) +test_params = 'vw --link logistic -i {model} -d {data} -t -p {pred} --quiet'.format(model=model_path, data=test_logistic_path, pred=prediction_path) + +result = run_vw(train_params=train_params, + test_params=test_params, + test_data=test, + prediction_path=prediction_path, + logistic=True) + +comparison = comparison.append(pd.DataFrame(result, index=['Logistic Regression'])) +comparison + + +# # 3. Matrix Factorization Based Recommendations +# +# All of the above approaches train a regression model, but VW also supports matrix factorization with two different approaches. +# +# As opposed to learning direct weights for specific users, items and interactions when training a regression model, matrix factorization attempts to learn latent factors that determine how a user rates an item. An example of how this might work is if you could represent user preference and item categorization by genre. Given a smaller set of genres we can associate how much each item belongs to each genre class, and we can set weights for a user's preference for each genre. Both sets of weights could be represented as a vectors where the inner product would be the user-item rating. Matrix factorization approaches learn low rank matrices for latent features of users and items such that those matrices can be combined to approximate the original user item matrix. +# +# ## 3.1. Singular Value Decomposition Based Matrix Factorization +# +# The first approach performs matrix factorization based on Singular Value Decomposition (SVD) to learn a low rank approximation for the user-item rating matix. It is is called using the '--rank' command line argument. +# +# See the [Matrix Factorization Example](https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Matrix-factorization-example) for more detail. + +""" +Quick description of command line parameters used + --rank : sets the number of latent factors in the reduced matrix +""" +train_params = 'vw --rank 5 -q ui -f {model} -d {data} --quiet'.format(model=model_path, data=train_path) +test_params = 'vw -i {model} -d {data} -t -p {pred} --quiet'.format(model=model_path, data=test_path, pred=prediction_path) + +result = run_vw(train_params=train_params, + test_params=test_params, + test_data=test, + prediction_path=prediction_path) + +comparison = comparison.append(pd.DataFrame(result, index=['Matrix Factorization (Rank)'])) +comparison + + +# ## 3.2. Factorization Machine Based Matrix Factorization +# +# An alternative approach based on [Rendel's factorization machines](https://cseweb.ucsd.edu/classes/fa17/cse291-b/reading/Rendle2010FM.pdf) is called using '--lrq' (low rank quadratic). More LRQ details in this [demo](https://github.com/VowpalWabbit/vowpal_wabbit/tree/master/demo/movielens). +# +# This learns two lower rank matrices which are multiplied to generate an approximation of the user-item rating matrix. Compressing the matrix in this way leads to learning generalizable factors which avoids some of the limitations of using regression models with extremely sparse interaction features. This can lead to better convergence and smaller on-disk models. +# +# An additional term to improve performance is --lrqdropout which will dropout columns during training. This however tends to increase the optimal rank size. Other parameters such as L2 regularization can help avoid overfitting. +""" +Quick description of command line parameters used + --lrq : learns approximations of rank N for the quadratic interaction between namespaces starting with 'a' and 'b' + --lrqdroupout: performs dropout during training to improve generalization +""" +train_params = 'vw --lrq ui7 -f {model} -d {data} --quiet'.format(model=model_path, data=train_path) +test_params = 'vw -i {model} -d {data} -t -p {pred} --quiet'.format(model=model_path, data=test_path, pred=prediction_path) + +result = run_vw(train_params=train_params, + test_params=test_params, + test_data=test, + prediction_path=prediction_path) + +comparison = comparison.append(pd.DataFrame(result, index=['Matrix Factorization (LRQ)'])) +comparison + + +# # 4. Conclusion +# +# The table above shows a few of the approaches in the VW library that can be used for recommendation prediction. The relative performance can change when applied to different datasets and properly tuned, but it is useful to note the rapid speed at which all approaches are able to train (75,000 examples) and test (25,000 examples). + +# # 5. Scoring + +# After training a model with any of the above approaches, the model can be used to score potential user-pairs in offline batch mode, or in a real-time scoring mode. The example below shows how to leverage the utilities in the reco_utils directory to generate Top-K recommendations from offline scored output. + +# First construct a test set of all items (except those seen during training) for each user +users = df[['userID']].drop_duplicates() +users['key'] = 1 + +items = df[['itemID']].drop_duplicates() +items['key'] = 1 + +all_pairs = pd.merge(users, items, on='key').drop(columns=['key']) + +# now combine with training data and keep only entries that were note in training +merged = pd.merge(train[['userID', 'itemID', 'rating']], all_pairs, on=["userID", "itemID"], how="outer") +all_user_items = merged[merged['rating'].isnull()].fillna(0).astype('int64') + +# save in vw format (this can take a while) +to_vw(df=all_user_items, output=all_test_path) + + +# In[8]: + +# run the saved model (linear regression with interactions) on the new dataset +test_start = process_time() +test_params = 'vw -i {model} -d {data} -t -p {pred} --quiet'.format(model=saved_model_path, data=all_test_path, pred=prediction_path) +run(test_params.split(' '), check=True) +test_stop = process_time() +test_time = test_stop - test_start + +# load predictions and get top-k from previous saved results +pred_data = pd.read_csv(prediction_path, delim_whitespace=True, names=['prediction'], index_col=1).join(all_user_items) +top_k = get_top_k_items(pred_data, col_rating='prediction', k=TOP_K)[['prediction', 'userID', 'itemID']] +top_k.head() + + + +# get ranking metrics +args = [test, top_k] +kwargs = dict(col_user='userID', col_item='itemID', col_rating='rating', col_prediction='prediction', + relevancy_method='top_k', k=TOP_K) + +rank_metrics = {'MAP': map_at_k(*args, **kwargs), + 'NDCG': ndcg_at_k(*args, **kwargs), + 'Precision': precision_at_k(*args, **kwargs), + 'Recall': recall_at_k(*args, **kwargs)} + + +# final results +all_results = ['{k}: {v}'.format(k=k, v=v) for k, v in saved_result.items()] +all_results += ['{k}: {v}'.format(k=k, v=v) for k, v in rank_metrics.items()] +print('\n'.join(all_results)) + + +# # 6. Cleanup + +# record results for testing +if is_jupyter(): + pm.record('rmse', saved_result['RMSE']) + pm.record('mae', saved_result['MAE']) + pm.record('rsquared', saved_result['R2']) + pm.record('exp_var', saved_result['Explained Variance']) + pm.record("train_time", saved_result['Train Time (ms)']) + pm.record("test_time", test_time) + pm.record('map', rank_metrics['MAP']) + pm.record('ndcg', rank_metrics['NDCG']) + pm.record('precision', rank_metrics['Precision']) + pm.record('recall', rank_metrics['Recall']) + + +tmpdir.cleanup() + + From 5367ed09da5c5b1af172b300267874158beb69e7 Mon Sep 17 00:00:00 2001 From: sayoojbk Date: Mon, 16 Dec 2019 19:46:14 +0530 Subject: [PATCH 4/7] added a recommendation service based on the frequently purhcased items. --- app-requirements.md | 14 + .../user_recommendation_controller.py | 2 +- app/main/dataModel/category.py | 4 +- app/main/dataModel/merchant_product.py | 2 +- app/main/dataModel/user.py | 8 +- app/main/dataModel/user_purchase.py | 6 +- app/main/database.py | 3 +- app/main/model/vowpal_wabbit.py | 266 ++++++++++++++++++ .../service/UserRecommendationsService.py | 24 +- app/main/util/data_cleaning.py | 92 ++++++ links.md | 7 + 11 files changed, 415 insertions(+), 13 deletions(-) create mode 100644 app-requirements.md create mode 100644 app/main/model/vowpal_wabbit.py create mode 100644 app/main/util/data_cleaning.py create mode 100644 links.md diff --git a/app-requirements.md b/app-requirements.md new file mode 100644 index 0000000..5f2534d --- /dev/null +++ b/app-requirements.md @@ -0,0 +1,14 @@ +- The /toggle endpoint updates the users recommendation based on the most popular items in the city. +If you look at the /toggle endpoint. That's what runs the process that generates the user recommendations. + +- There is a boolean switch that changes the process that is ran (popular/machine learning). +We need to have your recommendation algorithm implementation ran everytime that endpoint is called. + + +### So we need the following: + +1. Functions to query all the data you need for your algorithm (in order for this to work in production we need to be careful with memory. Does your algorithm algorithm work in chunks? Can it work by processing 10,000 rows of data at a time. Or does it need all the data at once? Another solution would be to use a cluster computing framework, which might be the better way to go about it) + +2. Function that takes in the queried data and begins the recommendation algorithm you've made + +3. Function that populates the UserRecommendation table and associates the UserRecommendation ID with it's corresponding Users. (when I was doing research it mentioned that some users are likely to get very similar recommendations. So in order to save data for production they would give the same recommendations to three users that are very similar. So in the implementation the UserRecommendation ID can be associated with more than 1 user. This doesn't have to be the case if you don't want, we can have a OnetoOne relationship with UserRecommendation and the User). How the popular recommendation currently works is: it creates one UserRecommendation object, and it gives that ID to every User in our database. \ No newline at end of file diff --git a/app/main/controller/user_recommendation_controller.py b/app/main/controller/user_recommendation_controller.py index b63a563..9b6aa53 100644 --- a/app/main/controller/user_recommendation_controller.py +++ b/app/main/controller/user_recommendation_controller.py @@ -19,7 +19,7 @@ def get(self): @api.route('/v1') class Recommendation(Resource): - + @api.doc('Gets a specific user\'s product recommendations') def get(self): user_id = int(request.args.get('userId', None)) diff --git a/app/main/dataModel/category.py b/app/main/dataModel/category.py index 9160015..3a49999 100644 --- a/app/main/dataModel/category.py +++ b/app/main/dataModel/category.py @@ -18,15 +18,17 @@ class Category(object): COLLECTION = "categories" def __init__(self, id, name): - self.id = id; + self.id = id self.name = name self.added_on = time.time() self.last_updated = self.added_on + # adds the categorical info if not found in the database. def insert(self): if not DB.find_one(Category.COLLECTION, {"_id": self.id}): DB.insert(collection=Category.COLLECTION, data=self.json()) + # return the info in json format. def json(self): return { '_id': self.id, diff --git a/app/main/dataModel/merchant_product.py b/app/main/dataModel/merchant_product.py index eb7c6d2..c5934d0 100644 --- a/app/main/dataModel/merchant_product.py +++ b/app/main/dataModel/merchant_product.py @@ -21,7 +21,7 @@ class Merchant_Product(object): COLLECTION = "merchant_products" def __init__(self, id, merchant_id, product_id, price, currency, discounted_price): - self.id = id; + self.id = id self.merchant_id = merchant_id self.product_id = product_id self.price = price diff --git a/app/main/dataModel/user.py b/app/main/dataModel/user.py index 0775fd3..98232d7 100644 --- a/app/main/dataModel/user.py +++ b/app/main/dataModel/user.py @@ -45,9 +45,15 @@ def json(self): 'recommendation_id': self.recommendation_id, 'city_id': self.city_id } + + # I dont see why there is no need for insert of the user data? or was it missed ? + def insert(self): + if not DB.find_one(User.COLLECTION, {"_id": self.id}): + DB.insert(collection=User.COLLECTION, data=self.json()) + # to get the id of the user ?. TO do find the use of this function. nOT SURE WHAT TO RETURN HERE. def get_id(self): - self.getId() + self.id def set_id(self, id): self.id = id diff --git a/app/main/dataModel/user_purchase.py b/app/main/dataModel/user_purchase.py index e9045fc..96e5c04 100644 --- a/app/main/dataModel/user_purchase.py +++ b/app/main/dataModel/user_purchase.py @@ -35,9 +35,9 @@ def insert(self): def json(self): return { '_id': self.id, - 'user_id': self.name, - 'product_id': self.retail_price, - 'purchased_count': self.categories, + 'user_id': self.user_id, + 'product_id': self.product_id, + 'purchased_count': self.purchased_count, 'added_on': self.added_on, 'last_updated': self.last_updated } diff --git a/app/main/database.py b/app/main/database.py index e39fdd2..cd17e82 100644 --- a/app/main/database.py +++ b/app/main/database.py @@ -9,7 +9,8 @@ class DB(object): DB_NAME = "recommendation" @staticmethod - def init(): + def __init__(): + # connecting to the database. client = pymongo.MongoClient(DB.URI) DB.DATABASE = client[DB.DB_NAME] diff --git a/app/main/model/vowpal_wabbit.py b/app/main/model/vowpal_wabbit.py new file mode 100644 index 0000000..4abf601 --- /dev/null +++ b/app/main/model/vowpal_wabbit.py @@ -0,0 +1,266 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +""" +This file provides a wrapper to run Vowpal Wabbit from the command line through python. +It is not recommended to use this approach in production, there are python bindings that can be installed from the +repository or pip or the command line can be used. This is merely to demonstrate vw usage in the example notebooks. +""" + +import os +from subprocess import run +from tempfile import TemporaryDirectory +import pandas as pd + +from reco_utils.common.constants import ( + DEFAULT_USER_COL, + DEFAULT_ITEM_COL, + DEFAULT_RATING_COL, + DEFAULT_TIMESTAMP_COL, + DEFAULT_PREDICTION_COL, +) + + +class VW: + """Vowpal Wabbit Class""" + + def __init__( + self, + col_user=DEFAULT_USER_COL, + col_item=DEFAULT_ITEM_COL, + col_rating=DEFAULT_RATING_COL, + col_timestamp=DEFAULT_TIMESTAMP_COL, + col_prediction=DEFAULT_PREDICTION_COL, + **kwargs, + ): + """Initialize model parameters + + Args: + col_user (str): user column name + col_item (str): item column name + col_rating (str): rating column name + col_timestamp (str): timestamp column name + col_prediction (str): prediction column name + """ + + # create temporary files + self.tempdir = TemporaryDirectory() + self.train_file = os.path.join(self.tempdir.name, "train.dat") + self.test_file = os.path.join(self.tempdir.name, "test.dat") + self.model_file = os.path.join(self.tempdir.name, "vw.model") + self.prediction_file = os.path.join(self.tempdir.name, "prediction.dat") + + # set DataFrame columns + self.col_user = col_user + self.col_item = col_item + self.col_rating = col_rating + self.col_timestamp = col_timestamp + self.col_prediction = col_prediction + + self.logistic = "logistic" in kwargs.values() + self.train_cmd = self.parse_train_params(params=kwargs) + self.test_cmd = self.parse_test_params(params=kwargs) + + @staticmethod + def to_vw_cmd(params): + """Convert dictionary of parameters to vw command line. + + Args: + params (dict): key = parameter, value = value (use True if parameter is just a flag) + + Returns: + list[str]: vw command line parameters as list of strings + """ + + cmd = ["vw"] + for k, v in params.items(): + if v is False: + # don't add parameters with a value == False + continue + + # add the correct hyphen to the parameter + cmd.append(f"-{k}" if len(k) == 1 else f"--{k}") + if v is not True: + # don't add an argument for parameters with value == True + cmd.append("{}".format(v)) + + return cmd + + def parse_train_params(self, params): + """Parse input hyper-parameters to build vw train commands + + Args: + params (dict): key = parameter, value = value (use True if parameter is just a flag) + + Returns: + list[str]: vw command line parameters as list of strings + """ + + # make a copy of the original hyper parameters + train_params = params.copy() + + # remove options that are handled internally, not supported, or test only parameters + invalid = [ + "data", + "final_regressor", + "invert_hash", + "readable_model", + "t", + "testonly", + "i", + "initial_regressor", + "link", + ] + + for option in invalid: + if option in train_params: + del train_params[option] + + train_params.update( + { + "d": self.train_file, + "f": self.model_file, + "quiet": params.get("quiet", True), + } + ) + return self.to_vw_cmd(params=train_params) + + def parse_test_params(self, params): + """Parse input hyper-parameters to build vw test commands + + Args: + params (dict): key = parameter, value = value (use True if parameter is just a flag) + + Returns: + list[str]: vw command line parameters as list of strings + """ + + # make a copy of the original hyper parameters + test_params = params.copy() + + # remove options that are handled internally, ot supported or train only parameters + invalid = [ + "data", + "f", + "final_regressor", + "initial_regressor", + "test_only", + "invert_hash", + "readable_model", + "b", + "bit_precision", + "holdout_off", + "c", + "cache", + "k", + "kill_cache", + "l", + "learning_rate", + "l1", + "l2", + "initial_t", + "power_t", + "decay_learning_rate", + "q", + "quadratic", + "cubic", + "i", + "interactions", + "rank", + "lrq", + "lrqdropout", + "oaa", + ] + for option in invalid: + if option in test_params: + del test_params[option] + + test_params.update( + { + "d": self.test_file, + "i": self.model_file, + "quiet": params.get("quiet", True), + "p": self.prediction_file, + "t": True, + } + ) + return self.to_vw_cmd(params=test_params) + + def to_vw_file(self, df, train=True): + """Convert Pandas DataFrame to vw input format file + + Args: + df (pd.DataFrame): input DataFrame + train (bool): flag for train mode (or test mode if False) + """ + + output = self.train_file if train else self.test_file + with open(output, "w") as f: + # extract columns and create a new dataframe + tmp = df[[self.col_rating, self.col_user, self.col_item]].reset_index() + + if train: + # we need to reset the rating type to an integer to simplify the vw formatting + tmp[self.col_rating] = tmp[self.col_rating].astype("int64") + + # convert rating to binary value + if self.logistic: + max_value = tmp[self.col_rating].max() + tmp[self.col_rating] = tmp[self.col_rating].apply( + lambda x: 2 * round(x / max_value) - 1 + ) + else: + tmp[self.col_rating] = "" + + # convert each row to VW input format (https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Input-format) + # [label] [tag]|[user namespace] [user id feature] |[item namespace] [movie id feature] + # label is the true rating, tag is a unique id for the example just used to link predictions to truth + # user and item namespaces separate features to support interaction features through command line options + for _, row in tmp.iterrows(): + f.write( + "{rating} {index}|user {userID} |item {itemID}\n".format( + rating=row[self.col_rating], + index=row["index"], + userID=row[self.col_user], + itemID=row[self.col_item], + ) + ) + + def fit(self, df): + """Train model + + Args: + df (pd.DataFrame): input training data + """ + + # write dataframe to disk in vw format + self.to_vw_file(df=df) + + # train model + run(self.train_cmd, check=True) + + def predict(self, df): + """Predict results + + Args: + df (pd.DataFrame): input test data + """ + + # write dataframe to disk in vw format + self.to_vw_file(df=df, train=False) + + # generate predictions + run(self.test_cmd, check=True) + + # read predictions + return df.join( + pd.read_csv( + self.prediction_file, + delim_whitespace=True, + names=[self.col_prediction], + index_col=1, + ) + ) + + def __del__(self): + self.tempdir.cleanup() diff --git a/app/main/service/UserRecommendationsService.py b/app/main/service/UserRecommendationsService.py index 80a2c2a..e3a4824 100644 --- a/app/main/service/UserRecommendationsService.py +++ b/app/main/service/UserRecommendationsService.py @@ -4,11 +4,14 @@ from ..repository.UserRecommendationRepository import UserRecommendationRepository from ..repository.UserRepository import UserRepository -class UserRecommendationService(object): +from ..util.data_cleaning import DataProcessing +import pandas as pd +class UserRecommendationService(object): - def __init__(self, recommendation_active = False): + def __init__(self, recommendation_active = False , process='purchase_count'): self.recommendation_active = recommendation_active + self.process = process def update_user_recommendations(self): @@ -18,11 +21,12 @@ def update_user_recommendations(self): self._generic_process() + ''' Queries for the most popular items in a given city. Inserts one entry to the user_recommendation database collection per city. Updates every User's recommendation_id to that of the inserted user_recommendation entry - + This is the generic recommnedation which is based on the popilar items in the given city. ''' def _generic_process(self): #TODO: Change logic so that the entire process is not commited unless everything is ran correctly (in case of errors) @@ -33,13 +37,23 @@ def _generic_process(self): # list_of_city_users = self._get_all_city_users(city) self._set_user_recommendation(city) + ''' - Calls a function that begins the recommendation algorithm process + Calls a function that begins the recommendation algorithm process Should insert new recommendations in the user_recommendation database collection and update a User's recommendation_id. ''' def _machine_learning_process(self): - return None + purchase_count_explicit, purchase_count_implicit,users_explicit, users_implicit = DataProcessing.start() + + if self.process =='purchase_count': + purchase_count = pd.Dataframe(purchase_count_explicit.groupby(['product_id'])['purchased_count'].sum()) + top10 = purchase_count.sort_values('purchase_count' , ascending=False).head(10) + + UserRecommendationRepository.bulk_delete() + UserRecommendationRepository.insert(top10) + + return UserRecommendationRepository ''' diff --git a/app/main/util/data_cleaning.py b/app/main/util/data_cleaning.py new file mode 100644 index 0000000..7d5f3d8 --- /dev/null +++ b/app/main/util/data_cleaning.py @@ -0,0 +1,92 @@ +from ..dataModel.user_recommendation import UserRecommendation +from ..dataModel.category import Category +from ..dataModel.merchant_category import Merchant_Category +from ..dataModel.merchant_product import Merchant_Product +from ..dataModel.merchant import Merchant +from ..dataModel.product import Product +from ..dataModel.user_purchase import UserPurchase +from ..datkaModel.user import User + +from ..database import DB + +import numpy as np +import pandas as pd + +def read_mongo(db, collection,no_id=True): + """ Read from Mongo and Store into DataFrame """ + + # Connect to MongoDB - For that we are making a call to the DB class which gives us the database connection and gives helper functions for that. + + # Make a query to the specific DB and Collection + data = db.find_all(collection) + # Expand the user_data mongo field and construct the DataFrame + df = pd.DataFrame(list(data)) + + + # Delete the _id from the dataframe as it is of no use for other collection. + # The user_id is the only thing required else the id generated in each of the other collection is of no use. + if no_id: + del df['id'] + # need to take care of the added_on and last_updated tables also. + + return df + + +class DataProcessing(object): + def __init__(self): + + self.user_df = read_mongo(DB, User.COLLECTION , no_id= False) + self.user_purchase_df = read_mongo(DB, UserPurchase.COLLECTION , no_id=True) # Not sure abt the no_id here if there are mutliple entries for the same user + self.product_df = read_mongo(DB, Product.COLLECTION, no_id=False) + self.merchant_df = read_mongo(DB, Merchant.COLLECTION, no_id=False) + self.merchant_prod_df = read_mongo(DB, Merchant_Product.COLLECTION, no_id=True) + self.merchant_cat_df = read_mongo(DB, Merchant_Category.COLLECTION, no_id=True) + self.category_df = read_mongo(DB , Category.COLLECTION, no_id=False) + # the user_recommenation dataframe could be used for something like the toggling task or something. + self.user_rec_df = read_mongo(DB, UserRecommendation.COLLECTION, no_id=True) + + + def start(self): + + """ + The last updated data or added_on doesn't make any useful contribution to the data unless we need to just check if the + item is available in the shop or not while recommending so we will just use those column fields for just an affirmation + the item is available at the recommended shop. + """ + self.user_df.drop(['added_on', 'last_updated'], inplace=True) + self.user_purchase_df.drop(['added_on', 'last_updated'], inplace=True) + self.product_df.drop(['added_on' , 'last_updated'] , inplace=True) + self.merchant_df.drop(['added_on' , 'last_updated'] , inplace=True) + self.merchant_prod_df.drop(['added_on' , 'last_updated'] , inplace=True) + self.merchant_cat_df.drop(['added_on' , 'last_updated'] , inplace=True) + self.category_df.drop(['added_on' , 'last_updated'] , inplace=True) + + # self.user_rec_df.drop(['added_on' , 'last_updated'] , inplace=True) + + """ + Age column has a NaN and some very high values. In my view ages below 5 and above 90 do not make much sense, + and hence, these are being replaced with NaNs. + All the NaNs are then replaced with mean value of Age, and its data type is set as int. + """ + + self.user_df.loc[(self.user_df.age > 90)|(self.user_df.age<5) ,'age' ] = np.nan + self.user_df.age = self.user_df.fillna( self.user_df.age.mean() ) + self.user_df.age = self.user_df.age.astype(np.int32) + + + # we need to make sure the items that the user givves are also present in our database. + + """ + The explicit purchase_count represented by 1–10 and implicit represented by 0 will have to be segregated now + """ + purchase_count_explicit = self.user_purchase_df[self.user_purchase_df.purchase_count !=0] + purchase_count_implicit = self.user_purchase_df[self.user_purchase_df.purchase_count ==0] + + users_explicit = self.user_df[self.user_df.id.isin(purchase_count_explicit.user_id)] + users_implicit = self.user_df[self.user_df.id.isin(purchase_count_implicit.user_id)] + + + return purchase_count_explicit, purchase_count_implicit, \ + users_explicit, users_implicit + + diff --git a/links.md b/links.md new file mode 100644 index 0000000..1ce5ebb --- /dev/null +++ b/links.md @@ -0,0 +1,7 @@ +- https://www.mikulskibartosz.name/a-comprehensive-guide-to-putting-a-machine-learning-model-in-production/ +- https://auth0.com/blog/developing-restful-apis-with-python-and-flask/ +- https://www.kdnuggets.com/2019/10/easily-deploy-machine-learning-models-using-flask.html +- https://blog.hyperiondev.com/index.php/2018/02/01/deploy-machine-learning-model-flask-api/ +- https://scotch.io/tutorials/build-a-restful-api-with-flask-the-tdd-way +- https://www.retentionscience.com/blog/scalingrecommendations/ +- https://getstream.io/blog/recommendations-activity-streams-vowpal-wabbit/ From 3df222d79db07df5d2de31dfbb8e1df99ac287af Mon Sep 17 00:00:00 2001 From: sayoojbk Date: Mon, 16 Dec 2019 19:47:07 +0530 Subject: [PATCH 5/7] removed unwanted links.md file --- links.md | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 links.md diff --git a/links.md b/links.md deleted file mode 100644 index 1ce5ebb..0000000 --- a/links.md +++ /dev/null @@ -1,7 +0,0 @@ -- https://www.mikulskibartosz.name/a-comprehensive-guide-to-putting-a-machine-learning-model-in-production/ -- https://auth0.com/blog/developing-restful-apis-with-python-and-flask/ -- https://www.kdnuggets.com/2019/10/easily-deploy-machine-learning-models-using-flask.html -- https://blog.hyperiondev.com/index.php/2018/02/01/deploy-machine-learning-model-flask-api/ -- https://scotch.io/tutorials/build-a-restful-api-with-flask-the-tdd-way -- https://www.retentionscience.com/blog/scalingrecommendations/ -- https://getstream.io/blog/recommendations-activity-streams-vowpal-wabbit/ From 5ae3eba0afa736084cc8cf973669bb890612c785 Mon Sep 17 00:00:00 2001 From: sayoojbk Date: Mon, 16 Dec 2019 19:55:37 +0530 Subject: [PATCH 6/7] deleted some unnecessarily committed files. --- .vscode/settings.json | 3 - recommendation-model/configs/constants.py | 41 -- recommendation-model/utils/dataset.py | 234 ------ recommendation-model/utils/evaluation.py | 677 ------------------ .../utils/python_splitters.py | 280 -------- .../vowpal_wabbit_deep_dive.py | 371 ---------- 6 files changed, 1606 deletions(-) delete mode 100644 .vscode/settings.json delete mode 100644 recommendation-model/configs/constants.py delete mode 100644 recommendation-model/utils/dataset.py delete mode 100644 recommendation-model/utils/evaluation.py delete mode 100644 recommendation-model/utils/python_splitters.py delete mode 100644 recommendation-model/vowpal_wabbit_deep_dive.py diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 3c3a83b..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "python.pythonPath": "/home/sayooj/anaconda3/bin/python" -} \ No newline at end of file diff --git a/recommendation-model/configs/constants.py b/recommendation-model/configs/constants.py deleted file mode 100644 index 10e64eb..0000000 --- a/recommendation-model/configs/constants.py +++ /dev/null @@ -1,41 +0,0 @@ -# ------------------------------------------------------------------------------------------------------ -# Licensed under MIT License -# Written by sayooj_bk -# ------------------------------------------------------------------------------------------------------- - -# Default column names - -# This is the user meta-data which user provides. -DEFAULT_USER_COL = 'userId' -DEFAULT_USER_BUDGET = "userBudget" -DEFAULT_PURCHASED_COUNT = "purchase_count" -DEFAULT_NATIONALITY = "nationality" - - -# The product data -DEFAULT_RETAIL_PRICE = "itemPrice" -DEFAULT_ITEM_COL = "itemId" # This is what item user prefers from the list of items available. -DEFAULT_CATEGORY = "itemCategory" - - - -# The merchant data scheme -MERCHANT_ID = "merchantId" -MERCHANT_NAME = "merchantName" -LOCATION = "location" - - - -# Merchant product data -# - MERCHANT_ID , DEFAULT_ITEM_COL, DEFAULT_RETAIL_PRICE , DISCOUNTED_PRICE -MERCHANT_PRODUCT_PRICE = "merchantProductPrice" -DISCOUNTED_PRICE = "discountedPrice" - -# Merchant category data -MERCHANT_CATEGORY_ID = "merchantCategoryId" -PRICE_LEVEL = "priceLevel" # This is the price level of the merchant like is this a expensive place or cheap place like that. - - - - - diff --git a/recommendation-model/utils/dataset.py b/recommendation-model/utils/dataset.py deleted file mode 100644 index 16e290f..0000000 --- a/recommendation-model/utils/dataset.py +++ /dev/null @@ -1,234 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -import os -import re -import shutil -import warnings -import pandas as pd -from pandas import DataFrame - -# The below data are which have been uploaded by the user while filling out form preference. -from ..configs.constants import ( - DEFAULT_USER_COL, - DEFAULT_USER_BUDGET, - DEFAULT_PURCHASED_COUNT, - DEFAULT_NATIONALITY , - DEFAULT_ITEM_COL, - DEFAULT_ITEM_CATEGORY, - DEFAULT_RETAIL_PRICE, - MERCHANT_PRODUCT_PRICE, - DISCOUNTED_PRICE, - MERCHANT_CATEGORY_ID, - PRICE_LEVEL -) - - -import pymongo -myclient = pymongo.MongoClient("mongodb://localhost:27017/") - -# Load the fcgl database which has to be loaded and -""" -FCGL-DATABASE : -COLLECTIONS : USER DATA - PRODUCT DATA - MERCHANT DATA - - -""" -mydb = myclient["fcgl-database"] -training_collection = mydb["training_data"] -# training_collection.insert(dict) -- this will add the data to the colleciton. - -ERROR_HEADER = "Header error. At least user and item id should be provided." -WARNING_FCGL_HEADER = "The dataset has more than the required data so only few columns will be used #TODO select those few." -""" -try: - from pyspark.sql.types import ( - StructType, - StructField, - IntegerType, - FloatType, - DoubleType, - LongType, - StringType, - ) - from pyspark.sql.functions import concat_ws, col -except ImportError: - pass # so the environment without spark doesn't break -""" - -# The data points on which the recommendation system will be trained. -DEFAULT_HEADER = ( - DEFAULT_USER_COL, - DEFAULT_USER_BUDGET, - DEFAULT_PURCHASED_COUNT, - DEFAULT_NATIONALITY, - DEFAULT_ITEM_COL, - DEFAULT_ITEM_CATEGORY, - DEFAULT_RETAIL_PRICE, - MERCHANT_PRODUCT_PRICE, - DISCOUNTED_PRICE, - MERCHANT_CATEGORY_ID, - PRICE_LEVEL -) - - - -def load_pandas_df( - header=None): - """ - Loads the Mongodb dataset as pd.DataFrame. - To load customer information only, you can use load_item_df function. - - Args: - size (str): Size of the data to load. One of ("100k", "1m", "10m", "20m"). - header (list or tuple or None): Rating dataset header. - local_cache_path (str): Path (directory or a zip file) to cache the downloaded zip file. - If None, all the intermediate files will be stored in a temporary directory and removed after use. - title_col (str): Movie title column name. If None, the column will not be loaded. - genres_col (str): Genres column name. Genres are '|' separated string. - If None, the column will not be loaded. - year_col (str): Movie release year column name. If None, the column will not be loaded. - - Returns: - pd.DataFrame: Movie rating dataset. - - - **Examples** - - .. code-block:: python - - # To load just user-id, item-id, and ratings from MovieLens-1M dataset, - df = load_pandas_df('1m', ('UserId', 'ItemId', 'Rating')) - - # To load rating's timestamp together, - df = load_pandas_df('1m', ('UserId', 'ItemId', 'Rating', 'Timestamp')) - - # To load movie's title, genres, and released year info along with the ratings data, - df = load_pandas_df('1m', ('UserId', 'ItemId', 'Rating', 'Timestamp'), - title_col='Title', - genres_col='Genres', - year_col='Year' - ) - """ - if header is None: - header = DEFAULT_HEADER - elif len(header) < 2: - raise ValueError(ERROR_HEADER) - elif len(header) > 4: - warnings.warn(WARNING_FCGL_HEADER) - # header = header[:4] - - fcgl_column = header[1] - # collection_name : Collection name that is used to train the vowpal wabbit model. - # collection_name : Since this is what helps in recommnding we will push in the intial user data we get from the user while he/she - # signups for the application. - - - df = DataFrame( list(mydb.training_data.find({})) ) - - return df - - -def load_item_df( - movie_col=DEFAULT_ITEM_COL, - title_col=None, - genres_col=None, - year_col=None): - """Loads Movie info. - - Args: - movie_col (str): Movie id column name. - title_col (str): Movie title column name. If None, the column will not be loaded. - genres_col (str): Genres column name. Genres are '|' separated string. - If None, the column will not be loaded. - year_col (str): Movie release year column name. If None, the column will not be loaded. - - Returns: - pd.DataFrame: Movie information data, such as title, genres, and release year. - """ - size = size.lower() - if size not in DATA_FORMAT: - raise ValueError(ERROR_MOVIE_LENS_SIZE) - - with download_path(local_cache_path) as path: - filepath = os.path.join(path, "ml-{}.zip".format(size)) - _, item_datapath = _maybe_download_and_extract(size, filepath) - item_df = _load_item_df( - size, item_datapath, movie_col, title_col, genres_col, year_col - ) - - return item_df - - -def _load_item_df(size, item_datapath, movie_col, title_col, genres_col, year_col): - """Loads Movie info""" - if title_col is None and genres_col is None and year_col is None: - return None - - item_header = [movie_col] - usecols = [0] - - # Year is parsed from title - if title_col is not None or year_col is not None: - item_header.append("title_year") - usecols.append(1) - - genres_header_100k = None - if genres_col is not None: - # 100k data's movie genres are encoded as a binary array (the last 19 fields) - # For details, see http://files.grouplens.org/datasets/movielens/ml-100k-README.txt - if size == "100k": - genres_header_100k = [*(str(i) for i in range(19))] - item_header.extend(genres_header_100k) - usecols.extend([*range(5, 24)]) # genres columns - else: - item_header.append(genres_col) - usecols.append(2) # genres column - - item_df = pd.read_csv( - item_datapath, - sep=DATA_FORMAT[size].item_separator, - engine="python", - names=item_header, - usecols=usecols, - header=0 if DATA_FORMAT[size].item_has_header else None, - encoding="ISO-8859-1", - ) - - # Convert 100k data's format: '0|0|1|...' to 'Action|Romance|..." - if genres_header_100k is not None: - item_df[genres_col] = item_df[genres_header_100k].values.tolist() - item_df[genres_col] = item_df[genres_col].map( - lambda l: "|".join([GENRES[i] for i, v in enumerate(l) if v == 1]) - ) - - item_df.drop(genres_header_100k, axis=1, inplace=True) - - # Parse year from movie title. Note, MovieLens title format is "title (year)" - # Note, there are very few records that are missing the year info. - if year_col is not None: - - def parse_year(t): - parsed = re.split("[()]", t) - if len(parsed) > 2 and parsed[-2].isdecimal(): - return parsed[-2] - else: - return None - - item_df[year_col] = item_df["title_year"].map(parse_year) - if title_col is None: - item_df.drop("title_year", axis=1, inplace=True) - - if title_col is not None: - item_df.rename(columns={"title_year": title_col}, inplace=True) - - return item_df - - -def load_spark_df( - spark): - raise NotImplementedError - - diff --git a/recommendation-model/utils/evaluation.py b/recommendation-model/utils/evaluation.py deleted file mode 100644 index 349ada6..0000000 --- a/recommendation-model/utils/evaluation.py +++ /dev/null @@ -1,677 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -import numpy as np -import pandas as pd -from functools import wraps -from sklearn.metrics import ( - mean_squared_error, - mean_absolute_error, - r2_score, - explained_variance_score, - roc_auc_score, - log_loss, -) - -from reco_utils.common.constants import ( - DEFAULT_USER_COL, - DEFAULT_ITEM_COL, - DEFAULT_RATING_COL, - DEFAULT_PREDICTION_COL, - DEFAULT_K, - DEFAULT_THRESHOLD, -) -from reco_utils.dataset.pandas_df_utils import ( - has_columns, - has_same_base_dtype, - lru_cache_df, -) - - -def check_column_dtypes(func): - """Checks columns of DataFrame inputs - - This includes the checks on: - 1. whether the input columns exist in the input DataFrames - 2. whether the data types of col_user as well as col_item are matched in the two input DataFrames. - - Args: - func (function): function that will be wrapped - """ - - @wraps(func) - def check_column_dtypes_wrapper( - rating_true, - rating_pred, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_rating=DEFAULT_RATING_COL, - col_prediction=DEFAULT_PREDICTION_COL, - *args, - **kwargs - ): - """Check columns of DataFrame inputs - - Args: - rating_true (pd.DataFrame): True data - rating_pred (pd.DataFrame): Predicted data - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - """ - - if not has_columns(rating_true, [col_user, col_item, col_rating]): - raise ValueError("Missing columns in true rating DataFrame") - if not has_columns(rating_pred, [col_user, col_item, col_prediction]): - raise ValueError("Missing columns in predicted rating DataFrame") - if not has_same_base_dtype( - rating_true, rating_pred, columns=[col_user, col_item] - ): - raise ValueError("Columns in provided DataFrames are not the same datatype") - - return func( - rating_true=rating_true, - rating_pred=rating_pred, - col_user=col_user, - col_item=col_item, - col_rating=col_rating, - col_prediction=col_prediction, - *args, - **kwargs - ) - - return check_column_dtypes_wrapper - - -@check_column_dtypes -@lru_cache_df(maxsize=1) -def merge_rating_true_pred( - rating_true, - rating_pred, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_rating=DEFAULT_RATING_COL, - col_prediction=DEFAULT_PREDICTION_COL, -): - """Join truth and prediction data frames on userID and itemID and return the true - and predicted rated with the correct index. - - Args: - rating_true (pd.DataFrame): True data - rating_pred (pd.DataFrame): Predicted data - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - - Returns: - np.array: Array with the true ratings - np.array: Array with the predicted ratings - - """ - - # pd.merge will apply suffixes to columns which have the same name across both dataframes - suffixes = ["_true", "_pred"] - rating_true_pred = pd.merge( - rating_true, rating_pred, on=[col_user, col_item], suffixes=suffixes - ) - if col_rating in rating_pred.columns: - col_rating = col_rating + suffixes[0] - if col_prediction in rating_true.columns: - col_prediction = col_prediction + suffixes[1] - return rating_true_pred[col_rating], rating_true_pred[col_prediction] - - -def rmse( - rating_true, - rating_pred, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_rating=DEFAULT_RATING_COL, - col_prediction=DEFAULT_PREDICTION_COL, -): - """Calculate Root Mean Squared Error - - Args: - rating_true (pd.DataFrame): True data. There should be no duplicate (userID, itemID) pairs - rating_pred (pd.DataFrame): Predicted data. There should be no duplicate (userID, itemID) pairs - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - - Returns: - float: Root mean squared error - """ - - y_true, y_pred = merge_rating_true_pred( - rating_true=rating_true, - rating_pred=rating_pred, - col_user=col_user, - col_item=col_item, - col_rating=col_rating, - col_prediction=col_prediction, - ) - return np.sqrt(mean_squared_error(y_true, y_pred)) - - -def mae( - rating_true, - rating_pred, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_rating=DEFAULT_RATING_COL, - col_prediction=DEFAULT_PREDICTION_COL, -): - """Calculate Mean Absolute Error. - - Args: - rating_true (pd.DataFrame): True data. There should be no duplicate (userID, itemID) pairs - rating_pred (pd.DataFrame): Predicted data. There should be no duplicate (userID, itemID) pairs - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - - Returns: - float: Mean Absolute Error. - """ - - y_true, y_pred = merge_rating_true_pred( - rating_true=rating_true, - rating_pred=rating_pred, - col_user=col_user, - col_item=col_item, - col_rating=col_rating, - col_prediction=col_prediction, - ) - return mean_absolute_error(y_true, y_pred) - - -def rsquared( - rating_true, - rating_pred, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_rating=DEFAULT_RATING_COL, - col_prediction=DEFAULT_PREDICTION_COL, -): - """Calculate R squared - - Args: - rating_true (pd.DataFrame): True data. There should be no duplicate (userID, itemID) pairs - rating_pred (pd.DataFrame): Predicted data. There should be no duplicate (userID, itemID) pairs - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - - Returns: - float: R squared (min=0, max=1). - """ - - y_true, y_pred = merge_rating_true_pred( - rating_true=rating_true, - rating_pred=rating_pred, - col_user=col_user, - col_item=col_item, - col_rating=col_rating, - col_prediction=col_prediction, - ) - return r2_score(y_true, y_pred) - - -def exp_var( - rating_true, - rating_pred, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_rating=DEFAULT_RATING_COL, - col_prediction=DEFAULT_PREDICTION_COL, -): - """Calculate explained variance. - - Args: - rating_true (pd.DataFrame): True data. There should be no duplicate (userID, itemID) pairs - rating_pred (pd.DataFrame): Predicted data. There should be no duplicate (userID, itemID) pairs - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - - Returns: - float: Explained variance (min=0, max=1). - """ - - y_true, y_pred = merge_rating_true_pred( - rating_true=rating_true, - rating_pred=rating_pred, - col_user=col_user, - col_item=col_item, - col_rating=col_rating, - col_prediction=col_prediction, - ) - return explained_variance_score(y_true, y_pred) - - -def auc( - rating_true, - rating_pred, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_rating=DEFAULT_RATING_COL, - col_prediction=DEFAULT_PREDICTION_COL, -): - """Calculate the Area-Under-Curve metric for implicit feedback typed - recommender, where rating is binary and prediction is float number ranging - from 0 to 1. - - https://en.wikipedia.org/wiki/Receiver_operating_characteristic#Area_under_the_curve - - Note: - The evaluation does not require a leave-one-out scenario. - This metric does not calculate group-based AUC which considers the AUC scores - averaged across users. It is also not limited to k. Instead, it calculates the - scores on the entire prediction results regardless the users. - - Args: - rating_true (pd.DataFrame): True data - rating_pred (pd.DataFrame): Predicted data - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - - Returns: - float: auc_score (min=0, max=1) - """ - - y_true, y_pred = merge_rating_true_pred( - rating_true=rating_true, - rating_pred=rating_pred, - col_user=col_user, - col_item=col_item, - col_rating=col_rating, - col_prediction=col_prediction, - ) - return roc_auc_score(y_true, y_pred) - - -def logloss( - rating_true, - rating_pred, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_rating=DEFAULT_RATING_COL, - col_prediction=DEFAULT_PREDICTION_COL, -): - """Calculate the logloss metric for implicit feedback typed - recommender, where rating is binary and prediction is float number ranging - from 0 to 1. - - https://en.wikipedia.org/wiki/Loss_functions_for_classification#Cross_entropy_loss_(Log_Loss) - - Args: - rating_true (pd.DataFrame): True data - rating_pred (pd.DataFrame): Predicted data - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - - Returns: - float: log_loss_score (min=-inf, max=inf) - """ - - y_true, y_pred = merge_rating_true_pred( - rating_true=rating_true, - rating_pred=rating_pred, - col_user=col_user, - col_item=col_item, - col_rating=col_rating, - col_prediction=col_prediction, - ) - return log_loss(y_true, y_pred) - - -@check_column_dtypes -@lru_cache_df(maxsize=1) -def merge_ranking_true_pred( - rating_true, - rating_pred, - col_user, - col_item, - col_rating, - col_prediction, - relevancy_method, - k=DEFAULT_K, - threshold=DEFAULT_THRESHOLD, -): - """Filter truth and prediction data frames on common users - - Args: - rating_true (pd.DataFrame): True DataFrame - rating_pred (pd.DataFrame): Predicted DataFrame - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold'] - k (int): number of top k items per user (optional) - threshold (float): threshold of top items per user (optional) - - Returns: - pd.DataFrame, pd.DataFrame, int: DataFrame of recommendation hits, sorted by `col_user` and `rank` - DataFrmae of hit counts vs actual relevant items per user number of unique user ids - """ - - # Make sure the prediction and true data frames have the same set of users - common_users = set(rating_true[col_user]).intersection(set(rating_pred[col_user])) - rating_true_common = rating_true[rating_true[col_user].isin(common_users)] - rating_pred_common = rating_pred[rating_pred[col_user].isin(common_users)] - n_users = len(common_users) - - # Return hit items in prediction data frame with ranking information. This is used for calculating NDCG and MAP. - # Use first to generate unique ranking values for each item. This is to align with the implementation in - # Spark evaluation metrics, where index of each recommended items (the indices are unique to items) is used - # to calculate penalized precision of the ordered items. - if relevancy_method == "top_k": - top_k = k - elif relevancy_method == "by_threshold": - top_k = threshold - else: - raise NotImplementedError("Invalid relevancy_method") - df_hit = get_top_k_items( - dataframe=rating_pred_common, - col_user=col_user, - col_rating=col_prediction, - k=top_k, - ) - df_hit = pd.merge(df_hit, rating_true_common, on=[col_user, col_item])[ - [col_user, col_item, "rank"] - ] - - # count the number of hits vs actual relevant items per user - df_hit_count = pd.merge( - df_hit.groupby(col_user, as_index=False)[col_user].agg({"hit": "count"}), - rating_true_common.groupby(col_user, as_index=False)[col_user].agg( - {"actual": "count"} - ), - on=col_user, - ) - - return df_hit, df_hit_count, n_users - - -def precision_at_k( - rating_true, - rating_pred, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_rating=DEFAULT_RATING_COL, - col_prediction=DEFAULT_PREDICTION_COL, - relevancy_method="top_k", - k=DEFAULT_K, - threshold=DEFAULT_THRESHOLD, -): - """Precision at K. - - Note: - We use the same formula to calculate precision@k as that in Spark. - More details can be found at - http://spark.apache.org/docs/2.1.1/api/python/pyspark.mllib.html#pyspark.mllib.evaluation.RankingMetrics.precisionAt - In particular, the maximum achievable precision may be < 1, if the number of items for a - user in rating_pred is less than k. - - Args: - rating_true (pd.DataFrame): True DataFrame - rating_pred (pd.DataFrame): Predicted DataFrame - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold'] - k (int): number of top k items per user - threshold (float): threshold of top items per user (optional) - - Returns: - float: precision at k (min=0, max=1) - """ - - df_hit, df_hit_count, n_users = merge_ranking_true_pred( - rating_true=rating_true, - rating_pred=rating_pred, - col_user=col_user, - col_item=col_item, - col_rating=col_rating, - col_prediction=col_prediction, - relevancy_method=relevancy_method, - k=k, - threshold=threshold, - ) - - if df_hit.shape[0] == 0: - return 0.0 - - return (df_hit_count["hit"] / k).sum() / n_users - - -def recall_at_k( - rating_true, - rating_pred, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_rating=DEFAULT_RATING_COL, - col_prediction=DEFAULT_PREDICTION_COL, - relevancy_method="top_k", - k=DEFAULT_K, - threshold=DEFAULT_THRESHOLD, -): - """Recall at K. - - Args: - rating_true (pd.DataFrame): True DataFrame - rating_pred (pd.DataFrame): Predicted DataFrame - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold'] - k (int): number of top k items per user - threshold (float): threshold of top items per user (optional) - - Returns: - float: recall at k (min=0, max=1). The maximum value is 1 even when fewer than - k items exist for a user in rating_true. - """ - - df_hit, df_hit_count, n_users = merge_ranking_true_pred( - rating_true=rating_true, - rating_pred=rating_pred, - col_user=col_user, - col_item=col_item, - col_rating=col_rating, - col_prediction=col_prediction, - relevancy_method=relevancy_method, - k=k, - threshold=threshold, - ) - - if df_hit.shape[0] == 0: - return 0.0 - - return (df_hit_count["hit"] / df_hit_count["actual"]).sum() / n_users - - -def ndcg_at_k( - rating_true, - rating_pred, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_rating=DEFAULT_RATING_COL, - col_prediction=DEFAULT_PREDICTION_COL, - relevancy_method="top_k", - k=DEFAULT_K, - threshold=DEFAULT_THRESHOLD, -): - """Normalized Discounted Cumulative Gain (nDCG). - - Info: https://en.wikipedia.org/wiki/Discounted_cumulative_gain - - Args: - rating_true (pd.DataFrame): True DataFrame - rating_pred (pd.DataFrame): Predicted DataFrame - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold'] - k (int): number of top k items per user - threshold (float): threshold of top items per user (optional) - - Returns: - float: nDCG at k (min=0, max=1). - """ - - df_hit, df_hit_count, n_users = merge_ranking_true_pred( - rating_true=rating_true, - rating_pred=rating_pred, - col_user=col_user, - col_item=col_item, - col_rating=col_rating, - col_prediction=col_prediction, - relevancy_method=relevancy_method, - k=k, - threshold=threshold, - ) - - if df_hit.shape[0] == 0: - return 0.0 - - # calculate discounted gain for hit items - df_dcg = df_hit.copy() - # relevance in this case is always 1 - df_dcg["dcg"] = 1 / np.log1p(df_dcg["rank"]) - # sum up discount gained to get discount cumulative gain - df_dcg = df_dcg.groupby(col_user, as_index=False, sort=False).agg({"dcg": "sum"}) - # calculate ideal discounted cumulative gain - df_ndcg = pd.merge(df_dcg, df_hit_count, on=[col_user]) - df_ndcg["idcg"] = df_ndcg["actual"].apply( - lambda x: sum(1 / np.log1p(range(1, min(x, k) + 1))) - ) - - # DCG over IDCG is the normalized DCG - return (df_ndcg["dcg"] / df_ndcg["idcg"]).sum() / n_users - - -def map_at_k( - rating_true, - rating_pred, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_rating=DEFAULT_RATING_COL, - col_prediction=DEFAULT_PREDICTION_COL, - relevancy_method="top_k", - k=DEFAULT_K, - threshold=DEFAULT_THRESHOLD, -): - """Mean Average Precision at k - - The implementation of MAP is referenced from Spark MLlib evaluation metrics. - https://spark.apache.org/docs/2.3.0/mllib-evaluation-metrics.html#ranking-systems - - A good reference can be found at: - http://web.stanford.edu/class/cs276/handouts/EvaluationNew-handout-6-per.pdf - - Note: - 1. The evaluation function is named as 'MAP is at k' because the evaluation class takes top k items for - the prediction items. The naming is different from Spark. - - 2. The MAP is meant to calculate Avg. Precision for the relevant items, so it is normalized by the number of - relevant items in the ground truth data, instead of k. - - Args: - rating_true (pd.DataFrame): True DataFrame - rating_pred (pd.DataFrame): Predicted DataFrame - col_user (str): column name for user - col_item (str): column name for item - col_rating (str): column name for rating - col_prediction (str): column name for prediction - relevancy_method (str): method for determining relevancy ['top_k', 'by_threshold'] - k (int): number of top k items per user - threshold (float): threshold of top items per user (optional) - - Returns: - float: MAP at k (min=0, max=1). - """ - - df_hit, df_hit_count, n_users = merge_ranking_true_pred( - rating_true=rating_true, - rating_pred=rating_pred, - col_user=col_user, - col_item=col_item, - col_rating=col_rating, - col_prediction=col_prediction, - relevancy_method=relevancy_method, - k=k, - threshold=threshold, - ) - - if df_hit.shape[0] == 0: - return 0.0 - - # calculate reciprocal rank of items for each user and sum them up - df_hit_sorted = df_hit.copy() - df_hit_sorted["rr"] = (df_hit_sorted.groupby(col_user).cumcount() + 1) / df_hit_sorted["rank"] - df_hit_sorted = df_hit_sorted.groupby(col_user).agg({"rr": "sum"}).reset_index() - - df_merge = pd.merge(df_hit_sorted, df_hit_count, on=col_user) - return (df_merge["rr"] / df_merge["actual"]).sum() / n_users - - -def get_top_k_items( - dataframe, col_user=DEFAULT_USER_COL, col_rating=DEFAULT_RATING_COL, k=DEFAULT_K -): - """Get the input customer-item-rating tuple in the format of Pandas - DataFrame, output a Pandas DataFrame in the dense format of top k items - for each user. - - Note: - If it is implicit rating, just append a column of constants to be - ratings. - - Args: - dataframe (pandas.DataFrame): DataFrame of rating data (in the format - customerID-itemID-rating) - col_user (str): column name for user - col_rating (str): column name for rating - k (int): number of items for each user - - Returns: - pd.DataFrame: DataFrame of top k items for each user, sorted by `col_user` and `rank` - """ - # Sort dataframe by col_user and (top k) col_rating - top_k_items = ( - dataframe.groupby(col_user, as_index=False) - .apply(lambda x: x.nlargest(k, col_rating)) - .reset_index(drop=True) - ) - # Add ranks - top_k_items["rank"] = top_k_items.groupby(col_user, sort=False).cumcount() + 1 - return top_k_items - - -"""Function name and function mapper. -Useful when we have to serialize evaluation metric names -and call the functions based on deserialized names""" -metrics = { - rmse.__name__: rmse, - mae.__name__: mae, - rsquared.__name__: rsquared, - exp_var.__name__: exp_var, - precision_at_k.__name__: precision_at_k, - recall_at_k.__name__: recall_at_k, - ndcg_at_k.__name__: ndcg_at_k, - map_at_k.__name__: map_at_k, -} diff --git a/recommendation-model/utils/python_splitters.py b/recommendation-model/utils/python_splitters.py deleted file mode 100644 index a6ce3be..0000000 --- a/recommendation-model/utils/python_splitters.py +++ /dev/null @@ -1,280 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -import numpy as np -import pandas as pd -from sklearn.model_selection import train_test_split as sk_split - -from reco_utils.common.constants import ( - DEFAULT_ITEM_COL, - DEFAULT_USER_COL, - DEFAULT_TIMESTAMP_COL, -) -from reco_utils.dataset.split_utils import ( - process_split_ratio, - min_rating_filter_pandas, - split_pandas_data_with_ratios, -) - - -def python_random_split(data, ratio=0.75, seed=42): - """Pandas random splitter. - - The splitter randomly splits the input data. - - Args: - data (pd.DataFrame): Pandas DataFrame to be split. - ratio (float or list): Ratio for splitting data. If it is a single float number - it splits data into two halves and the ratio argument indicates the ratio - of training data set; if it is a list of float numbers, the splitter splits - data into several portions corresponding to the split ratios. If a list is - provided and the ratios are not summed to 1, they will be normalized. - seed (int): Seed. - - Returns: - list: Splits of the input data as pd.DataFrame. - """ - multi_split, ratio = process_split_ratio(ratio) - - if multi_split: - splits = split_pandas_data_with_ratios(data, ratio, shuffle=True, seed=seed) - splits_new = [x.drop("split_index", axis=1) for x in splits] - - return splits_new - else: - return sk_split(data, test_size=None, train_size=ratio, random_state=seed) - - -def _do_stratification( - data, - ratio=0.75, - min_rating=1, - filter_by="user", - is_random=True, - seed=42, - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_timestamp=DEFAULT_TIMESTAMP_COL, -): - # A few preliminary checks. - if not (filter_by == "user" or filter_by == "item"): - raise ValueError("filter_by should be either 'user' or 'item'.") - - if min_rating < 1: - raise ValueError("min_rating should be integer and larger than or equal to 1.") - - if col_user not in data.columns: - raise ValueError("Schema of data not valid. Missing User Col") - - if col_item not in data.columns: - raise ValueError("Schema of data not valid. Missing Item Col") - - if not is_random: - if col_timestamp not in data.columns: - raise ValueError("Schema of data not valid. Missing Timestamp Col") - - multi_split, ratio = process_split_ratio(ratio) - - split_by_column = col_user if filter_by == "user" else col_item - - ratio = ratio if multi_split else [ratio, 1 - ratio] - - if min_rating > 1: - data = min_rating_filter_pandas( - data, - min_rating=min_rating, - filter_by=filter_by, - col_user=col_user, - col_item=col_item, - ) - - # Split by each group and aggregate splits together. - splits = [] - - # If it is for chronological splitting, the split will be performed in a random way. - df_grouped = ( - data.sort_values(col_timestamp).groupby(split_by_column) - if is_random is False - else data.groupby(split_by_column) - ) - - for name, group in df_grouped: - group_splits = split_pandas_data_with_ratios( - df_grouped.get_group(name), ratio, shuffle=is_random, seed=seed - ) - - # Concatenate the list of split dataframes. - concat_group_splits = pd.concat(group_splits) - - splits.append(concat_group_splits) - - # Concatenate splits for all the groups together. - splits_all = pd.concat(splits) - - # Take split by split_index - splits_list = [ - splits_all[splits_all["split_index"] == x].drop("split_index", axis=1) - for x in range(len(ratio)) - ] - - return splits_list - - -def python_chrono_split( - data, - ratio=0.75, - min_rating=1, - filter_by="user", - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - col_timestamp=DEFAULT_TIMESTAMP_COL, -): - """Pandas chronological splitter. - - This function splits data in a chronological manner. That is, for each user / item, the - split function takes proportions of ratings which is specified by the split ratio(s). - The split is stratified. - - Args: - data (pd.DataFrame): Pandas DataFrame to be split. - ratio (float or list): Ratio for splitting data. If it is a single float number - it splits data into two halves and the ratio argument indicates the ratio of - training data set; if it is a list of float numbers, the splitter splits - data into several portions corresponding to the split ratios. If a list is - provided and the ratios are not summed to 1, they will be normalized. - seed (int): Seed. - min_rating (int): minimum number of ratings for user or item. - filter_by (str): either "user" or "item", depending on which of the two is to - filter with min_rating. - col_user (str): column name of user IDs. - col_item (str): column name of item IDs. - col_timestamp (str): column name of timestamps. - - Returns: - list: Splits of the input data as pd.DataFrame. - """ - return _do_stratification( - data, - ratio=ratio, - min_rating=min_rating, - filter_by=filter_by, - col_user=col_user, - col_item=col_item, - col_timestamp=col_timestamp, - is_random=False, - ) - - -def python_stratified_split( - data, - ratio=0.75, - min_rating=1, - filter_by="user", - col_user=DEFAULT_USER_COL, - col_item=DEFAULT_ITEM_COL, - seed=42, -): - """Pandas stratified splitter. - - For each user / item, the split function takes proportions of ratings which is - specified by the split ratio(s). The split is stratified. - - Args: - data (pd.DataFrame): Pandas DataFrame to be split. - ratio (float or list): Ratio for splitting data. If it is a single float number - it splits data into two halves and the ratio argument indicates the ratio of - training data set; if it is a list of float numbers, the splitter splits - data into several portions corresponding to the split ratios. If a list is - provided and the ratios are not summed to 1, they will be normalized. - seed (int): Seed. - min_rating (int): minimum number of ratings for user or item. - filter_by (str): either "user" or "item", depending on which of the two is to - filter with min_rating. - col_user (str): column name of user IDs. - col_item (str): column name of item IDs. - - Returns: - list: Splits of the input data as pd.DataFrame. - """ - return _do_stratification( - data, - ratio=ratio, - min_rating=min_rating, - filter_by=filter_by, - col_user=col_user, - col_item=col_item, - is_random=True, - seed=seed, - ) - - -def numpy_stratified_split(X, ratio=0.75, seed=42): - """Split the user/item affinity matrix (sparse matrix) into train and test set matrices while maintaining - local (i.e. per user) ratios. - - Main points : - - 1. In a typical recommender problem, different users rate a different number of items, - and therefore the user/affinity matrix has a sparse structure with variable number - of zeroes (unrated items) per row (user). Cutting a total amount of ratings will - result in a non-homogeneous distribution between train and test set, i.e. some test - users may have many ratings while other very little if none. - - 2. In an unsupervised learning problem, no explicit answer is given. For this reason - the split needs to be implemented in a different way then in supervised learningself. - In the latter, one typically split the dataset by rows (by examples), ending up with - the same number of features but different number of examples in the train/test setself. - This scheme does not work in the unsupervised case, as part of the rated items needs to - be used as a test set for fixed number of users. - - Solution: - - 1. Instead of cutting a total percentage, for each user we cut a relative ratio of the rated - items. For example, if user1 has rated 4 items and user2 10, cutting 25% will correspond to - 1 and 2.6 ratings in the test set, approximated as 1 and 3 according to the round() function. - In this way, the 0.75 ratio is satisfied both locally and globally, preserving the original - distribution of ratings across the train and test set. - - 2. It is easy (and fast) to satisfy this requirements by creating the test via element subtraction - from the original dataset X. We first create two copies of X; for each user we select a random - sample of local size ratio (point 1) and erase the remaining ratings, obtaining in this way the - train set matrix Xtst. The train set matrix is obtained in the opposite way. - - Args: - X (np.array, int): a sparse matrix to be split - ratio (float): fraction of the entire dataset to constitute the train set - seed (int): random seed - - Returns: - np.array, np.array: Xtr is the train set user/item affinity matrix. Xtst is the test set user/item affinity - matrix. - """ - - np.random.seed(seed) # set the random seed - test_cut = int((1 - ratio) * 100) # percentage of ratings to go in the test set - - # initialize train and test set matrices - Xtr = X.copy() - Xtst = X.copy() - - # find the number of rated movies per user - rated = np.sum(Xtr != 0, axis=1) - - # for each user, cut down a test_size% for the test set - tst = np.around((rated * test_cut) / 100).astype(int) - - for u in range(X.shape[0]): - # For each user obtain the index of rated movies - idx = np.asarray(np.where(Xtr[u] != 0))[0].tolist() - - # extract a random subset of size n from the set of rated movies without repetition - idx_tst = np.random.choice(idx, tst[u], replace=False) - idx_train = list(set(idx).difference(set(idx_tst))) - - # change the selected rated movies to unrated in the train set - Xtr[u, idx_tst] = 0 - # set the movies that appear already in the train set as 0 - Xtst[u, idx_train] = 0 - - del idx, idx_train, idx_tst - - return Xtr, Xtst diff --git a/recommendation-model/vowpal_wabbit_deep_dive.py b/recommendation-model/vowpal_wabbit_deep_dive.py deleted file mode 100644 index 4d4ddc7..0000000 --- a/recommendation-model/vowpal_wabbit_deep_dive.py +++ /dev/null @@ -1,371 +0,0 @@ - -import sys -sys.path.append('../..') - -import os -from subprocess import run -from tempfile import TemporaryDirectory -from time import process_time - -import pandas as pd -import papermill as pm - -from reco_utils.common.notebook_utils import is_jupyter -from reco_utils.dataset.movielens import load_pandas_df -from reco_utils.dataset.python_splitters import python_random_split -from reco_utils.evaluation.python_evaluation import (rmse, mae, exp_var, rsquared, get_top_k_items, - map_at_k, ndcg_at_k, precision_at_k, recall_at_k) - -print("System version: {}".format(sys.version)) -print("Pandas version: {}".format(pd.__version__)) - - -def to_vw(df, output, logistic=False): - """Convert Pandas DataFrame to vw input format - Args: - df (pd.DataFrame): input DataFrame - output (str): path to output file - logistic (bool): flag to convert label to logistic value - """ - with open(output, 'w') as f: - tmp = df.reset_index() - - # we need to reset the rating type to an integer to simplify the vw formatting - tmp['rating'] = tmp['rating'].astype('int64') - - # convert rating to binary value - if logistic: - tmp['rating'] = tmp['rating'].apply(lambda x: 1 if x >= 3 else -1) - - # convert each row to VW input format (https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Input-format) - # [label] [tag]|[user namespace] [user id feature] |[item namespace] [movie id feature] - # label is the true rating, tag is a unique id for the example just used to link predictions to truth - # user and item namespaces separate the features to support interaction features through command line options - for _, row in tmp.iterrows(): - f.write('{rating:d} {index:d}|user {userID:d} |item {itemID:d}\n'.format_map(row)) - - -def run_vw(train_params, test_params, test_data, prediction_path, logistic=False): - """Convenience function to train, test, and show metrics of interest - Args: - train_params (str): vw training parameters - test_params (str): vw testing parameters - test_data (pd.dataFrame): test data - prediction_path (str): path to vw prediction output - logistic (bool): flag to convert label to logistic value - Returns: - (dict): metrics and timing information - """ - - # train model - train_start = process_time() - run(train_params.split(' '), check=True) - train_stop = process_time() - - # test model - test_start = process_time() - run(test_params.split(' '), check=True) - test_stop = process_time() - - # read in predictions - pred_df = pd.read_csv(prediction_path, delim_whitespace=True, names=['prediction'], index_col=1).join(test_data) - pred_df.drop("rating", axis=1, inplace=True) - - test_df = test_data.copy() - if logistic: - # make the true label binary so that the metrics are captured correctly - test_df['rating'] = test['rating'].apply(lambda x: 1 if x >= 3 else -1) - else: - # ensure results are integers in correct range - pred_df['prediction'] = pred_df['prediction'].apply(lambda x: int(max(1, min(5, round(x))))) - - # calculate metrics - result = dict() - result['RMSE'] = rmse(test_df, pred_df) - result['MAE'] = mae(test_df, pred_df) - result['R2'] = rsquared(test_df, pred_df) - result['Explained Variance'] = exp_var(test_df, pred_df) - result['Train Time (ms)'] = (train_stop - train_start) * 1000 - result['Test Time (ms)'] = (test_stop - test_start) * 1000 - - return result - -# create temp directory to maintain data files -tmpdir = TemporaryDirectory() - -model_path = os.path.join(tmpdir.name, 'vw.model') -saved_model_path = os.path.join(tmpdir.name, 'vw_saved.model') -train_path = os.path.join(tmpdir.name, 'train.dat') -test_path = os.path.join(tmpdir.name, 'test.dat') -train_logistic_path = os.path.join(tmpdir.name, 'train_logistic.dat') -test_logistic_path = os.path.join(tmpdir.name, 'test_logistic.dat') -prediction_path = os.path.join(tmpdir.name, 'prediction.dat') -all_test_path = os.path.join(tmpdir.name, 'new_test.dat') -all_prediction_path = os.path.join(tmpdir.name, 'new_prediction.dat') - - -# # 1. Load & Transform Data - -# Select MovieLens data size: 100k, 1m, 10m, or 20m -MOVIELENS_DATA_SIZE = '100k' -TOP_K = 10 - -# load movielens data -df = load_pandas_df(MOVIELENS_DATA_SIZE) - -# split data to train and test sets, default values take 75% of each users ratings as train, and 25% as test -train, test = python_random_split(df, 0.75) - -# save train and test data in vw format -to_vw(df=train, output=train_path) -to_vw(df=test, output=test_path) - -# save data for logistic regression (requires adjusting the label) -to_vw(df=train, output=train_logistic_path, logistic=True) -to_vw(df=test, output=test_logistic_path, logistic=True) - - -# # 2. Regression Based Recommendations -# -# When considering different approaches for solving a problem with machine learning it is helpful to generate a baseline approach to understand how more complex solutions perform across dimensions of performance, time, and resource (memory or cpu) usage. -# -# Regression based approaches are some of the simplest and fastest baselines to consider for many ML problems. - -# ## 2.1 Linear Regression -# -# As the data provides a numerical rating between 1-5, fitting those values with a linear regression model is easy approach. This model is trained on examples of ratings as the target variable and corresponding user ids and movie ids as independent features. -# -# By passing each user-item rating in as an example the model will begin to learn weights based on average ratings for each user as well as average ratings per item. -# -# This however can generate predicted ratings which are no longer integers, so some additional adjustments should be made at prediction time to convert them back to the integer scale of 1 through 5 if necessary. Here, this is done in the evaluate function. - -""" -Quick description of command line parameters used - Other optional parameters can be found here: https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Command-Line-Arguments - VW uses linear regression by default, so no extra command line options - -f : indicates where the final model file will reside after training - -d : indicates which data file to use for training or testing - --quiet: this runs vw in quiet mode silencing stdout (for debugging it's helpful to not use quiet mode) - -i : indicates where to load the previously model file created during training - -t: this executes inference only (no learned updates to the model) - -p : indicates where to store prediction output -""" -train_params = 'vw -f {model} -d {data} --quiet'.format(model=model_path, data=train_path) -# save these results for later use during top-k analysis -test_params = 'vw -i {model} -d {data} -t -p {pred} --quiet'.format(model=model_path, data=test_path, pred=prediction_path) - -result = run_vw(train_params=train_params, - test_params=test_params, - test_data=test, - prediction_path=prediction_path) - -comparison = pd.DataFrame(result, index=['Linear Regression']) -comparison - - -# ## 2.2 Linear Regression with Interaction Features -# -# Previously we treated the user features and item features independently, but taking into account interactions between features can provide a mechanism to learn more fine grained preferences of the users. -# -# To generate interaction features use the quadratic command line argument and specify the namespaces that should be combined: '-q ui' combines the user and item namespaces based on the first letter of each. -# -# Currently the userIDs and itemIDs used are integers which means the feature ID is used directly, for instance when user ID 123 rates movie 456, the training example puts a 1 in the values for features 123 and 456. However when interaction is specified (or if a feature is a string) the resulting interaction feature is hashed into the available feature space. Feature hashing is a way to take a very sparse high dimensional feature space and reduce it into a lower dimensional space. This allows for reduced memory while retaining fast computation of feature and model weights. -# -# The caveat with feature hashing, is that it can lead to hash collisions, where separate features are mapped to the same location. In this case it can be beneficial to increase the size of the space to support interactions between features of high cardinality. The available feature space is dictated by the --bit_precision (-b) argument. Where the total available space for all features in the model is 2N. -# - -""" -Quick description of command line parameters used - -b : sets the memory size to 2N entries - -q : create quadratic feature interactions between features in namespaces starting with 'a' and 'b' -""" -train_params = 'vw -b 26 -q ui -f {model} -d {data} --quiet'.format(model=saved_model_path, data=train_path) -test_params = 'vw -i {model} -d {data} -t -p {pred} --quiet'.format(model=saved_model_path, data=test_path, pred=prediction_path) - -result = run_vw(train_params=train_params, - test_params=test_params, - test_data=test, - prediction_path=prediction_path) -saved_result = result - -comparison = comparison.append(pd.DataFrame(result, index=['Linear Regression w/ Interaction'])) -comparison - - -# ## 2.3 Multinomial Logistic Regression -# -# An alternative to linear regression is to leverage multinomial logistic regression, or multiclass classification, which treats each rating value as a distinct class. -# -# This avoids any non integer results, but also reduces the training data for each class which could lead to poorer performance if the counts of different rating levels are skewed. -# -# Basic multiclass logistic regression can be accomplished using the One Against All approach specified by the '--oaa N' option, where N is the number of classes and proving the logistic option for the loss function to be used. - -""" -Quick description of command line parameters used - --loss_function logistic: sets the model loss function for logistic regression - --oaa : trains N separate models using One-Against-All approach (all models are captured in the single model file) - This expects the labels to be contiguous integers starting at 1 - --link logistic: converts the predicted output from logit to probability -The predicted output is the model (label) with the largest likelihood -""" -train_params = 'vw --loss_function logistic --oaa 5 -f {model} -d {data} --quiet'.format(model=model_path, data=train_path) -test_params = 'vw --link logistic -i {model} -d {data} -t -p {pred} --quiet'.format(model=model_path, data=test_path, pred=prediction_path) - -result = run_vw(train_params=train_params, - test_params=test_params, - test_data=test, - prediction_path=prediction_path) - -comparison = comparison.append(pd.DataFrame(result, index=['Multinomial Regression'])) -comparison - - -# ## 2.4 Logistic Regression -# -# Additionally, one might simply be interested in whether the user likes or dislikes an item and we can adjust the input data to represent a binary outcome, where ratings in (1,3] are dislikes (negative results) and (3,5] are likes (positive results). -# -# This framing allows for a simple logistic regression model to be applied. To perform logistic regression the loss_function parameter is changed to 'logistic' and the target label is switched to [0, 1]. Also, be sure to set '--link logistic' during prediction to convert the logit output back to a probability value. - - -train_params = 'vw --loss_function logistic -f {model} -d {data} --quiet'.format(model=model_path, data=train_logistic_path) -test_params = 'vw --link logistic -i {model} -d {data} -t -p {pred} --quiet'.format(model=model_path, data=test_logistic_path, pred=prediction_path) - -result = run_vw(train_params=train_params, - test_params=test_params, - test_data=test, - prediction_path=prediction_path, - logistic=True) - -comparison = comparison.append(pd.DataFrame(result, index=['Logistic Regression'])) -comparison - - -# # 3. Matrix Factorization Based Recommendations -# -# All of the above approaches train a regression model, but VW also supports matrix factorization with two different approaches. -# -# As opposed to learning direct weights for specific users, items and interactions when training a regression model, matrix factorization attempts to learn latent factors that determine how a user rates an item. An example of how this might work is if you could represent user preference and item categorization by genre. Given a smaller set of genres we can associate how much each item belongs to each genre class, and we can set weights for a user's preference for each genre. Both sets of weights could be represented as a vectors where the inner product would be the user-item rating. Matrix factorization approaches learn low rank matrices for latent features of users and items such that those matrices can be combined to approximate the original user item matrix. -# -# ## 3.1. Singular Value Decomposition Based Matrix Factorization -# -# The first approach performs matrix factorization based on Singular Value Decomposition (SVD) to learn a low rank approximation for the user-item rating matix. It is is called using the '--rank' command line argument. -# -# See the [Matrix Factorization Example](https://github.com/VowpalWabbit/vowpal_wabbit/wiki/Matrix-factorization-example) for more detail. - -""" -Quick description of command line parameters used - --rank : sets the number of latent factors in the reduced matrix -""" -train_params = 'vw --rank 5 -q ui -f {model} -d {data} --quiet'.format(model=model_path, data=train_path) -test_params = 'vw -i {model} -d {data} -t -p {pred} --quiet'.format(model=model_path, data=test_path, pred=prediction_path) - -result = run_vw(train_params=train_params, - test_params=test_params, - test_data=test, - prediction_path=prediction_path) - -comparison = comparison.append(pd.DataFrame(result, index=['Matrix Factorization (Rank)'])) -comparison - - -# ## 3.2. Factorization Machine Based Matrix Factorization -# -# An alternative approach based on [Rendel's factorization machines](https://cseweb.ucsd.edu/classes/fa17/cse291-b/reading/Rendle2010FM.pdf) is called using '--lrq' (low rank quadratic). More LRQ details in this [demo](https://github.com/VowpalWabbit/vowpal_wabbit/tree/master/demo/movielens). -# -# This learns two lower rank matrices which are multiplied to generate an approximation of the user-item rating matrix. Compressing the matrix in this way leads to learning generalizable factors which avoids some of the limitations of using regression models with extremely sparse interaction features. This can lead to better convergence and smaller on-disk models. -# -# An additional term to improve performance is --lrqdropout which will dropout columns during training. This however tends to increase the optimal rank size. Other parameters such as L2 regularization can help avoid overfitting. -""" -Quick description of command line parameters used - --lrq : learns approximations of rank N for the quadratic interaction between namespaces starting with 'a' and 'b' - --lrqdroupout: performs dropout during training to improve generalization -""" -train_params = 'vw --lrq ui7 -f {model} -d {data} --quiet'.format(model=model_path, data=train_path) -test_params = 'vw -i {model} -d {data} -t -p {pred} --quiet'.format(model=model_path, data=test_path, pred=prediction_path) - -result = run_vw(train_params=train_params, - test_params=test_params, - test_data=test, - prediction_path=prediction_path) - -comparison = comparison.append(pd.DataFrame(result, index=['Matrix Factorization (LRQ)'])) -comparison - - -# # 4. Conclusion -# -# The table above shows a few of the approaches in the VW library that can be used for recommendation prediction. The relative performance can change when applied to different datasets and properly tuned, but it is useful to note the rapid speed at which all approaches are able to train (75,000 examples) and test (25,000 examples). - -# # 5. Scoring - -# After training a model with any of the above approaches, the model can be used to score potential user-pairs in offline batch mode, or in a real-time scoring mode. The example below shows how to leverage the utilities in the reco_utils directory to generate Top-K recommendations from offline scored output. - -# First construct a test set of all items (except those seen during training) for each user -users = df[['userID']].drop_duplicates() -users['key'] = 1 - -items = df[['itemID']].drop_duplicates() -items['key'] = 1 - -all_pairs = pd.merge(users, items, on='key').drop(columns=['key']) - -# now combine with training data and keep only entries that were note in training -merged = pd.merge(train[['userID', 'itemID', 'rating']], all_pairs, on=["userID", "itemID"], how="outer") -all_user_items = merged[merged['rating'].isnull()].fillna(0).astype('int64') - -# save in vw format (this can take a while) -to_vw(df=all_user_items, output=all_test_path) - - -# In[8]: - -# run the saved model (linear regression with interactions) on the new dataset -test_start = process_time() -test_params = 'vw -i {model} -d {data} -t -p {pred} --quiet'.format(model=saved_model_path, data=all_test_path, pred=prediction_path) -run(test_params.split(' '), check=True) -test_stop = process_time() -test_time = test_stop - test_start - -# load predictions and get top-k from previous saved results -pred_data = pd.read_csv(prediction_path, delim_whitespace=True, names=['prediction'], index_col=1).join(all_user_items) -top_k = get_top_k_items(pred_data, col_rating='prediction', k=TOP_K)[['prediction', 'userID', 'itemID']] -top_k.head() - - - -# get ranking metrics -args = [test, top_k] -kwargs = dict(col_user='userID', col_item='itemID', col_rating='rating', col_prediction='prediction', - relevancy_method='top_k', k=TOP_K) - -rank_metrics = {'MAP': map_at_k(*args, **kwargs), - 'NDCG': ndcg_at_k(*args, **kwargs), - 'Precision': precision_at_k(*args, **kwargs), - 'Recall': recall_at_k(*args, **kwargs)} - - -# final results -all_results = ['{k}: {v}'.format(k=k, v=v) for k, v in saved_result.items()] -all_results += ['{k}: {v}'.format(k=k, v=v) for k, v in rank_metrics.items()] -print('\n'.join(all_results)) - - -# # 6. Cleanup - -# record results for testing -if is_jupyter(): - pm.record('rmse', saved_result['RMSE']) - pm.record('mae', saved_result['MAE']) - pm.record('rsquared', saved_result['R2']) - pm.record('exp_var', saved_result['Explained Variance']) - pm.record("train_time", saved_result['Train Time (ms)']) - pm.record("test_time", test_time) - pm.record('map', rank_metrics['MAP']) - pm.record('ndcg', rank_metrics['NDCG']) - pm.record('precision', rank_metrics['Precision']) - pm.record('recall', rank_metrics['Recall']) - - -tmpdir.cleanup() - - From 977ba82cca54975cde86de85689592ae9b6bf38d Mon Sep 17 00:00:00 2001 From: sayoojbk Date: Wed, 25 Dec 2019 16:22:44 +0530 Subject: [PATCH 7/7] made changes to build fail issues. --- app/main/__init__.py | 2 +- app/main/dataModel/user.py | 2 +- app/main/util/data_cleaning.py | 2 +- requirements.txt | 4 +++- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/app/main/__init__.py b/app/main/__init__.py index 2acb0e9..d104d1d 100644 --- a/app/main/__init__.py +++ b/app/main/__init__.py @@ -9,7 +9,7 @@ def create_app(config_name): app = Flask(__name__) - DB.init() + DB.__init__() app.config.from_object(config_by_name[config_name]) flask_bcrypt.init_app(app) return app diff --git a/app/main/dataModel/user.py b/app/main/dataModel/user.py index 98232d7..c5a770a 100644 --- a/app/main/dataModel/user.py +++ b/app/main/dataModel/user.py @@ -53,7 +53,7 @@ def insert(self): # to get the id of the user ?. TO do find the use of this function. nOT SURE WHAT TO RETURN HERE. def get_id(self): - self.id + return self.id def set_id(self, id): self.id = id diff --git a/app/main/util/data_cleaning.py b/app/main/util/data_cleaning.py index 7d5f3d8..d154eab 100644 --- a/app/main/util/data_cleaning.py +++ b/app/main/util/data_cleaning.py @@ -5,7 +5,7 @@ from ..dataModel.merchant import Merchant from ..dataModel.product import Product from ..dataModel.user_purchase import UserPurchase -from ..datkaModel.user import User +from ..dataModel.user import User from ..database import DB diff --git a/requirements.txt b/requirements.txt index 1763521..67141a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,4 +21,6 @@ python-editor==1.0.4 pytz==2019.1 six==1.12.0 Werkzeug==0.15.5 -pymongo==3.5.1 \ No newline at end of file +pymongo==3.5.1 +numpy==1.18.0 +pandas==0.25.3 \ No newline at end of file