-
Notifications
You must be signed in to change notification settings - Fork 43
Add cpu backend to q28 #257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,24 +18,33 @@ | |||||||||
| import cupy as cp | ||||||||||
| import cupy | ||||||||||
|
|
||||||||||
| import pandas as pd | ||||||||||
| import cudf | ||||||||||
|
|
||||||||||
| import dask | ||||||||||
| import dask_cudf | ||||||||||
|
|
||||||||||
| from cuml.feature_extraction.text import HashingVectorizer | ||||||||||
| from cuml.dask.naive_bayes import MultinomialNB as DistMNB | ||||||||||
| from cuml.dask.common import to_dask_cudf | ||||||||||
| from cuml.dask.common.input_utils import DistributedDataHandler | ||||||||||
|
|
||||||||||
| from sklearn.feature_extraction.text import HashingVectorizer as SKHashingVectorizer | ||||||||||
| from sklearn.naive_bayes import MultinomialNB as MultNB | ||||||||||
|
|
||||||||||
| from dask_ml.wrappers import ParallelPostFit | ||||||||||
|
|
||||||||||
| import scipy | ||||||||||
|
|
||||||||||
| from distributed import wait | ||||||||||
|
|
||||||||||
| from uuid import uuid1 | ||||||||||
|
|
||||||||||
| from bdb_tools.readers import build_reader | ||||||||||
|
|
||||||||||
| from cuml.dask.common.part_utils import _extract_partitions | ||||||||||
|
|
||||||||||
| N_FEATURES = 2 ** 23 # Spark is doing 2^20 | ||||||||||
| ngram_range = (1, 2) | ||||||||||
| preprocessor = lambda s:s.str.lower() | ||||||||||
| norm = None | ||||||||||
| alternate_sign = False | ||||||||||
|
|
||||||||||
|
|
@@ -45,6 +54,7 @@ def read_tables(config, c=None): | |||||||||
| data_format=config["file_format"], | ||||||||||
| basepath=config["data_dir"], | ||||||||||
| split_row_groups=True, | ||||||||||
| backend=config["backend"] | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| columns = [ | ||||||||||
|
|
@@ -60,18 +70,33 @@ def read_tables(config, c=None): | |||||||||
| return pr_df | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def gpu_hashing_vectorizer(x): | ||||||||||
| vec = HashingVectorizer(n_features=N_FEATURES, | ||||||||||
| alternate_sign=alternate_sign, | ||||||||||
| ngram_range=ngram_range, | ||||||||||
| norm=norm, | ||||||||||
| preprocessor=preprocessor | ||||||||||
| ) | ||||||||||
| def hashing_vectorizer(x): | ||||||||||
|
|
||||||||||
| if isinstance(x, cudf.Series): | ||||||||||
| vectorizer = HashingVectorizer | ||||||||||
| preprocessor = lambda s:s.str.lower() | ||||||||||
| else: | ||||||||||
| vectorizer = SKHashingVectorizer | ||||||||||
| preprocessor = lambda s:s.lower() | ||||||||||
|
|
||||||||||
| vec = vectorizer( | ||||||||||
| n_features=N_FEATURES, | ||||||||||
| alternate_sign=alternate_sign, | ||||||||||
| ngram_range=ngram_range, | ||||||||||
| norm=norm, | ||||||||||
| preprocessor=preprocessor | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| return vec.fit_transform(x) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def map_labels(ser): | ||||||||||
| output_ser = cudf.Series(cudf.core.column.full(size=len(ser), fill_value=2, dtype=np.int32)) | ||||||||||
|
|
||||||||||
| if isinstance(ser, cudf.Series): | ||||||||||
| output_ser = cudf.Series(cudf.core.column.full(size=len(ser), fill_value=2, dtype=np.int32)) | ||||||||||
| else: | ||||||||||
| output_ser = pd.Series(2, index=ser.index, dtype=np.int32) | ||||||||||
|
Comment on lines
+96
to
+98
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should have similar behavior here. We should not use cudf specific API when not needed. Lets use cupy and numpy arrray here.e pd.Series(np.full(shape=len(ser), fill_value=2, dtype=np.int32))cudf.Series(cp.full(shape=len(ser), fill_value=2, dtype=cp.int32)))) |
||||||||||
|
|
||||||||||
| zero_flag = (ser==1) | (ser==2) | ||||||||||
| output_ser.loc[zero_flag]=0 | ||||||||||
|
|
||||||||||
|
|
@@ -80,13 +105,22 @@ def map_labels(ser): | |||||||||
|
|
||||||||||
| return output_ser | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def build_features(t): | ||||||||||
|
|
||||||||||
| if isinstance(t, dask_cudf.DataFrame): | ||||||||||
| meta_arr = dask.array.from_array( | ||||||||||
| cp.sparse.csr_matrix(cp.zeros(1, dtype=np.float32)) | ||||||||||
| ) | ||||||||||
| else: | ||||||||||
| meta_arr = dask.array.from_array( | ||||||||||
| scipy.sparse.csr_matrix(np.zeros(1, dtype=np.float32)) | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| X = t["pr_review_content"] | ||||||||||
| X = X.map_partitions( | ||||||||||
| gpu_hashing_vectorizer, | ||||||||||
| meta=dask.array.from_array( | ||||||||||
| cupy.sparse.csr_matrix(cupy.zeros(1, dtype=cp.float32)) | ||||||||||
| ), | ||||||||||
| hashing_vectorizer, | ||||||||||
| meta=meta_arr, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| X = X.astype(np.float32).persist() | ||||||||||
|
|
@@ -97,22 +131,39 @@ def build_features(t): | |||||||||
|
|
||||||||||
| def build_labels(reviews_df): | ||||||||||
| y = reviews_df["pr_review_rating"].map_partitions(map_labels) | ||||||||||
| y = y.map_partitions(lambda x: cupy.asarray(x, cupy.int32)).persist() | ||||||||||
|
|
||||||||||
| if isinstance(reviews_df, dask_cudf.DataFrame): | ||||||||||
| y = y.map_partitions(lambda x: cp.asarray(x, np.int32)).persist() | ||||||||||
| y._meta = cp.array(y._meta) | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need to do this ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically the |
||||||||||
| else: | ||||||||||
| y = y.map_partitions(lambda x: np.asarray(x, np.int32)).persist() | ||||||||||
|
|
||||||||||
| y.compute_chunk_sizes() | ||||||||||
|
|
||||||||||
| return y | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def categoricalize(num_sr): | ||||||||||
| return num_sr.astype("str").str.replace(["0", "1", "2"], ["NEG", "NEUT", "POS"]) | ||||||||||
|
|
||||||||||
| return (num_sr | ||||||||||
| .astype(str) | ||||||||||
| .str.replace("0", "NEG") | ||||||||||
| .str.replace("1", "NEUT") | ||||||||||
| .str.replace("2", "POS") | ||||||||||
| ) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def sum_tp_fp(y_y_pred, nclasses): | ||||||||||
|
|
||||||||||
| y, y_pred = y_y_pred | ||||||||||
| res = cp.zeros((nclasses, 2), order="F") | ||||||||||
|
|
||||||||||
| res = np.zeros((nclasses, 2), order="F", like=y) | ||||||||||
|
|
||||||||||
| for i in range(nclasses): | ||||||||||
| pos_pred_ix = cp.where(y_pred == i)[0] | ||||||||||
| if isinstance(y, cp.ndarray): | ||||||||||
| pos_pred_ix = cp.where(y_pred == i)[0] | ||||||||||
| else: | ||||||||||
| pos_pred_ix = np.where(y_pred == i)[0] | ||||||||||
|
|
||||||||||
| # short circuit | ||||||||||
| if len(pos_pred_ix) == 0: | ||||||||||
|
|
@@ -123,36 +174,40 @@ def sum_tp_fp(y_y_pred, nclasses): | |||||||||
| fp_sum = (y_pred[pos_pred_ix] != y[pos_pred_ix]).sum() | ||||||||||
| res[i][0] = tp_sum | ||||||||||
| res[i][1] = fp_sum | ||||||||||
|
|
||||||||||
| return res | ||||||||||
|
|
||||||||||
| def precision_score(client, y, y_pred, average="binary"): | ||||||||||
|
|
||||||||||
| nclasses = len(cp.unique(y.map_blocks(lambda x: cp.unique(x)).compute())) | ||||||||||
| if isinstance(y._meta, cp.ndarray): | ||||||||||
| nclasses = len(cp.unique(y.map_blocks(lambda x: cp.unique(x)).compute())) | ||||||||||
| else: | ||||||||||
| nclasses = len(np.unique(y.map_blocks(lambda x: np.unique(x)).compute())) | ||||||||||
|
|
||||||||||
| if average == "binary" and nclasses > 2: | ||||||||||
| raise ValueError | ||||||||||
|
|
||||||||||
| if nclasses < 2: | ||||||||||
| raise ValueError("Single class precision is not yet supported") | ||||||||||
|
|
||||||||||
| ddh = DistributedDataHandler.create([y, y_pred]) | ||||||||||
| gpu_futures = client.sync(_extract_partitions, [y, y_pred], client) | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are they called |
||||||||||
|
|
||||||||||
| precision_scores = client.compute( | ||||||||||
| [ | ||||||||||
| client.submit(sum_tp_fp, part, nclasses, workers=[worker]) | ||||||||||
| for worker, part in ddh.gpu_futures | ||||||||||
| for worker, part in gpu_futures | ||||||||||
| ], | ||||||||||
| sync=True, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| res = cp.zeros((nclasses, 2), order="F") | ||||||||||
| res = np.zeros((nclasses, 2), order="F", like=y._meta) | ||||||||||
|
|
||||||||||
| for i in precision_scores: | ||||||||||
| res += i | ||||||||||
|
|
||||||||||
| if average == "binary" or average == "macro": | ||||||||||
| prec = np.zeros(nclasses, like=y._meta) | ||||||||||
|
|
||||||||||
| prec = cp.zeros(nclasses) | ||||||||||
| for i in range(nclasses): | ||||||||||
| tp_sum, fp_sum = res[i] | ||||||||||
| prec[i] = (tp_sum / (tp_sum + fp_sum)).item() | ||||||||||
|
|
@@ -162,8 +217,12 @@ def precision_score(client, y, y_pred, average="binary"): | |||||||||
| else: | ||||||||||
| return prec.mean().item() | ||||||||||
| else: | ||||||||||
| global_tp = cp.sum(res[:, 0]) | ||||||||||
| global_fp = cp.sum(res[:, 1]) | ||||||||||
| if isinstance(y._meta, cp.ndarray): | ||||||||||
| global_tp = cp.sum(res[:, 0]) | ||||||||||
| global_fp = cp.sum(res[:, 1]) | ||||||||||
|
Comment on lines
+221
to
+222
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
| else: | ||||||||||
| global_tp = np.sum(res[:, 0]) | ||||||||||
| global_fp = np.sum(res[:, 1]) | ||||||||||
|
|
||||||||||
| return global_tp / (global_tp + global_fp).item() | ||||||||||
|
|
||||||||||
|
|
@@ -178,42 +237,64 @@ def local_cm(y_y_pred, unique_labels, sample_weight): | |||||||||
| # Assume labels are monotonically increasing for now. | ||||||||||
|
|
||||||||||
| # intersect y_pred, y_true with labels, eliminate items not in labels | ||||||||||
| ind = cp.logical_and(y_pred < n_labels, y_true < n_labels) | ||||||||||
| if isinstance(y_true, cp.ndarray): | ||||||||||
| ind = cp.logical_and(y_pred < n_labels, y_true < n_labels) | ||||||||||
| else: | ||||||||||
| ind = np.logical_and(y_pred < n_labels, y_true < n_labels) | ||||||||||
|
|
||||||||||
| y_pred = y_pred[ind] | ||||||||||
| y_true = y_true[ind] | ||||||||||
|
|
||||||||||
| if sample_weight is None: | ||||||||||
| sample_weight = cp.ones(y_true.shape[0], dtype=np.int64) | ||||||||||
| if isinstance(y_true, cp.ndarray): | ||||||||||
| if sample_weight is None: | ||||||||||
| sample_weight = cp.ones(y_true.shape[0], dtype=np.int64) | ||||||||||
| else: | ||||||||||
| sample_weight = cp.asarray(sample_weight) | ||||||||||
| else: | ||||||||||
| sample_weight = cp.asarray(sample_weight) | ||||||||||
| if sample_weight is None: | ||||||||||
| sample_weight = np.ones(y_true.shape[0], dtype=np.int64) | ||||||||||
| else: | ||||||||||
| sample_weight = np.asarray(sample_weight) | ||||||||||
|
|
||||||||||
| sample_weight = sample_weight[ind] | ||||||||||
|
|
||||||||||
| cm = cp.sparse.coo_matrix( | ||||||||||
| (sample_weight, (y_true, y_pred)), shape=(n_labels, n_labels), dtype=cp.float32, | ||||||||||
| ).toarray() | ||||||||||
| if isinstance(y_true, cp.ndarray): | ||||||||||
| cm = cupy.sparse.coo_matrix( | ||||||||||
| (sample_weight, (y_true, y_pred)), shape=(n_labels, n_labels), dtype=np.float32, | ||||||||||
| ).toarray() | ||||||||||
|
|
||||||||||
| return cp.nan_to_num(cm) | ||||||||||
| else: | ||||||||||
| cm = scipy.sparse.coo_matrix( | ||||||||||
| (sample_weight, (y_true, y_pred)), shape=(n_labels, n_labels), dtype=np.float32, | ||||||||||
| ).toarray() | ||||||||||
|
|
||||||||||
| return cp.nan_to_num(cm) | ||||||||||
| return np.nan_to_num(cm) | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def confusion_matrix(client, y_true, y_pred, normalize=None, sample_weight=None): | ||||||||||
|
|
||||||||||
| unique_classes = cp.unique(y_true.map_blocks(lambda x: cp.unique(x)).compute()) | ||||||||||
| if isinstance(y_true._meta, cp.ndarray): | ||||||||||
| unique_classes = cp.unique(y_true.map_blocks(lambda x: cp.unique(x)).compute()) | ||||||||||
| else: | ||||||||||
| unique_classes = np.unique(y_true.map_blocks(lambda x: np.unique(x)).compute()) | ||||||||||
|
|
||||||||||
| nclasses = len(unique_classes) | ||||||||||
|
|
||||||||||
| ddh = DistributedDataHandler.create([y_true, y_pred]) | ||||||||||
| gpu_futures = client.sync(_extract_partitions, [y_true, y_pred], client) | ||||||||||
|
|
||||||||||
| cms = client.compute( | ||||||||||
| [ | ||||||||||
| client.submit( | ||||||||||
| local_cm, part, unique_classes, sample_weight, workers=[worker] | ||||||||||
| ) | ||||||||||
| for worker, part in ddh.gpu_futures | ||||||||||
| for worker, part in gpu_futures | ||||||||||
| ], | ||||||||||
| sync=True, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| cm = cp.zeros((nclasses, nclasses)) | ||||||||||
| cm = np.zeros((nclasses, nclasses), like=y_true._meta) | ||||||||||
|
|
||||||||||
| for i in cms: | ||||||||||
| cm += i | ||||||||||
|
|
||||||||||
|
|
@@ -224,20 +305,26 @@ def confusion_matrix(client, y_true, y_pred, normalize=None, sample_weight=None) | |||||||||
| cm = cm / cm.sum(axis=0, keepdims=True) | ||||||||||
| elif normalize == "all": | ||||||||||
| cm = cm / cm.sum() | ||||||||||
| cm = cp.nan_to_num(cm) | ||||||||||
|
|
||||||||||
| if isinstance(y_true._meta, cp.ndarray): | ||||||||||
| cm = cp.nan_to_num(cm) | ||||||||||
| else: | ||||||||||
| cm = np.nan_to_num(cm) | ||||||||||
|
|
||||||||||
| return cm | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def accuracy_score(client, y, y_hat): | ||||||||||
|
|
||||||||||
| ddh = DistributedDataHandler.create([y_hat, y]) | ||||||||||
| gpu_futures = client.sync(_extract_partitions, [y_hat, y], client) | ||||||||||
|
|
||||||||||
| def _count_accurate_predictions(y_hat_y): | ||||||||||
| y_hat, y = y_hat_y | ||||||||||
| y_hat = cp.asarray(y_hat, dtype=y_hat.dtype) | ||||||||||
| y = cp.asarray(y, dtype=y.dtype) | ||||||||||
| return y.shape[0] - cp.count_nonzero(y - y_hat) | ||||||||||
|
|
||||||||||
| if isinstance(y, cp.ndarray): | ||||||||||
| return y.shape[0] - cp.count_nonzero(y - y_hat) | ||||||||||
| else: | ||||||||||
| return y.shape[0] - np.count_nonzero(y - y_hat) | ||||||||||
|
|
||||||||||
| key = uuid1() | ||||||||||
|
|
||||||||||
|
|
@@ -249,12 +336,12 @@ def _count_accurate_predictions(y_hat_y): | |||||||||
| workers=[worker_future[0]], | ||||||||||
| key="%s-%s" % (key, idx), | ||||||||||
| ) | ||||||||||
| for idx, worker_future in enumerate(ddh.gpu_futures) | ||||||||||
| for idx, worker_future in enumerate(gpu_futures) | ||||||||||
| ], | ||||||||||
| sync=True, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| return sum(futures) / y.shape[0] | ||||||||||
| return sum(futures) / y.shape[0] | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def post_etl_processing(client, train_data, test_data): | ||||||||||
|
|
@@ -267,27 +354,33 @@ def post_etl_processing(client, train_data, test_data): | |||||||||
| y_test = build_labels(test_data) | ||||||||||
|
|
||||||||||
| # Perform ML | ||||||||||
| model = DistMNB(client=client, alpha=0.001) | ||||||||||
| model.fit(X_train, y_train) | ||||||||||
| if isinstance(y_train._meta, cp.ndarray): | ||||||||||
| model = DistMNB(client=client, alpha=0.001) | ||||||||||
| model.fit(X_train, y_train) | ||||||||||
| else: | ||||||||||
| model = ParallelPostFit(estimator=MultNB(alpha=0.001)) | ||||||||||
| model.fit(X_train.compute(), y_train.compute()) | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are going to train this on the client process, is this intentional ? We should not do anything on the client process.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep that was a placeholder I meant to replace. Do you have any suggestion for the best way to parallelize SKlearn here? Unfortunately the dask-ml naive bayes model is incompatible with the sparse matrices we use in this query.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should still train on worker processes. We can probably do something like below. est_model = MultNB(alpha=0.001)
X_d = X_train.repartition(npartitions=1).to_delayed()
y_d = y_train .repartition(npartitions=1).to_delayed()
delayed_model = [delayed(est_model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)]
model = delayed_model[0].compute()
model = ParallelPostFit(estimator=model)
del est_model |
||||||||||
|
|
||||||||||
| ### this regression seems to be coming from here | ||||||||||
| y_hat = model.predict(X_test).persist() | ||||||||||
|
|
||||||||||
| # Compute distributed performance metrics | ||||||||||
| acc = accuracy_score(client, y_test, y_hat) | ||||||||||
|
|
||||||||||
| print("Accuracy: " + str(acc)) | ||||||||||
| prec = precision_score(client, y_test, y_hat, average="macro") | ||||||||||
|
|
||||||||||
| prec = precision_score(client, y_test, y_hat, average="macro") | ||||||||||
| print("Precision: " + str(prec)) | ||||||||||
| cmat = confusion_matrix(client, y_test, y_hat) | ||||||||||
|
|
||||||||||
| cmat = confusion_matrix(client, y_test, y_hat) | ||||||||||
| print("Confusion Matrix: " + str(cmat)) | ||||||||||
|
|
||||||||||
| # Place results back in original Dataframe | ||||||||||
| ddh = DistributedDataHandler.create(y_hat) | ||||||||||
| gpu_futures = client.sync(_extract_partitions, y_hat, client) | ||||||||||
|
|
||||||||||
| ser_type = cudf.Series if isinstance(y_test._meta, cp.ndarray) else pd.Series | ||||||||||
|
|
||||||||||
| test_preds = to_dask_cudf( | ||||||||||
| [client.submit(cudf.Series, part) for w, part in ddh.gpu_futures] | ||||||||||
| [client.submit(ser_type, part) for w, part in gpu_futures] | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| test_preds = test_preds.map_partitions(categoricalize) | ||||||||||
|
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is common for both so should be outside the if statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They need to use different hashing vectorizers (
HashingVectorizervsSKHashingVectorizer) and the lambda functions need to be slightly different (s:s.str.lower()vss:s.lower())