Conversation
VibhuJawa
left a comment
There was a problem hiding this comment.
Thanks for working on this , have requested changes
|
|
||
| if isinstance(x, cudf.Series): | ||
| vectorizer = HashingVectorizer | ||
| preprocessor = lambda s:s.str.lower() |
There was a problem hiding this comment.
This is common for both so should be outside the if statement
There was a problem hiding this comment.
They need to use different hashing vectorizers (HashingVectorizer vs SKHashingVectorizer) and the lambda functions need to be slightly different (s:s.str.lower() vs s:s.lower())
| output_ser = cudf.Series(cudf.core.column.full(size=len(ser), fill_value=2, dtype=np.int32)) | ||
| else: | ||
| output_ser = pd.Series(2, index=ser.index, dtype=np.int32) |
There was a problem hiding this comment.
We should have similar behavior here. We should not use cudf specific API when not needed.
Lets use cupy and numpy arrray here.e
pd.Series(np.full(shape=len(ser), fill_value=2, dtype=np.int32))cudf.Series(cp.full(shape=len(ser), fill_value=2, dtype=cp.int32))))|
|
||
| if isinstance(reviews_df, dask_cudf.DataFrame): | ||
| y = y.map_partitions(lambda x: cp.asarray(x, np.int32)).persist() | ||
| y._meta = cp.array(y._meta) |
There was a problem hiding this comment.
Basically the map_partitions call creates a dask array with a cupy chunktype but for some reason the metadata shows a numpy chunktype. This issue has a deeper explanation: rapidsai/cudf#4309
| raise ValueError("Single class precision is not yet supported") | ||
|
|
||
| ddh = DistributedDataHandler.create([y, y_pred]) | ||
| gpu_futures = client.sync(_extract_partitions, [y, y_pred], client) |
There was a problem hiding this comment.
Why are they called gpu_futures , wont this be for both cpu and gpu ?
| global_tp = cp.sum(res[:, 0]) | ||
| global_fp = cp.sum(res[:, 1]) |
There was a problem hiding this comment.
| global_tp = cp.sum(res[:, 0]) | |
| global_fp = cp.sum(res[:, 1]) | |
| global_tp = res[:, 0].sum() | |
| global_fp = res[:, 1].sum() |
| model.fit(X_train, y_train) | ||
| else: | ||
| model = ParallelPostFit(estimator=MultNB(alpha=0.001)) | ||
| model.fit(X_train.compute(), y_train.compute()) |
There was a problem hiding this comment.
You are going to train this on the client process, is this intentional ? We should not do anything on the client process.
There was a problem hiding this comment.
Yep that was a placeholder I meant to replace. Do you have any suggestion for the best way to parallelize SKlearn here? Unfortunately the dask-ml naive bayes model is incompatible with the sparse matrices we use in this query.
There was a problem hiding this comment.
We should still train on worker processes. We can probably do something like below.
est_model = MultNB(alpha=0.001)
X_d = X_train.repartition(npartitions=1).to_delayed()
y_d = y_train .repartition(npartitions=1).to_delayed()
delayed_model = [delayed(est_model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)]
model = delayed_model[0].compute()
model = ParallelPostFit(estimator=model)
del est_model
No description provided.