Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -751,4 +751,40 @@ object VeloxConfig extends ConfigRegistry {
.doc("Maps table field names to file field names using names, not indices for Parquet files.")
.booleanConf
.createWithDefault(true)

val ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES =
buildConf("spark.gluten.sql.columnar.backend.velox.iceberg.write.target-file-size-bytes")
.doc("Target file size in bytes for Iceberg write operations.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("512MB")

val ICEBERG_WRITE_PARQUET_COMPRESSION_CODEC =
buildConf("spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.compression-codec")
.doc("Compression codec to use for Iceberg Parquet write operations.")
.stringConf
.createWithDefault("zstd")

val ICEBERG_WRITE_PARQUET_COMPRESSION_LEVEL =
buildConf("spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.compression-level")
.doc("Compression level for Iceberg Parquet write operations.")
.intConf
.createOptional

val ICEBERG_WRITE_PARQUET_ROW_GROUP_SIZE_BYTES =
buildConf("spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.row-group-size-bytes")
.doc("Row group size in bytes for Iceberg Parquet write operations.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("128MB")

val ICEBERG_WRITE_PARQUET_PAGE_SIZE_BYTES =
buildConf("spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.page-size-bytes")
.doc("Page size in bytes for Iceberg Parquet write operations.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1MB")

val ICEBERG_WRITE_PARQUET_PAGE_ROW_LIMIT =
buildConf("spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.page-row-limit")
.doc("Maximum number of rows per page for Iceberg Parquet write operations.")
.intConf
.createWithDefault(20000)
}
16 changes: 15 additions & 1 deletion cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,23 @@ const std::string kMemoryPoolCapacityTransferAcrossTasks =
const std::string kOrcUseColumnNames = "spark.gluten.sql.columnar.backend.velox.orcUseColumnNames";
const std::string kParquetUseColumnNames = "spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames";

// write fies
// write files
const std::string kMaxPartitions = "spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession";

// Iceberg write configs
const std::string kIcebergWriteTargetFileSizeBytes =
"spark.gluten.sql.columnar.backend.velox.iceberg.write.target-file-size-bytes";
const std::string kIcebergWriteParquetCompressionCodec =
"spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.compression-codec";
const std::string kIcebergWriteParquetCompressionLevel =
"spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.compression-level";
const std::string kIcebergWriteParquetRowGroupSizeBytes =
"spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.row-group-size-bytes";
const std::string kIcebergWriteParquetPageSizeBytes =
"spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.page-size-bytes";
const std::string kIcebergWriteParquetPageRowLimit =
"spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.page-row-limit";

const std::string kGlogVerboseLevel = "spark.gluten.sql.columnar.backend.velox.glogVerboseLevel";
const uint32_t kGlogVerboseLevelDefault = 0;
const uint32_t kGlogVerboseLevelMaximum = 99;
Expand Down
13 changes: 13 additions & 0 deletions cpp/velox/utils/ConfigExtractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,19 @@ std::shared_ptr<facebook::velox::config::ConfigBase> createHiveConnectorSessionC
configs[facebook::velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession] =
conf->get<bool>(kOrcUseColumnNames, true) ? "true" : "false";

if (conf->isValueExists(kIcebergWriteTargetFileSizeBytes)) {
configs[facebook::velox::connector::hive::HiveConfig::kMaxTargetFileSizeSession] =
conf->get<std::string>(kIcebergWriteTargetFileSizeBytes);
}
if (conf->isValueExists(kIcebergWriteParquetPageSizeBytes)) {
configs[facebook::velox::parquet::WriterOptions::kParquetSessionWritePageSize] =
conf->get<std::string>(kIcebergWriteParquetPageSizeBytes);
}
if (conf->isValueExists(kIcebergWriteParquetRowGroupSizeBytes)) {
configs[facebook::velox::parquet::WriterOptions::kParquetSessionWriteBatchSize] =
conf->get<std::string>(kIcebergWriteParquetRowGroupSizeBytes);
}

overwriteVeloxConf(conf.get(), configs, kDynamicBackendConfPrefix);
return std::make_shared<facebook::velox::config::ConfigBase>(std::move(configs));
}
Expand Down
Loading
Loading