Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 144 additions & 51 deletions gpu_bdb/bdb_tools/q28_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,33 @@
import cupy as cp
import cupy

import pandas as pd
import cudf

import dask
import dask_cudf

from cuml.feature_extraction.text import HashingVectorizer
from cuml.dask.naive_bayes import MultinomialNB as DistMNB
from cuml.dask.common import to_dask_cudf
from cuml.dask.common.input_utils import DistributedDataHandler

from sklearn.feature_extraction.text import HashingVectorizer as SKHashingVectorizer
from sklearn.naive_bayes import MultinomialNB as MultNB

from dask_ml.wrappers import ParallelPostFit

import scipy

from distributed import wait

from uuid import uuid1

from bdb_tools.readers import build_reader

from cuml.dask.common.part_utils import _extract_partitions

N_FEATURES = 2 ** 23 # Spark is doing 2^20
ngram_range = (1, 2)
preprocessor = lambda s:s.str.lower()
norm = None
alternate_sign = False

Expand All @@ -45,6 +54,7 @@ def read_tables(config, c=None):
data_format=config["file_format"],
basepath=config["data_dir"],
split_row_groups=True,
backend=config["backend"]
)

columns = [
Expand All @@ -60,18 +70,33 @@ def read_tables(config, c=None):
return pr_df


def gpu_hashing_vectorizer(x):
vec = HashingVectorizer(n_features=N_FEATURES,
alternate_sign=alternate_sign,
ngram_range=ngram_range,
norm=norm,
preprocessor=preprocessor
)
def hashing_vectorizer(x):

if isinstance(x, cudf.Series):
vectorizer = HashingVectorizer
preprocessor = lambda s:s.str.lower()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is common for both so should be outside the if statement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They need to use different hashing vectorizers (HashingVectorizer vs SKHashingVectorizer) and the lambda functions need to be slightly different (s:s.str.lower() vs s:s.lower())

else:
vectorizer = SKHashingVectorizer
preprocessor = lambda s:s.lower()

vec = vectorizer(
n_features=N_FEATURES,
alternate_sign=alternate_sign,
ngram_range=ngram_range,
norm=norm,
preprocessor=preprocessor
)

return vec.fit_transform(x)


def map_labels(ser):
output_ser = cudf.Series(cudf.core.column.full(size=len(ser), fill_value=2, dtype=np.int32))

if isinstance(ser, cudf.Series):
output_ser = cudf.Series(cudf.core.column.full(size=len(ser), fill_value=2, dtype=np.int32))
else:
output_ser = pd.Series(2, index=ser.index, dtype=np.int32)
Comment on lines +96 to +98
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have similar behavior here. We should not use cudf specific API when not needed.

Lets use cupy and numpy arrray here.e

pd.Series(np.full(shape=len(ser), fill_value=2, dtype=np.int32))
cudf.Series(cp.full(shape=len(ser), fill_value=2,  dtype=cp.int32))))


zero_flag = (ser==1) | (ser==2)
output_ser.loc[zero_flag]=0

Expand All @@ -80,13 +105,22 @@ def map_labels(ser):

return output_ser


def build_features(t):

if isinstance(t, dask_cudf.DataFrame):
meta_arr = dask.array.from_array(
cp.sparse.csr_matrix(cp.zeros(1, dtype=np.float32))
)
else:
meta_arr = dask.array.from_array(
scipy.sparse.csr_matrix(np.zeros(1, dtype=np.float32))
)

X = t["pr_review_content"]
X = X.map_partitions(
gpu_hashing_vectorizer,
meta=dask.array.from_array(
cupy.sparse.csr_matrix(cupy.zeros(1, dtype=cp.float32))
),
hashing_vectorizer,
meta=meta_arr,
)

X = X.astype(np.float32).persist()
Expand All @@ -97,22 +131,39 @@ def build_features(t):

def build_labels(reviews_df):
y = reviews_df["pr_review_rating"].map_partitions(map_labels)
y = y.map_partitions(lambda x: cupy.asarray(x, cupy.int32)).persist()

if isinstance(reviews_df, dask_cudf.DataFrame):
y = y.map_partitions(lambda x: cp.asarray(x, np.int32)).persist()
y._meta = cp.array(y._meta)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically the map_partitions call creates a dask array with a cupy chunktype but for some reason the metadata shows a numpy chunktype. This issue has a deeper explanation: rapidsai/cudf#4309

else:
y = y.map_partitions(lambda x: np.asarray(x, np.int32)).persist()

y.compute_chunk_sizes()

return y


def categoricalize(num_sr):
return num_sr.astype("str").str.replace(["0", "1", "2"], ["NEG", "NEUT", "POS"])

return (num_sr
.astype(str)
.str.replace("0", "NEG")
.str.replace("1", "NEUT")
.str.replace("2", "POS")
)


