-
Notifications
You must be signed in to change notification settings - Fork 411
Support commit ts in TiFlash #10723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Support commit ts in TiFlash #10723
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -82,6 +82,11 @@ NamesAndTypes genNamesAndTypes(const TiDB::ColumnInfos & column_infos, const Str | |
| case MutSup::extra_table_id_col_id: | ||
| names_and_types.emplace_back(MutSup::extra_table_id_column_name, MutSup::getExtraTableIdColumnType()); | ||
| break; | ||
| case MutSup::extra_commit_ts_col_id: | ||
| names_and_types.emplace_back( | ||
| MutSup::version_column_name, | ||
| getDataTypeByColumnInfoForComputingLayer(column_info)); | ||
| break; | ||
| default: | ||
| names_and_types.emplace_back( | ||
| column_info.name.empty() ? fmt::format("{}_{}", column_prefix, i) : column_info.name, | ||
|
|
@@ -130,6 +135,8 @@ std::tuple<DM::ColumnDefinesPtr, int, std::vector<std::tuple<UInt64, String, Dat | |
| extra_table_id_index = i; | ||
| break; | ||
| } | ||
| case MutSup::extra_commit_ts_col_id: | ||
| throw Exception("Not supported in disaggregated read now"); | ||
| default: | ||
|
Comment on lines
+138
to
140
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add an ErrorCodes value to this Exception. The current throw omits an error code, which makes error handling inconsistent. 🔧 Suggested fix- case MutSup::extra_commit_ts_col_id:
- throw Exception("Not supported in disaggregated read now");
+ case MutSup::extra_commit_ts_col_id:
+ throw Exception("Not supported in disaggregated read now", ErrorCodes::NOT_IMPLEMENTED);🤖 Prompt for AI Agents |
||
| column_defines->emplace_back(DM::ColumnDefine{ | ||
| column_info.id, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,9 +29,9 @@ | |
| #include <Storages/DeltaMerge/Filter/RSOperator.h> | ||
| #include <Storages/DeltaMerge/Filter/Unsupported.h> | ||
| #include <Storages/DeltaMerge/FilterParser/FilterParser.h> | ||
| #include <Storages/MutableSupport.h> | ||
| #include <TiDB/Schema/TiDB.h> | ||
|
|
||
|
|
||
| namespace DB::DM | ||
| { | ||
|
|
||
|
|
@@ -70,18 +70,28 @@ RSOperatorPtr RSOperator::build( | |
| FilterParser::ColumnIDToAttrMap column_id_to_attr; | ||
| for (const auto & col_info : scan_column_infos) | ||
| { | ||
| ColumnID col_id = col_info.id; | ||
| // TiDB may request a hidden commit_ts column in TableScan with a special ColumnID. | ||
| // In TiFlash it is stored in `_INTERNAL_VERSION` (VersionColumnID), so create an alias mapping. | ||
| if (unlikely(col_id == MutSup::extra_commit_ts_col_id)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2 - Rough set filter commit_ts alias uses wrong map key
Evidence: Line 76 rewrites Suggested fix: Populate the map under both keys, or adjust the lookup to use the aliased key. |
||
| col_id = MutSup::version_col_id; | ||
| auto iter = std::find_if( | ||
| table_column_defines.cbegin(), | ||
| table_column_defines.cend(), | ||
| [col_id = col_info.id](const ColumnDefine & cd) { return cd.id == col_id; }); | ||
| [col_id](const ColumnDefine & cd) { return cd.id == col_id; }); | ||
| if (iter == table_column_defines.cend()) | ||
| { | ||
| // Some columns may not be in the table schema, such as extra table id column. | ||
| column_id_to_attr[col_info.id] = Attr{.col_name = "", .col_id = col_info.id, .type = nullptr}; | ||
| continue; | ||
| } | ||
| const auto & cd = *iter; | ||
| column_id_to_attr[cd.id] = Attr{.col_name = cd.name, .col_id = cd.id, .type = cd.type}; | ||
| Attr attr{.col_name = cd.name, .col_id = cd.id, .type = cd.type}; | ||
| column_id_to_attr[cd.id] = attr; | ||
| // When commit_ts (extra_commit_ts_col_id) is aliased to version_col_id, also register under | ||
| // col_info.id so that FilterParser lookups by column id from expressions find the Attr. | ||
| if (unlikely(col_info.id != cd.id)) | ||
| column_id_to_attr[col_info.id] = attr; | ||
| } | ||
|
|
||
| auto rs_operator = FilterParser::parseDAGQuery(*dag_query, scan_column_infos, column_id_to_attr, tracing_logger); | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -43,6 +43,7 @@ class MutSup : public ext::Singleton<MutSup> | |||||
|
|
||||||
| inline static constexpr ColumnID extra_handle_id = -1; | ||||||
| inline static constexpr ColumnID extra_table_id_col_id = -3; | ||||||
| inline static constexpr ColumnID extra_commit_ts_col_id = -5; | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rename to camelCase to match naming guideline.
🔧 Suggested rename (plus update all usages)- inline static constexpr ColumnID extra_commit_ts_col_id = -5;
+ inline static constexpr ColumnID extraCommitTsColId = -5;📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||
| inline static constexpr ColumnID version_col_id = -1024; | ||||||
| inline static constexpr ColumnID delmark_col_id = -1025; | ||||||
| inline static constexpr ColumnID invalid_col_id = -10000; | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,249 @@ | ||||||
| // Copyright 2026 PingCAP, Inc. | ||||||
| // | ||||||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
| // you may not use this file except in compliance with the License. | ||||||
| // You may obtain a copy of the License at | ||||||
| // | ||||||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||||||
| // | ||||||
| // Unless required by applicable law or agreed to in writing, software | ||||||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||||||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
| // See the License for the specific language governing permissions and | ||||||
| // limitations under the License. | ||||||
|
|
||||||
| #include <Flash/Coprocessor/DAGCodec.h> | ||||||
| #include <Flash/Coprocessor/DAGExpressionAnalyzer.h> | ||||||
| #include <Flash/Coprocessor/DAGQueryInfo.h> | ||||||
| #include <Flash/Coprocessor/DAGUtils.h> | ||||||
| #include <Functions/registerFunctions.h> | ||||||
| #include <IO/Buffer/WriteBufferFromString.h> | ||||||
| #include <Interpreters/Context.h> | ||||||
| #include <Storages/DeltaMerge/DeltaMergeDefines.h> | ||||||
| #include <Storages/DeltaMerge/Filter/PushDownExecutor.h> | ||||||
| #include <Storages/DeltaMerge/Filter/RSOperator.h> | ||||||
| #include <TestUtils/FunctionTestUtils.h> | ||||||
| #include <TestUtils/TiFlashTestBasic.h> | ||||||
| #include <TiDB/Decode/TypeMapping.h> | ||||||
| #include <TiDB/Schema/TiDB.h> | ||||||
| #include <common/logger_useful.h> | ||||||
| #include <gtest/gtest.h> | ||||||
| #include <tipb/executor.pb.h> | ||||||
|
|
||||||
| namespace DB::tests | ||||||
| { | ||||||
| class HiddenCommitTSColumnTest : public ::testing::Test | ||||||
| { | ||||||
| public: | ||||||
| static void SetUpTestCase() | ||||||
| { | ||||||
| try | ||||||
| { | ||||||
| registerFunctions(); | ||||||
| } | ||||||
| catch (DB::Exception &) | ||||||
| { | ||||||
| // Maybe another test has already registed, ignore exception here. | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo: "registed" → "registered". - // Maybe another test has already registed, ignore exception here.
+ // Maybe another test has already registered, ignore exception here.📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||
| } | ||||||
| } | ||||||
|
|
||||||
| protected: | ||||||
| LoggerPtr log = Logger::get(); | ||||||
| ContextPtr ctx = TiFlashTestEnv::getContext(); | ||||||
| }; | ||||||
|
|
||||||
| TEST_F(HiddenCommitTSColumnTest, PushDownFilterAliasAndCast) | ||||||
| try | ||||||
| { | ||||||
| // TiDB may request a hidden column with ColumnID=-5 (commit_ts). In TiFlash storage layer it is stored in | ||||||
| // `_INTERNAL_VERSION` with ColumnID=VersionColumnID. When TiDB column type differs (e.g. Nullable(Int64)), | ||||||
| // TiFlash should add a cast. | ||||||
|
|
||||||
| TiDB::ColumnInfo commit_ts_ci; | ||||||
| commit_ts_ci.id = MutSup::extra_commit_ts_col_id; | ||||||
| commit_ts_ci.name = "commit_ts"; | ||||||
| commit_ts_ci.tp = TiDB::TypeLongLong; // Int64 | ||||||
| commit_ts_ci.flag = 0; // Nullable(Int64) | ||||||
|
|
||||||
| TiDB::ColumnInfos table_scan_column_info{commit_ts_ci}; | ||||||
|
|
||||||
| // Use a single ColumnRef as filter condition: "where commit_ts". | ||||||
| // This is enough to trigger: | ||||||
| // 1) filter column id extraction (ColumnID=-5) | ||||||
| // 2) aliasing from -5 to VersionColumnID in PushDownExecutor | ||||||
| // 3) extra cast generation based on ColumnInfo (Nullable(Int64)) vs storage type (MutSup::getVersionColumnType()) | ||||||
| google::protobuf::RepeatedPtrField<tipb::Expr> pushed_down_filters; | ||||||
| { | ||||||
| auto * cond = pushed_down_filters.Add(); | ||||||
| cond->set_tp(tipb::ExprType::ColumnRef); | ||||||
| { | ||||||
| WriteBufferFromOwnString ss; | ||||||
| encodeDAGInt64(/*column_index=*/0, ss); | ||||||
| cond->set_val(ss.releaseStr()); | ||||||
| } | ||||||
| auto * field_type = cond->mutable_field_type(); | ||||||
| field_type->set_tp(TiDB::TypeLongLong); | ||||||
| field_type->set_flag(0); // Nullable | ||||||
| field_type->set_flen(0); | ||||||
| field_type->set_decimal(0); | ||||||
| } | ||||||
|
|
||||||
| DM::ColumnDefines columns_to_read; | ||||||
| columns_to_read.emplace_back(MutSup::version_col_id, MutSup::version_column_name, MutSup::getVersionColumnType()); | ||||||
|
|
||||||
| auto executor = DM::PushDownExecutor::build( | ||||||
| DM::EMPTY_RS_OPERATOR, | ||||||
| nullptr, // ann_query_info | ||||||
| #if ENABLE_CLARA | ||||||
| nullptr, // fts_query_info | ||||||
| #endif | ||||||
| table_scan_column_info, | ||||||
| pushed_down_filters, | ||||||
| columns_to_read, | ||||||
| nullptr, // column_range | ||||||
| *ctx, | ||||||
| log); | ||||||
|
|
||||||
| ASSERT_TRUE(executor); | ||||||
| ASSERT_TRUE(executor->filter_columns); | ||||||
| ASSERT_EQ(executor->filter_columns->size(), 1); | ||||||
| // Storage must read VersionColumnID, not -5. | ||||||
| EXPECT_EQ(executor->filter_columns->at(0).id, MutSup::version_col_id); | ||||||
| EXPECT_EQ(executor->filter_columns->at(0).name, MutSup::version_column_name); | ||||||
|
|
||||||
| // Extra cast should exist because TiDB requires Nullable(Int64) while TiFlash storage uses MutSup::getVersionColumnType(). | ||||||
| ASSERT_TRUE(executor->extra_cast); | ||||||
|
|
||||||
| Block block = Block{ | ||||||
| {toVec<UInt64>(MutSup::version_column_name, {1, 2, 3, 4})}, | ||||||
| }; | ||||||
| executor->extra_cast->execute(block); | ||||||
|
|
||||||
| const auto expected_type = getDataTypeByColumnInfoForComputingLayer(commit_ts_ci); | ||||||
| ASSERT_TRUE(block.has(MutSup::version_column_name)); | ||||||
| EXPECT_EQ(block.getByName(MutSup::version_column_name).type->getName(), expected_type->getName()); | ||||||
| } | ||||||
| CATCH | ||||||
|
|
||||||
| TEST_F(HiddenCommitTSColumnTest, CastAfterTableScanForCommitTS) | ||||||
| try | ||||||
| { | ||||||
| // Non-late-materialization path: | ||||||
| // TiDB may request a hidden column with ColumnID=-5 (commit_ts). In TiFlash storage layer it is stored in | ||||||
| // `_INTERNAL_VERSION` with type MutSup::getVersionColumnType() (currently UInt64). If TiDB column type differs | ||||||
| // (e.g. Nullable(Int64)), TiFlash should add a cast after TableScan and keep the output column name unchanged. | ||||||
|
|
||||||
| TiDB::ColumnInfo commit_ts_ci; | ||||||
| commit_ts_ci.id = MutSup::extra_commit_ts_col_id; | ||||||
| commit_ts_ci.name = "commit_ts"; | ||||||
| commit_ts_ci.tp = TiDB::TypeLongLong; // Int64 | ||||||
| commit_ts_ci.flag = 0; // Nullable | ||||||
|
|
||||||
| TiDB::ColumnInfos table_scan_column_info{commit_ts_ci}; | ||||||
| std::vector<UInt8> may_need_add_cast_column{/*commit_ts*/ 1}; | ||||||
|
|
||||||
| Block block = Block{ | ||||||
| {toVec<UInt64>(MutSup::version_column_name, {1, 2, 3, 4})}, | ||||||
| }; | ||||||
|
|
||||||
| DAGExpressionAnalyzer analyzer{block, *ctx}; | ||||||
| ExpressionActionsChain chain; | ||||||
| auto & step = analyzer.initAndGetLastStep(chain); | ||||||
| auto & actions = step.actions; | ||||||
|
|
||||||
| auto [has_cast, casted_columns] | ||||||
| = analyzer.buildExtraCastsAfterTS(actions, may_need_add_cast_column, table_scan_column_info); | ||||||
| ASSERT_TRUE(has_cast); | ||||||
| ASSERT_EQ(casted_columns.size(), 1); | ||||||
|
|
||||||
| // Mimic appendExtraCastsAfterTS: project casted columns back to original names. | ||||||
| NamesWithAliases project_cols; | ||||||
| project_cols.emplace_back(casted_columns[0], MutSup::version_column_name); | ||||||
| actions->add(ExpressionAction::project(project_cols)); | ||||||
| step.required_output.push_back(MutSup::version_column_name); | ||||||
|
|
||||||
| ExpressionActionsPtr extra_cast = chain.getLastActions(); | ||||||
| ASSERT_TRUE(extra_cast); | ||||||
| chain.finalize(); | ||||||
| chain.clear(); | ||||||
|
|
||||||
| extra_cast->execute(block); | ||||||
|
|
||||||
| const auto expected_type = getDataTypeByColumnInfoForComputingLayer(commit_ts_ci); | ||||||
| ASSERT_TRUE(block.has(MutSup::version_column_name)); | ||||||
| EXPECT_EQ(block.getByName(MutSup::version_column_name).type->getName(), expected_type->getName()); | ||||||
| } | ||||||
| CATCH | ||||||
|
|
||||||
| TEST_F(HiddenCommitTSColumnTest, RoughSetFilterAliasCommitTS) | ||||||
| try | ||||||
| { | ||||||
| // Rough set filter (RSOperator) uses table_column_defines by ColumnID. | ||||||
| // TiDB requests commit_ts as ColumnID=-5, but in TiFlash it is stored in `_INTERNAL_VERSION` (VersionColumnID). | ||||||
| // Ensure rough set filter can correctly map ColumnID=-5 to VersionColumnID. | ||||||
|
|
||||||
| TiDB::ColumnInfo commit_ts_ci; | ||||||
| commit_ts_ci.id = MutSup::extra_commit_ts_col_id; | ||||||
| commit_ts_ci.name = "commit_ts"; | ||||||
| commit_ts_ci.tp = TiDB::TypeLongLong; // Int64 | ||||||
| commit_ts_ci.flag = 0; // Nullable | ||||||
| TiDB::ColumnInfos scan_column_infos{commit_ts_ci}; | ||||||
|
|
||||||
| google::protobuf::RepeatedPtrField<tipb::Expr> filters; | ||||||
| { | ||||||
| tipb::Expr col_ref; | ||||||
| col_ref.set_tp(tipb::ExprType::ColumnRef); | ||||||
| { | ||||||
| WriteBufferFromOwnString ss; | ||||||
| encodeDAGInt64(/*column_index=*/0, ss); | ||||||
| col_ref.set_val(ss.releaseStr()); | ||||||
| } | ||||||
| auto * field_type = col_ref.mutable_field_type(); | ||||||
| field_type->set_tp(TiDB::TypeLongLong); | ||||||
| field_type->set_flag(0); // Nullable | ||||||
| field_type->set_flen(0); | ||||||
| field_type->set_decimal(0); | ||||||
|
|
||||||
| tipb::Expr literal = constructInt64LiteralTiExpr(10); | ||||||
|
|
||||||
| auto * func = filters.Add(); | ||||||
| func->set_tp(tipb::ExprType::ScalarFunc); | ||||||
| func->set_sig(tipb::ScalarFuncSig::GTInt); | ||||||
| *func->add_children() = col_ref; | ||||||
| *func->add_children() = literal; | ||||||
| } | ||||||
|
|
||||||
| tipb::ANNQueryInfo ann_query_info; | ||||||
| tipb::FTSQueryInfo fts_query_info; | ||||||
| google::protobuf::RepeatedPtrField<tipb::Expr> pushed_down_filters; | ||||||
| google::protobuf::RepeatedPtrField<tipb::ColumnarIndexInfo> used_indexes; | ||||||
| std::vector<int> runtime_filter_ids; | ||||||
| const int rf_max_wait_time_ms = 0; | ||||||
| auto dag_query = std::make_unique<DAGQueryInfo>( | ||||||
| filters, | ||||||
| ann_query_info, | ||||||
| fts_query_info, | ||||||
| pushed_down_filters, | ||||||
| used_indexes, | ||||||
| scan_column_infos, | ||||||
| runtime_filter_ids, | ||||||
| rf_max_wait_time_ms, | ||||||
| ctx->getTimezoneInfo()); | ||||||
|
|
||||||
| DM::ColumnDefines table_column_defines; | ||||||
| table_column_defines.emplace_back( | ||||||
| MutSup::version_col_id, | ||||||
| MutSup::version_column_name, | ||||||
| MutSup::getVersionColumnType()); | ||||||
|
|
||||||
| auto rs_operator | ||||||
| = DM::RSOperator::build(dag_query, scan_column_infos, table_column_defines, /*enable_rs_filter*/ true, log); | ||||||
| ASSERT_TRUE(rs_operator); | ||||||
|
|
||||||
| const auto col_ids = rs_operator->getColumnIDs(); | ||||||
| ASSERT_EQ(col_ids.size(), 1); | ||||||
| EXPECT_EQ(col_ids[0], MutSup::version_col_id); | ||||||
| } | ||||||
| CATCH | ||||||
|
|
||||||
| } // namespace DB::tests | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2 - Disaggregated read rejects commit_ts
Disaggregated compute mode's TableScan path unconditionally throws when it encounters the hidden commit_ts ColumnID (
MutSup::extra_commit_ts_col_id, -5), so any DAG request that includes that column will fail in disaggregated mode.Evidence:
case MutSup::extra_commit_ts_col_id: throw Exception(\"Not supported in disaggregated read now\");StorageDisaggregatedRemote.cpp:759and:833_INTERNAL_VERSIONinstead of rejecting itSuggested fix: Implement the same aliasing behavior (-5 → version column) for disaggregated mode instead of throwing.