diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index e5a10481b0a..1d146d16fbc 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -1258,6 +1258,20 @@ std::pair> DAGExpressionAnalyzer::buildExtraCastsAfter has_cast = true; } + if (may_need_add_cast_column[i] && table_scan_columns[i].id == MutSup::extra_commit_ts_col_id) + { + const auto & expected_type = getDataTypeByColumnInfoForComputingLayer(table_scan_columns[i]); + const auto & actual_type = actions->getSampleBlock().getByName(casted_name).type; + if (!expected_type->equals(*actual_type)) + { + casted_name = appendCast(expected_type, actions, casted_name); + // We will replace the source_columns[i] with the casted column later + // so we need to update the type of the source_column[i] + source_columns[i].type = actions->getSampleBlock().getByName(casted_name).type; + has_cast = true; + } + } + casted_columns.emplace_back(std::move(casted_name)); } diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index da3b0245256..6fe7435a243 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -1617,6 +1617,8 @@ std::pair> DAGStorageInterpreter::getColumnsForTableSc name = handle_column_name; else if (cid == MutSup::extra_table_id_col_id) name = MutSup::extra_table_id_column_name; + else if (cid == MutSup::extra_commit_ts_col_id) + name = MutSup::version_column_name; else if (cid == DM::VectorIndexStreamCtx::VIRTUAL_DISTANCE_CD.id) name = DM::VectorIndexStreamCtx::VIRTUAL_DISTANCE_CD.name; #if ENABLE_CLARA diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp index 86844cb7710..206fc0a01ff 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp @@ -82,6 +82,11 @@ NamesAndTypes genNamesAndTypes(const TiDB::ColumnInfos & column_infos, const Str case MutSup::extra_table_id_col_id: names_and_types.emplace_back(MutSup::extra_table_id_column_name, MutSup::getExtraTableIdColumnType()); break; + case MutSup::extra_commit_ts_col_id: + names_and_types.emplace_back( + MutSup::version_column_name, + getDataTypeByColumnInfoForComputingLayer(column_info)); + break; default: names_and_types.emplace_back( column_info.name.empty() ? fmt::format("{}_{}", column_prefix, i) : column_info.name, @@ -130,6 +135,8 @@ std::tupleemplace_back(DM::ColumnDefine{ column_info.id, diff --git a/dbms/src/Flash/Coprocessor/RemoteRequest.cpp b/dbms/src/Flash/Coprocessor/RemoteRequest.cpp index bb6232dd6ab..a056c78cbba 100644 --- a/dbms/src/Flash/Coprocessor/RemoteRequest.cpp +++ b/dbms/src/Flash/Coprocessor/RemoteRequest.cpp @@ -73,6 +73,10 @@ RemoteRequest RemoteRequest::build( ci.tp = TiDB::TypeLongLong; schema.emplace_back(std::make_pair(MutSup::extra_table_id_column_name, std::move(ci))); } + else if (col_id == MutSup::extra_commit_ts_col_id) + { + schema.emplace_back(std::make_pair(MutSup::version_column_name, col)); + } else { // https://github.com/pingcap/tiflash/issues/8601 diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownExecutor.cpp b/dbms/src/Storages/DeltaMerge/Filter/PushDownExecutor.cpp index 7ef83e48603..2288a87633c 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/PushDownExecutor.cpp +++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownExecutor.cpp @@ -61,6 +61,11 @@ PushDownExecutorPtr PushDownExecutor::build( for (const auto & column : columns_to_read) columns_to_read_map.emplace(column.id, column); + // TiDB may request a hidden commit_ts column in TableScan with a special ColumnID. + // In TiFlash it is stored in `_INTERNAL_VERSION` (VersionColumnID), so create an alias mapping. + if (const auto it = columns_to_read_map.find(MutSup::version_col_id); it != columns_to_read_map.end()) + columns_to_read_map.try_emplace(MutSup::extra_commit_ts_col_id, it->second); + // Get the columns of the filter, is a subset of columns_to_read std::unordered_set filter_col_id_set; for (const auto & expr : pushed_down_filters) diff --git a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp index a3f3c44ebaf..1e57a9f8de8 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp +++ b/dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp @@ -29,9 +29,9 @@ #include #include #include +#include #include - namespace DB::DM { @@ -70,10 +70,15 @@ RSOperatorPtr RSOperator::build( FilterParser::ColumnIDToAttrMap column_id_to_attr; for (const auto & col_info : scan_column_infos) { + ColumnID col_id = col_info.id; + // TiDB may request a hidden commit_ts column in TableScan with a special ColumnID. + // In TiFlash it is stored in `_INTERNAL_VERSION` (VersionColumnID), so create an alias mapping. + if (unlikely(col_id == MutSup::extra_commit_ts_col_id)) + col_id = MutSup::version_col_id; auto iter = std::find_if( table_column_defines.cbegin(), table_column_defines.cend(), - [col_id = col_info.id](const ColumnDefine & cd) { return cd.id == col_id; }); + [col_id](const ColumnDefine & cd) { return cd.id == col_id; }); if (iter == table_column_defines.cend()) { // Some columns may not be in the table schema, such as extra table id column. @@ -81,7 +86,12 @@ RSOperatorPtr RSOperator::build( continue; } const auto & cd = *iter; - column_id_to_attr[cd.id] = Attr{.col_name = cd.name, .col_id = cd.id, .type = cd.type}; + Attr attr{.col_name = cd.name, .col_id = cd.id, .type = cd.type}; + column_id_to_attr[cd.id] = attr; + // When commit_ts (extra_commit_ts_col_id) is aliased to version_col_id, also register under + // col_info.id so that FilterParser lookups by column id from expressions find the Attr. + if (unlikely(col_info.id != cd.id)) + column_id_to_attr[col_info.id] = attr; } auto rs_operator = FilterParser::parseDAGQuery(*dag_query, scan_column_infos, column_id_to_attr, tracing_logger); diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp index d259ec8b622..b434c9446f5 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp @@ -433,6 +433,10 @@ std::optional FilterParser::createAttr( return std::nullopt; } auto col_id = cop::getColumnIDForColumnExpr(expr, scan_column_infos); + // TiDB may request a hidden commit_ts column in TableScan with a special ColumnID. + // In TiFlash it is stored in `_INTERNAL_VERSION` (VersionColumnID), so create an alias mapping. + if (unlikely(col_id == MutSup::extra_commit_ts_col_id)) + col_id = MutSup::version_col_id; auto it = std::find_if( // table_column_defines.cbegin(), table_column_defines.cend(), diff --git a/dbms/src/Storages/MutableSupport.h b/dbms/src/Storages/MutableSupport.h index 5a098ccc313..ffb03c8992b 100644 --- a/dbms/src/Storages/MutableSupport.h +++ b/dbms/src/Storages/MutableSupport.h @@ -43,6 +43,7 @@ class MutSup : public ext::Singleton inline static constexpr ColumnID extra_handle_id = -1; inline static constexpr ColumnID extra_table_id_col_id = -3; + inline static constexpr ColumnID extra_commit_ts_col_id = -5; inline static constexpr ColumnID version_col_id = -1024; inline static constexpr ColumnID delmark_col_id = -1025; inline static constexpr ColumnID invalid_col_id = -10000; diff --git a/dbms/src/Storages/tests/gtest_hidden_commit_ts_column.cpp b/dbms/src/Storages/tests/gtest_hidden_commit_ts_column.cpp new file mode 100644 index 00000000000..cb2c153f9c8 --- /dev/null +++ b/dbms/src/Storages/tests/gtest_hidden_commit_ts_column.cpp @@ -0,0 +1,249 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB::tests +{ +class HiddenCommitTSColumnTest : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + try + { + registerFunctions(); + } + catch (DB::Exception &) + { + // Maybe another test has already registed, ignore exception here. + } + } + +protected: + LoggerPtr log = Logger::get(); + ContextPtr ctx = TiFlashTestEnv::getContext(); +}; + +TEST_F(HiddenCommitTSColumnTest, PushDownFilterAliasAndCast) +try +{ + // TiDB may request a hidden column with ColumnID=-5 (commit_ts). In TiFlash storage layer it is stored in + // `_INTERNAL_VERSION` with ColumnID=VersionColumnID. When TiDB column type differs (e.g. Nullable(Int64)), + // TiFlash should add a cast. + + TiDB::ColumnInfo commit_ts_ci; + commit_ts_ci.id = MutSup::extra_commit_ts_col_id; + commit_ts_ci.name = "commit_ts"; + commit_ts_ci.tp = TiDB::TypeLongLong; // Int64 + commit_ts_ci.flag = 0; // Nullable(Int64) + + TiDB::ColumnInfos table_scan_column_info{commit_ts_ci}; + + // Use a single ColumnRef as filter condition: "where commit_ts". + // This is enough to trigger: + // 1) filter column id extraction (ColumnID=-5) + // 2) aliasing from -5 to VersionColumnID in PushDownExecutor + // 3) extra cast generation based on ColumnInfo (Nullable(Int64)) vs storage type (MutSup::getVersionColumnType()) + google::protobuf::RepeatedPtrField pushed_down_filters; + { + auto * cond = pushed_down_filters.Add(); + cond->set_tp(tipb::ExprType::ColumnRef); + { + WriteBufferFromOwnString ss; + encodeDAGInt64(/*column_index=*/0, ss); + cond->set_val(ss.releaseStr()); + } + auto * field_type = cond->mutable_field_type(); + field_type->set_tp(TiDB::TypeLongLong); + field_type->set_flag(0); // Nullable + field_type->set_flen(0); + field_type->set_decimal(0); + } + + DM::ColumnDefines columns_to_read; + columns_to_read.emplace_back(MutSup::version_col_id, MutSup::version_column_name, MutSup::getVersionColumnType()); + + auto executor = DM::PushDownExecutor::build( + DM::EMPTY_RS_OPERATOR, + nullptr, // ann_query_info +#if ENABLE_CLARA + nullptr, // fts_query_info +#endif + table_scan_column_info, + pushed_down_filters, + columns_to_read, + nullptr, // column_range + *ctx, + log); + + ASSERT_TRUE(executor); + ASSERT_TRUE(executor->filter_columns); + ASSERT_EQ(executor->filter_columns->size(), 1); + // Storage must read VersionColumnID, not -5. + EXPECT_EQ(executor->filter_columns->at(0).id, MutSup::version_col_id); + EXPECT_EQ(executor->filter_columns->at(0).name, MutSup::version_column_name); + + // Extra cast should exist because TiDB requires Nullable(Int64) while TiFlash storage uses MutSup::getVersionColumnType(). + ASSERT_TRUE(executor->extra_cast); + + Block block = Block{ + {toVec(MutSup::version_column_name, {1, 2, 3, 4})}, + }; + executor->extra_cast->execute(block); + + const auto expected_type = getDataTypeByColumnInfoForComputingLayer(commit_ts_ci); + ASSERT_TRUE(block.has(MutSup::version_column_name)); + EXPECT_EQ(block.getByName(MutSup::version_column_name).type->getName(), expected_type->getName()); +} +CATCH + +TEST_F(HiddenCommitTSColumnTest, CastAfterTableScanForCommitTS) +try +{ + // Non-late-materialization path: + // TiDB may request a hidden column with ColumnID=-5 (commit_ts). In TiFlash storage layer it is stored in + // `_INTERNAL_VERSION` with type MutSup::getVersionColumnType() (currently UInt64). If TiDB column type differs + // (e.g. Nullable(Int64)), TiFlash should add a cast after TableScan and keep the output column name unchanged. + + TiDB::ColumnInfo commit_ts_ci; + commit_ts_ci.id = MutSup::extra_commit_ts_col_id; + commit_ts_ci.name = "commit_ts"; + commit_ts_ci.tp = TiDB::TypeLongLong; // Int64 + commit_ts_ci.flag = 0; // Nullable + + TiDB::ColumnInfos table_scan_column_info{commit_ts_ci}; + std::vector may_need_add_cast_column{/*commit_ts*/ 1}; + + Block block = Block{ + {toVec(MutSup::version_column_name, {1, 2, 3, 4})}, + }; + + DAGExpressionAnalyzer analyzer{block, *ctx}; + ExpressionActionsChain chain; + auto & step = analyzer.initAndGetLastStep(chain); + auto & actions = step.actions; + + auto [has_cast, casted_columns] + = analyzer.buildExtraCastsAfterTS(actions, may_need_add_cast_column, table_scan_column_info); + ASSERT_TRUE(has_cast); + ASSERT_EQ(casted_columns.size(), 1); + + // Mimic appendExtraCastsAfterTS: project casted columns back to original names. + NamesWithAliases project_cols; + project_cols.emplace_back(casted_columns[0], MutSup::version_column_name); + actions->add(ExpressionAction::project(project_cols)); + step.required_output.push_back(MutSup::version_column_name); + + ExpressionActionsPtr extra_cast = chain.getLastActions(); + ASSERT_TRUE(extra_cast); + chain.finalize(); + chain.clear(); + + extra_cast->execute(block); + + const auto expected_type = getDataTypeByColumnInfoForComputingLayer(commit_ts_ci); + ASSERT_TRUE(block.has(MutSup::version_column_name)); + EXPECT_EQ(block.getByName(MutSup::version_column_name).type->getName(), expected_type->getName()); +} +CATCH + +TEST_F(HiddenCommitTSColumnTest, RoughSetFilterAliasCommitTS) +try +{ + // Rough set filter (RSOperator) uses table_column_defines by ColumnID. + // TiDB requests commit_ts as ColumnID=-5, but in TiFlash it is stored in `_INTERNAL_VERSION` (VersionColumnID). + // Ensure rough set filter can correctly map ColumnID=-5 to VersionColumnID. + + TiDB::ColumnInfo commit_ts_ci; + commit_ts_ci.id = MutSup::extra_commit_ts_col_id; + commit_ts_ci.name = "commit_ts"; + commit_ts_ci.tp = TiDB::TypeLongLong; // Int64 + commit_ts_ci.flag = 0; // Nullable + TiDB::ColumnInfos scan_column_infos{commit_ts_ci}; + + google::protobuf::RepeatedPtrField filters; + { + tipb::Expr col_ref; + col_ref.set_tp(tipb::ExprType::ColumnRef); + { + WriteBufferFromOwnString ss; + encodeDAGInt64(/*column_index=*/0, ss); + col_ref.set_val(ss.releaseStr()); + } + auto * field_type = col_ref.mutable_field_type(); + field_type->set_tp(TiDB::TypeLongLong); + field_type->set_flag(0); // Nullable + field_type->set_flen(0); + field_type->set_decimal(0); + + tipb::Expr literal = constructInt64LiteralTiExpr(10); + + auto * func = filters.Add(); + func->set_tp(tipb::ExprType::ScalarFunc); + func->set_sig(tipb::ScalarFuncSig::GTInt); + *func->add_children() = col_ref; + *func->add_children() = literal; + } + + tipb::ANNQueryInfo ann_query_info; + tipb::FTSQueryInfo fts_query_info; + google::protobuf::RepeatedPtrField pushed_down_filters; + google::protobuf::RepeatedPtrField used_indexes; + std::vector runtime_filter_ids; + const int rf_max_wait_time_ms = 0; + auto dag_query = std::make_unique( + filters, + ann_query_info, + fts_query_info, + pushed_down_filters, + used_indexes, + scan_column_infos, + runtime_filter_ids, + rf_max_wait_time_ms, + ctx->getTimezoneInfo()); + + DM::ColumnDefines table_column_defines; + table_column_defines.emplace_back( + MutSup::version_col_id, + MutSup::version_column_name, + MutSup::getVersionColumnType()); + + auto rs_operator + = DM::RSOperator::build(dag_query, scan_column_infos, table_column_defines, /*enable_rs_filter*/ true, log); + ASSERT_TRUE(rs_operator); + + const auto col_ids = rs_operator->getColumnIDs(); + ASSERT_EQ(col_ids.size(), 1); + EXPECT_EQ(col_ids[0], MutSup::version_col_id); +} +CATCH + +} // namespace DB::tests