Skip to content

Adding NDSH Q13 impl #706

Open
nirandaperera wants to merge 9 commits intorapidsai:mainfrom
nirandaperera:q-13
Open

Adding NDSH Q13 impl #706
nirandaperera wants to merge 9 commits intorapidsai:mainfrom
nirandaperera:q-13

Conversation

@nirandaperera
Copy link
Contributor

Depends on #663

@copy-pr-bot
Copy link

copy-pr-bot bot commented Dec 3, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@nirandaperera nirandaperera added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels Dec 3, 2025
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
@nirandaperera nirandaperera marked this pull request as ready for review December 4, 2025 21:22
@nirandaperera nirandaperera requested review from a team as code owners December 4, 2025 21:22
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
EXCLUDE_FROM_ALL
)

add_executable(q13 "q13.cpp")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should just need to add q13 to line 41 (set(RAPIDSMPFNDSH_QUERIES)

"Mismatching sequence numbers"
);

// use right as build table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for this? Or just an implementation detail for now?

ctx->comm()->logger().print("Left shuffle join");
co_await ctx->executor()->schedule();
CudaEvent build_event;
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while (true) {
while (!ch_out->is-shutdown()) {

);
build_event.record(build_stream);

// drop key columns from build table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inner_join_shuffle has a keep_keys argument we might want to copy here.


namespace {

std::string get_table_path(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is in rapidsmpf::ndsh::detail::get_table_path now

) {
rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out};
auto mr = ctx->br()->device_mr();
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while (true) {
while (!ch_out->is_shutdown()) {

auto table = chunk.table_view();
auto o_comment = table.column(0);
// Match rows that contain "special.*requests" and negate to get rows that don't
auto regex_program = cudf::strings::regex_program::create("special.*requests");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done outside the while loop?

co_await ch_out->drain(ctx->executor());
}

rapidsmpf::streaming::Node chunkwise_groupby_agg(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a rapidsmpf::ndsh::chunkwise_group_by now. The signature is a bit different, but I think it'd be nice to try to reuse that.

co_await ch_out->drain(ctx->executor());
}

rapidsmpf::streaming::Node groupby_and_sort(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have a chunkwise_sort_by, though I haven't looked at all whether that could be used here.

co_await ch_out->drain(ctx->executor());
}

rapidsmpf::streaming::Node write_parquet(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully rapidsmpf::ndsh::write_parquet could be used here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

improvement Improves an existing functionality non-breaking Introduces a non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants