diff --git a/gpu_bdb/bdb_tools/q28_utils.py b/gpu_bdb/bdb_tools/q28_utils.py index c594dae9..67e52e69 100644 --- a/gpu_bdb/bdb_tools/q28_utils.py +++ b/gpu_bdb/bdb_tools/q28_utils.py @@ -18,14 +18,22 @@ import cupy as cp import cupy +import pandas as pd import cudf import dask +import dask_cudf from cuml.feature_extraction.text import HashingVectorizer from cuml.dask.naive_bayes import MultinomialNB as DistMNB from cuml.dask.common import to_dask_cudf -from cuml.dask.common.input_utils import DistributedDataHandler + +from sklearn.feature_extraction.text import HashingVectorizer as SKHashingVectorizer +from sklearn.naive_bayes import MultinomialNB as MultNB + +from dask_ml.wrappers import ParallelPostFit + +import scipy from distributed import wait @@ -33,9 +41,10 @@ from bdb_tools.readers import build_reader +from cuml.dask.common.part_utils import _extract_partitions + N_FEATURES = 2 ** 23 # Spark is doing 2^20 ngram_range = (1, 2) -preprocessor = lambda s:s.str.lower() norm = None alternate_sign = False @@ -45,6 +54,7 @@ def read_tables(config, c=None): data_format=config["file_format"], basepath=config["data_dir"], split_row_groups=True, + backend=config["backend"] ) columns = [ @@ -60,18 +70,33 @@ def read_tables(config, c=None): return pr_df -def gpu_hashing_vectorizer(x): - vec = HashingVectorizer(n_features=N_FEATURES, - alternate_sign=alternate_sign, - ngram_range=ngram_range, - norm=norm, - preprocessor=preprocessor - ) +def hashing_vectorizer(x): + + if isinstance(x, cudf.Series): + vectorizer = HashingVectorizer + preprocessor = lambda s:s.str.lower() + else: + vectorizer = SKHashingVectorizer + preprocessor = lambda s:s.lower() + + vec = vectorizer( + n_features=N_FEATURES, + alternate_sign=alternate_sign, + ngram_range=ngram_range, + norm=norm, + preprocessor=preprocessor + ) + return vec.fit_transform(x) def map_labels(ser): - output_ser = cudf.Series(cudf.core.column.full(size=len(ser), fill_value=2, dtype=np.int32)) + + if isinstance(ser, cudf.Series): + output_ser = cudf.Series(cudf.core.column.full(size=len(ser), fill_value=2, dtype=np.int32)) + else: + output_ser = pd.Series(2, index=ser.index, dtype=np.int32) + zero_flag = (ser==1) | (ser==2) output_ser.loc[zero_flag]=0 @@ -80,13 +105,22 @@ def map_labels(ser): return output_ser + def build_features(t): + + if isinstance(t, dask_cudf.DataFrame): + meta_arr = dask.array.from_array( + cp.sparse.csr_matrix(cp.zeros(1, dtype=np.float32)) + ) + else: + meta_arr = dask.array.from_array( + scipy.sparse.csr_matrix(np.zeros(1, dtype=np.float32)) + ) + X = t["pr_review_content"] X = X.map_partitions( - gpu_hashing_vectorizer, - meta=dask.array.from_array( - cupy.sparse.csr_matrix(cupy.zeros(1, dtype=cp.float32)) - ), + hashing_vectorizer, + meta=meta_arr, ) X = X.astype(np.float32).persist() @@ -97,22 +131,39 @@ def build_features(t): def build_labels(reviews_df): y = reviews_df["pr_review_rating"].map_partitions(map_labels) - y = y.map_partitions(lambda x: cupy.asarray(x, cupy.int32)).persist() + + if isinstance(reviews_df, dask_cudf.DataFrame): + y = y.map_partitions(lambda x: cp.asarray(x, np.int32)).persist() + y._meta = cp.array(y._meta) + else: + y = y.map_partitions(lambda x: np.asarray(x, np.int32)).persist() + y.compute_chunk_sizes() return y + def categoricalize(num_sr): - return num_sr.astype("str").str.replace(["0", "1", "2"], ["NEG", "NEUT", "POS"]) + + return (num_sr + .astype(str) + .str.replace("0", "NEG") + .str.replace("1", "NEUT") + .str.replace("2", "POS") + ) def sum_tp_fp(y_y_pred, nclasses): y, y_pred = y_y_pred - res = cp.zeros((nclasses, 2), order="F") + + res = np.zeros((nclasses, 2), order="F", like=y) for i in range(nclasses): - pos_pred_ix = cp.where(y_pred == i)[0] + if isinstance(y, cp.ndarray): + pos_pred_ix = cp.where(y_pred == i)[0] + else: + pos_pred_ix = np.where(y_pred == i)[0] # short circuit if len(pos_pred_ix) == 0: @@ -123,11 +174,15 @@ def sum_tp_fp(y_y_pred, nclasses): fp_sum = (y_pred[pos_pred_ix] != y[pos_pred_ix]).sum() res[i][0] = tp_sum res[i][1] = fp_sum + return res def precision_score(client, y, y_pred, average="binary"): - nclasses = len(cp.unique(y.map_blocks(lambda x: cp.unique(x)).compute())) + if isinstance(y._meta, cp.ndarray): + nclasses = len(cp.unique(y.map_blocks(lambda x: cp.unique(x)).compute())) + else: + nclasses = len(np.unique(y.map_blocks(lambda x: np.unique(x)).compute())) if average == "binary" and nclasses > 2: raise ValueError @@ -135,24 +190,24 @@ def precision_score(client, y, y_pred, average="binary"): if nclasses < 2: raise ValueError("Single class precision is not yet supported") - ddh = DistributedDataHandler.create([y, y_pred]) + gpu_futures = client.sync(_extract_partitions, [y, y_pred], client) precision_scores = client.compute( [ client.submit(sum_tp_fp, part, nclasses, workers=[worker]) - for worker, part in ddh.gpu_futures + for worker, part in gpu_futures ], sync=True, ) - res = cp.zeros((nclasses, 2), order="F") + res = np.zeros((nclasses, 2), order="F", like=y._meta) for i in precision_scores: res += i if average == "binary" or average == "macro": + prec = np.zeros(nclasses, like=y._meta) - prec = cp.zeros(nclasses) for i in range(nclasses): tp_sum, fp_sum = res[i] prec[i] = (tp_sum / (tp_sum + fp_sum)).item() @@ -162,8 +217,12 @@ def precision_score(client, y, y_pred, average="binary"): else: return prec.mean().item() else: - global_tp = cp.sum(res[:, 0]) - global_fp = cp.sum(res[:, 1]) + if isinstance(y._meta, cp.ndarray): + global_tp = cp.sum(res[:, 0]) + global_fp = cp.sum(res[:, 1]) + else: + global_tp = np.sum(res[:, 0]) + global_fp = np.sum(res[:, 1]) return global_tp / (global_tp + global_fp).item() @@ -178,42 +237,64 @@ def local_cm(y_y_pred, unique_labels, sample_weight): # Assume labels are monotonically increasing for now. # intersect y_pred, y_true with labels, eliminate items not in labels - ind = cp.logical_and(y_pred < n_labels, y_true < n_labels) + if isinstance(y_true, cp.ndarray): + ind = cp.logical_and(y_pred < n_labels, y_true < n_labels) + else: + ind = np.logical_and(y_pred < n_labels, y_true < n_labels) + y_pred = y_pred[ind] y_true = y_true[ind] - if sample_weight is None: - sample_weight = cp.ones(y_true.shape[0], dtype=np.int64) + if isinstance(y_true, cp.ndarray): + if sample_weight is None: + sample_weight = cp.ones(y_true.shape[0], dtype=np.int64) + else: + sample_weight = cp.asarray(sample_weight) else: - sample_weight = cp.asarray(sample_weight) + if sample_weight is None: + sample_weight = np.ones(y_true.shape[0], dtype=np.int64) + else: + sample_weight = np.asarray(sample_weight) sample_weight = sample_weight[ind] - cm = cp.sparse.coo_matrix( - (sample_weight, (y_true, y_pred)), shape=(n_labels, n_labels), dtype=cp.float32, - ).toarray() + if isinstance(y_true, cp.ndarray): + cm = cupy.sparse.coo_matrix( + (sample_weight, (y_true, y_pred)), shape=(n_labels, n_labels), dtype=np.float32, + ).toarray() + + return cp.nan_to_num(cm) + else: + cm = scipy.sparse.coo_matrix( + (sample_weight, (y_true, y_pred)), shape=(n_labels, n_labels), dtype=np.float32, + ).toarray() - return cp.nan_to_num(cm) + return np.nan_to_num(cm) def confusion_matrix(client, y_true, y_pred, normalize=None, sample_weight=None): - unique_classes = cp.unique(y_true.map_blocks(lambda x: cp.unique(x)).compute()) + if isinstance(y_true._meta, cp.ndarray): + unique_classes = cp.unique(y_true.map_blocks(lambda x: cp.unique(x)).compute()) + else: + unique_classes = np.unique(y_true.map_blocks(lambda x: np.unique(x)).compute()) + nclasses = len(unique_classes) - ddh = DistributedDataHandler.create([y_true, y_pred]) + gpu_futures = client.sync(_extract_partitions, [y_true, y_pred], client) cms = client.compute( [ client.submit( local_cm, part, unique_classes, sample_weight, workers=[worker] ) - for worker, part in ddh.gpu_futures + for worker, part in gpu_futures ], sync=True, ) - cm = cp.zeros((nclasses, nclasses)) + cm = np.zeros((nclasses, nclasses), like=y_true._meta) + for i in cms: cm += i @@ -224,20 +305,26 @@ def confusion_matrix(client, y_true, y_pred, normalize=None, sample_weight=None) cm = cm / cm.sum(axis=0, keepdims=True) elif normalize == "all": cm = cm / cm.sum() - cm = cp.nan_to_num(cm) + + if isinstance(y_true._meta, cp.ndarray): + cm = cp.nan_to_num(cm) + else: + cm = np.nan_to_num(cm) return cm def accuracy_score(client, y, y_hat): - ddh = DistributedDataHandler.create([y_hat, y]) + gpu_futures = client.sync(_extract_partitions, [y_hat, y], client) def _count_accurate_predictions(y_hat_y): y_hat, y = y_hat_y - y_hat = cp.asarray(y_hat, dtype=y_hat.dtype) - y = cp.asarray(y, dtype=y.dtype) - return y.shape[0] - cp.count_nonzero(y - y_hat) + + if isinstance(y, cp.ndarray): + return y.shape[0] - cp.count_nonzero(y - y_hat) + else: + return y.shape[0] - np.count_nonzero(y - y_hat) key = uuid1() @@ -249,12 +336,12 @@ def _count_accurate_predictions(y_hat_y): workers=[worker_future[0]], key="%s-%s" % (key, idx), ) - for idx, worker_future in enumerate(ddh.gpu_futures) + for idx, worker_future in enumerate(gpu_futures) ], sync=True, ) - return sum(futures) / y.shape[0] + return sum(futures) / y.shape[0] def post_etl_processing(client, train_data, test_data): @@ -267,27 +354,33 @@ def post_etl_processing(client, train_data, test_data): y_test = build_labels(test_data) # Perform ML - model = DistMNB(client=client, alpha=0.001) - model.fit(X_train, y_train) + if isinstance(y_train._meta, cp.ndarray): + model = DistMNB(client=client, alpha=0.001) + model.fit(X_train, y_train) + else: + model = ParallelPostFit(estimator=MultNB(alpha=0.001)) + model.fit(X_train.compute(), y_train.compute()) ### this regression seems to be coming from here y_hat = model.predict(X_test).persist() # Compute distributed performance metrics acc = accuracy_score(client, y_test, y_hat) - print("Accuracy: " + str(acc)) - prec = precision_score(client, y_test, y_hat, average="macro") + prec = precision_score(client, y_test, y_hat, average="macro") print("Precision: " + str(prec)) - cmat = confusion_matrix(client, y_test, y_hat) + cmat = confusion_matrix(client, y_test, y_hat) print("Confusion Matrix: " + str(cmat)) # Place results back in original Dataframe - ddh = DistributedDataHandler.create(y_hat) + gpu_futures = client.sync(_extract_partitions, y_hat, client) + + ser_type = cudf.Series if isinstance(y_test._meta, cp.ndarray) else pd.Series + test_preds = to_dask_cudf( - [client.submit(cudf.Series, part) for w, part in ddh.gpu_futures] + [client.submit(ser_type, part) for w, part in gpu_futures] ) test_preds = test_preds.map_partitions(categoricalize)