diff --git a/velox/experimental/cudf/exec/CMakeLists.txt b/velox/experimental/cudf/exec/CMakeLists.txt index fdf57f066298..3fcb3919acbb 100644 --- a/velox/experimental/cudf/exec/CMakeLists.txt +++ b/velox/experimental/cudf/exec/CMakeLists.txt @@ -18,6 +18,7 @@ add_library( CudfConversion.cpp CudfFilterProject.cpp CudfHashAggregation.cpp + DecimalAggregationKernels.cu CudfHashJoin.cpp CudfLimit.cpp CudfLocalPartition.cpp @@ -32,6 +33,8 @@ add_library( VeloxCudfInterop.cpp ) +set_target_properties(velox_cudf_exec PROPERTIES CUDA_STANDARD 20 CUDA_STANDARD_REQUIRED ON) + target_link_libraries( velox_cudf_exec PUBLIC cudf::cudf diff --git a/velox/experimental/cudf/exec/CudfHashAggregation.cpp b/velox/experimental/cudf/exec/CudfHashAggregation.cpp index c5847fe3f742..18e0e1e19d5c 100644 --- a/velox/experimental/cudf/exec/CudfHashAggregation.cpp +++ b/velox/experimental/cudf/exec/CudfHashAggregation.cpp @@ -19,6 +19,7 @@ #include "velox/experimental/cudf/exec/CudfFilterProject.h" #include "velox/experimental/cudf/exec/CudfHashAggregation.h" #include "velox/experimental/cudf/exec/GpuResources.h" +#include "velox/experimental/cudf/exec/DecimalAggregationKernels.h" #include "velox/experimental/cudf/exec/Utilities.h" #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" @@ -39,6 +40,7 @@ #include #include #include +#include #include #include @@ -67,7 +69,8 @@ using facebook::velox::cudf_velox::get_temp_mr; \ void addGroupbyRequest( \ cudf::table_view const& tbl, \ - std::vector& requests) override { \ + std::vector& requests, \ + rmm::cuda_stream_view stream) override { \ VELOX_CHECK( \ constant == nullptr, \ #Name "Aggregator does not yet support constant input"); \ @@ -82,10 +85,9 @@ using facebook::velox::cudf_velox::get_temp_mr; std::vector& results, \ rmm::cuda_stream_view stream) override { \ auto col = std::move(results[output_idx].results[0]); \ - const auto cudfType = \ - cudf::data_type(cudf_velox::veloxToCudfTypeId(resultType)); \ - if (col->type() != cudfType) { \ - col = cudf::cast(*col, cudfType, stream, get_output_mr()); \ + auto const cudfResType = cudf_velox::veloxToCudfDataType(resultType); \ + if (col->type() != cudfResType) { \ + col = cudf::cast(*col, cudfResType, stream, get_output_mr()); \ } \ return col; \ } \ @@ -96,16 +98,12 @@ using facebook::velox::cudf_velox::get_temp_mr; rmm::cuda_stream_view stream) override { \ auto const aggRequest = \ cudf::make_##name##_aggregation(); \ - auto const cudfOutputType = \ - cudf::data_type(cudf_velox::veloxToCudfTypeId(outputType)); \ + auto const cudfOutType = cudf_velox::veloxToCudfDataType(outputType); \ auto const resultScalar = cudf::reduce( \ - input.column(inputIndex), \ - *aggRequest, \ - cudfOutputType, \ - stream, \ + input.column(inputIndex), *aggRequest, cudfOutType, stream, \ get_temp_mr()); \ - return cudf::make_column_from_scalar( \ - *resultScalar, 1, stream, get_output_mr()); \ + return cudf::make_column_from_scalar(*resultScalar, 1, stream, \ + get_output_mr()); \ } \ \ private: \ @@ -116,6 +114,257 @@ DEFINE_SIMPLE_AGGREGATOR(Sum, sum, SUM) DEFINE_SIMPLE_AGGREGATOR(Min, min, MIN) DEFINE_SIMPLE_AGGREGATOR(Max, max, MAX) +struct DecimalSumOrAvgAggregator : cudf_velox::CudfHashAggregation::Aggregator { + DecimalSumOrAvgAggregator( + core::AggregationNode::Step step, + uint32_t inputIndex, + VectorPtr constant, + bool isGlobal, + const TypePtr& resultType, + const bool isAvg) + : Aggregator( + step, + cudf::aggregation::SUM, + inputIndex, + constant, + isGlobal, + resultType), + isAvg_(isAvg) {} + + void addGroupbyRequest( + cudf::table_view const& tbl, + std::vector& requests, + rmm::cuda_stream_view stream) override { + if (step == core::AggregationNode::Step::kIntermediate && + tbl.column(inputIndex).type().id() == cudf::type_id::STRING) { + auto scale = resultType->isDecimal() + ? getDecimalPrecisionScale(*resultType).second + : 0; + auto decoded = cudf_velox::deserializeDecimalSumStateWithCount( + tbl.column(inputIndex), scale, stream, cudf_velox::get_output_mr()); + decodedSum_ = std::move(decoded.sum); + decodedCount_ = std::move(decoded.count); + + sumIdx_ = requests.size(); + auto& sumRequest = requests.emplace_back(); + sumRequest.values = decodedSum_->view(); + sumRequest.aggregations.push_back( + cudf::make_sum_aggregation()); + + countIdx_ = requests.size(); + auto& countRequest = requests.emplace_back(); + countRequest.values = decodedCount_->view(); + countRequest.aggregations.push_back( + cudf::make_sum_aggregation()); + return; + } + + if (step == core::AggregationNode::Step::kFinal && + tbl.column(inputIndex).type().id() == cudf::type_id::STRING) { + auto scale = getDecimalPrecisionScale(*resultType).second; + if (isAvg_) { + auto decoded = cudf_velox::deserializeDecimalSumStateWithCount( + tbl.column(inputIndex), scale, stream, cudf_velox::get_output_mr()); + decodedSum_ = std::move(decoded.sum); + decodedCount_ = std::move(decoded.count); + + sumIdx_ = requests.size(); + auto& sumRequest = requests.emplace_back(); + sumRequest.values = decodedSum_->view(); + sumRequest.aggregations.push_back( + cudf::make_sum_aggregation()); + + countIdx_ = requests.size(); + auto& countRequest = requests.emplace_back(); + countRequest.values = decodedCount_->view(); + countRequest.aggregations.push_back( + cudf::make_sum_aggregation()); + return; + } else { + auto& request = requests.emplace_back(); + sumIdx_ = requests.size() - 1; + decodedSum_ = cudf_velox::deserializeDecimalSumState( + tbl.column(inputIndex), scale, stream, cudf_velox::get_output_mr()); + request.values = decodedSum_->view(); + request.aggregations.push_back( + cudf::make_sum_aggregation()); + return; + } + } else { + auto& request = requests.emplace_back(); + sumIdx_ = requests.size() - 1; + request.values = tbl.column(inputIndex); + request.aggregations.push_back( + cudf::make_sum_aggregation()); + if (step == core::AggregationNode::Step::kPartial || + (step == core::AggregationNode::Step::kSingle && isAvg_)) { + request.aggregations.push_back( + cudf::make_count_aggregation( + cudf::null_policy::EXCLUDE)); + } + return; + } + } + + std::unique_ptr makeOutputColumn( + std::vector& results, + rmm::cuda_stream_view stream) override { + auto col = std::move(results[sumIdx_].results[0]); + if (isAvg_ && step == core::AggregationNode::Step::kSingle) { + auto count = std::move(results[sumIdx_].results[1]); + return computeAvgColumn(std::move(col), std::move(count), stream); + } + if (step == core::AggregationNode::Step::kPartial) { + auto count = std::move(results[sumIdx_].results[1]); + if (count->type().id() != cudf::type_id::INT64) { + count = + cudf::cast(*count, cudf::data_type{cudf::type_id::INT64}, stream, cudf_velox::get_output_mr()); + } + return cudf_velox::serializeDecimalSumState( + col->view(), count->view(), stream, cudf_velox::get_output_mr()); + } + if (step == core::AggregationNode::Step::kIntermediate) { + auto count = std::move(results[countIdx_].results[0]); + if (count->type().id() != cudf::type_id::INT64) { + count = + cudf::cast(*count, cudf::data_type{cudf::type_id::INT64}, stream, cudf_velox::get_output_mr()); + } + return cudf_velox::serializeDecimalSumState( + col->view(), count->view(), stream, cudf_velox::get_output_mr()); + } + if (isAvg_ && step == core::AggregationNode::Step::kFinal) { + auto count = std::move(results[countIdx_].results[0]); + return computeAvgColumn(std::move(col), std::move(count), stream); + } + auto const cudfResType = cudf_velox::veloxToCudfDataType(resultType); + if (col->type() != cudfResType) { + col = cudf::cast(*col, cudfResType, stream, cudf_velox::get_output_mr()); + } + return col; + } + + std::unique_ptr doReduce( + cudf::table_view const& input, + TypePtr const& outputType, + rmm::cuda_stream_view stream) override { + if (step == core::AggregationNode::Step::kSingle && isAvg_) { + auto const sumAgg = + cudf::make_sum_aggregation(); + cudf::column_view inputCol = input.column(inputIndex); + auto sumScalar = cudf::reduce(inputCol, *sumAgg, inputCol.type(), stream, cudf_velox::get_temp_mr()); + auto countAgg = cudf::make_count_aggregation( + cudf::null_policy::EXCLUDE); + auto countScalar = cudf::reduce( + inputCol, *countAgg, cudf::data_type{cudf::type_id::INT64}, stream, cudf_velox::get_temp_mr()); + auto sumCol = cudf::make_column_from_scalar(*sumScalar, 1, stream, cudf_velox::get_output_mr()); + auto countCol = cudf::make_column_from_scalar(*countScalar, 1, stream, cudf_velox::get_output_mr()); + return computeAvgColumn(std::move(sumCol), std::move(countCol), stream); + } + auto const aggRequest = + cudf::make_sum_aggregation(); + cudf::column_view inputCol = input.column(inputIndex); + if (step == core::AggregationNode::Step::kPartial) { + auto sumScalar = + cudf::reduce(inputCol, *aggRequest, inputCol.type(), stream, cudf_velox::get_temp_mr()); + auto countAgg = cudf::make_count_aggregation( + cudf::null_policy::EXCLUDE); + auto countScalar = cudf::reduce( + inputCol, *countAgg, cudf::data_type{cudf::type_id::INT64}, stream, cudf_velox::get_temp_mr()); + auto sumCol = cudf::make_column_from_scalar(*sumScalar, 1, stream, cudf_velox::get_output_mr()); + auto countCol = cudf::make_column_from_scalar(*countScalar, 1, stream, cudf_velox::get_output_mr()); + return cudf_velox::serializeDecimalSumState( + sumCol->view(), countCol->view(), stream, cudf_velox::get_output_mr()); + } + if (step == core::AggregationNode::Step::kIntermediate && + inputCol.type().id() == cudf::type_id::STRING) { + auto scale = outputType->isDecimal() + ? getDecimalPrecisionScale(*outputType).second + : 0; + auto decoded = cudf_velox::deserializeDecimalSumStateWithCount( + inputCol, scale, stream, cudf_velox::get_output_mr()); + auto sumScalar = cudf::reduce( + decoded.sum->view(), *aggRequest, decoded.sum->view().type(), stream, cudf_velox::get_temp_mr()); + auto countScalar = cudf::reduce( + decoded.count->view(), + *aggRequest, + cudf::data_type{cudf::type_id::INT64}, + stream, cudf_velox::get_temp_mr()); + auto sumCol = cudf::make_column_from_scalar(*sumScalar, 1, stream, cudf_velox::get_output_mr()); + auto countCol = cudf::make_column_from_scalar(*countScalar, 1, stream, cudf_velox::get_output_mr()); + return cudf_velox::serializeDecimalSumState( + sumCol->view(), countCol->view(), stream, cudf_velox::get_output_mr()); + } + if (step == core::AggregationNode::Step::kFinal && + inputCol.type().id() == cudf::type_id::STRING) { + auto scale = getDecimalPrecisionScale(*outputType).second; + if (isAvg_) { + // AVG + // deserialize the results (sum and count) + auto sumAndCount = cudf_velox::deserializeDecimalSumStateWithCount( + inputCol, scale, stream, cudf_velox::get_output_mr()); + // reduce the two results to get final sum and count scalars + auto sumScalar = cudf::reduce( + sumAndCount.sum->view(), + *aggRequest, + sumAndCount.sum->view().type(), + stream, cudf_velox::get_temp_mr()); + auto countScalar = cudf::reduce( + sumAndCount.count->view(), + *aggRequest, + cudf::data_type{cudf::type_id::INT64}, + stream, cudf_velox::get_temp_mr()); + // convert to columns in order to perform division, as we cannot divide + // scalars directly + auto sumCol = cudf::make_column_from_scalar(*sumScalar, 1, stream, cudf_velox::get_output_mr()); + auto countCol = cudf::make_column_from_scalar(*countScalar, 1, stream, cudf_velox::get_output_mr()); + return computeAvgColumn(std::move(sumCol), std::move(countCol), stream); + } else { + // SUM + decodedSum_ = + cudf_velox::deserializeDecimalSumState(inputCol, scale, stream, cudf_velox::get_output_mr()); + inputCol = decodedSum_->view(); + // @TODO does this need to drop through to the code below + // or can we just do that stuff here, and not need decodedSum_ or + // decodedCount_ why we do have those anyway if they're only set in + // addGroupbyRequest() and either overwritten or not even used here? + // what does the final cudf::reduce() below actually do? + } + } + auto const cudfOutType = cudf_velox::veloxToCudfDataType(outputType); + std::unique_ptr castedInput; + if (outputType->isDecimal() && inputCol.type() != cudfOutType) { + castedInput = cudf::cast(inputCol, cudfOutType, stream, cudf_velox::get_output_mr()); + inputCol = castedInput->view(); + } + auto const resultScalar = + cudf::reduce(inputCol, *aggRequest, cudfOutType, stream, cudf_velox::get_temp_mr()); + return cudf::make_column_from_scalar(*resultScalar, 1, stream, cudf_velox::get_output_mr()); + } + + private: + std::unique_ptr computeAvgColumn( + std::unique_ptr sum, + std::unique_ptr count, + rmm::cuda_stream_view stream) const { + if (count->type().id() != cudf::type_id::INT64) { + count = cudf::cast(*count, cudf::data_type{cudf::type_id::INT64}, stream, cudf_velox::get_output_mr()); + } + auto avgCol = + cudf_velox::computeDecimalAverage(sum->view(), count->view(), stream, cudf_velox::get_output_mr()); + auto const cudfOutType = cudf_velox::veloxToCudfDataType(resultType); + if (avgCol->type() != cudfOutType) { + avgCol = cudf::cast(avgCol->view(), cudfOutType, stream, cudf_velox::get_output_mr()); + } + return avgCol; + } + + uint32_t sumIdx_{0}; + uint32_t countIdx_{0}; + const bool isAvg_{false}; + std::unique_ptr decodedSum_; + std::unique_ptr decodedCount_; +}; + struct CountAggregator : cudf_velox::CudfHashAggregation::Aggregator { CountAggregator( core::AggregationNode::Step step, @@ -133,7 +382,8 @@ struct CountAggregator : cudf_velox::CudfHashAggregation::Aggregator { void addGroupbyRequest( cudf::table_view const& tbl, - std::vector& requests) override { + std::vector& requests, + rmm::cuda_stream_view stream) override { auto& request = requests.emplace_back(); outputIdx_ = requests.size() - 1; request.values = tbl.column(constant == nullptr ? inputIndex : 0); @@ -187,8 +437,7 @@ struct CountAggregator : cudf_velox::CudfHashAggregation::Aggregator { rmm::cuda_stream_view stream) override { // cudf produces int32 for count(0) but velox expects int64 auto col = std::move(results[outputIdx_].results[0]); - const auto cudfOutputType = - cudf::data_type(cudf_velox::veloxToCudfTypeId(resultType)); + const auto cudfOutputType = cudf_velox::veloxToCudfDataType(resultType); if (col->type() != cudfOutputType) { col = cudf::cast(*col, cudfOutputType, stream, get_output_mr()); } @@ -216,7 +465,8 @@ struct MeanAggregator : cudf_velox::CudfHashAggregation::Aggregator { void addGroupbyRequest( cudf::table_view const& tbl, - std::vector& requests) override { + std::vector& requests, + rmm::cuda_stream_view stream) override { switch (step) { case core::AggregationNode::Step::kSingle: { auto& request = requests.emplace_back(); @@ -274,13 +524,12 @@ struct MeanAggregator : cudf_velox::CudfHashAggregation::Aggregator { auto count = std::move(results[sumIdx_].results[1]); auto const size = sum->size(); - auto const cudfSumType = cudf::data_type( - cudf_velox::veloxToCudfTypeId(outputType->childAt(0))); - auto const cudfCountType = cudf::data_type( - cudf_velox::veloxToCudfTypeId(outputType->childAt(1))); - if (sum->type() != cudf::data_type(cudfSumType)) { - sum = cudf::cast( - *sum, cudf::data_type(cudfSumType), stream, get_output_mr()); + auto const cudfSumType = + cudf_velox::veloxToCudfDataType(outputType->childAt(0)); + auto const cudfCountType = + cudf_velox::veloxToCudfDataType(outputType->childAt(1)); + if (sum->type() != cudfSumType) { + sum = cudf::cast(*sum, cudfSumType, stream, get_output_mr()); } if (count->type() != cudf::data_type(cudfCountType)) { count = cudf::cast( @@ -312,13 +561,12 @@ struct MeanAggregator : cudf_velox::CudfHashAggregation::Aggregator { auto count = std::move(results[countIdx_].results[0]); auto size = sum->size(); - auto const cudfSumType = cudf::data_type( - cudf_velox::veloxToCudfTypeId(outputType->childAt(0))); - auto const cudfCountType = cudf::data_type( - cudf_velox::veloxToCudfTypeId(outputType->childAt(1))); - if (sum->type() != cudf::data_type(cudfSumType)) { - sum = cudf::cast( - *sum, cudf::data_type(cudfSumType), stream, get_output_mr()); + auto const cudfSumType = + cudf_velox::veloxToCudfDataType(outputType->childAt(0)); + auto const cudfCountType = + cudf_velox::veloxToCudfDataType(outputType->childAt(1)); + if (sum->type() != cudfSumType) { + sum = cudf::cast(*sum, cudfSumType, stream, get_output_mr()); } if (count->type() != cudf::data_type(cudfCountType)) { count = cudf::cast( @@ -344,7 +592,7 @@ struct MeanAggregator : cudf_velox::CudfHashAggregation::Aggregator { *sum, *count, cudf::binary_operator::DIV, - cudf::data_type(cudf_velox::veloxToCudfTypeId(resultType)), + cudf_velox::veloxToCudfDataType(resultType), stream, get_output_mr()); return avg; @@ -362,8 +610,7 @@ struct MeanAggregator : cudf_velox::CudfHashAggregation::Aggregator { case core::AggregationNode::Step::kSingle: { auto const aggRequest = cudf::make_mean_aggregation(); - auto const cudfOutputType = - cudf::data_type(cudf_velox::veloxToCudfTypeId(outputType)); + auto const cudfOutputType = cudf_velox::veloxToCudfDataType(outputType); auto const resultScalar = cudf::reduce( input.column(inputIndex), *aggRequest, @@ -376,12 +623,7 @@ struct MeanAggregator : cudf_velox::CudfHashAggregation::Aggregator { case core::AggregationNode::Step::kPartial: { VELOX_CHECK(outputType->isRow()); auto const& rowType = outputType->asRow(); - auto const sumType = rowType.childAt(0); - auto const countType = rowType.childAt(1); - auto const cudfSumType = - cudf::data_type(cudf_velox::veloxToCudfTypeId(sumType)); - auto const cudfCountType = - cudf::data_type(cudf_velox::veloxToCudfTypeId(countType)); + auto const cudfSumType = cudf_velox::veloxToCudfDataType(rowType.childAt(0)); // sum auto const aggRequest = @@ -440,8 +682,7 @@ struct MeanAggregator : cudf_velox::CudfHashAggregation::Aggregator { countCol, *countAggRequest, countCol.type(), stream, get_temp_mr()); // divide the sums by the counts - auto const cudfOutputType = - cudf::data_type(cudf_velox::veloxToCudfTypeId(outputType)); + auto const cudfOutputType = cudf_velox::veloxToCudfDataType(outputType); return cudf::binary_operation( *sumResultCol, *countResultScalar, @@ -487,7 +728,8 @@ struct ApproxDistinctAggregator : cudf_velox::CudfHashAggregation::Aggregator { void addGroupbyRequest( cudf::table_view const& tbl, - std::vector& requests) override { + std::vector& requests, + rmm::cuda_stream_view stream) override { VELOX_UNSUPPORTED( "approx_distinct is not supported as a group aggregation"); } @@ -682,9 +924,16 @@ std::unique_ptr createAggregator( uint32_t inputIndex, VectorPtr constant, bool isGlobal, - const TypePtr& resultType) { + const TypePtr& resultType, + const std::vector& rawInputTypes = {}) { auto prefix = cudf_velox::CudfConfig::getInstance().functionNamePrefix; if (kind.rfind(prefix + "sum", 0) == 0) { + bool isDecimalInput = + rawInputTypes.size() == 1 && rawInputTypes[0]->isDecimal(); + if (isDecimalInput) { + return std::make_unique( + step, inputIndex, constant, isGlobal, resultType, false); + } return std::make_unique( step, inputIndex, constant, isGlobal, resultType); } else if (kind.rfind(prefix + "count", 0) == 0) { @@ -697,6 +946,12 @@ std::unique_ptr createAggregator( return std::make_unique( step, inputIndex, constant, isGlobal, resultType); } else if (kind.rfind(prefix + "avg", 0) == 0) { + bool isDecimalInput = + rawInputTypes.size() == 1 && rawInputTypes[0]->isDecimal(); + if (isDecimalInput) { + return std::make_unique( + step, inputIndex, constant, isGlobal, resultType, true); + } return std::make_unique( step, inputIndex, constant, isGlobal, resultType); } else if (kind.rfind(prefix + "approx_distinct", 0) == 0) { @@ -811,7 +1066,13 @@ auto toAggregators( : outputType->childAt(numKeys + i); aggregators.push_back(createAggregator( - companionStep, kind, inputIndex, constant, isGlobal, resultType)); + companionStep, + kind, + inputIndex, + constant, + isGlobal, + resultType, + aggregate.rawInputTypes)); } return aggregators; } @@ -838,7 +1099,13 @@ auto toIntermediateAggregators( const auto resultType = exec::resolveIntermediateType(originalName, aggregate.rawInputTypes); aggregators.push_back(createAggregator( - step, kind, inputIndex, constant, isGlobal, resultType)); + step, + kind, + inputIndex, + constant, + isGlobal, + resultType, + aggregate.rawInputTypes)); } else { // Final step aggregator will not use the intermediate aggregator. aggregators.push_back(nullptr); @@ -1069,7 +1336,7 @@ CudfVectorPtr CudfHashAggregation::doGroupByAggregation( std::vector requests; for (auto& aggregator : aggregators) { - aggregator->addGroupbyRequest(tableView, requests); + aggregator->addGroupbyRequest(tableView, requests, stream); } auto [groupKeys, results] = @@ -1294,6 +1561,38 @@ bool registerStepAwareBuiltinAggregationFunctions(const std::string& prefix) { .argumentType("double") .build()}; + // Decimal sum signatures. + auto decimalSumSingle = std::vector{ + FunctionSignatureBuilder() + .integerVariable("a_precision") + .integerVariable("a_scale") + .returnType("decimal(38, a_scale)") + .argumentType("decimal(a_precision, a_scale)") + .build()}; + auto decimalSumPartial = std::vector{ + FunctionSignatureBuilder() + .integerVariable("a_precision") + .integerVariable("a_scale") + .returnType("varbinary") + .argumentType("decimal(a_precision, a_scale)") + .build()}; + auto decimalSumFinal = std::vector{ + FunctionSignatureBuilder() + .integerVariable("a_scale") + .returnType("decimal(38, a_scale)") + .argumentType("varbinary") + .build()}; + auto decimalSumIntermediate = + std::vector{FunctionSignatureBuilder() + .returnType("varbinary") + .argumentType("varbinary") + .build()}; + + sumSingleSignatures.insert( + sumSingleSignatures.end(), + decimalSumSingle.begin(), + decimalSumSingle.end()); + registerAggregationFunctionForStep( prefix + "sum", core::AggregationNode::Step::kSingle, @@ -1324,6 +1623,12 @@ bool registerStepAwareBuiltinAggregationFunctions(const std::string& prefix) { .returnType("double") .argumentType("double") .build()}; + + sumPartialSignatures.insert( + sumPartialSignatures.end(), + decimalSumPartial.begin(), + decimalSumPartial.end()); + registerAggregationFunctionForStep( prefix + "sum", core::AggregationNode::Step::kPartial, @@ -1338,14 +1643,24 @@ bool registerStepAwareBuiltinAggregationFunctions(const std::string& prefix) { .returnType("double") .argumentType("double") .build()}; + + auto sumFinalSignatures = sumFinalIntermediateSignatures; + sumFinalSignatures.insert( + sumFinalSignatures.end(), decimalSumFinal.begin(), decimalSumFinal.end()); + registerAggregationFunctionForStep( - prefix + "sum", - core::AggregationNode::Step::kFinal, - sumFinalIntermediateSignatures); + prefix + "sum", core::AggregationNode::Step::kFinal, sumFinalSignatures); + + auto sumIntermediateSignatures = sumFinalIntermediateSignatures; + sumIntermediateSignatures.insert( + sumIntermediateSignatures.end(), + decimalSumIntermediate.begin(), + decimalSumIntermediate.end()); + registerAggregationFunctionForStep( prefix + "sum", core::AggregationNode::Step::kIntermediate, - sumFinalIntermediateSignatures); + sumIntermediateSignatures); // Register count function (split by aggregation step) auto countSinglePartialSignatures = std::vector{ @@ -1431,6 +1746,12 @@ bool registerStepAwareBuiltinAggregationFunctions(const std::string& prefix) { FunctionSignatureBuilder() .returnType("double") .argumentType("double") + .build(), + FunctionSignatureBuilder() + .integerVariable("p") + .integerVariable("s") + .returnType("decimal(p,s)") + .argumentType("decimal(p,s)") .build()}; registerAggregationFunctionForStep( @@ -1480,6 +1801,40 @@ bool registerStepAwareBuiltinAggregationFunctions(const std::string& prefix) { .returnType("double") .argumentType("double") .build()}; + + // Decimal avg signatures. + auto decimalAvgSingle = std::vector{ + FunctionSignatureBuilder() + .integerVariable("a_precision") + .integerVariable("a_scale") + .returnType("decimal(a_precision, a_scale)") + .argumentType("decimal(a_precision, a_scale)") + .build()}; + auto decimalAvgPartial = std::vector{ + FunctionSignatureBuilder() + .integerVariable("a_precision") + .integerVariable("a_scale") + .returnType("varbinary") + .argumentType("decimal(a_precision, a_scale)") + .build()}; + auto decimalAvgFinal = std::vector{ + FunctionSignatureBuilder() + .integerVariable("a_precision") + .integerVariable("a_scale") + .returnType("decimal(a_precision, a_scale)") + .argumentType("varbinary") + .build()}; + auto decimalAvgIntermediate = + std::vector{FunctionSignatureBuilder() + .returnType("varbinary") + .argumentType("varbinary") + .build()}; + + avgSingleSignatures.insert( + avgSingleSignatures.end(), + decimalAvgSingle.begin(), + decimalAvgSingle.end()); + registerAggregationFunctionForStep( prefix + "avg", core::AggregationNode::Step::kSingle, @@ -1507,17 +1862,28 @@ bool registerStepAwareBuiltinAggregationFunctions(const std::string& prefix) { .returnType("row(double,bigint)") .argumentType("double") .build()}; + + avgPartialSignatures.insert( + avgPartialSignatures.end(), + decimalAvgPartial.begin(), + decimalAvgPartial.end()); + registerAggregationFunctionForStep( prefix + "avg", core::AggregationNode::Step::kPartial, avgPartialSignatures); // Final step: avg(row(double, bigint)) -> double - auto avgFinalSignatures = std::vector{ + auto avgFinalIntermediateSignatures = std::vector{ FunctionSignatureBuilder() .returnType("double") .argumentType("row(double,bigint)") .build()}; + + auto avgFinalSignatures = avgFinalIntermediateSignatures; + avgFinalSignatures.insert( + avgFinalSignatures.end(), decimalAvgFinal.begin(), decimalAvgFinal.end()); + registerAggregationFunctionForStep( prefix + "avg", core::AggregationNode::Step::kFinal, avgFinalSignatures); @@ -1528,6 +1894,16 @@ bool registerStepAwareBuiltinAggregationFunctions(const std::string& prefix) { .returnType("row(double,bigint)") .argumentType("row(double,bigint)") .build()}; + + // WHY DOES SUM NOT HAVE THE EQUIVALENT OF THE ABOVE? + // THE ABOVE THEN CLASHES WITH BELOW + // @mattgara HELP! :) + // auto avgIntermediateSignatures = avgFinalIntermediateSignatures; + avgIntermediateSignatures.insert( + avgIntermediateSignatures.end(), + decimalAvgIntermediate.begin(), + decimalAvgIntermediate.end()); + registerAggregationFunctionForStep( prefix + "avg", core::AggregationNode::Step::kIntermediate, diff --git a/velox/experimental/cudf/exec/CudfHashAggregation.h b/velox/experimental/cudf/exec/CudfHashAggregation.h index b92bd63bdb41..4a5ccb8da718 100644 --- a/velox/experimental/cudf/exec/CudfHashAggregation.h +++ b/velox/experimental/cudf/exec/CudfHashAggregation.h @@ -42,7 +42,8 @@ class CudfHashAggregation : public exec::Operator, public NvtxHelper { virtual void addGroupbyRequest( cudf::table_view const& tbl, - std::vector& requests) = 0; + std::vector& requests, + rmm::cuda_stream_view stream) = 0; virtual std::unique_ptr doReduce( cudf::table_view const& input, diff --git a/velox/experimental/cudf/exec/DecimalAggregationKernels.cu b/velox/experimental/cudf/exec/DecimalAggregationKernels.cu new file mode 100644 index 000000000000..38533e6c9cc7 --- /dev/null +++ b/velox/experimental/cudf/exec/DecimalAggregationKernels.cu @@ -0,0 +1,432 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/experimental/cudf/exec/DecimalAggregationKernels.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +namespace facebook::velox::cudf_velox { +namespace { + +constexpr int32_t kStateSize = 32; + +struct DecimalSumStateDevice { + int64_t count; + int64_t overflow; + uint64_t lower; + int64_t upper; +}; + +static_assert(sizeof(DecimalSumStateDevice) == kStateSize); + +__device__ __forceinline__ void +splitToWords(int64_t value, int64_t& upper, uint64_t& lower) { + lower = static_cast(value); + upper = value < 0 ? -1 : 0; +} + +__device__ __forceinline__ void +splitToWords(__int128_t value, int64_t& upper, uint64_t& lower) { + lower = static_cast(value); + upper = static_cast(value >> 64); +} + +template +__global__ void fillOffsetsKernel(OffsetT* offsets, int32_t numRows) { + int32_t idx = blockIdx.x * blockDim.x + threadIdx.x; + if (idx <= numRows) { + int64_t offset = static_cast(idx) * kStateSize; + offsets[idx] = static_cast(offset); + } +} + +template +__global__ void packStateKernel( + const SumT* sums, + const int64_t* counts, + const OffsetT* offsets, + uint8_t* chars, + int32_t numRows) { + int32_t idx = blockIdx.x * blockDim.x + threadIdx.x; + if (idx >= numRows) { + return; + } + int64_t offset = static_cast(offsets[idx]); + auto* state = reinterpret_cast(chars + offset); + int64_t upper; + uint64_t lower; + splitToWords(sums[idx], upper, lower); + state->count = counts[idx]; + state->overflow = 0; + state->lower = lower; + state->upper = upper; +} + +template +__global__ void unpackStateKernel( + const OffsetT* offsets, + const uint8_t* chars, + __int128_t* sums, + int64_t* counts, + int32_t numRows) { + int32_t idx = blockIdx.x * blockDim.x + threadIdx.x; + if (idx >= numRows) { + return; + } + int64_t offset = static_cast(offsets[idx]); + auto* state = reinterpret_cast(chars + offset); + counts[idx] = state->count; + sums[idx] = (static_cast<__int128_t>(state->upper) << 64) | state->lower; +} + +template +__global__ void avgRoundKernel( + const SumT* sums, + const int64_t* counts, + SumT* out, + int32_t numRows) { + int32_t idx = blockIdx.x * blockDim.x + threadIdx.x; + if (idx >= numRows) { + return; + } + auto count = counts[idx]; + if (count == 0) { + out[idx] = SumT{0}; + return; + } + auto sum = sums[idx]; + SumT absSum = sum < 0 ? -sum : sum; + SumT half = static_cast(count / 2); + SumT rounded = (absSum + half) / static_cast(count); + out[idx] = sum < 0 ? -rounded : rounded; +} + +struct StateValidPredicate { + cudf::column_device_view sum; + cudf::column_device_view count; + + __device__ bool operator()(cudf::size_type idx) const { + if (sum.is_null(idx) || count.is_null(idx)) { + return false; + } + return count.element(idx) != 0; + } +}; + +std::pair buildStateValidityMask( + const cudf::column_view& sumCol, + const cudf::column_view& countCol, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + auto numRows = sumCol.size(); + if (numRows == 0) { + return {rmm::device_buffer{}, 0}; + } + auto sumDeviceView = cudf::column_device_view::create(sumCol, stream); + auto countDeviceView = cudf::column_device_view::create(countCol, stream); + StateValidPredicate pred{*sumDeviceView, *countDeviceView}; + auto begin = thrust::make_counting_iterator(0); + auto end = begin + numRows; + return cudf::detail::valid_if( + begin, end, pred, stream, mr); +} + +} // namespace + +DecimalSumStateColumns deserializeDecimalSumStateWithCount( + const cudf::column_view& stateCol, + int32_t scale, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + CUDF_EXPECTS( + stateCol.type().id() == cudf::type_id::STRING, + "Decimal sum state requires STRING/VARBINARY column"); + auto numRows = stateCol.size(); + if (numRows == 0) { + DecimalSumStateColumns empty; + empty.sum = cudf::make_fixed_width_column( + cudf::data_type{cudf::type_id::DECIMAL128, -scale}, + 0, + cudf::mask_state::UNALLOCATED, + stream); + empty.count = cudf::make_fixed_width_column( + cudf::data_type{cudf::type_id::INT64}, + 0, + cudf::mask_state::UNALLOCATED, + stream); + return empty; + } + + // For fully-null state columns there is nothing to deserialize. Avoid + // launching unpack kernels over string payload buffers that may be empty. + if (stateCol.nullable() && stateCol.null_count() == numRows) { + DecimalSumStateColumns allNull; + allNull.sum = cudf::make_fixed_width_column( + cudf::data_type{cudf::type_id::DECIMAL128, -scale}, + numRows, + cudf::mask_state::ALL_NULL, + stream); + allNull.count = cudf::make_fixed_width_column( + cudf::data_type{cudf::type_id::INT64}, + numRows, + cudf::mask_state::ALL_NULL, + stream); + return allNull; + } + + cudf::strings_column_view strings(stateCol); + numRows = strings.size(); + + auto offsetsView = strings.offsets(); + auto offsetsType = offsetsView.type().id(); + auto charsPtr = reinterpret_cast(strings.chars_begin(stream)); + + auto sumCol = cudf::make_fixed_width_column( + cudf::data_type{cudf::type_id::DECIMAL128, -scale}, + numRows, + cudf::mask_state::UNALLOCATED, + stream); + auto countCol = cudf::make_fixed_width_column( + cudf::data_type{cudf::type_id::INT64}, + numRows, + cudf::mask_state::UNALLOCATED, + stream); + + auto sumView = sumCol->mutable_view(); + auto countView = countCol->mutable_view(); + + if (numRows > 0) { + int32_t blockSize = 256; + int32_t gridSize = (numRows + blockSize - 1) / blockSize; + if (offsetsType == cudf::type_id::INT64) { + auto offsetsCol = offsetsView.data(); + unpackStateKernel<<>>( + offsetsCol, + charsPtr, + sumView.data<__int128_t>(), + countView.data(), + numRows); + } else { + CUDF_EXPECTS( + offsetsType == cudf::type_id::INT32, + "Decimal sum state requires INT32 or INT64 offsets"); + auto offsetsCol = offsetsView.data(); + unpackStateKernel<<>>( + offsetsCol, + charsPtr, + sumView.data<__int128_t>(), + countView.data(), + numRows); + } + CUDF_CUDA_TRY(cudaGetLastError()); + } + + if (stateCol.nullable()) { + auto nullMask = cudf::detail::copy_bitmask( + stateCol, stream, mr); + auto nullCount = stateCol.null_count(); + sumCol->set_null_mask(std::move(nullMask), nullCount); + auto countMask = cudf::detail::copy_bitmask( + stateCol, stream, mr); + countCol->set_null_mask(std::move(countMask), nullCount); + } + + DecimalSumStateColumns result; + result.sum = std::move(sumCol); + result.count = std::move(countCol); + return result; +} + +std::unique_ptr deserializeDecimalSumState( + const cudf::column_view& stateCol, + int32_t scale, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + auto decoded = deserializeDecimalSumStateWithCount(stateCol, scale, stream, mr); + return std::move(decoded.sum); +} + +std::unique_ptr serializeDecimalSumState( + const cudf::column_view& sumCol, + const cudf::column_view& countCol, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + CUDF_EXPECTS( + countCol.type().id() == cudf::type_id::INT64, + "Decimal sum state requires INT64 count column"); + auto numRows = sumCol.size(); + CUDF_EXPECTS( + numRows == countCol.size(), + "Decimal sum state requires sum and count to be same size"); + CUDF_EXPECTS( + numRows <= std::numeric_limits::max(), + "Too many rows to serialize decimal sum state"); + + auto const charsBytes = static_cast(numRows) * kStateSize; + auto const threshold = cudf::strings::get_offset64_threshold(); + auto const useLargeOffsets = charsBytes >= threshold; + CUDF_EXPECTS( + !useLargeOffsets || cudf::strings::is_large_strings_enabled(), + "Size of output exceeds the column size limit", + std::overflow_error); + + auto const offsetsType = + useLargeOffsets ? cudf::type_id::INT64 : cudf::type_id::INT32; + auto offsetsCol = cudf::make_fixed_width_column( + cudf::data_type{offsetsType}, + numRows + 1, + cudf::mask_state::UNALLOCATED, + stream); + auto offsetsView = offsetsCol->mutable_view(); + + rmm::device_buffer charsBuf( + static_cast(numRows) * kStateSize, stream); + + int32_t blockSize = 256; + int32_t offsetGridSize = (numRows + 1 + blockSize - 1) / blockSize; + if (useLargeOffsets) { + fillOffsetsKernel + <<>>( + offsetsView.data(), numRows); + } else { + fillOffsetsKernel + <<>>( + offsetsView.data(), numRows); + } + CUDF_CUDA_TRY(cudaGetLastError()); + + if (numRows > 0) { + int32_t gridSize = (numRows + blockSize - 1) / blockSize; + auto charsPtr = reinterpret_cast(charsBuf.data()); + if (useLargeOffsets) { + auto offsetsPtr = offsetsView.data(); + if (sumCol.type().id() == cudf::type_id::DECIMAL64) { + packStateKernel + <<>>( + sumCol.data(), + countCol.data(), + offsetsPtr, + charsPtr, + numRows); + } else { + CUDF_EXPECTS( + sumCol.type().id() == cudf::type_id::DECIMAL128, + "Unsupported decimal sum column type"); + packStateKernel<__int128_t, int64_t> + <<>>( + sumCol.data<__int128_t>(), + countCol.data(), + offsetsPtr, + charsPtr, + numRows); + } + } else { + auto offsetsPtr = offsetsView.data(); + if (sumCol.type().id() == cudf::type_id::DECIMAL64) { + packStateKernel + <<>>( + sumCol.data(), + countCol.data(), + offsetsPtr, + charsPtr, + numRows); + } else { + CUDF_EXPECTS( + sumCol.type().id() == cudf::type_id::DECIMAL128, + "Unsupported decimal sum column type"); + packStateKernel<__int128_t, int32_t> + <<>>( + sumCol.data<__int128_t>(), + countCol.data(), + offsetsPtr, + charsPtr, + numRows); + } + } + CUDF_CUDA_TRY(cudaGetLastError()); + } + + auto [nullMask, nullCount] = buildStateValidityMask(sumCol, countCol, stream, mr); + return cudf::make_strings_column( + static_cast(numRows), + std::move(offsetsCol), + std::move(charsBuf), + nullCount, + std::move(nullMask)); +} + +std::unique_ptr computeDecimalAverage( + const cudf::column_view& sumCol, + const cudf::column_view& countCol, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + CUDF_EXPECTS( + countCol.type().id() == cudf::type_id::INT64, + "Decimal average requires INT64 count column"); + CUDF_EXPECTS( + sumCol.type().id() == cudf::type_id::DECIMAL64 || + sumCol.type().id() == cudf::type_id::DECIMAL128, + "Decimal average requires DECIMAL64 or DECIMAL128 sum column"); + CUDF_EXPECTS( + sumCol.size() == countCol.size(), + "Decimal average requires sum and count to be same size"); + + auto numRows = sumCol.size(); + auto out = cudf::make_fixed_width_column( + sumCol.type(), numRows, cudf::mask_state::UNALLOCATED, stream); + + if (numRows > 0) { + int32_t blockSize = 256; + int32_t gridSize = (numRows + blockSize - 1) / blockSize; + if (sumCol.type().id() == cudf::type_id::DECIMAL64) { + avgRoundKernel<<>>( + sumCol.data(), + countCol.data(), + out->mutable_view().data(), + numRows); + } else { + avgRoundKernel<<>>( + sumCol.data<__int128_t>(), + countCol.data(), + out->mutable_view().data<__int128_t>(), + numRows); + } + CUDF_CUDA_TRY(cudaGetLastError()); + } + + auto [nullMask, nullCount] = buildStateValidityMask(sumCol, countCol, stream, mr); + if (nullCount > 0) { + out->set_null_mask(std::move(nullMask), nullCount); + } else if (nullMask.size() > 0) { + out->set_null_mask(std::move(nullMask), 0); + } + return out; +} + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/DecimalAggregationKernels.h b/velox/experimental/cudf/exec/DecimalAggregationKernels.h new file mode 100644 index 000000000000..620194249f21 --- /dev/null +++ b/velox/experimental/cudf/exec/DecimalAggregationKernels.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include + +#include + +namespace facebook::velox::cudf_velox { + +struct DecimalSumStateColumns { + std::unique_ptr sum; + std::unique_ptr count; +}; + +DecimalSumStateColumns deserializeDecimalSumStateWithCount( + const cudf::column_view& stateCol, + int32_t scale, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +std::unique_ptr deserializeDecimalSumState( + const cudf::column_view& stateCol, + int32_t scale, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +std::unique_ptr serializeDecimalSumState( + const cudf::column_view& sumCol, + const cudf::column_view& countCol, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +std::unique_ptr computeDecimalAverage( + const cudf::column_view& sumCol, + const cudf::column_view& countCol, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/VeloxCudfInterop.cpp b/velox/experimental/cudf/exec/VeloxCudfInterop.cpp index 775dab190f58..2c418e2390d3 100644 --- a/velox/experimental/cudf/exec/VeloxCudfInterop.cpp +++ b/velox/experimental/cudf/exec/VeloxCudfInterop.cpp @@ -39,12 +39,6 @@ namespace facebook::velox::cudf_velox { -cudf::type_id veloxToCudfTypeId(const TypePtr& type) { - // Legacy helper retained for compatibility. Note: returning cudf::type_id - // discards decimal scale; prefer veloxToCudfDataType when scale matters. - return veloxToCudfDataType(type).id(); -} - cudf::data_type veloxToCudfDataType(const TypePtr& type) { switch (type->kind()) { case TypeKind::BOOLEAN: diff --git a/velox/experimental/cudf/exec/VeloxCudfInterop.h b/velox/experimental/cudf/exec/VeloxCudfInterop.h index 81b32754d0f1..00a0aa11b75f 100644 --- a/velox/experimental/cudf/exec/VeloxCudfInterop.h +++ b/velox/experimental/cudf/exec/VeloxCudfInterop.h @@ -26,8 +26,6 @@ namespace facebook::velox::cudf_velox { -cudf::type_id veloxToCudfTypeId(const TypePtr& type); - cudf::data_type veloxToCudfDataType(const TypePtr& type); namespace with_arrow { diff --git a/velox/experimental/cudf/tests/CMakeLists.txt b/velox/experimental/cudf/tests/CMakeLists.txt index d6336e3a2f32..9f233411bae9 100644 --- a/velox/experimental/cudf/tests/CMakeLists.txt +++ b/velox/experimental/cudf/tests/CMakeLists.txt @@ -17,6 +17,7 @@ add_executable(velox_cudf_assign_unique_id_test Main.cpp AssignUniqueIdTest.cpp) add_executable(velox_cudf_config_test Main.cpp ConfigTest.cpp) add_executable(velox_cudf_expression_selection_test Main.cpp ExpressionEvaluatorSelectionTest.cpp) add_executable(velox_cudf_decimal_expression_test Main.cpp DecimalExpressionTest.cpp) +add_executable(velox_cudf_decimal_aggregation_test Main.cpp DecimalAggregationTest.cpp) add_executable(velox_cudf_filter_project_test Main.cpp FilterProjectTest.cpp) add_executable(velox_cudf_hash_join_test HashJoinTest.cpp Main.cpp) add_executable(velox_cudf_limit_test Main.cpp LimitTest.cpp) @@ -70,6 +71,12 @@ add_test( WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} ) +add_test( + NAME velox_cudf_decimal_aggregation_test + COMMAND velox_cudf_decimal_aggregation_test + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} +) + add_test( NAME velox_cudf_filter_project_test COMMAND velox_cudf_filter_project_test @@ -165,6 +172,7 @@ set_tests_properties( PROPERTIES LABELS cuda_driver TIMEOUT 3000 ) set_tests_properties(velox_cudf_decimal_expression_test PROPERTIES LABELS cuda_driver TIMEOUT 3000) +set_tests_properties(velox_cudf_decimal_aggregation_test PROPERTIES LABELS cuda_driver TIMEOUT 3000) set_tests_properties(velox_cudf_filter_project_test PROPERTIES LABELS cuda_driver TIMEOUT 3000) set_tests_properties(velox_cudf_hash_join_test PROPERTIES LABELS cuda_driver TIMEOUT 3000) set_tests_properties(velox_cudf_limit_test PROPERTIES LABELS cuda_driver TIMEOUT 3000) @@ -234,6 +242,17 @@ target_link_libraries( gtest_main ) +target_link_libraries( + velox_cudf_decimal_aggregation_test + velox_cudf_exec + velox_exec + velox_exec_test_lib + velox_functions_test_lib + velox_test_util + gtest + gtest_main +) + target_link_libraries( velox_cudf_filter_project_test velox_cudf_exec diff --git a/velox/experimental/cudf/tests/DecimalAggregationTest.cpp b/velox/experimental/cudf/tests/DecimalAggregationTest.cpp new file mode 100644 index 000000000000..b7320b9fd725 --- /dev/null +++ b/velox/experimental/cudf/tests/DecimalAggregationTest.cpp @@ -0,0 +1,1405 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/exec/DecimalAggregationKernels.h" +#include "velox/experimental/cudf/exec/ToCudf.h" +#include "velox/experimental/cudf/exec/VeloxCudfInterop.h" + +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/file/FileSystems.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/OperatorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" +#include "velox/parse/TypeResolver.h" +#include "velox/type/DecimalUtil.h" + +#include +#include +#include +#include + +#include + +#include +#include +#include + +namespace facebook::velox::cudf_velox { +namespace { + +int64_t computeAvgRaw(const std::vector& values) { + int128_t sum = 0; + for (auto value : values) { + sum += value; + } + int128_t avg = 0; + facebook::velox::DecimalUtil::computeAverage(avg, sum, values.size(), 0); + return static_cast(avg); +} + +constexpr int kBitsPerWord = 8 * sizeof(cudf::bitmask_type); + +std::pair makeNullMask( + const std::vector& valid, + rmm::cuda_stream_view stream) { + auto numBits = static_cast(valid.size()); + if (numBits == 0) { + return {rmm::device_buffer{}, 0}; + } + auto maskBytes = cudf::bitmask_allocation_size_bytes(numBits); + auto numWords = maskBytes / sizeof(cudf::bitmask_type); + std::vector host(numWords, 0); + cudf::size_type nullCount = 0; + for (cudf::size_type i = 0; i < numBits; ++i) { + if (valid[i]) { + auto word = i / kBitsPerWord; + auto bit = i % kBitsPerWord; + host[word] |= (cudf::bitmask_type{1} << bit); + } else { + ++nullCount; + } + } + rmm::device_buffer mask(maskBytes, stream); + if (!host.empty()) { + auto status = cudaMemcpyAsync( + mask.data(), + host.data(), + host.size() * sizeof(cudf::bitmask_type), + cudaMemcpyHostToDevice, + stream.value()); + VELOX_CHECK_EQ(0, static_cast(status)); + stream.synchronize(); + } + return {std::move(mask), nullCount}; +} + +class ScopedEnvVar { + public: + ScopedEnvVar(const char* key, const char* value) : key_(key) { + const char* existing = std::getenv(key); + if (existing) { + oldValue_ = std::string(existing); + } + if (value) { + setenv(key, value, 1); + } else { + unsetenv(key); + } + } + + ~ScopedEnvVar() { + if (oldValue_) { + setenv(key_.c_str(), oldValue_->c_str(), 1); + } else { + unsetenv(key_.c_str()); + } + } + + private: + std::string key_; + std::optional oldValue_; +}; + +template +std::unique_ptr makeFixedWidthColumn( + cudf::data_type type, + const std::vector& values, + const std::vector* valid, + rmm::cuda_stream_view stream) { + auto col = cudf::make_fixed_width_column( + type, + static_cast(values.size()), + cudf::mask_state::UNALLOCATED, + stream); + if (!values.empty()) { + auto status = cudaMemcpyAsync( + col->mutable_view().data(), + values.data(), + values.size() * sizeof(T), + cudaMemcpyHostToDevice, + stream.value()); + VELOX_CHECK_EQ(0, static_cast(status)); + stream.synchronize(); + } + if (valid) { + auto [mask, nullCount] = makeNullMask(*valid, stream); + col->set_null_mask(std::move(mask), nullCount); + } + return col; +} + +template +std::unique_ptr makeDecimalColumn( + const std::vector& values, + int32_t scale, + const std::vector* valid, + rmm::cuda_stream_view stream) { + cudf::type_id typeId = std::is_same_v ? cudf::type_id::DECIMAL64 + : cudf::type_id::DECIMAL128; + cudf::data_type type{typeId, -scale}; + return makeFixedWidthColumn(type, values, valid, stream); +} + +std::unique_ptr makeInt64Column( + const std::vector& values, + const std::vector* valid, + rmm::cuda_stream_view stream) { + return makeFixedWidthColumn( + cudf::data_type{cudf::type_id::INT64}, values, valid, stream); +} + +template +std::vector copyColumnData( + const cudf::column_view& view, + rmm::cuda_stream_view stream) { + std::vector host(view.size()); + if (view.size() == 0) { + return host; + } + auto status = cudaMemcpyAsync( + host.data(), + view.data(), + view.size() * sizeof(T), + cudaMemcpyDeviceToHost, + stream.value()); + VELOX_CHECK_EQ(0, static_cast(status)); + stream.synchronize(); + return host; +} + +std::vector copyNullMask( + const cudf::column_view& view, + rmm::cuda_stream_view stream) { + auto numWords = cudf::num_bitmask_words(view.size()); + std::vector host(numWords, 0); + if (!view.nullable() || numWords == 0) { + return host; + } + auto status = cudaMemcpyAsync( + host.data(), + view.null_mask(), + host.size() * sizeof(cudf::bitmask_type), + cudaMemcpyDeviceToHost, + stream.value()); + VELOX_CHECK_EQ(0, static_cast(status)); + stream.synchronize(); + return host; +} + +bool isValidAt(const std::vector& mask, size_t idx) { + if (mask.empty()) { + return true; + } + auto word = idx / kBitsPerWord; + auto bit = idx % kBitsPerWord; + return (mask[word] >> bit) & 1; +} + +class CudfDecimalTest : public exec::test::OperatorTestBase { + protected: + void SetUp() override { + exec::test::OperatorTestBase::SetUp(); + filesystems::registerLocalFileSystem(); + parse::registerTypeResolver(); + functions::prestosql::registerAllScalarFunctions(); + aggregate::prestosql::registerAllAggregateFunctions(); + CudfConfig::getInstance().allowCpuFallback = false; + // Ensure a CUDA device is selected and initialized (RMM asserts otherwise). + int deviceCount = 0; + auto status = cudaGetDeviceCount(&deviceCount); + if (status != cudaSuccess) { + GTEST_SKIP() << "cudaGetDeviceCount failed: " << static_cast(status) + << " (" << cudaGetErrorString(status) << ")"; + } + if (deviceCount == 0) { + GTEST_SKIP() << "No CUDA devices visible (check CUDA_VISIBLE_DEVICES)"; + } + VELOX_CHECK_EQ(0, static_cast(cudaSetDevice(0))); + VELOX_CHECK_EQ(0, static_cast(cudaFree(nullptr))); + registerCudf(); + } + + void TearDown() override { + unregisterCudf(); + exec::test::OperatorTestBase::TearDown(); + } +}; + +TEST_F(CudfDecimalTest, DISABLED_decimalAvgAndSumTimesDouble) { + auto rowType = ROW({ + {"l_quantity", DECIMAL(15, 2)}, + }); + + // Values chosen to keep the AVG and SUM exact in double. + auto input = makeRowVector( + {"l_quantity"}, + {makeFlatVector( + {125, 250, 375, 400}, // 1.25, 2.50, 3.75, 4.00 + DECIMAL(15, 2))}); + + std::vector vectors = {input}; + createDuckDbTable(vectors); + + // Force CPU-only path to validate this fails without cuDF involvement. + unregisterCudf(); + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .project({"l_quantity * 2.0 AS qty2"}) + .singleAggregation( + {}, {"avg(qty2) AS avg_qty", "sum(qty2) AS sum_qty"}) + .planNode(); + + facebook::velox::exec::test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .assertResults( + "SELECT avg(l_quantity * 2.0) AS avg_qty, " + "sum(l_quantity * 2.0) AS sum_qty " + "FROM tmp"); +} + +TEST_F(CudfDecimalTest, decimalAvgDecimalInput) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"d"}, + {makeFlatVector( + {100, 200, 300, 400}, // 1.00, 2.00, 3.00, 4.00 + DECIMAL(12, 2))}); + + std::vector vectors = {input}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .singleAggregation({}, {"avg(d) AS avg_d"}) + .planNode(); + + auto expected = makeRowVector( + {"avg_d"}, {makeFlatVector({250}, DECIMAL(12, 2))}); // 2.50 + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalAvgDecimalInputRounds) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + // Sum = 1.60, count = 7 => 0.22857..., rounds to 0.23 at scale 2. + std::vector rawValues = {100, 10, 10, 10, 10, 10, 10}; + auto input = makeRowVector( + {"d"}, {makeFlatVector(rawValues, DECIMAL(12, 2))}); + + std::vector vectors = {input}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .singleAggregation({}, {"avg(d) AS avg_d"}) + .planNode(); + + auto expected = makeRowVector( + {"avg_d"}, + {makeFlatVector({computeAvgRaw(rawValues)}, DECIMAL(12, 2))}); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalAvgPartialFinalVarbinaryRounds) { + auto rowType = ROW({ + {"k", INTEGER()}, + {"d", DECIMAL(12, 2)}, + }); + + std::vector keys = {1, 1, 1, 1, 1, 1, 1, 2, 2, 3, 3}; + std::vector values = {100, 10, 10, 10, 10, 10, 10, 100, 1, -100, -1}; + + auto input = makeRowVector( + {"k", "d"}, + { + makeFlatVector(keys), + makeFlatVector(values, DECIMAL(12, 2)), + }); + + std::vector vectors = {input}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({"k"}, {"avg(d) AS a"}) + .finalAggregation() + .orderBy({"k"}, false) + .planNode(); + + std::vector>> groups = { + {1, {100, 10, 10, 10, 10, 10, 10}}, + {2, {100, 1}}, + {3, {-100, -1}}, + }; + + auto expected = makeRowVector( + {"k", "a"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector( + {computeAvgRaw(groups[0].second), + computeAvgRaw(groups[1].second), + computeAvgRaw(groups[2].second)}, + DECIMAL(12, 2)), + }); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalAvgIntermediateVarbinaryRounds) { + auto rowType = ROW({ + {"k", INTEGER()}, + {"d", DECIMAL(12, 2)}, + }); + + auto input1 = makeRowVector( + {"k", "d"}, + { + makeFlatVector({1, 1, 2, 3}), + makeFlatVector({100, 10, 100, -100}, DECIMAL(12, 2)), + }); + auto input2 = makeRowVector( + {"k", "d"}, + { + makeFlatVector({1, 1, 1, 1, 1, 2, 3}), + makeFlatVector({10, 10, 10, 10, 10, 1, -1}, DECIMAL(12, 2)), + }); + + std::vector vectors = {input1, input2}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({"k"}, {"avg(d) AS a"}) + .intermediateAggregation() + .finalAggregation() + .orderBy({"k"}, false) + .planNode(); + + std::vector>> groups = { + {1, {100, 10, 10, 10, 10, 10, 10}}, + {2, {100, 1}}, + {3, {-100, -1}}, + }; + + auto expected = makeRowVector( + {"k", "a"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector( + {computeAvgRaw(groups[0].second), + computeAvgRaw(groups[1].second), + computeAvgRaw(groups[2].second)}, + DECIMAL(12, 2)), + }); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalAvgGlobalPartialFinalVarbinaryRounds) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input1 = makeRowVector( + {"d"}, {makeFlatVector({100, 10, 10}, DECIMAL(12, 2))}); + auto input2 = makeRowVector( + {"d"}, {makeFlatVector({10, 10, 10, 10}, DECIMAL(12, 2))}); + + std::vector vectors = {input1, input2}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({}, {"avg(d) AS a"}) + .finalAggregation() + .planNode(); + + std::vector allValues = {100, 10, 10, 10, 10, 10, 10}; + auto expected = makeRowVector( + {"a"}, + {makeFlatVector({computeAvgRaw(allValues)}, DECIMAL(12, 2))}); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalAvgGlobalIntermediateVarbinaryRounds) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input1 = makeRowVector( + {"d"}, {makeFlatVector({100, 10, 10}, DECIMAL(12, 2))}); + auto input2 = makeRowVector( + {"d"}, {makeFlatVector({10, 10, 10, 10}, DECIMAL(12, 2))}); + + std::vector vectors = {input1, input2}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({}, {"avg(d) AS a"}) + .intermediateAggregation() + .finalAggregation() + .planNode(); + + std::vector allValues = {100, 10, 10, 10, 10, 10, 10}; + auto expected = makeRowVector( + {"a"}, + {makeFlatVector({computeAvgRaw(allValues)}, DECIMAL(12, 2))}); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalAvgGlobalSingleRounds) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"d"}, + {makeFlatVector({100, 10, 10, 10, 10, 10, 10}, DECIMAL(12, 2))}); + + std::vector vectors = {input}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .singleAggregation({}, {"avg(d) AS a"}) + .planNode(); + + std::vector allValues = {100, 10, 10, 10, 10, 10, 10}; + auto expected = makeRowVector( + {"a"}, + {makeFlatVector({computeAvgRaw(allValues)}, DECIMAL(12, 2))}); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalAvgGlobalSingleAllNulls) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"d"}, + {makeNullableFlatVector( + {std::nullopt, std::nullopt, std::nullopt}, DECIMAL(12, 2))}); + + std::vector vectors = {input}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .singleAggregation({}, {"avg(d) AS a"}) + .planNode(); + + auto expected = makeRowVector( + {"a"}, {makeNullableFlatVector({std::nullopt}, DECIMAL(12, 2))}); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalAvgGlobalPartialFinalVarbinaryAllNulls) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"d"}, + {makeNullableFlatVector( + {std::nullopt, std::nullopt, std::nullopt}, DECIMAL(12, 2))}); + + std::vector vectors = {input}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({}, {"avg(d) AS a"}) + .finalAggregation() + .planNode(); + + auto expected = makeRowVector( + {"a"}, {makeNullableFlatVector({std::nullopt}, DECIMAL(12, 2))}); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalAvgGlobalIntermediateVarbinaryAllNulls) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"d"}, + {makeNullableFlatVector( + {std::nullopt, std::nullopt, std::nullopt}, DECIMAL(12, 2))}); + + std::vector vectors = {input}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({}, {"avg(d) AS a"}) + .intermediateAggregation() + .finalAggregation() + .planNode(); + + auto expected = makeRowVector( + {"a"}, {makeNullableFlatVector({std::nullopt}, DECIMAL(12, 2))}); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalAvgPartialFinalVarbinaryNullGroup) { + auto rowType = ROW({ + {"k", INTEGER()}, + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"k", "d"}, + { + makeFlatVector({1, 1, 2, 2, 3, 3}), + makeNullableFlatVector( + {100, 200, std::nullopt, std::nullopt, 400, std::nullopt}, + DECIMAL(12, 2)), + }); + + std::vector vectors = {input}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({"k"}, {"avg(d) AS a"}) + .finalAggregation() + .orderBy({"k"}, false) + .planNode(); + + auto expected = makeRowVector( + {"k", "a"}, + { + makeFlatVector({1, 2, 3}), + makeNullableFlatVector( + {150, std::nullopt, 400}, DECIMAL(12, 2)), + }); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalAvgIntermediateVarbinaryNullGroup) { + auto rowType = ROW({ + {"k", INTEGER()}, + {"d", DECIMAL(12, 2)}, + }); + + auto input1 = makeRowVector( + {"k", "d"}, + { + makeFlatVector({1, 2, 3}), + makeNullableFlatVector( + {100, std::nullopt, 400}, DECIMAL(12, 2)), + }); + auto input2 = makeRowVector( + {"k", "d"}, + { + makeFlatVector({1, 2, 3}), + makeNullableFlatVector( + {200, std::nullopt, std::nullopt}, DECIMAL(12, 2)), + }); + + std::vector vectors = {input1, input2}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({"k"}, {"avg(d) AS a"}) + .intermediateAggregation() + .finalAggregation() + .orderBy({"k"}, false) + .planNode(); + + auto expected = makeRowVector( + {"k", "a"}, + { + makeFlatVector({1, 2, 3}), + makeNullableFlatVector( + {150, std::nullopt, 400}, DECIMAL(12, 2)), + }); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalSumPartialFinalVarbinary) { + auto rowType = ROW({ + {"k", INTEGER()}, + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"k", "d"}, + { + makeFlatVector({1, 1, 2, 2, 2}), + makeFlatVector( + {12345, -2500, 10000, 200, -300}, DECIMAL(12, 2)), + }); + + std::vector vectors = {input}; + createDuckDbTable(vectors); + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({"k"}, {"sum(d) AS s"}) + .finalAggregation() + .planNode(); + + facebook::velox::exec::test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .assertResults("SELECT k, sum(d) AS s FROM tmp GROUP BY k"); +} + +TEST_F(CudfDecimalTest, decimalPartialSumVarbinaryToVeloxRoundTrip) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"d"}, {makeFlatVector({100, 200, 300}, DECIMAL(12, 2))}); + + std::vector vectors = {input}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({}, {"sum(d) AS s"}) + .planNode(); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + VELOX_CHECK_NOT_NULL(result); + ASSERT_GT(result->size(), 0); + ASSERT_EQ(result->childAt(0)->type()->kind(), TypeKind::VARBINARY); +} + +TEST_F(CudfDecimalTest, decimalSumPartialFinalEmptyInput) { + auto rowType = ROW({ + {"k", INTEGER()}, + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"k", "d"}, + { + makeFlatVector({1, 2, 3}), + makeFlatVector({100, 200, 300}, DECIMAL(12, 2)), + }); + + std::vector vectors = {input}; + createDuckDbTable(vectors); + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .filter("k < 0") + .partialAggregation({"k"}, {"sum(d) AS s"}) + .finalAggregation() + .planNode(); + + facebook::velox::exec::test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .assertResults("SELECT k, sum(d) AS s FROM tmp WHERE k < 0 GROUP BY k"); +} + +TEST_F(CudfDecimalTest, decimalSumIntermediateVarbinary) { + auto rowType = ROW({ + {"k", INTEGER()}, + {"d", DECIMAL(12, 2)}, + }); + + auto input1 = makeRowVector( + {"k", "d"}, + { + makeFlatVector({1, 1, 2}), + makeFlatVector({12345, -2500, 10000}, DECIMAL(12, 2)), + }); + auto input2 = makeRowVector( + {"k", "d"}, + { + makeFlatVector({2, 3}), + makeFlatVector({200, -300}, DECIMAL(12, 2)), + }); + + std::vector vectors = {input1, input2}; + createDuckDbTable(vectors); + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({"k"}, {"sum(d) AS s"}) + .intermediateAggregation() + .finalAggregation() + .planNode(); + + facebook::velox::exec::test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .assertResults("SELECT k, sum(d) AS s FROM tmp GROUP BY k"); +} + +TEST_F(CudfDecimalTest, decimalSumGlobalPartialFinalVarbinary) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input1 = makeRowVector( + {"d"}, {makeFlatVector({12345, -2500, 10000}, DECIMAL(12, 2))}); + auto input2 = makeRowVector( + {"d"}, {makeFlatVector({200, -300}, DECIMAL(12, 2))}); + + std::vector vectors = {input1, input2}; + createDuckDbTable(vectors); + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({}, {"sum(d) AS s"}) + .finalAggregation() + .planNode(); + + facebook::velox::exec::test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .assertResults("SELECT sum(d) AS s FROM tmp"); +} + +TEST_F(CudfDecimalTest, decimalSumGlobalIntermediateVarbinary) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input1 = makeRowVector( + {"d"}, {makeFlatVector({12345, -2500, 10000}, DECIMAL(12, 2))}); + auto input2 = makeRowVector( + {"d"}, {makeFlatVector({200, -300}, DECIMAL(12, 2))}); + + std::vector vectors = {input1, input2}; + createDuckDbTable(vectors); + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({}, {"sum(d) AS s"}) + .intermediateAggregation() + .finalAggregation() + .planNode(); + + facebook::velox::exec::test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .assertResults("SELECT sum(d) AS s FROM tmp"); +} + +TEST_F(CudfDecimalTest, decimalSumGlobalSingle) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"d"}, + {makeFlatVector( + {12345, -2500, 10000, 200, -300}, DECIMAL(12, 2))}); + + std::vector vectors = {input}; + createDuckDbTable(vectors); + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .singleAggregation({}, {"sum(d) AS s"}) + .planNode(); + + facebook::velox::exec::test::AssertQueryBuilder(plan, duckDbQueryRunner_) + .assertResults("SELECT sum(d) AS s FROM tmp"); +} + +TEST_F(CudfDecimalTest, decimalSumPartialFinalVarbinaryNullGroup) { + auto rowType = ROW({ + {"k", INTEGER()}, + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"k", "d"}, + { + makeFlatVector({1, 1, 2, 2, 3, 3}), + makeNullableFlatVector( + {100, 200, std::nullopt, std::nullopt, 400, std::nullopt}, + DECIMAL(12, 2)), + }); + + std::vector vectors = {input}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({"k"}, {"sum(d) AS s"}) + .finalAggregation() + .orderBy({"k"}, false) + .planNode(); + + auto expected = makeRowVector( + {"k", "s"}, + { + makeFlatVector({1, 2, 3}), + makeNullableFlatVector( + {static_cast(300), + std::nullopt, + static_cast(400)}, + DECIMAL(38, 2)), + }); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalSumIntermediateVarbinaryNullGroup) { + auto rowType = ROW({ + {"k", INTEGER()}, + {"d", DECIMAL(12, 2)}, + }); + + auto input1 = makeRowVector( + {"k", "d"}, + { + makeFlatVector({1, 2, 3}), + makeNullableFlatVector( + {100, std::nullopt, 400}, DECIMAL(12, 2)), + }); + auto input2 = makeRowVector( + {"k", "d"}, + { + makeFlatVector({1, 2, 3}), + makeNullableFlatVector( + {200, std::nullopt, std::nullopt}, DECIMAL(12, 2)), + }); + + std::vector vectors = {input1, input2}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({"k"}, {"sum(d) AS s"}) + .intermediateAggregation() + .finalAggregation() + .orderBy({"k"}, false) + .planNode(); + + auto expected = makeRowVector( + {"k", "s"}, + { + makeFlatVector({1, 2, 3}), + makeNullableFlatVector( + {static_cast(300), + std::nullopt, + static_cast(400)}, + DECIMAL(38, 2)), + }); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalSumGlobalPartialFinalVarbinaryAllNulls) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"d"}, + {makeNullableFlatVector( + {std::nullopt, std::nullopt, std::nullopt}, DECIMAL(12, 2))}); + + std::vector vectors = {input}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({}, {"sum(d) AS s"}) + .finalAggregation() + .planNode(); + + auto expected = makeRowVector( + {"s"}, + {makeNullableFlatVector({std::nullopt}, DECIMAL(38, 2))}); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalSumGlobalIntermediateVarbinaryAllNulls) { + auto rowType = ROW({ + {"d", DECIMAL(12, 2)}, + }); + + auto input = makeRowVector( + {"d"}, + {makeNullableFlatVector( + {std::nullopt, std::nullopt, std::nullopt}, DECIMAL(12, 2))}); + + std::vector vectors = {input}; + + auto plan = exec::test::PlanBuilder() + .values(vectors) + .partialAggregation({}, {"sum(d) AS s"}) + .intermediateAggregation() + .finalAggregation() + .planNode(); + + auto expected = makeRowVector( + {"s"}, + {makeNullableFlatVector({std::nullopt}, DECIMAL(38, 2))}); + + auto result = + facebook::velox::exec::test::AssertQueryBuilder(plan).copyResults(pool()); + facebook::velox::test::assertEqualVectors(expected, result); +} + +TEST_F(CudfDecimalTest, decimalDeserializeSumStateDecimal64) { + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + std::vector sums = {100, -200, 300}; + std::vector counts = {1, 2, 0}; + std::vector sumValid = {true, false, true}; + std::vector countValid = {true, true, true}; + + auto sumCol = makeDecimalColumn(sums, 2, &sumValid, stream); + auto countCol = makeInt64Column(counts, &countValid, stream); + auto stateCol = + serializeDecimalSumState(sumCol->view(), countCol->view(), stream, mr); + auto sumOnly = deserializeDecimalSumState(stateCol->view(), 2, stream, mr); + auto stateMask = copyNullMask(stateCol->view(), stream); + auto sumMask = copyNullMask(sumOnly->view(), stream); + EXPECT_EQ(stateMask, sumMask); + + auto outSum = copyColumnData<__int128_t>(sumOnly->view(), stream); + for (size_t i = 0; i < sums.size(); ++i) { + bool expectedValid = sumValid[i] && countValid[i] && counts[i] != 0; + EXPECT_EQ(isValidAt(sumMask, i), expectedValid); + if (expectedValid) { + EXPECT_EQ(outSum[i], static_cast<__int128_t>(sums[i])); + } + } +} + +TEST_F(CudfDecimalTest, decimalDeserializeSumStateDecimal128) { + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + std::vector<__int128_t> sums = { + static_cast<__int128_t>(123450), + static_cast<__int128_t>(-25000), + static_cast<__int128_t>(100000), + }; + std::vector counts = {2, 1, 0}; + std::vector sumValid = {true, true, true}; + std::vector countValid = {true, false, true}; + + auto sumCol = makeDecimalColumn<__int128_t>(sums, 3, &sumValid, stream); + auto countCol = makeInt64Column(counts, &countValid, stream); + auto stateCol = + serializeDecimalSumState(sumCol->view(), countCol->view(), stream, mr); + auto sumOnly = deserializeDecimalSumState(stateCol->view(), 3, stream, mr); + auto stateMask = copyNullMask(stateCol->view(), stream); + auto sumMask = copyNullMask(sumOnly->view(), stream); + EXPECT_EQ(stateMask, sumMask); + + auto outSum = copyColumnData<__int128_t>(sumOnly->view(), stream); + for (size_t i = 0; i < sums.size(); ++i) { + bool expectedValid = sumValid[i] && countValid[i] && counts[i] != 0; + EXPECT_EQ(isValidAt(sumMask, i), expectedValid); + if (expectedValid) { + EXPECT_EQ(outSum[i], sums[i]); + } + } +} + +TEST_F(CudfDecimalTest, decimalDeserializeSumStateAllNull) { + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + constexpr cudf::size_type numRows = 4; + + auto offsetsCol = cudf::make_fixed_width_column( + cudf::data_type{cudf::type_id::INT32}, + numRows + 1, + cudf::mask_state::UNALLOCATED, + stream); + auto* offsetsPtr = offsetsCol->mutable_view().data(); + auto status = cudaMemsetAsync( + offsetsPtr, + 0, + static_cast(numRows + 1) * sizeof(int32_t), + stream.value()); + VELOX_CHECK_EQ(0, static_cast(status)); + stream.synchronize(); + + std::vector valid(numRows, false); + auto [nullMask, nullCount] = makeNullMask(valid, stream); + rmm::device_buffer charsBuf(0, stream); + auto stateCol = cudf::make_strings_column( + numRows, + std::move(offsetsCol), + std::move(charsBuf), + nullCount, + std::move(nullMask)); + + auto decoded = + deserializeDecimalSumStateWithCount(stateCol->view(), 2, stream, mr); + auto outSumView = decoded.sum->view(); + auto outCountView = decoded.count->view(); + + EXPECT_EQ(outSumView.size(), numRows); + EXPECT_EQ(outCountView.size(), numRows); + EXPECT_EQ(outSumView.null_count(), numRows); + EXPECT_EQ(outCountView.null_count(), numRows); + + auto outSumMask = copyNullMask(outSumView, stream); + auto outCountMask = copyNullMask(outCountView, stream); + for (size_t i = 0; i < static_cast(numRows); ++i) { + EXPECT_FALSE(isValidAt(outSumMask, i)); + EXPECT_FALSE(isValidAt(outCountMask, i)); + } +} + +TEST_F(CudfDecimalTest, decimalSerializeSumStateUsesInt64OffsetsWhenEnabled) { + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + ScopedEnvVar enableLargeStrings("LIBCUDF_LARGE_STRINGS_ENABLED", "1"); + ScopedEnvVar threshold("LIBCUDF_LARGE_STRINGS_THRESHOLD", "1"); + + std::vector sums = {100, -200}; + std::vector counts = {1, 1}; + + auto sumCol = makeDecimalColumn(sums, 2, nullptr, stream); + auto countCol = makeInt64Column(counts, nullptr, stream); + auto stateCol = + serializeDecimalSumState(sumCol->view(), countCol->view(), stream, mr); + + cudf::strings_column_view strings(stateCol->view()); + EXPECT_EQ(strings.offsets().type().id(), cudf::type_id::INT64); +} + +TEST_F(CudfDecimalTest, decimalSumStateRoundTripUsesInt64Offsets) { + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + ScopedEnvVar enableLargeStrings("LIBCUDF_LARGE_STRINGS_ENABLED", "1"); + ScopedEnvVar threshold("LIBCUDF_LARGE_STRINGS_THRESHOLD", "1"); + + std::vector sums = {100, -200, 300, 400}; + std::vector counts = {1, 0, 2, 3}; + std::vector sumValid = {true, true, false, true}; + std::vector countValid = {true, true, true, false}; + + auto sumCol = makeDecimalColumn(sums, 2, &sumValid, stream); + auto countCol = makeInt64Column(counts, &countValid, stream); + auto stateCol = + serializeDecimalSumState(sumCol->view(), countCol->view(), stream, mr); + + cudf::strings_column_view strings(stateCol->view()); + EXPECT_EQ(strings.offsets().type().id(), cudf::type_id::INT64); + + auto decoded = + deserializeDecimalSumStateWithCount(stateCol->view(), 2, stream, mr); + auto outSumView = decoded.sum->view(); + auto outCountView = decoded.count->view(); + auto outSum = copyColumnData<__int128_t>(outSumView, stream); + auto outCount = copyColumnData(outCountView, stream); + auto outSumMask = copyNullMask(outSumView, stream); + auto outCountMask = copyNullMask(outCountView, stream); + + EXPECT_EQ(outSumView.size(), sums.size()); + EXPECT_EQ(outCountView.size(), counts.size()); + EXPECT_EQ(outSumMask, outCountMask); + + for (size_t i = 0; i < sums.size(); ++i) { + bool expectedValid = sumValid[i] && countValid[i] && counts[i] != 0; + EXPECT_EQ(isValidAt(outSumMask, i), expectedValid); + EXPECT_EQ(isValidAt(outCountMask, i), expectedValid); + if (expectedValid) { + EXPECT_EQ(outSum[i], static_cast<__int128_t>(sums[i])); + EXPECT_EQ(outCount[i], counts[i]); + } + } +} + +TEST_F(CudfDecimalTest, decimalComputeAverageDecimal64) { + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + std::vector sums = {100, 105, 250, -125}; + std::vector counts = {4, 2, 0, 2}; + std::vector sumValid = {true, true, true, true}; + std::vector countValid = {true, false, true, true}; + + auto sumCol = makeDecimalColumn(sums, 2, &sumValid, stream); + auto countCol = makeInt64Column(counts, &countValid, stream); + auto avgCol = computeDecimalAverage(sumCol->view(), countCol->view(), stream, mr); + + auto avgMask = copyNullMask(avgCol->view(), stream); + auto outAvg = copyColumnData(avgCol->view(), stream); + + auto avgUnscaled = [](int128_t sum, int64_t count) { + __int128_t out = 0; + facebook::velox::DecimalUtil:: + divideWithRoundUp<__int128_t, __int128_t, int64_t>( + out, sum, count, false, 0, 0); + return static_cast(out); + }; + + for (size_t i = 0; i < sums.size(); ++i) { + bool expectedValid = sumValid[i] && countValid[i] && counts[i] != 0; + EXPECT_EQ(isValidAt(avgMask, i), expectedValid); + if (expectedValid) { + EXPECT_EQ(outAvg[i], avgUnscaled(sums[i], counts[i])); + } + } +} + +TEST_F(CudfDecimalTest, decimalComputeAverageDecimal128) { + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + std::vector<__int128_t> sums = { + static_cast<__int128_t>(123450), + static_cast<__int128_t>(-25000), + static_cast<__int128_t>(100000), + }; + std::vector counts = {3, 2, 0}; + std::vector sumValid = {true, true, true}; + std::vector countValid = {true, true, true}; + + auto sumCol = makeDecimalColumn<__int128_t>(sums, 3, &sumValid, stream); + auto countCol = makeInt64Column(counts, &countValid, stream); + auto avgCol = computeDecimalAverage(sumCol->view(), countCol->view(), stream, mr); + + auto avgMask = copyNullMask(avgCol->view(), stream); + auto outAvg = copyColumnData<__int128_t>(avgCol->view(), stream); + + auto avgUnscaled = [](int128_t sum, int64_t count) { + __int128_t out = 0; + facebook::velox::DecimalUtil:: + divideWithRoundUp<__int128_t, __int128_t, int64_t>( + out, sum, count, false, 0, 0); + return out; + }; + + for (size_t i = 0; i < sums.size(); ++i) { + bool expectedValid = sumValid[i] && countValid[i] && counts[i] != 0; + EXPECT_EQ(isValidAt(avgMask, i), expectedValid); + if (expectedValid) { + EXPECT_EQ(outAvg[i], avgUnscaled(sums[i], counts[i])); + } + } +} + +TEST_F(CudfDecimalTest, decimalSumStateRoundTripDecimal64) { + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + std::vector sums = {100, -200, 300, 400}; + std::vector counts = {1, 0, 2, 3}; + std::vector sumValid = {true, true, false, true}; + std::vector countValid = {true, true, true, false}; + + auto sumCol = makeDecimalColumn(sums, 2, &sumValid, stream); + auto countCol = makeInt64Column(counts, &countValid, stream); + auto stateCol = + serializeDecimalSumState(sumCol->view(), countCol->view(), stream, mr); + auto stateMask = copyNullMask(stateCol->view(), stream); + + auto decoded = + deserializeDecimalSumStateWithCount(stateCol->view(), 2, stream, mr); + auto outSumView = decoded.sum->view(); + auto outCountView = decoded.count->view(); + auto outSum = copyColumnData<__int128_t>(outSumView, stream); + auto outCount = copyColumnData(outCountView, stream); + auto outSumMask = copyNullMask(outSumView, stream); + auto outCountMask = copyNullMask(outCountView, stream); + + EXPECT_EQ(stateMask, outSumMask); + EXPECT_EQ(stateMask, outCountMask); + + for (size_t i = 0; i < sums.size(); ++i) { + bool expectedValid = sumValid[i] && countValid[i] && counts[i] != 0; + EXPECT_EQ(isValidAt(stateMask, i), expectedValid); + if (expectedValid) { + EXPECT_EQ(outSum[i], static_cast<__int128_t>(sums[i])); + EXPECT_EQ(outCount[i], counts[i]); + } + } +} + +TEST_F(CudfDecimalTest, decimalSumStateRoundTripDecimal128) { + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + std::vector<__int128_t> sums = { + static_cast<__int128_t>(123450), + static_cast<__int128_t>(-25000), + static_cast<__int128_t>(100000), + }; + std::vector counts = {2, 1, 0}; + std::vector sumValid = {true, false, true}; + std::vector countValid = {true, true, true}; + + auto sumCol = makeDecimalColumn<__int128_t>(sums, 3, &sumValid, stream); + auto countCol = makeInt64Column(counts, &countValid, stream); + auto stateCol = + serializeDecimalSumState(sumCol->view(), countCol->view(), stream, mr); + auto stateMask = copyNullMask(stateCol->view(), stream); + + auto decoded = + deserializeDecimalSumStateWithCount(stateCol->view(), 3, stream, mr); + auto outSumView = decoded.sum->view(); + auto outCountView = decoded.count->view(); + auto outSum = copyColumnData<__int128_t>(outSumView, stream); + auto outCount = copyColumnData(outCountView, stream); + auto outSumMask = copyNullMask(outSumView, stream); + auto outCountMask = copyNullMask(outCountView, stream); + + EXPECT_EQ(stateMask, outSumMask); + EXPECT_EQ(stateMask, outCountMask); + + for (size_t i = 0; i < sums.size(); ++i) { + bool expectedValid = sumValid[i] && countValid[i] && counts[i] != 0; + EXPECT_EQ(isValidAt(stateMask, i), expectedValid); + if (expectedValid) { + EXPECT_EQ(outSum[i], sums[i]); + EXPECT_EQ(outCount[i], counts[i]); + } + } +} + +TEST_F(CudfDecimalTest, cudfVarbinaryArrowRoundTrip) { + auto input = makeRowVector( + {"bin"}, + {makeNullableFlatVector( + {std::string("abc"), std::nullopt, std::string("xyz")}, + VARBINARY())}); + + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + auto cudfTable = with_arrow::toCudfTable(input, pool(), stream, mr); + auto roundTrip = + with_arrow::toVeloxColumn(cudfTable->view(), pool(), "rt_", stream, mr); + + ASSERT_EQ(roundTrip->childAt(0)->type()->kind(), TypeKind::VARCHAR); + VELOX_ASSERT_THROW( + roundTrip->setType(ROW({{"rt_0", VARBINARY()}})), + "Cannot change vector type"); + + auto expected = makeRowVector( + {"rt_0"}, + {makeNullableFlatVector( + {std::string("abc"), std::nullopt, std::string("xyz")}, VARCHAR())}); + + facebook::velox::test::assertEqualVectors(expected, roundTrip); +} + +TEST_F(CudfDecimalTest, cudfVarbinaryArrowRoundTripWithExpectedType) { + auto input = makeRowVector( + {"bin"}, + {makeNullableFlatVector( + {std::string("abc"), std::nullopt, std::string("xyz")}, + VARBINARY())}); + + auto expectedType = ROW({{"bin", VARBINARY()}}); + + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + auto cudfTable = with_arrow::toCudfTable(input, pool(), stream, mr); + auto roundTrip = with_arrow::toVeloxColumn( + cudfTable->view(), pool(), expectedType, "rt_", stream, mr); + + ASSERT_EQ(roundTrip->childAt(0)->type()->kind(), TypeKind::VARBINARY); + + auto expected = makeRowVector( + {"rt_0"}, + {makeNullableFlatVector( + {std::string("abc"), std::nullopt, std::string("xyz")}, + VARBINARY())}); + + facebook::velox::test::assertEqualVectors(expected, roundTrip); +} + +TEST_F(CudfDecimalTest, cudfVarbinaryRowTypeMismatch) { + auto input = makeRowVector( + {"l_returnflag", + "l_linestatus", + "avg_51", + "avg_52", + "avg_53", + "count_54", + "sum_47", + "sum_48", + "sum_49", + "sum_50"}, + {makeFlatVector({"A", "B"}, VARCHAR()), + makeFlatVector({"F", "O"}, VARCHAR()), + makeFlatVector({"x", "y"}, VARBINARY()), + makeFlatVector({"p", "q"}, VARBINARY()), + makeFlatVector({"m", "n"}, VARBINARY()), + makeFlatVector({10, 20}, BIGINT()), + makeFlatVector({"u", "v"}, VARBINARY()), + makeFlatVector({"r", "s"}, VARBINARY()), + makeFlatVector({"t", "w"}, VARBINARY()), + makeFlatVector({"c", "d"}, VARBINARY())}); + + auto expectedType = ROW({ + {"l_returnflag", VARCHAR()}, + {"l_linestatus", VARCHAR()}, + {"avg_51", VARBINARY()}, + {"avg_52", VARBINARY()}, + {"avg_53", VARBINARY()}, + {"count_54", BIGINT()}, + {"sum_47", VARBINARY()}, + {"sum_48", VARBINARY()}, + {"sum_49", VARBINARY()}, + {"sum_50", VARBINARY()}, + }); + + auto stream = cudf::get_default_stream(); + auto mr = cudf::get_current_device_resource_ref(); + auto cudfTable = with_arrow::toCudfTable(input, pool(), stream, mr); + auto roundTrip = + with_arrow::toVeloxColumn(cudfTable->view(), pool(), "rt_", stream, mr); + + ASSERT_EQ(roundTrip->childAt(2)->type()->kind(), TypeKind::VARCHAR); + ASSERT_EQ(roundTrip->childAt(3)->type()->kind(), TypeKind::VARCHAR); + ASSERT_EQ(roundTrip->childAt(4)->type()->kind(), TypeKind::VARCHAR); + ASSERT_EQ(roundTrip->childAt(6)->type()->kind(), TypeKind::VARCHAR); + ASSERT_EQ(roundTrip->childAt(7)->type()->kind(), TypeKind::VARCHAR); + ASSERT_EQ(roundTrip->childAt(8)->type()->kind(), TypeKind::VARCHAR); + ASSERT_EQ(roundTrip->childAt(9)->type()->kind(), TypeKind::VARCHAR); + + VELOX_ASSERT_THROW( + roundTrip->setType(expectedType), "Cannot change vector type"); +} + +} // namespace +} // namespace facebook::velox::cudf_velox