Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,20 @@ std::pair<bool, std::vector<String>> DAGExpressionAnalyzer::buildExtraCastsAfter
has_cast = true;
}

if (may_need_add_cast_column[i] && table_scan_columns[i].id == MutSup::extra_commit_ts_col_id)
{
const auto & expected_type = getDataTypeByColumnInfoForComputingLayer(table_scan_columns[i]);
const auto & actual_type = actions->getSampleBlock().getByName(casted_name).type;
if (!expected_type->equals(*actual_type))
{
casted_name = appendCast(expected_type, actions, casted_name);
// We will replace the source_columns[i] with the casted column later
// so we need to update the type of the source_column[i]
source_columns[i].type = actions->getSampleBlock().getByName(casted_name).type;
has_cast = true;
}
}

casted_columns.emplace_back(std::move(casted_name));
}

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,8 @@ std::pair<Names, std::vector<UInt8>> DAGStorageInterpreter::getColumnsForTableSc
name = handle_column_name;
else if (cid == MutSup::extra_table_id_col_id)
name = MutSup::extra_table_id_column_name;
else if (cid == MutSup::extra_commit_ts_col_id)
name = MutSup::version_column_name;
else if (cid == DM::VectorIndexStreamCtx::VIRTUAL_DISTANCE_CD.id)
name = DM::VectorIndexStreamCtx::VIRTUAL_DISTANCE_CD.name;
#if ENABLE_CLARA
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ NamesAndTypes genNamesAndTypes(const TiDB::ColumnInfos & column_infos, const Str
case MutSup::extra_table_id_col_id:
names_and_types.emplace_back(MutSup::extra_table_id_column_name, MutSup::getExtraTableIdColumnType());
break;
case MutSup::extra_commit_ts_col_id:
names_and_types.emplace_back(
MutSup::version_column_name,
getDataTypeByColumnInfoForComputingLayer(column_info));
break;
default:
names_and_types.emplace_back(
column_info.name.empty() ? fmt::format("{}_{}", column_prefix, i) : column_info.name,
Expand Down Expand Up @@ -130,6 +135,8 @@ std::tuple<DM::ColumnDefinesPtr, int, std::vector<std::tuple<UInt64, String, Dat
extra_table_id_index = i;
break;
}
case MutSup::extra_commit_ts_col_id:
throw Exception("Not supported in disaggregated read now");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 - Disaggregated read rejects commit_ts

Disaggregated compute mode's TableScan path unconditionally throws when it encounters the hidden commit_ts ColumnID (MutSup::extra_commit_ts_col_id, -5), so any DAG request that includes that column will fail in disaggregated mode.

Evidence:

  • Line 137: case MutSup::extra_commit_ts_col_id: throw Exception(\"Not supported in disaggregated read now\");
  • Called from StorageDisaggregatedRemote.cpp:759 and :833
  • Non-disaggregated paths map -5 to _INTERNAL_VERSION instead of rejecting it

Suggested fix: Implement the same aliasing behavior (-5 → version column) for disaggregated mode instead of throwing.

default:
Comment on lines +138 to 140
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add an ErrorCodes value to this Exception.

The current throw omits an error code, which makes error handling inconsistent.

🔧 Suggested fix
-        case MutSup::extra_commit_ts_col_id:
-            throw Exception("Not supported in disaggregated read now");
+        case MutSup::extra_commit_ts_col_id:
+            throw Exception("Not supported in disaggregated read now", ErrorCodes::NOT_IMPLEMENTED);
As per coding guidelines, "Use DB::Exception for error handling in C++. Pattern: throw Exception("Message", ErrorCodes::SOME_CODE);"
🤖 Prompt for AI Agents
In `@dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp` around lines 136 - 138,
The throw in the switch case for MutSup::extra_commit_ts_col_id in
GenSchemaAndColumn.cpp omits an ErrorCodes value; update the throw to include a
DB::ErrorCodes enum (e.g., ErrorCodes::NOT_IMPLEMENTED) so it follows the
pattern throw Exception("Not supported in disaggregated read now",
ErrorCodes::NOT_IMPLEMENTED); — locate the case handling
MutSup::extra_commit_ts_col_id and replace the current throw Exception(...) with
one that passes the appropriate ErrorCodes::... constant.

column_defines->emplace_back(DM::ColumnDefine{
column_info.id,
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Coprocessor/RemoteRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ RemoteRequest RemoteRequest::build(
ci.tp = TiDB::TypeLongLong;
schema.emplace_back(std::make_pair(MutSup::extra_table_id_column_name, std::move(ci)));
}
else if (col_id == MutSup::extra_commit_ts_col_id)
{
schema.emplace_back(std::make_pair(MutSup::version_column_name, col));
}
else
{
// https://github.com/pingcap/tiflash/issues/8601
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/DeltaMerge/Filter/PushDownExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ PushDownExecutorPtr PushDownExecutor::build(
for (const auto & column : columns_to_read)
columns_to_read_map.emplace(column.id, column);

// TiDB may request a hidden commit_ts column in TableScan with a special ColumnID.
// In TiFlash it is stored in `_INTERNAL_VERSION` (VersionColumnID), so create an alias mapping.
if (const auto it = columns_to_read_map.find(MutSup::version_col_id); it != columns_to_read_map.end())
columns_to_read_map.try_emplace(MutSup::extra_commit_ts_col_id, it->second);

// Get the columns of the filter, is a subset of columns_to_read
std::unordered_set<ColumnID> filter_col_id_set;
for (const auto & expr : pushed_down_filters)
Expand Down
16 changes: 13 additions & 3 deletions dbms/src/Storages/DeltaMerge/Filter/RSOperator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/Filter/Unsupported.h>
#include <Storages/DeltaMerge/FilterParser/FilterParser.h>
#include <Storages/MutableSupport.h>
#include <TiDB/Schema/TiDB.h>


namespace DB::DM
{

Expand Down Expand Up @@ -70,18 +70,28 @@ RSOperatorPtr RSOperator::build(
FilterParser::ColumnIDToAttrMap column_id_to_attr;
for (const auto & col_info : scan_column_infos)
{
ColumnID col_id = col_info.id;
// TiDB may request a hidden commit_ts column in TableScan with a special ColumnID.
// In TiFlash it is stored in `_INTERNAL_VERSION` (VersionColumnID), so create an alias mapping.
if (unlikely(col_id == MutSup::extra_commit_ts_col_id))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 - Rough set filter commit_ts alias uses wrong map key

RSOperator::build intends to alias TiDB's hidden commit_ts ColumnID (-5) to TiFlash's internal version ColumnID (-1024), but only populates the map under key -1024. Later, FilterParser resolves ColumnRef to the original ColumnID (-5) and calls id_to_attr.at(-5), which will throw std::out_of_range if a filter references commit_ts.

Evidence: Line 76 rewrites col_id from -5 to -1024, but map insertion at line 89 uses column_id_to_attr[cd.id], creating key -1024 only. Later lookup at FilterParser.cpp:168 uses the original -5.

Suggested fix: Populate the map under both keys, or adjust the lookup to use the aliased key.

col_id = MutSup::version_col_id;
auto iter = std::find_if(
table_column_defines.cbegin(),
table_column_defines.cend(),
[col_id = col_info.id](const ColumnDefine & cd) { return cd.id == col_id; });
[col_id](const ColumnDefine & cd) { return cd.id == col_id; });
if (iter == table_column_defines.cend())
{
// Some columns may not be in the table schema, such as extra table id column.
column_id_to_attr[col_info.id] = Attr{.col_name = "", .col_id = col_info.id, .type = nullptr};
continue;
}
const auto & cd = *iter;
column_id_to_attr[cd.id] = Attr{.col_name = cd.name, .col_id = cd.id, .type = cd.type};
Attr attr{.col_name = cd.name, .col_id = cd.id, .type = cd.type};
column_id_to_attr[cd.id] = attr;
// When commit_ts (extra_commit_ts_col_id) is aliased to version_col_id, also register under
// col_info.id so that FilterParser lookups by column id from expressions find the Attr.
if (unlikely(col_info.id != cd.id))
column_id_to_attr[col_info.id] = attr;
}

auto rs_operator = FilterParser::parseDAGQuery(*dag_query, scan_column_infos, column_id_to_attr, tracing_logger);
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,10 @@ std::optional<Attr> FilterParser::createAttr(
return std::nullopt;
}
auto col_id = cop::getColumnIDForColumnExpr(expr, scan_column_infos);
// TiDB may request a hidden commit_ts column in TableScan with a special ColumnID.
// In TiFlash it is stored in `_INTERNAL_VERSION` (VersionColumnID), so create an alias mapping.
if (unlikely(col_id == MutSup::extra_commit_ts_col_id))
col_id = MutSup::version_col_id;
auto it = std::find_if( //
table_column_defines.cbegin(),
table_column_defines.cend(),
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/MutableSupport.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class MutSup : public ext::Singleton<MutSup>

inline static constexpr ColumnID extra_handle_id = -1;
inline static constexpr ColumnID extra_table_id_col_id = -3;
inline static constexpr ColumnID extra_commit_ts_col_id = -5;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Rename to camelCase to match naming guideline.

extra_commit_ts_col_id introduces a new snake_case identifier.

🔧 Suggested rename (plus update all usages)
-    inline static constexpr ColumnID extra_commit_ts_col_id = -5;
+    inline static constexpr ColumnID extraCommitTsColId = -5;
As per coding guidelines, "Method and variable names in C++ must use camelCase (e.g., readBlock, totalBytes)"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
inline static constexpr ColumnID extra_commit_ts_col_id = -5;
inline static constexpr ColumnID extraCommitTsColId = -5;
🤖 Prompt for AI Agents
In `@dbms/src/Storages/MutableSupport.h` at line 46, Rename the snake_case
constant extra_commit_ts_col_id to camelCase extraCommitTsColId and update every
usage and reference accordingly; locate the declaration inline static constexpr
ColumnID extra_commit_ts_col_id in MutableSupport.h and change the identifier,
then update all call sites, static casts, comments, and any tests or
documentation that use extra_commit_ts_col_id so they reference
extraCommitTsColId to comply with the project's naming guideline.

inline static constexpr ColumnID version_col_id = -1024;
inline static constexpr ColumnID delmark_col_id = -1025;
inline static constexpr ColumnID invalid_col_id = -10000;
Expand Down
249 changes: 249 additions & 0 deletions dbms/src/Storages/tests/gtest_hidden_commit_ts_column.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQueryInfo.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Functions/registerFunctions.h>
#include <IO/Buffer/WriteBufferFromString.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/Filter/PushDownExecutor.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <TestUtils/FunctionTestUtils.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <TiDB/Decode/TypeMapping.h>
#include <TiDB/Schema/TiDB.h>
#include <common/logger_useful.h>
#include <gtest/gtest.h>
#include <tipb/executor.pb.h>

namespace DB::tests
{
class HiddenCommitTSColumnTest : public ::testing::Test
{
public:
static void SetUpTestCase()
{
try
{
registerFunctions();
}
catch (DB::Exception &)
{
// Maybe another test has already registed, ignore exception here.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Typo: "registed" → "registered".

-            // Maybe another test has already registed, ignore exception here.
+            // Maybe another test has already registered, ignore exception here.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Maybe another test has already registed, ignore exception here.
// Maybe another test has already registered, ignore exception here.
🤖 Prompt for AI Agents
In `@dbms/src/Storages/tests/gtest_hidden_commit_ts_column.cpp` at line 46, Update
the inline comment that reads "Maybe another test has already registed, ignore
exception here." to correct the typo: change "registed" to "registered" so it
reads "Maybe another test has already registered, ignore exception here." — edit
this comment in gtest_hidden_commit_ts_column.cpp (the comment string above the
exception-handling block) to fix the spelling only.

}
}

protected:
LoggerPtr log = Logger::get();
ContextPtr ctx = TiFlashTestEnv::getContext();
};

TEST_F(HiddenCommitTSColumnTest, PushDownFilterAliasAndCast)
try
{
// TiDB may request a hidden column with ColumnID=-5 (commit_ts). In TiFlash storage layer it is stored in
// `_INTERNAL_VERSION` with ColumnID=VersionColumnID. When TiDB column type differs (e.g. Nullable(Int64)),
// TiFlash should add a cast.

TiDB::ColumnInfo commit_ts_ci;
commit_ts_ci.id = MutSup::extra_commit_ts_col_id;
commit_ts_ci.name = "commit_ts";
commit_ts_ci.tp = TiDB::TypeLongLong; // Int64
commit_ts_ci.flag = 0; // Nullable(Int64)

TiDB::ColumnInfos table_scan_column_info{commit_ts_ci};

// Use a single ColumnRef as filter condition: "where commit_ts".
// This is enough to trigger:
// 1) filter column id extraction (ColumnID=-5)
// 2) aliasing from -5 to VersionColumnID in PushDownExecutor
// 3) extra cast generation based on ColumnInfo (Nullable(Int64)) vs storage type (MutSup::getVersionColumnType())
google::protobuf::RepeatedPtrField<tipb::Expr> pushed_down_filters;
{
auto * cond = pushed_down_filters.Add();
cond->set_tp(tipb::ExprType::ColumnRef);
{
WriteBufferFromOwnString ss;
encodeDAGInt64(/*column_index=*/0, ss);
cond->set_val(ss.releaseStr());
}
auto * field_type = cond->mutable_field_type();
field_type->set_tp(TiDB::TypeLongLong);
field_type->set_flag(0); // Nullable
field_type->set_flen(0);
field_type->set_decimal(0);
}

DM::ColumnDefines columns_to_read;
columns_to_read.emplace_back(MutSup::version_col_id, MutSup::version_column_name, MutSup::getVersionColumnType());

auto executor = DM::PushDownExecutor::build(
DM::EMPTY_RS_OPERATOR,
nullptr, // ann_query_info
#if ENABLE_CLARA
nullptr, // fts_query_info
#endif
table_scan_column_info,
pushed_down_filters,
columns_to_read,
nullptr, // column_range
*ctx,
log);

ASSERT_TRUE(executor);
ASSERT_TRUE(executor->filter_columns);
ASSERT_EQ(executor->filter_columns->size(), 1);
// Storage must read VersionColumnID, not -5.
EXPECT_EQ(executor->filter_columns->at(0).id, MutSup::version_col_id);
EXPECT_EQ(executor->filter_columns->at(0).name, MutSup::version_column_name);

// Extra cast should exist because TiDB requires Nullable(Int64) while TiFlash storage uses MutSup::getVersionColumnType().
ASSERT_TRUE(executor->extra_cast);

Block block = Block{
{toVec<UInt64>(MutSup::version_column_name, {1, 2, 3, 4})},
};
executor->extra_cast->execute(block);

const auto expected_type = getDataTypeByColumnInfoForComputingLayer(commit_ts_ci);
ASSERT_TRUE(block.has(MutSup::version_column_name));
EXPECT_EQ(block.getByName(MutSup::version_column_name).type->getName(), expected_type->getName());
}
CATCH

TEST_F(HiddenCommitTSColumnTest, CastAfterTableScanForCommitTS)
try
{
// Non-late-materialization path:
// TiDB may request a hidden column with ColumnID=-5 (commit_ts). In TiFlash storage layer it is stored in
// `_INTERNAL_VERSION` with type MutSup::getVersionColumnType() (currently UInt64). If TiDB column type differs
// (e.g. Nullable(Int64)), TiFlash should add a cast after TableScan and keep the output column name unchanged.

TiDB::ColumnInfo commit_ts_ci;
commit_ts_ci.id = MutSup::extra_commit_ts_col_id;
commit_ts_ci.name = "commit_ts";
commit_ts_ci.tp = TiDB::TypeLongLong; // Int64
commit_ts_ci.flag = 0; // Nullable

TiDB::ColumnInfos table_scan_column_info{commit_ts_ci};
std::vector<UInt8> may_need_add_cast_column{/*commit_ts*/ 1};

Block block = Block{
{toVec<UInt64>(MutSup::version_column_name, {1, 2, 3, 4})},
};

DAGExpressionAnalyzer analyzer{block, *ctx};
ExpressionActionsChain chain;
auto & step = analyzer.initAndGetLastStep(chain);
auto & actions = step.actions;

auto [has_cast, casted_columns]
= analyzer.buildExtraCastsAfterTS(actions, may_need_add_cast_column, table_scan_column_info);
ASSERT_TRUE(has_cast);
ASSERT_EQ(casted_columns.size(), 1);

// Mimic appendExtraCastsAfterTS: project casted columns back to original names.
NamesWithAliases project_cols;
project_cols.emplace_back(casted_columns[0], MutSup::version_column_name);
actions->add(ExpressionAction::project(project_cols));
step.required_output.push_back(MutSup::version_column_name);

ExpressionActionsPtr extra_cast = chain.getLastActions();
ASSERT_TRUE(extra_cast);
chain.finalize();
chain.clear();

extra_cast->execute(block);

const auto expected_type = getDataTypeByColumnInfoForComputingLayer(commit_ts_ci);
ASSERT_TRUE(block.has(MutSup::version_column_name));
EXPECT_EQ(block.getByName(MutSup::version_column_name).type->getName(), expected_type->getName());
}
CATCH

TEST_F(HiddenCommitTSColumnTest, RoughSetFilterAliasCommitTS)
try
{
// Rough set filter (RSOperator) uses table_column_defines by ColumnID.
// TiDB requests commit_ts as ColumnID=-5, but in TiFlash it is stored in `_INTERNAL_VERSION` (VersionColumnID).
// Ensure rough set filter can correctly map ColumnID=-5 to VersionColumnID.

TiDB::ColumnInfo commit_ts_ci;
commit_ts_ci.id = MutSup::extra_commit_ts_col_id;
commit_ts_ci.name = "commit_ts";
commit_ts_ci.tp = TiDB::TypeLongLong; // Int64
commit_ts_ci.flag = 0; // Nullable
TiDB::ColumnInfos scan_column_infos{commit_ts_ci};

google::protobuf::RepeatedPtrField<tipb::Expr> filters;
{
tipb::Expr col_ref;
col_ref.set_tp(tipb::ExprType::ColumnRef);
{
WriteBufferFromOwnString ss;
encodeDAGInt64(/*column_index=*/0, ss);
col_ref.set_val(ss.releaseStr());
}
auto * field_type = col_ref.mutable_field_type();
field_type->set_tp(TiDB::TypeLongLong);
field_type->set_flag(0); // Nullable
field_type->set_flen(0);
field_type->set_decimal(0);

tipb::Expr literal = constructInt64LiteralTiExpr(10);

auto * func = filters.Add();
func->set_tp(tipb::ExprType::ScalarFunc);
func->set_sig(tipb::ScalarFuncSig::GTInt);
*func->add_children() = col_ref;
*func->add_children() = literal;
}

tipb::ANNQueryInfo ann_query_info;
tipb::FTSQueryInfo fts_query_info;
google::protobuf::RepeatedPtrField<tipb::Expr> pushed_down_filters;
google::protobuf::RepeatedPtrField<tipb::ColumnarIndexInfo> used_indexes;
std::vector<int> runtime_filter_ids;
const int rf_max_wait_time_ms = 0;
auto dag_query = std::make_unique<DAGQueryInfo>(
filters,
ann_query_info,
fts_query_info,
pushed_down_filters,
used_indexes,
scan_column_infos,
runtime_filter_ids,
rf_max_wait_time_ms,
ctx->getTimezoneInfo());

DM::ColumnDefines table_column_defines;
table_column_defines.emplace_back(
MutSup::version_col_id,
MutSup::version_column_name,
MutSup::getVersionColumnType());

auto rs_operator
= DM::RSOperator::build(dag_query, scan_column_infos, table_column_defines, /*enable_rs_filter*/ true, log);
ASSERT_TRUE(rs_operator);

const auto col_ids = rs_operator->getColumnIDs();
ASSERT_EQ(col_ids.size(), 1);
EXPECT_EQ(col_ids[0], MutSup::version_col_id);
}
CATCH

} // namespace DB::tests