def sum_tp_fp(y_y_pred, nclasses):

y, y_pred = y_y_pred
res = cp.zeros((nclasses, 2), order="F")

res = np.zeros((nclasses, 2), order="F", like=y)

for i in range(nclasses):
pos_pred_ix = cp.where(y_pred == i)[0]
if isinstance(y, cp.ndarray):
pos_pred_ix = cp.where(y_pred == i)[0]
else:
pos_pred_ix = np.where(y_pred == i)[0]

# short circuit
if len(pos_pred_ix) == 0:
Expand All @@ -123,36 +174,40 @@ def sum_tp_fp(y_y_pred, nclasses):
fp_sum = (y_pred[pos_pred_ix] != y[pos_pred_ix]).sum()
res[i][0] = tp_sum
res[i][1] = fp_sum

return res

def precision_score(client, y, y_pred, average="binary"):

nclasses = len(cp.unique(y.map_blocks(lambda x: cp.unique(x)).compute()))
if isinstance(y._meta, cp.ndarray):
nclasses = len(cp.unique(y.map_blocks(lambda x: cp.unique(x)).compute()))
else:
nclasses = len(np.unique(y.map_blocks(lambda x: np.unique(x)).compute()))

if average == "binary" and nclasses > 2:
raise ValueError

if nclasses < 2:
raise ValueError("Single class precision is not yet supported")

ddh = DistributedDataHandler.create([y, y_pred])
gpu_futures = client.sync(_extract_partitions, [y, y_pred], client)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are they called gpu_futures , wont this be for both cpu and gpu ?


precision_scores = client.compute(
[
client.submit(sum_tp_fp, part, nclasses, workers=[worker])
for worker, part in ddh.gpu_futures
for worker, part in gpu_futures
],
sync=True,
)

res = cp.zeros((nclasses, 2), order="F")
res = np.zeros((nclasses, 2), order="F", like=y._meta)

for i in precision_scores:
res += i

if average == "binary" or average == "macro":
prec = np.zeros(nclasses, like=y._meta)

prec = cp.zeros(nclasses)
for i in range(nclasses):
tp_sum, fp_sum = res[i]
prec[i] = (tp_sum / (tp_sum + fp_sum)).item()
Expand All @@ -162,8 +217,12 @@ def precision_score(client, y, y_pred, average="binary"):
else:
return prec.mean().item()
else:
global_tp = cp.sum(res[:, 0])
global_fp = cp.sum(res[:, 1])
if isinstance(y._meta, cp.ndarray):
global_tp = cp.sum(res[:, 0])
global_fp = cp.sum(res[:, 1])
Comment on lines +221 to +222
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
global_tp = cp.sum(res[:, 0])
global_fp = cp.sum(res[:, 1])
global_tp = res[:, 0].sum()
global_fp = res[:, 1].sum()

else:
global_tp = np.sum(res[:, 0])
global_fp = np.sum(res[:, 1])

return global_tp / (global_tp + global_fp).item()

Expand All @@ -178,42 +237,64 @@ def local_cm(y_y_pred, unique_labels, sample_weight):
# Assume labels are monotonically increasing for now.

# intersect y_pred, y_true with labels, eliminate items not in labels
ind = cp.logical_and(y_pred < n_labels, y_true < n_labels)
if isinstance(y_true, cp.ndarray):
ind = cp.logical_and(y_pred < n_labels, y_true < n_labels)
else:
ind = np.logical_and(y_pred < n_labels, y_true < n_labels)

y_pred = y_pred[ind]
y_true = y_true[ind]

if sample_weight is None:
sample_weight = cp.ones(y_true.shape[0], dtype=np.int64)
if isinstance(y_true, cp.ndarray):
if sample_weight is None:
sample_weight = cp.ones(y_true.shape[0], dtype=np.int64)
else:
sample_weight = cp.asarray(sample_weight)
else:
sample_weight = cp.asarray(sample_weight)
if sample_weight is None:
sample_weight = np.ones(y_true.shape[0], dtype=np.int64)
else:
sample_weight = np.asarray(sample_weight)

sample_weight = sample_weight[ind]

cm = cp.sparse.coo_matrix(
(sample_weight, (y_true, y_pred)), shape=(n_labels, n_labels), dtype=cp.float32,
).toarray()
if isinstance(y_true, cp.ndarray):
cm = cupy.sparse.coo_matrix(
(sample_weight, (y_true, y_pred)), shape=(n_labels, n_labels), dtype=np.float32,
).toarray()

return cp.nan_to_num(cm)
else:
cm = scipy.sparse.coo_matrix(
(sample_weight, (y_true, y_pred)), shape=(n_labels, n_labels), dtype=np.float32,
).toarray()

return cp.nan_to_num(cm)
return np.nan_to_num(cm)


