diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 898eacd18644..5d954d5db9f3 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -466,7 +466,7 @@ object VeloxConfig extends ConfigRegistry { "Whether to apply dynamic filters pushed down from hash probe in the ValueStream" + " (shuffle reader) operator to filter rows before they reach the hash join.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COLUMNAR_VELOX_FILE_HANDLE_CACHE_ENABLED = buildStaticConf("spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled") diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index 58dd301b6968..7b3de5cc15a9 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -128,6 +128,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS memory/MemoryManager.cc memory/ArrowMemoryPool.cc memory/ColumnarBatch.cc + shuffle/BlockStatistics.cc shuffle/Dictionary.cc shuffle/FallbackRangePartitioner.cc shuffle/HashPartitioner.cc diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index c8cd3adef457..c56368f8aa41 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -962,6 +962,14 @@ Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition numSubDirs, enableDictionary); + // Reuse the dynamic filter config to also enable block statistics collection, + // since stats are only useful when dynamic filter pushdown is active. + const auto& confMap = ctx->getConfMap(); + auto it = confMap.find("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled"); + if (it != confMap.end() && it->second == "true") { + partitionWriterOptions->blockStatisticsEnabled = true; + } + auto partitionWriter = std::make_shared( numPartitions, createCompressionCodec( diff --git a/cpp/core/shuffle/BlockStatistics.cc b/cpp/core/shuffle/BlockStatistics.cc new file mode 100644 index 000000000000..49b36f96eaba --- /dev/null +++ b/cpp/core/shuffle/BlockStatistics.cc @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "shuffle/BlockStatistics.h" + +#include +#include +#include + +namespace gluten { +namespace { + +// Returns true if the row at the given index is valid (non-null). +inline bool isRowValid(const std::shared_ptr& validityBuffer, uint32_t row) { + if (!validityBuffer) { + return true; // No validity buffer means all rows are valid. + } + return arrow::bit_util::GetBit(validityBuffer->data(), row); +} + +// Returns true if the column has any null rows. +bool hasAnyNull(const std::shared_ptr& validityBuffer, uint32_t numRows) { + if (!validityBuffer || numRows == 0) { + return false; + } + // Check each bit — return early on first null found. + for (uint32_t i = 0; i < numRows; ++i) { + if (!arrow::bit_util::GetBit(validityBuffer->data(), i)) { + return true; + } + } + return false; +} + +template +void writeBytes(uint8_t*& dst, T value) { + memcpy(dst, &value, sizeof(T)); + dst += sizeof(T); +} + +template +T readBytes(const uint8_t*& src) { + T value; + memcpy(&value, src, sizeof(T)); + src += sizeof(T); + return value; +} + +template +void scanColumnMinMax( + const std::shared_ptr& validityBuffer, + const std::shared_ptr& valueBuffer, + uint32_t numRows, + ColumnStatistics& stats) { + if (!valueBuffer || valueBuffer->size() == 0 || numRows == 0) { + return; + } + + const auto* values = reinterpret_cast(valueBuffer->data()); + bool foundAny = false; + T minVal{}; + T maxVal{}; + + for (uint32_t i = 0; i < numRows; ++i) { + if (!isRowValid(validityBuffer, i)) { + continue; + } + T val = values[i]; + if (!foundAny) { + minVal = val; + maxVal = val; + foundAny = true; + } else { + if (val < minVal) { + minVal = val; + } + if (val > maxVal) { + maxVal = val; + } + } + } + + if (foundAny) { + stats.hasStats = true; + stats.setMin(minVal); + stats.setMax(maxVal); + } +} + +} // namespace + +void ColumnStatistics::merge(const ColumnStatistics& other) { + hasNull = hasNull || other.hasNull; + if (!other.hasStats) { + return; + } + if (!hasStats) { + hasStats = true; + memcpy(minBytes, other.minBytes, 8); + memcpy(maxBytes, other.maxBytes, 8); + return; + } + // Both have stats — merge based on type. + switch (static_cast(typeId)) { + case arrow::Type::INT8: + mergeTyped(other); + break; + case arrow::Type::INT16: + mergeTyped(other); + break; + case arrow::Type::INT32: + case arrow::Type::DATE32: + mergeTyped(other); + break; + case arrow::Type::INT64: + case arrow::Type::DATE64: + case arrow::Type::TIMESTAMP: + mergeTyped(other); + break; + case arrow::Type::FLOAT: + mergeTyped(other); + break; + case arrow::Type::DOUBLE: + mergeTyped(other); + break; + default: + break; + } +} + +arrow::Status BlockStatistics::serialize(arrow::io::OutputStream* out, int64_t payloadSize) const { + uint32_t size = serializedSize(); + std::vector buffer(size); + uint8_t* ptr = buffer.data(); + + writeBytes(ptr, kVersion); + writeBytes(ptr, static_cast(columnStats.size())); + writeBytes(ptr, payloadSize); + + for (const auto& col : columnStats) { + col.serialize(ptr); + } + + return out->Write(buffer.data(), size); +} + +arrow::Result> BlockStatistics::deserialize(arrow::io::InputStream* in) { + // Read version. + uint8_t version; + ARROW_ASSIGN_OR_RAISE(auto bytesRead, in->Read(sizeof(version), &version)); + if (bytesRead != sizeof(version) || version != kVersion) { + return arrow::Status::Invalid("Unsupported BlockStatistics version: ", static_cast(version)); + } + + // Read numColumns. + uint16_t numColumns; + ARROW_ASSIGN_OR_RAISE(bytesRead, in->Read(sizeof(numColumns), &numColumns)); + if (bytesRead != sizeof(numColumns)) { + return arrow::Status::IOError("Unexpected end of stream reading BlockStatistics numColumns"); + } + + // Read payloadSize. + int64_t payloadSize; + ARROW_ASSIGN_OR_RAISE(bytesRead, in->Read(sizeof(payloadSize), &payloadSize)); + if (bytesRead != sizeof(payloadSize)) { + return arrow::Status::IOError("Unexpected end of stream reading BlockStatistics payloadSize"); + } + + BlockStatistics stats; + stats.columnStats.reserve(numColumns); + + for (uint16_t i = 0; i < numColumns; ++i) { + uint8_t buf[ColumnStatistics::kSerializedSize]; + ARROW_ASSIGN_OR_RAISE(bytesRead, in->Read(sizeof(buf), buf)); + if (bytesRead != sizeof(buf)) { + return arrow::Status::IOError("Unexpected end of stream reading BlockStatistics column ", i); + } + const uint8_t* ptr = buf; + stats.columnStats.push_back(ColumnStatistics::deserialize(ptr)); + } + + return std::make_pair(std::move(stats), payloadSize); +} + +void BlockStatistics::merge(const BlockStatistics& other) { + for (size_t i = 0; i < columnStats.size() && i < other.columnStats.size(); ++i) { + columnStats[i].merge(other.columnStats[i]); + } +} + +BlockStatistics computeBlockStatistics( + const std::shared_ptr& schema, + const std::vector>& buffers, + uint32_t numRows, + bool hasComplexType) { + BlockStatistics result; + if (numRows == 0 || buffers.empty()) { + return result; + } + + uint32_t bufIdx = 0; + auto numFields = schema->num_fields(); + + for (int fieldIdx = 0; fieldIdx < numFields; ++fieldIdx) { + auto typeId = schema->field(fieldIdx)->type()->id(); + + switch (typeId) { + case arrow::Type::BINARY: + case arrow::Type::STRING: + case arrow::Type::LARGE_BINARY: + case arrow::Type::LARGE_STRING: { + if (bufIdx + 3 > buffers.size()) { + break; + } + auto validityBuf = buffers[bufIdx++]; // validity + bufIdx++; // length (skip) + bufIdx++; // value (skip) + + ColumnStatistics col{}; + col.columnIndex = static_cast(fieldIdx); + col.typeId = static_cast(typeId); + col.hasNull = hasAnyNull(validityBuf, numRows); + col.hasStats = false; // String stats not supported yet. + result.columnStats.push_back(col); + break; + } + case arrow::Type::STRUCT: + case arrow::Type::MAP: + case arrow::Type::LIST: + case arrow::Type::LARGE_LIST: + // Complex types are skipped in assembleBuffers() per-field loop. + // Their buffer is appended at the end. No stats for them. + break; + case arrow::Type::NA: + // Null type has no buffers. + break; + case arrow::Type::BOOL: { + if (bufIdx + 2 > buffers.size()) { + break; + } + auto validityBuf = buffers[bufIdx++]; // validity + bufIdx++; // value (bit-packed, skip for stats) + + ColumnStatistics col{}; + col.columnIndex = static_cast(fieldIdx); + col.typeId = static_cast(typeId); + col.hasNull = hasAnyNull(validityBuf, numRows); + col.hasStats = false; // Bool stats not useful. + result.columnStats.push_back(col); + break; + } + default: { + // Fixed-width numeric types. + if (bufIdx + 2 > buffers.size()) { + break; + } + auto validityBuf = buffers[bufIdx++]; // validity + auto valueBuf = buffers[bufIdx++]; // value + + ColumnStatistics col{}; + col.columnIndex = static_cast(fieldIdx); + col.typeId = static_cast(typeId); + col.hasNull = hasAnyNull(validityBuf, numRows); + col.hasStats = false; + + switch (typeId) { + case arrow::Type::INT8: + scanColumnMinMax(validityBuf, valueBuf, numRows, col); + break; + case arrow::Type::INT16: + scanColumnMinMax(validityBuf, valueBuf, numRows, col); + break; + case arrow::Type::INT32: + case arrow::Type::DATE32: + scanColumnMinMax(validityBuf, valueBuf, numRows, col); + break; + case arrow::Type::INT64: + case arrow::Type::DATE64: + case arrow::Type::TIMESTAMP: + scanColumnMinMax(validityBuf, valueBuf, numRows, col); + break; + case arrow::Type::FLOAT: + scanColumnMinMax(validityBuf, valueBuf, numRows, col); + break; + case arrow::Type::DOUBLE: + scanColumnMinMax(validityBuf, valueBuf, numRows, col); + break; + default: + // Unsupported type for min/max stats. + break; + } + + result.columnStats.push_back(col); + break; + } + } + } + + return result; +} + +} // namespace gluten diff --git a/cpp/core/shuffle/BlockStatistics.h b/cpp/core/shuffle/BlockStatistics.h new file mode 100644 index 000000000000..0a301d45a85d --- /dev/null +++ b/cpp/core/shuffle/BlockStatistics.h @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace arrow { +class Buffer; +class Schema; +} // namespace arrow + +namespace gluten { + +/// Per-column min/max statistics for a shuffle block. +struct ColumnStatistics { + uint16_t columnIndex; // Index in the schema. + uint8_t typeId; // Arrow type ID. + bool hasNull; // Whether the column contains nulls in this block. + bool hasStats; // Whether min/max are valid (false for unsupported types). + + // Raw bytes for min/max. Interpretation depends on typeId. + uint8_t minBytes[8]{}; + uint8_t maxBytes[8]{}; + + static constexpr uint32_t kSerializedSize = sizeof(uint16_t) + // columnIndex + sizeof(uint8_t) + // typeId + sizeof(uint8_t) + // flags (hasNull | hasStats) + 8 + // min + 8; // max + + template + void setMin(T value) { + static_assert(sizeof(T) <= 8); + memset(minBytes, 0, 8); + memcpy(minBytes, &value, sizeof(T)); + } + + template + void setMax(T value) { + static_assert(sizeof(T) <= 8); + memset(maxBytes, 0, 8); + memcpy(maxBytes, &value, sizeof(T)); + } + + template + T getMin() const { + T value{}; + memcpy(&value, minBytes, sizeof(T)); + return value; + } + + template + T getMax() const { + T value{}; + memcpy(&value, maxBytes, sizeof(T)); + return value; + } + + void serialize(uint8_t*& dst) const { + memcpy(dst, &columnIndex, sizeof(columnIndex)); + dst += sizeof(columnIndex); + memcpy(dst, &typeId, sizeof(typeId)); + dst += sizeof(typeId); + uint8_t flags = (hasNull ? 1u : 0u) | (hasStats ? 2u : 0u); + memcpy(dst, &flags, sizeof(flags)); + dst += sizeof(flags); + memcpy(dst, minBytes, 8); + dst += 8; + memcpy(dst, maxBytes, 8); + dst += 8; + } + + static ColumnStatistics deserialize(const uint8_t*& src) { + ColumnStatistics stats{}; + memcpy(&stats.columnIndex, src, sizeof(stats.columnIndex)); + src += sizeof(stats.columnIndex); + memcpy(&stats.typeId, src, sizeof(stats.typeId)); + src += sizeof(stats.typeId); + uint8_t flags; + memcpy(&flags, src, sizeof(flags)); + src += sizeof(flags); + stats.hasNull = (flags & 1u) != 0; + stats.hasStats = (flags & 2u) != 0; + memcpy(stats.minBytes, src, 8); + src += 8; + memcpy(stats.maxBytes, src, 8); + src += 8; + return stats; + } + + /// Merge another ColumnStatistics into this one (for merging payloads). + void merge(const ColumnStatistics& other); + + private: + template + void mergeTyped(const ColumnStatistics& other) { + auto myMin = getMin(); + auto otherMin = other.getMin(); + if (otherMin < myMin) { + setMin(otherMin); + } + auto myMax = getMax(); + auto otherMax = other.getMax(); + if (otherMax > myMax) { + setMax(otherMax); + } + } +}; + +/// Block-level statistics containing per-column min/max for a shuffle block. +struct BlockStatistics { + static constexpr uint8_t kVersion = 1; + + std::vector columnStats; + + /// Byte size of the serialized stats header (excluding the BlockType byte). + uint32_t serializedSize() const { + return sizeof(uint8_t) + // version + sizeof(uint16_t) + // numColumns + sizeof(int64_t) + // payloadSize + static_cast(columnStats.size()) * ColumnStatistics::kSerializedSize; + } + + /// Serialize to output stream. payloadSize is the byte size of the + /// following payload block (BlockType byte + serialized payload data). + arrow::Status serialize(arrow::io::OutputStream* out, int64_t payloadSize) const; + + /// Deserialize from input stream. Returns (stats, payloadSize). + static arrow::Result> deserialize(arrow::io::InputStream* in); + + /// Merge another BlockStatistics into this one. + void merge(const BlockStatistics& other); +}; + +/// Compute block-level statistics from assembled Arrow buffers. +/// The buffer layout must match the assembleBuffers() output: +/// for each field in schema order, fixed-width fields produce +/// [validity, value], binary fields produce [validity, length, value], +/// null/complex types are skipped (complex buffer appended at end). +BlockStatistics computeBlockStatistics( + const std::shared_ptr& schema, + const std::vector>& buffers, + uint32_t numRows, + bool hasComplexType); + +} // namespace gluten diff --git a/cpp/core/shuffle/Dictionary.h b/cpp/core/shuffle/Dictionary.h index 246406d5aed4..d3f70174c8e6 100644 --- a/cpp/core/shuffle/Dictionary.h +++ b/cpp/core/shuffle/Dictionary.h @@ -23,7 +23,13 @@ namespace gluten { -enum class BlockType : uint8_t { kEndOfStream = 0, kPlainPayload = 1, kDictionary = 2, kDictionaryPayload = 3 }; +enum class BlockType : uint8_t { + kEndOfStream = 0, + kPlainPayload = 1, + kDictionary = 2, + kDictionaryPayload = 3, + kStatisticsPayload = 4 +}; class ShuffleDictionaryStorage { public: diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 948a2b0e05ef..b4fed3fce337 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -297,12 +297,14 @@ class LocalPartitionWriter::PayloadCache { arrow::util::Codec* codec, int32_t compressionThreshold, bool enableDictionary, + bool blockStatisticsEnabled, arrow::MemoryPool* pool, MemoryManager* memoryManager) : numPartitions_(numPartitions), codec_(codec), compressionThreshold_(compressionThreshold), enableDictionary_(enableDictionary), + blockStatisticsEnabled_(blockStatisticsEnabled), pool_(pool), memoryManager_(memoryManager) {} @@ -346,6 +348,15 @@ class LocalPartitionWriter::PayloadCache { // Write the cached payload to disk. uint8_t blockType = static_cast(hasDictionaries ? BlockType::kDictionaryPayload : BlockType::kPlainPayload); + + // Write statistics block before non-dictionary payloads if enabled. + if (blockStatisticsEnabled_ && !hasDictionaries && payload->hasBlockStats()) { + static constexpr uint8_t kStatsBlockType = static_cast(BlockType::kStatisticsPayload); + RETURN_NOT_OK(os->Write(&kStatsBlockType, sizeof(kStatsBlockType))); + int64_t payloadSize = sizeof(blockType) + payload->serializedSize(); + RETURN_NOT_OK(payload->blockStats()->serialize(os, payloadSize)); + } + RETURN_NOT_OK(os->Write(&blockType, sizeof(blockType))); RETURN_NOT_OK(payload->serialize(os)); @@ -396,6 +407,15 @@ class LocalPartitionWriter::PayloadCache { // Spill the cached payload to disk. uint8_t blockType = static_cast(hasDictionaries ? BlockType::kDictionaryPayload : BlockType::kPlainPayload); + + // Write statistics block before non-dictionary payloads if enabled. + if (blockStatisticsEnabled_ && !hasDictionaries && payload->hasBlockStats()) { + static constexpr uint8_t kStatsBlockType = static_cast(BlockType::kStatisticsPayload); + RETURN_NOT_OK(os->Write(&kStatsBlockType, sizeof(kStatsBlockType))); + int64_t payloadSize = sizeof(blockType) + payload->serializedSize(); + RETURN_NOT_OK(payload->blockStats()->serialize(os.get(), payloadSize)); + } + RETURN_NOT_OK(os->Write(&blockType, sizeof(blockType))); RETURN_NOT_OK(payload->serialize(os.get())); @@ -483,6 +503,7 @@ class LocalPartitionWriter::PayloadCache { arrow::util::Codec* codec_; int32_t compressionThreshold_; bool enableDictionary_; + bool blockStatisticsEnabled_; arrow::MemoryPool* pool_; MemoryManager* memoryManager_; @@ -695,6 +716,7 @@ arrow::Status LocalPartitionWriter::finishMerger() { codec_.get(), options_->compressionThreshold, options_->enableDictionary, + options_->blockStatisticsEnabled, payloadPool_.get(), memoryManager_); } @@ -744,6 +766,7 @@ arrow::Status LocalPartitionWriter::hashEvict( codec_.get(), options_->compressionThreshold, options_->enableDictionary, + options_->blockStatisticsEnabled, payloadPool_.get(), memoryManager_); } diff --git a/cpp/core/shuffle/LocalPartitionWriter.h b/cpp/core/shuffle/LocalPartitionWriter.h index 113e5a3cfd2f..ae63e912341c 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.h +++ b/cpp/core/shuffle/LocalPartitionWriter.h @@ -81,6 +81,10 @@ class LocalPartitionWriter : public PartitionWriter { // 3. After stop() called, arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override; + bool blockStatisticsEnabled() const override { + return options_ && options_->blockStatisticsEnabled; + } + protected: class LocalSpiller; diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 2139c6e9d724..9966c2d4aa0e 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -175,6 +175,7 @@ struct LocalPartitionWriterOptions { int32_t numSubDirs = kDefaultNumSubDirs; // spark.diskStore.subDirectories bool enableDictionary = kDefaultEnableDictionary; + bool blockStatisticsEnabled = false; LocalPartitionWriterOptions() = default; diff --git a/cpp/core/shuffle/PartitionWriter.h b/cpp/core/shuffle/PartitionWriter.h index ebb86004cb88..a841085676f2 100644 --- a/cpp/core/shuffle/PartitionWriter.h +++ b/cpp/core/shuffle/PartitionWriter.h @@ -67,6 +67,11 @@ class PartitionWriter : public Reclaimable { virtual arrow::Status evict(uint32_t partitionId, std::unique_ptr blockPayload, bool stop, int64_t& evictBytes) = 0; + /// Returns true if block-level statistics should be computed for payloads. + virtual bool blockStatisticsEnabled() const { + return false; + } + uint64_t cachedPayloadSize() { return payloadPool_->bytes_allocated(); } diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index 27e7d9c85cbb..b5246c725b6b 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -277,6 +277,30 @@ arrow::Status BlockPayload::serialize(arrow::io::OutputStream* outputStream) { return arrow::Status::OK(); } +int64_t BlockPayload::serializedSize() const { + switch (type_) { + case Type::kUncompressed: { + int64_t size = sizeof(Type) + sizeof(uint32_t) + sizeof(uint32_t); // type + numRows + numBuffers + for (const auto& buffer : buffers_) { + size += sizeof(int64_t); // buffer size field + if (buffer && buffer->size() > 0) { + size += buffer->size(); + } + } + return size; + } + case Type::kCompressed: { + int64_t size = sizeof(Type) + sizeof(uint32_t) + sizeof(uint32_t); // type + numRows + numBuffers + if (!buffers_.empty() && buffers_[0]) { + size += buffers_[0]->size(); + } + return size; + } + default: + return 0; + } +} + arrow::Result> BlockPayload::readBufferAt(uint32_t pos) { if (type_ == Type::kCompressed) { return arrow::Status::Invalid("Cannot read buffer from compressed BlockPayload."); @@ -413,12 +437,27 @@ arrow::Result> InMemoryPayload::merge( } } } - return std::make_unique(mergedRows, isValidityBuffer, source->schema(), std::move(merged)); + auto result = std::make_unique(mergedRows, isValidityBuffer, source->schema(), std::move(merged)); + // Merge block statistics if both payloads have them. + if (source->hasBlockStats() && append->hasBlockStats()) { + auto mergedStats = *source->blockStats_; + mergedStats.merge(*append->blockStats_); + result->setBlockStats(std::move(mergedStats)); + } else if (source->hasBlockStats()) { + result->setBlockStats(*source->blockStats_); + } else if (append->hasBlockStats()) { + result->setBlockStats(*append->blockStats_); + } + return result; } arrow::Result> InMemoryPayload::toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec) { - return BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec); + auto result = BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec); + if (result.ok() && blockStats_.has_value()) { + (*result)->setBlockStats(std::move(*blockStats_)); + } + return result; } arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream* outputStream) { diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h index dfe98cafd563..62086ba01621 100644 --- a/cpp/core/shuffle/Payload.h +++ b/cpp/core/shuffle/Payload.h @@ -21,6 +21,7 @@ #include #include +#include "shuffle/BlockStatistics.h" #include "shuffle/Dictionary.h" #include "shuffle/Options.h" #include "shuffle/Utils.h" @@ -61,12 +62,25 @@ class Payload { std::string toString() const; + void setBlockStats(BlockStatistics stats) { + blockStats_ = std::move(stats); + } + + const std::optional& blockStats() const { + return blockStats_; + } + + bool hasBlockStats() const { + return blockStats_.has_value(); + } + protected: Type type_; uint32_t numRows_; const std::vector* isValidityBuffer_; int64_t compressTime_{0}; int64_t writeTime_{0}; + std::optional blockStats_; }; // A block represents data to be cached in-memory. @@ -95,6 +109,10 @@ class BlockPayload final : public Payload { arrow::Status serialize(arrow::io::OutputStream* outputStream) override; + /// Returns the number of bytes that serialize() would write. + /// Only valid for kUncompressed and kCompressed payloads. + int64_t serializedSize() const; + arrow::Result> readBufferAt(uint32_t pos); int64_t rawSize() override; @@ -149,6 +167,10 @@ class InMemoryPayload final : public Payload { std::shared_ptr schema() const; + const std::vector>& getBuffers() const { + return buffers_; + } + arrow::Status createDictionaries(const std::shared_ptr& dictionaryWriter); private: diff --git a/cpp/core/tests/BlockStatisticsTest.cc b/cpp/core/tests/BlockStatisticsTest.cc new file mode 100644 index 000000000000..bc772f43708f --- /dev/null +++ b/cpp/core/tests/BlockStatisticsTest.cc @@ -0,0 +1,609 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "shuffle/BlockStatistics.h" +#include "shuffle/Payload.h" + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace gluten { + +class BlockStatisticsTest : public ::testing::Test { + protected: + // Build a validity buffer where bits at the given indices are unset (null). + std::shared_ptr makeValidityBuffer(uint32_t numRows, const std::vector& nullIndices) { + auto byteCount = arrow::bit_util::BytesForBits(numRows); + auto buf = arrow::AllocateBuffer(byteCount).ValueOrDie(); + // Start with all valid. + memset(buf->mutable_data(), 0xFF, byteCount); + for (auto idx : nullIndices) { + arrow::bit_util::ClearBit(buf->mutable_data(), idx); + } + return buf; + } + + // Build a value buffer from a vector of typed values. + template + std::shared_ptr makeValueBuffer(const std::vector& values) { + auto byteSize = static_cast(values.size() * sizeof(T)); + auto buf = arrow::AllocateBuffer(byteSize).ValueOrDie(); + memcpy(buf->mutable_data(), values.data(), byteSize); + return buf; + } +}; + +TEST_F(BlockStatisticsTest, ColumnStatisticsSetGetInt64) { + ColumnStatistics col{}; + col.setMin(-42); + col.setMax(100); + ASSERT_EQ(col.getMin(), -42); + ASSERT_EQ(col.getMax(), 100); +} + +TEST_F(BlockStatisticsTest, ColumnStatisticsSetGetFloat) { + ColumnStatistics col{}; + col.setMin(-1.5f); + col.setMax(3.14f); + ASSERT_FLOAT_EQ(col.getMin(), -1.5f); + ASSERT_FLOAT_EQ(col.getMax(), 3.14f); +} + +TEST_F(BlockStatisticsTest, ColumnStatisticsSetGetDouble) { + ColumnStatistics col{}; + col.setMin(-99.99); + col.setMax(1e18); + ASSERT_DOUBLE_EQ(col.getMin(), -99.99); + ASSERT_DOUBLE_EQ(col.getMax(), 1e18); +} + +TEST_F(BlockStatisticsTest, ColumnStatisticsSerializeDeserialize) { + ColumnStatistics original{}; + original.columnIndex = 7; + original.typeId = static_cast(arrow::Type::INT64); + original.hasNull = true; + original.hasStats = true; + original.setMin(-1000); + original.setMax(2000); + + uint8_t buf[ColumnStatistics::kSerializedSize]; + uint8_t* ptr = buf; + original.serialize(ptr); + ASSERT_EQ(ptr - buf, ColumnStatistics::kSerializedSize); + + const uint8_t* readPtr = buf; + auto restored = ColumnStatistics::deserialize(readPtr); + ASSERT_EQ(readPtr - buf, ColumnStatistics::kSerializedSize); + + ASSERT_EQ(restored.columnIndex, 7); + ASSERT_EQ(restored.typeId, static_cast(arrow::Type::INT64)); + ASSERT_TRUE(restored.hasNull); + ASSERT_TRUE(restored.hasStats); + ASSERT_EQ(restored.getMin(), -1000); + ASSERT_EQ(restored.getMax(), 2000); +} + +TEST_F(BlockStatisticsTest, ColumnStatisticsMergeInt64) { + ColumnStatistics a{}; + a.typeId = static_cast(arrow::Type::INT64); + a.hasNull = false; + a.hasStats = true; + a.setMin(10); + a.setMax(50); + + ColumnStatistics b{}; + b.typeId = static_cast(arrow::Type::INT64); + b.hasNull = true; + b.hasStats = true; + b.setMin(5); + b.setMax(30); + + a.merge(b); + ASSERT_TRUE(a.hasNull); + ASSERT_TRUE(a.hasStats); + ASSERT_EQ(a.getMin(), 5); + ASSERT_EQ(a.getMax(), 50); +} + +TEST_F(BlockStatisticsTest, ColumnStatisticsMergeDouble) { + ColumnStatistics a{}; + a.typeId = static_cast(arrow::Type::DOUBLE); + a.hasNull = false; + a.hasStats = true; + a.setMin(1.5); + a.setMax(9.9); + + ColumnStatistics b{}; + b.typeId = static_cast(arrow::Type::DOUBLE); + b.hasNull = false; + b.hasStats = true; + b.setMin(-0.5); + b.setMax(5.0); + + a.merge(b); + ASSERT_FALSE(a.hasNull); + ASSERT_DOUBLE_EQ(a.getMin(), -0.5); + ASSERT_DOUBLE_EQ(a.getMax(), 9.9); +} + +TEST_F(BlockStatisticsTest, ColumnStatisticsMergeOneEmpty) { + ColumnStatistics a{}; + a.typeId = static_cast(arrow::Type::INT32); + a.hasNull = false; + a.hasStats = false; // no stats yet + + ColumnStatistics b{}; + b.typeId = static_cast(arrow::Type::INT32); + b.hasNull = true; + b.hasStats = true; + b.setMin(100); + b.setMax(200); + + a.merge(b); + ASSERT_TRUE(a.hasNull); + ASSERT_TRUE(a.hasStats); + ASSERT_EQ(a.getMin(), 100); + ASSERT_EQ(a.getMax(), 200); +} + +TEST_F(BlockStatisticsTest, BlockStatisticsSerializeDeserialize) { + BlockStatistics original; + + ColumnStatistics c0{}; + c0.columnIndex = 0; + c0.typeId = static_cast(arrow::Type::INT32); + c0.hasNull = false; + c0.hasStats = true; + c0.setMin(-5); + c0.setMax(42); + + ColumnStatistics c1{}; + c1.columnIndex = 1; + c1.typeId = static_cast(arrow::Type::DOUBLE); + c1.hasNull = true; + c1.hasStats = true; + c1.setMin(-1.0); + c1.setMax(99.5); + + original.columnStats.push_back(c0); + original.columnStats.push_back(c1); + + const int64_t fakePayloadSize = 12345; + + // Serialize to an in-memory stream. + auto sink = arrow::io::BufferOutputStream::Create().ValueOrDie(); + ASSERT_TRUE(original.serialize(sink.get(), fakePayloadSize).ok()); + auto serialized = sink->Finish().ValueOrDie(); + + ASSERT_EQ(static_cast(serialized->size()), original.serializedSize()); + + // Deserialize. + auto source = std::make_shared(serialized); + auto result = BlockStatistics::deserialize(source.get()); + ASSERT_TRUE(result.ok()); + + auto& [restored, payloadSize] = result.ValueOrDie(); + ASSERT_EQ(payloadSize, fakePayloadSize); + ASSERT_EQ(restored.columnStats.size(), 2u); + + ASSERT_EQ(restored.columnStats[0].columnIndex, 0); + ASSERT_EQ(restored.columnStats[0].getMin(), -5); + ASSERT_EQ(restored.columnStats[0].getMax(), 42); + ASSERT_FALSE(restored.columnStats[0].hasNull); + + ASSERT_EQ(restored.columnStats[1].columnIndex, 1); + ASSERT_DOUBLE_EQ(restored.columnStats[1].getMin(), -1.0); + ASSERT_DOUBLE_EQ(restored.columnStats[1].getMax(), 99.5); + ASSERT_TRUE(restored.columnStats[1].hasNull); +} + +TEST_F(BlockStatisticsTest, BlockStatisticsSerializeEmpty) { + BlockStatistics empty; + + auto sink = arrow::io::BufferOutputStream::Create().ValueOrDie(); + ASSERT_TRUE(empty.serialize(sink.get(), 0).ok()); + auto serialized = sink->Finish().ValueOrDie(); + + auto source = std::make_shared(serialized); + auto result = BlockStatistics::deserialize(source.get()); + ASSERT_TRUE(result.ok()); + + auto& [restored, payloadSize] = result.ValueOrDie(); + ASSERT_EQ(payloadSize, 0); + ASSERT_TRUE(restored.columnStats.empty()); +} + +TEST_F(BlockStatisticsTest, BlockStatisticsMerge) { + BlockStatistics a; + { + ColumnStatistics c{}; + c.columnIndex = 0; + c.typeId = static_cast(arrow::Type::INT64); + c.hasNull = false; + c.hasStats = true; + c.setMin(10); + c.setMax(20); + a.columnStats.push_back(c); + } + + BlockStatistics b; + { + ColumnStatistics c{}; + c.columnIndex = 0; + c.typeId = static_cast(arrow::Type::INT64); + c.hasNull = true; + c.hasStats = true; + c.setMin(5); + c.setMax(15); + b.columnStats.push_back(c); + } + + a.merge(b); + ASSERT_EQ(a.columnStats.size(), 1u); + ASSERT_TRUE(a.columnStats[0].hasNull); + ASSERT_EQ(a.columnStats[0].getMin(), 5); + ASSERT_EQ(a.columnStats[0].getMax(), 20); +} + +TEST_F(BlockStatisticsTest, ComputeInt32Column) { + // Schema: single INT32 column. + auto schema = arrow::schema({arrow::field("id", arrow::int32())}); + + uint32_t numRows = 5; + std::vector values = {10, -3, 42, 7, 0}; + + // Buffers: [validity(nullptr = all valid), value] + std::vector> buffers; + buffers.push_back(nullptr); // all valid + buffers.push_back(makeValueBuffer(values)); + + auto stats = computeBlockStatistics(schema, buffers, numRows, /*hasComplexType=*/false); + + ASSERT_EQ(stats.columnStats.size(), 1u); + auto& col = stats.columnStats[0]; + ASSERT_EQ(col.columnIndex, 0); + ASSERT_FALSE(col.hasNull); + ASSERT_TRUE(col.hasStats); + ASSERT_EQ(col.getMin(), -3); + ASSERT_EQ(col.getMax(), 42); +} + +TEST_F(BlockStatisticsTest, ComputeInt64ColumnWithNulls) { + auto schema = arrow::schema({arrow::field("id", arrow::int64())}); + + uint32_t numRows = 4; + std::vector values = {100, 200, 50, 300}; + + // Row 2 is null. + auto validity = makeValidityBuffer(numRows, {2}); + + std::vector> buffers; + buffers.push_back(validity); + buffers.push_back(makeValueBuffer(values)); + + auto stats = computeBlockStatistics(schema, buffers, numRows, false); + + ASSERT_EQ(stats.columnStats.size(), 1u); + auto& col = stats.columnStats[0]; + ASSERT_TRUE(col.hasNull); + ASSERT_TRUE(col.hasStats); + // Row 2 (value 50) is null and should be skipped. + ASSERT_EQ(col.getMin(), 100); + ASSERT_EQ(col.getMax(), 300); +} + +TEST_F(BlockStatisticsTest, ComputeDoubleColumn) { + auto schema = arrow::schema({arrow::field("val", arrow::float64())}); + + uint32_t numRows = 3; + std::vector values = {-1.5, 0.0, 99.9}; + + std::vector> buffers; + buffers.push_back(nullptr); // all valid + buffers.push_back(makeValueBuffer(values)); + + auto stats = computeBlockStatistics(schema, buffers, numRows, false); + + ASSERT_EQ(stats.columnStats.size(), 1u); + ASSERT_TRUE(stats.columnStats[0].hasStats); + ASSERT_DOUBLE_EQ(stats.columnStats[0].getMin(), -1.5); + ASSERT_DOUBLE_EQ(stats.columnStats[0].getMax(), 99.9); +} + +TEST_F(BlockStatisticsTest, ComputeFloatColumn) { + auto schema = arrow::schema({arrow::field("val", arrow::float32())}); + + uint32_t numRows = 4; + std::vector values = {2.5f, -0.1f, 7.0f, 3.0f}; + + std::vector> buffers; + buffers.push_back(nullptr); + buffers.push_back(makeValueBuffer(values)); + + auto stats = computeBlockStatistics(schema, buffers, numRows, false); + + ASSERT_EQ(stats.columnStats.size(), 1u); + ASSERT_TRUE(stats.columnStats[0].hasStats); + ASSERT_FLOAT_EQ(stats.columnStats[0].getMin(), -0.1f); + ASSERT_FLOAT_EQ(stats.columnStats[0].getMax(), 7.0f); +} + +TEST_F(BlockStatisticsTest, ComputeMultipleColumns) { + // Schema: INT32, DOUBLE + auto schema = arrow::schema({arrow::field("a", arrow::int32()), arrow::field("b", arrow::float64())}); + + uint32_t numRows = 3; + std::vector ints = {5, -10, 20}; + std::vector doubles = {1.0, 2.0, -3.0}; + + // Buffer layout: [a_validity, a_value, b_validity, b_value] + std::vector> buffers; + buffers.push_back(nullptr); // a validity + buffers.push_back(makeValueBuffer(ints)); // a value + buffers.push_back(nullptr); // b validity + buffers.push_back(makeValueBuffer(doubles)); // b value + + auto stats = computeBlockStatistics(schema, buffers, numRows, false); + + ASSERT_EQ(stats.columnStats.size(), 2u); + + // Column a (INT32) + ASSERT_EQ(stats.columnStats[0].columnIndex, 0); + ASSERT_TRUE(stats.columnStats[0].hasStats); + ASSERT_EQ(stats.columnStats[0].getMin(), -10); + ASSERT_EQ(stats.columnStats[0].getMax(), 20); + + // Column b (DOUBLE) + ASSERT_EQ(stats.columnStats[1].columnIndex, 1); + ASSERT_TRUE(stats.columnStats[1].hasStats); + ASSERT_DOUBLE_EQ(stats.columnStats[1].getMin(), -3.0); + ASSERT_DOUBLE_EQ(stats.columnStats[1].getMax(), 2.0); +} + +TEST_F(BlockStatisticsTest, ComputeWithStringColumn) { + // Schema: INT32, STRING + // Strings produce 3 buffers (validity, length, value) but no min/max stats. + auto schema = arrow::schema({arrow::field("id", arrow::int32()), arrow::field("name", arrow::utf8())}); + + uint32_t numRows = 2; + std::vector ints = {1, 2}; + + // Buffer layout: [id_validity, id_value, name_validity, name_length, name_value] + std::vector> buffers; + buffers.push_back(nullptr); // id validity + buffers.push_back(makeValueBuffer(ints)); // id value + buffers.push_back(nullptr); // name validity + buffers.push_back(arrow::AllocateBuffer(numRows * sizeof(uint32_t)).ValueOrDie()); // name length + buffers.push_back(arrow::AllocateBuffer(0).ValueOrDie()); // name value (empty) + + auto stats = computeBlockStatistics(schema, buffers, numRows, false); + + ASSERT_EQ(stats.columnStats.size(), 2u); + + // Column 0 (INT32) — has stats. + ASSERT_TRUE(stats.columnStats[0].hasStats); + ASSERT_EQ(stats.columnStats[0].getMin(), 1); + ASSERT_EQ(stats.columnStats[0].getMax(), 2); + + // Column 1 (STRING) — no min/max stats, but tracks nullability. + ASSERT_FALSE(stats.columnStats[1].hasStats); + ASSERT_FALSE(stats.columnStats[1].hasNull); +} + +TEST_F(BlockStatisticsTest, ComputeAllNullColumn) { + auto schema = arrow::schema({arrow::field("x", arrow::int64())}); + + uint32_t numRows = 3; + std::vector values = {0, 0, 0}; // values don't matter, all null + auto validity = makeValidityBuffer(numRows, {0, 1, 2}); // all null + + std::vector> buffers; + buffers.push_back(validity); + buffers.push_back(makeValueBuffer(values)); + + auto stats = computeBlockStatistics(schema, buffers, numRows, false); + + ASSERT_EQ(stats.columnStats.size(), 1u); + ASSERT_TRUE(stats.columnStats[0].hasNull); + ASSERT_FALSE(stats.columnStats[0].hasStats); // No non-null values → no min/max. +} + +TEST_F(BlockStatisticsTest, ComputeEmptyBlock) { + auto schema = arrow::schema({arrow::field("x", arrow::int32())}); + + std::vector> buffers; + + auto stats = computeBlockStatistics(schema, buffers, /*numRows=*/0, false); + ASSERT_TRUE(stats.columnStats.empty()); +} + +TEST_F(BlockStatisticsTest, ComputeSingleRow) { + auto schema = arrow::schema({arrow::field("x", arrow::int32())}); + + uint32_t numRows = 1; + std::vector values = {77}; + + std::vector> buffers; + buffers.push_back(nullptr); + buffers.push_back(makeValueBuffer(values)); + + auto stats = computeBlockStatistics(schema, buffers, numRows, false); + + ASSERT_EQ(stats.columnStats.size(), 1u); + ASSERT_TRUE(stats.columnStats[0].hasStats); + ASSERT_EQ(stats.columnStats[0].getMin(), 77); + ASSERT_EQ(stats.columnStats[0].getMax(), 77); +} + +TEST_F(BlockStatisticsTest, ComputeNegativeValues) { + auto schema = arrow::schema({arrow::field("x", arrow::int32())}); + + uint32_t numRows = 4; + std::vector values = {-100, -50, -200, -1}; + + std::vector> buffers; + buffers.push_back(nullptr); + buffers.push_back(makeValueBuffer(values)); + + auto stats = computeBlockStatistics(schema, buffers, numRows, false); + + ASSERT_EQ(stats.columnStats[0].getMin(), -200); + ASSERT_EQ(stats.columnStats[0].getMax(), -1); +} + +TEST_F(BlockStatisticsTest, ComputeInt8Column) { + auto schema = arrow::schema({arrow::field("x", arrow::int8())}); + + uint32_t numRows = 3; + std::vector values = {-128, 0, 127}; + + std::vector> buffers; + buffers.push_back(nullptr); + buffers.push_back(makeValueBuffer(values)); + + auto stats = computeBlockStatistics(schema, buffers, numRows, false); + + ASSERT_EQ(stats.columnStats.size(), 1u); + ASSERT_TRUE(stats.columnStats[0].hasStats); + ASSERT_EQ(stats.columnStats[0].getMin(), -128); + ASSERT_EQ(stats.columnStats[0].getMax(), 127); +} + +TEST_F(BlockStatisticsTest, ComputeInt16Column) { + auto schema = arrow::schema({arrow::field("x", arrow::int16())}); + + uint32_t numRows = 3; + std::vector values = {-1000, 500, 32000}; + + std::vector> buffers; + buffers.push_back(nullptr); + buffers.push_back(makeValueBuffer(values)); + + auto stats = computeBlockStatistics(schema, buffers, numRows, false); + + ASSERT_EQ(stats.columnStats[0].getMin(), -1000); + ASSERT_EQ(stats.columnStats[0].getMax(), 32000); +} + +TEST_F(BlockStatisticsTest, InMemoryPayloadCarriesStats) { + auto schema = arrow::schema({arrow::field("x", arrow::int32())}); + + uint32_t numRows = 3; + std::vector values = {1, 2, 3}; + + std::vector> buffers; + buffers.push_back(nullptr); + buffers.push_back(makeValueBuffer(values)); + + std::vector isValidityBuffer = {true, false}; + + auto payload = std::make_unique( + numRows, &isValidityBuffer, schema, std::move(buffers), /*hasComplexType=*/false); + + ASSERT_FALSE(payload->hasBlockStats()); + + // Compute and set stats. + auto stats = computeBlockStatistics(schema, payload->getBuffers(), numRows, false); + payload->setBlockStats(std::move(stats)); + + ASSERT_TRUE(payload->hasBlockStats()); + ASSERT_EQ(payload->blockStats()->columnStats.size(), 1u); + ASSERT_EQ(payload->blockStats()->columnStats[0].getMin(), 1); + ASSERT_EQ(payload->blockStats()->columnStats[0].getMax(), 3); + + // Convert to BlockPayload — stats should survive. + auto pool = arrow::default_memory_pool(); + auto blockResult = payload->toBlockPayload(Payload::kUncompressed, pool, nullptr); + ASSERT_TRUE(blockResult.ok()); + auto blockPayload = std::move(blockResult).ValueOrDie(); + + ASSERT_TRUE(blockPayload->hasBlockStats()); + ASSERT_EQ(blockPayload->blockStats()->columnStats[0].getMin(), 1); + ASSERT_EQ(blockPayload->blockStats()->columnStats[0].getMax(), 3); +} + +TEST_F(BlockStatisticsTest, InMemoryPayloadMergePreservesStats) { + auto schema = arrow::schema({arrow::field("x", arrow::int64())}); + std::vector isValidityBuffer = {true, false}; + auto pool = arrow::default_memory_pool(); + + // Payload A: values [10, 20] + { + uint32_t numRows = 2; + std::vector values = {10, 20}; + std::vector> buffers; + buffers.push_back(nullptr); + buffers.push_back(makeValueBuffer(values)); + auto a = std::make_unique(numRows, &isValidityBuffer, schema, std::move(buffers)); + auto statsA = computeBlockStatistics(schema, a->getBuffers(), numRows, false); + a->setBlockStats(std::move(statsA)); + + // Payload B: values [5, 15] + std::vector valuesB = {5, 15}; + std::vector> buffersB; + buffersB.push_back(nullptr); + buffersB.push_back(makeValueBuffer(valuesB)); + auto b = std::make_unique(numRows, &isValidityBuffer, schema, std::move(buffersB)); + auto statsB = computeBlockStatistics(schema, b->getBuffers(), numRows, false); + b->setBlockStats(std::move(statsB)); + + // Merge. + auto merged = InMemoryPayload::merge(std::move(a), std::move(b), pool); + ASSERT_TRUE(merged.ok()); + auto mergedPayload = std::move(merged).ValueOrDie(); + + ASSERT_TRUE(mergedPayload->hasBlockStats()); + ASSERT_EQ(mergedPayload->blockStats()->columnStats.size(), 1u); + ASSERT_EQ(mergedPayload->blockStats()->columnStats[0].getMin(), 5); + ASSERT_EQ(mergedPayload->blockStats()->columnStats[0].getMax(), 20); + } +} + +TEST_F(BlockStatisticsTest, BlockPayloadSerializedSize) { + auto pool = arrow::default_memory_pool(); + std::vector isValidityBuffer = {true, false}; + + uint32_t numRows = 2; + std::vector values = {1, 2}; + + std::vector> buffers; + buffers.push_back(nullptr); // validity + buffers.push_back(makeValueBuffer(values)); // value + + auto result = BlockPayload::fromBuffers(Payload::kUncompressed, numRows, std::move(buffers), &isValidityBuffer, pool, nullptr); + ASSERT_TRUE(result.ok()); + auto payload = std::move(result).ValueOrDie(); + + int64_t expectedSize = payload->serializedSize(); + ASSERT_GT(expectedSize, 0); + + // Serialize and verify the actual size matches. + auto sink = arrow::io::BufferOutputStream::Create().ValueOrDie(); + ASSERT_TRUE(payload->serialize(sink.get()).ok()); + auto written = sink->Finish().ValueOrDie(); + + ASSERT_EQ(written->size(), expectedSize); +} + +} // namespace gluten diff --git a/cpp/core/tests/CMakeLists.txt b/cpp/core/tests/CMakeLists.txt index ac3c719db262..9a56ffe0f366 100644 --- a/cpp/core/tests/CMakeLists.txt +++ b/cpp/core/tests/CMakeLists.txt @@ -15,3 +15,4 @@ add_test_case(round_robin_partitioner_test SOURCES RoundRobinPartitionerTest.cc) add_test_case(object_store_test SOURCES ObjectStoreTest.cc) +add_test_case(block_statistics_test SOURCES BlockStatisticsTest.cc) diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 33a66f46490b..3cec65b16c97 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -84,7 +84,7 @@ const std::string kHashProbeBloomFilterPushdownMaxSize = const std::string kValueStreamDynamicFilterEnabled = "spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled"; -const bool kValueStreamDynamicFilterEnabledDefault = false; +const bool kValueStreamDynamicFilterEnabledDefault = true; const std::string kShowTaskMetricsWhenFinished = "spark.gluten.sql.columnar.backend.velox.showTaskMetricsWhenFinished"; const bool kShowTaskMetricsWhenFinishedDefault = false; diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index e8b455e448f5..0be31e9f597d 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -18,6 +18,7 @@ #include "shuffle/VeloxHashShuffleWriter.h" #include "memory/ArrowMemory.h" #include "memory/VeloxColumnarBatch.h" +#include "shuffle/BlockStatistics.h" #include "shuffle/Utils.h" #include "utils/Common.h" #include "utils/Macros.h" @@ -975,6 +976,12 @@ arrow::Status VeloxHashShuffleWriter::evictBuffers( if (!buffers.empty()) { auto payload = std::make_unique(numRows, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_); + if (partitionWriter_->blockStatisticsEnabled()) { + // Compute and attach per-column min/max statistics before the buffers + // are moved into the partition writer pipeline. + auto stats = computeBlockStatistics(schema_, payload->getBuffers(), numRows, hasComplexType_); + payload->setBlockStats(std::move(stats)); + } RETURN_NOT_OK(partitionWriter_->hashEvict(partitionId, std::move(payload), Evict::kCache, reuseBuffers, writtenBytes_)); } return arrow::Status::OK();