Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
| EXCLUDE_FROM_ALL | ||
| ) | ||
|
|
||
| add_executable(q13 "q13.cpp") |
There was a problem hiding this comment.
You should just need to add q13 to line 41 (set(RAPIDSMPFNDSH_QUERIES)
| "Mismatching sequence numbers" | ||
| ); | ||
|
|
||
| // use right as build table |
There was a problem hiding this comment.
Is there a reason for this? Or just an implementation detail for now?
| ctx->comm()->logger().print("Left shuffle join"); | ||
| co_await ctx->executor()->schedule(); | ||
| CudaEvent build_event; | ||
| while (true) { |
There was a problem hiding this comment.
| while (true) { | |
| while (!ch_out->is-shutdown()) { |
| ); | ||
| build_event.record(build_stream); | ||
|
|
||
| // drop key columns from build table. |
There was a problem hiding this comment.
inner_join_shuffle has a keep_keys argument we might want to copy here.
|
|
||
| namespace { | ||
|
|
||
| std::string get_table_path( |
There was a problem hiding this comment.
this is in rapidsmpf::ndsh::detail::get_table_path now
| ) { | ||
| rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; | ||
| auto mr = ctx->br()->device_mr(); | ||
| while (true) { |
There was a problem hiding this comment.
| while (true) { | |
| while (!ch_out->is_shutdown()) { |
| auto table = chunk.table_view(); | ||
| auto o_comment = table.column(0); | ||
| // Match rows that contain "special.*requests" and negate to get rows that don't | ||
| auto regex_program = cudf::strings::regex_program::create("special.*requests"); |
There was a problem hiding this comment.
Can this be done outside the while loop?
| co_await ch_out->drain(ctx->executor()); | ||
| } | ||
|
|
||
| rapidsmpf::streaming::Node chunkwise_groupby_agg( |
There was a problem hiding this comment.
We have a rapidsmpf::ndsh::chunkwise_group_by now. The signature is a bit different, but I think it'd be nice to try to reuse that.
| co_await ch_out->drain(ctx->executor()); | ||
| } | ||
|
|
||
| rapidsmpf::streaming::Node groupby_and_sort( |
There was a problem hiding this comment.
We also have a chunkwise_sort_by, though I haven't looked at all whether that could be used here.
| co_await ch_out->drain(ctx->executor()); | ||
| } | ||
|
|
||
| rapidsmpf::streaming::Node write_parquet( |
There was a problem hiding this comment.
Hopefully rapidsmpf::ndsh::write_parquet could be used here.
Depends on #663