def confusion_matrix(client, y_true, y_pred, normalize=None, sample_weight=None):

unique_classes = cp.unique(y_true.map_blocks(lambda x: cp.unique(x)).compute())
if isinstance(y_true._meta, cp.ndarray):
unique_classes = cp.unique(y_true.map_blocks(lambda x: cp.unique(x)).compute())
else:
unique_classes = np.unique(y_true.map_blocks(lambda x: np.unique(x)).compute())

nclasses = len(unique_classes)

ddh = DistributedDataHandler.create([y_true, y_pred])
gpu_futures = client.sync(_extract_partitions, [y_true, y_pred], client)

cms = client.compute(
[
client.submit(
local_cm, part, unique_classes, sample_weight, workers=[worker]
)
for worker, part in ddh.gpu_futures
for worker, part in gpu_futures
],
sync=True,
)

cm = cp.zeros((nclasses, nclasses))
cm = np.zeros((nclasses, nclasses), like=y_true._meta)

for i in cms:
cm += i

Expand All @@ -224,20 +305,26 @@ def confusion_matrix(client, y_true, y_pred, normalize=None, sample_weight=None)
cm = cm / cm.sum(axis=0, keepdims=True)
elif normalize == "all":
cm = cm / cm.sum()
cm = cp.nan_to_num(cm)

if isinstance(y_true._meta, cp.ndarray):
cm = cp.nan_to_num(cm)
else:
cm = np.nan_to_num(cm)

return cm


def accuracy_score(client, y, y_hat):

ddh = DistributedDataHandler.create([y_hat, y])
gpu_futures = client.sync(_extract_partitions, [y_hat, y], client)

def _count_accurate_predictions(y_hat_y):
y_hat, y = y_hat_y
y_hat = cp.asarray(y_hat, dtype=y_hat.dtype)
y = cp.asarray(y, dtype=y.dtype)
return y.shape[0] - cp.count_nonzero(y - y_hat)

if isinstance(y, cp.ndarray):
return y.shape[0] - cp.count_nonzero(y - y_hat)
else:
return y.shape[0] - np.count_nonzero(y - y_hat)

key = uuid1()

Expand All @@ -249,12 +336,12 @@ def _count_accurate_predictions(y_hat_y):
workers=[worker_future[0]],
key="%s-%s" % (key, idx),
)
for idx, worker_future in enumerate(ddh.gpu_futures)
for idx, worker_future in enumerate(gpu_futures)
],
sync=True,
)

return sum(futures) / y.shape[0]
return sum(futures) / y.shape[0]


def post_etl_processing(client, train_data, test_data):
Expand All @@ -267,27 +354,33 @@ def post_etl_processing(client, train_data, test_data):
y_test = build_labels(test_data)

# Perform ML
model = DistMNB(client=client, alpha=0.001)
model.fit(X_train, y_train)
if isinstance(y_train._meta, cp.ndarray):
model = DistMNB(client=client, alpha=0.001)
model.fit(X_train, y_train)
else:
model = ParallelPostFit(estimator=MultNB(alpha=0.001))
model.fit(X_train.compute(), y_train.compute())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are going to train this on the client process, is this intentional ? We should not do anything on the client process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that was a placeholder I meant to replace. Do you have any suggestion for the best way to parallelize SKlearn here? Unfortunately the dask-ml naive bayes model is incompatible with the sparse matrices we use in this query.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should still train on worker processes. We can probably do something like below.

est_model = MultNB(alpha=0.001)
X_d = X_train.repartition(npartitions=1).to_delayed()
y_d =  y_train .repartition(npartitions=1).to_delayed()

delayed_model = [delayed(est_model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)]
model = delayed_model[0].compute()
model = ParallelPostFit(estimator=model)
del est_model


### this regression seems to be coming from here
y_hat = model.predict(X_test).persist()

# Compute distributed performance metrics
acc = accuracy_score(client, y_test, y_hat)

print("Accuracy: " + str(acc))
prec = precision_score(client, y_test, y_hat, average="macro")

prec = precision_score(client, y_test, y_hat, average="macro")
print("Precision: " + str(prec))
cmat = confusion_matrix(client, y_test, y_hat)

cmat = confusion_matrix(client, y_test, y_hat)
print("Confusion Matrix: " + str(cmat))

# Place results back in original Dataframe
ddh = DistributedDataHandler.create(y_hat)
gpu_futures = client.sync(_extract_partitions, y_hat, client)

ser_type = cudf.Series if isinstance(y_test._meta, cp.ndarray) else pd.Series

test_preds = to_dask_cudf(
[client.submit(cudf.Series, part) for w, part in ddh.gpu_futures]
[client.submit(ser_type, part) for w, part in gpu_futures]
)

test_preds = test_preds.map_partitions(categoricalize)
Expand Down