diff --git a/src/Storages/MaterializedView/RefreshTask.cpp b/src/Storages/MaterializedView/RefreshTask.cpp index 87599650b73b..90e03ae3bdac 100644 --- a/src/Storages/MaterializedView/RefreshTask.cpp +++ b/src/Storages/MaterializedView/RefreshTask.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -59,6 +60,33 @@ namespace ErrorCodes extern const int INCORRECT_QUERY; } +namespace RefreshTimeout +{ + extern const int REFRESH_TIMEOUT_SEC = 60 * 60 * 2; // 2 hours +} + +/* + The RefreshTask class is responsible for refreshing a materialized view. + It can be executed in a sharded environment where each shard can consist of multiple replicas. + + How it works: + On a scheduled time, the RefreshTask will be executed on every replica in every shard. + This replica will be the leader for all shards. + It will grab the lock by creating ephemeral "running" znode with current timestamp. + Then it will delete all old "refresh_" directories older then 24 hours. + Then it will create a new temporary table and write its name to the "refresh_/temporary_table" znode. + After that, every shard will read the temporary table name and will try to write new data to it. + Every shard will elect its leader by creating a znode with its name "refresh_/". + If the node creation is successful, the shard will be the leader and will start writing the new data to the temporary table. + If the node creation is not successful, the shard will wait for another shard leader to finish writing the data. + After the shard leader finishes writing the data, it will create another node with its name "shards//finished". + The global leader will be participating in the data renewal process as well. + + After that, the leader will check if all shards have finished writing the data. + If all shards have finished writing the data, the leader will swap the temporary table with the main table and delete the "running" znode. + After that, the leader will delete the "refresh_" directory. +*/ + RefreshTask::RefreshTask( StorageMaterializedView * view_, ContextPtr context, const DB::ASTRefreshStrategy & strategy, bool /*attach*/, bool coordinated, bool empty, bool is_restore_from_backup) : log(getLogger("RefreshTask")) @@ -81,24 +109,27 @@ RefreshTask::RefreshTask( Macros::MacroExpansionInfo info; info.table_id = view->getStorageID(); const auto database = DatabaseCatalog::instance().getDatabase(view_->getStorageID().database_name); + // Use "_common" instead of a real shard name, to create a common znode for all shards + info.shard = "_common"; + coordination.shard_name = "_common"; + coordination.path = macros->expand(server_settings[ServerSetting::default_replica_path], info); if (const auto * replicated_db = dynamic_cast(database.get())) + { info.shard = replicated_db->getShardName(); - coordination.path = macros->expand(server_settings[ServerSetting::default_replica_path], info); + coordination.shard_name = replicated_db->getShardName(); + } coordination.replica_name = context->getMacros()->expand(server_settings[ServerSetting::default_replica_name], info); auto zookeeper = context->getZooKeeper(); - String replica_path = coordination.path + "/replicas/" + coordination.replica_name; - bool replica_path_existed = zookeeper->exists(replica_path); + bool root_znode_exists = zookeeper->exists(coordination.path); /// Create znodes even if it's ATTACH query. This seems weird, possibly incorrect, but /// currently both DatabaseReplicated and DatabaseShared seem to require this behavior. - if (!replica_path_existed) + if (!root_znode_exists) { zookeeper->createAncestors(coordination.path); std::vector futures; futures.emplace_back(zookeeper->asyncTryCreateNoThrow(coordination.path, coordination.root_znode.toString(), zkutil::CreateMode::Persistent)); - futures.emplace_back(zookeeper->asyncTryCreateNoThrow(coordination.path + "/replicas", "", zkutil::CreateMode::Persistent)); - futures.emplace_back(zookeeper->asyncTryCreateNoThrow(replica_path, "", zkutil::CreateMode::Persistent)); /// When restoring multiple tables from backup (e.g. a RESTORE DATABASE), the restored /// refreshable materialized views shouldn't start refreshing on any replica until all @@ -209,14 +240,11 @@ void RefreshTask::drop(ContextPtr context) { auto zookeeper = context->getZooKeeper(); - zookeeper->tryRemove(coordination.path + "/replicas/" + coordination.replica_name); - /// Redundant, refreshTask() is supposed to clean up after itself, but let's be paranoid. removeRunningZnodeIfMine(zookeeper); /// If no replicas left, remove the coordination znode. Coordination::Requests ops; - ops.emplace_back(zkutil::makeRemoveRequest(coordination.path + "/replicas", -1)); String paused_path = coordination.path + "/paused"; if (zookeeper->exists(paused_path)) ops.emplace_back(zkutil::makeRemoveRequest(paused_path, -1)); @@ -468,7 +496,8 @@ void RefreshTask::refreshTask() chassert(lock.owns_lock()); /// Check if another replica is already running a refresh. - if (coordination.running_znode_exists) + /// In sharded mode, we still participate even if not the global leader. + if (coordination.running_znode_exists && !coordination.is_global_leader) { if (coordination.root_znode.last_attempt_replica == coordination.replica_name) { @@ -485,6 +514,45 @@ void RefreshTask::refreshTask() } else { + /// Another replica is the global leader, but we may still be a shard leader. + /// Check if there's an active refresh directory we should participate in. + if (coordination.coordinated && !coordination.current_refresh_dir.empty()) + { + /// Try to become shard leader and participate in the refresh. + lock.unlock(); + + bool became_shard_leader = tryBecomeShardLeader(zookeeper); + if (became_shard_leader) + { + LOG_DEBUG(log, "Participating as shard leader for shard {}, current_refresh_dir: {}", coordination.shard_name, coordination.current_refresh_dir); + + [[maybe_unused]] String temp_table = getOrWaitForTemporaryTable(zookeeper); + LOG_DEBUG(log, "Shard {} got temporary table: {}", coordination.shard_name, temp_table); + + /// Execute our part of the refresh (write data to temporary table) + /// Note: In sharded mode, each shard writes its local data to the shared + /// temporary table. The shard leader will execute the query and write data. + try + { + executeRefreshUnlocked(refresh_append, -1); + markShardFinished(zookeeper); + } + catch (...) + { + LOG_ERROR(log, "Shard {} failed to write data: {}", coordination.shard_name, getCurrentExceptionMessage(true)); + } + + coordination.is_shard_leader = false; + } + else + { + /// Wait for another replica in our shard to finish + LOG_DEBUG(log, "Another replica is shard leader for shard {}, waiting", coordination.shard_name); + } + + lock.lock(); + } + setState(RefreshState::RunningOnAnotherReplica, lock); break; } @@ -536,12 +604,53 @@ void RefreshTask::refreshTask() break; } - /// Write to keeper. - if (!updateCoordinationState(start_znode, true, zookeeper, lock)) + lock.unlock(); + + /// Try to become the global leader for this refresh. + LOG_DEBUG(log, "Attempting to become global leader for refresh"); + bool became_global_leader = tryBecomeGlobalLeader(zookeeper, start_time); + + if (became_global_leader && coordination.coordinated) + { + /// As global leader: + /// 1. Clean up old refresh directories older than 24 hours + LOG_DEBUG(log, "Global leader cleaning up old refresh directories"); + cleanupOldRefreshDirectories(zookeeper); + + /// 2. Create a new refresh directory for this refresh + LOG_DEBUG(log, "Global leader creating refresh directory"); + createRefreshDirectory(zookeeper, start_time); + } + + // Global reader should succeed to become shard leader, because it did not yet create a temporary table. + [[maybe_unused]] bool became_shard_leader = tryBecomeShardLeader(zookeeper); + LOG_DEBUG(log, "tryBecomeShardLeader result: {}", became_shard_leader); + assert(became_shard_leader == coordination.is_global_leader); + + lock.lock(); + + /// Write to keeper (update root znode). + LOG_DEBUG(log, "Updating coordination state (is_global_leader={})", coordination.is_global_leader); + if (!updateCoordinationState(start_znode, coordination.is_global_leader, zookeeper, lock)) { + /// Clean up the artifacts we created before losing the race + if (coordination.is_shard_leader && !coordination.current_refresh_dir.empty()) + { + String shard_leader_path = coordination.current_refresh_dir + "/" + coordination.shard_name; + LOG_DEBUG(log, "Cleaning up shard leader znode: {}", shard_leader_path); + zookeeper->tryRemove(shard_leader_path); + } + if (coordination.is_global_leader && !coordination.current_refresh_dir.empty()) + { + LOG_DEBUG(log, "Cleaning up refresh directory: {}", coordination.current_refresh_dir); + cleanupRefreshDirectory(zookeeper); + } + coordination.is_global_leader = false; + coordination.is_shard_leader = false; schedule_keeper_retry(); return; } + LOG_DEBUG(log, "Coordination state updated successfully"); chassert(lock.owns_lock()); /// Perform a refresh. @@ -560,7 +669,38 @@ void RefreshTask::refreshTask() try { - new_table_uuid = executeRefreshUnlocked(append, root_znode_version); + if (coordination.is_global_leader) + { + /// Execute refresh: create temporary table and write data + new_table_uuid = executeRefreshUnlocked(append, root_znode_version); + + /// Mark our shard as finished + if (coordination.coordinated) + markShardFinished(zookeeper); + + /// Poll until all shards are finished or timeout + while (true) + { + if (std::chrono::steady_clock::now() - start_time_steady > std::chrono::seconds(RefreshTimeout::REFRESH_TIMEOUT_SEC)) + { + throw Exception(ErrorCodes::REFRESH_FAILED, "Timeout waiting for all shards to finish"); + } + + if (checkAllShardsFinished(zookeeper)) + { + LOG_DEBUG(log, "All shards finished, proceeding with table swap"); + break; + } + + if (execution.interrupt_execution.load()) + throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Refresh cancelled while waiting for shards"); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100ms before polling again + } + if (!append) + exchangeTargetTableAfterRefresh(new_table_uuid, append); + } + refreshed = true; } catch (...) @@ -568,7 +708,7 @@ void RefreshTask::refreshTask() if (execution.interrupt_execution.load()) { error_message = "cancelled"; - LOG_INFO(log, "{}: Refresh cancelled", view->getStorageID().getFullTableName()); + LOG_DEBUG(log, "{}: Refresh cancelled", view->getStorageID().getFullTableName()); } else { @@ -577,8 +717,25 @@ void RefreshTask::refreshTask() } } + /// Global leader cleans up the refresh directory + if (coordination.is_global_leader && coordination.coordinated) + { + try + { + cleanupRefreshDirectory(zookeeper); + } + catch (...) + { + LOG_WARNING(log, "Failed to cleanup refresh directory: {}", getCurrentExceptionMessage(true)); + } + } + lock.lock(); + /// Reset coordination state for next refresh + coordination.is_global_leader = false; + coordination.is_shard_leader = false; + setState(RefreshState::Scheduling, lock); auto end_time = std::chrono::floor(currentTime()); @@ -620,6 +777,8 @@ void RefreshTask::refreshTask() tryLogCurrentException(log, "Keeper error"); if (!lock.owns_lock()) lock.lock(); + coordination.is_global_leader = false; + coordination.is_shard_leader = false; schedule_keeper_retry(); } catch (...) @@ -629,6 +788,8 @@ void RefreshTask::refreshTask() scheduling.stop_requested = true; coordination.watches->should_reread_znodes.store(true); coordination.running_znode_exists = false; + coordination.is_global_leader = false; + coordination.is_shard_leader = false; lock.unlock(); tryLogCurrentException(log, @@ -648,10 +809,15 @@ void RefreshTask::refreshTask() UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version) { - LOG_DEBUG(log, "Refreshing view {}", view->getStorageID().getFullTableName()); + // Only executes after the replica has become a shard leader or global leader. + LOG_DEBUG(log, "Refreshing view {} (global_leader={}, shard_leader={})", + view->getStorageID().getFullTableName(), coordination.is_global_leader, coordination.is_shard_leader); execution.progress.reset(); ContextMutablePtr refresh_context = view->createRefreshContext(); + std::shared_ptr zookeeper; + if (coordination.coordinated) + zookeeper = view->getContext()->getZooKeeper(); if (!append) { @@ -666,9 +832,44 @@ UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version try { { - /// Create a table. - auto [refresh_query, query_scope] = view->prepareRefresh(append, refresh_context, table_to_drop); - new_table_id = refresh_query->table_id; + std::shared_ptr refresh_query; + std::unique_ptr query_scope; + /// to ZooKeeper so other shards can find it. + if (coordination.is_global_leader && coordination.coordinated) + { + /// Create a table (or get the shared temporary table name in multi-shard mode). + std::tie(refresh_query, query_scope) = view->prepareRefresh(append, refresh_context, table_to_drop, new_table_id); + new_table_id = refresh_query->table_id; + coordination.temporary_table_name = new_table_id.table_name; + getOrWaitForTemporaryTable(zookeeper); // This will write the temporary table name + } + /// If we're a shard leader but not the global leader, we need to get the temporary + /// table name from ZooKeeper (the table was already created by global leader). + else if (coordination.is_shard_leader && coordination.coordinated && !coordination.is_global_leader) + { + /// Note: In this case, we write to the same temporary table that the global leader created. + /// The refresh_query->table_id should be updated to point to the shared table. + String shared_temp_table = getOrWaitForTemporaryTable(zookeeper); + coordination.temporary_table_name = shared_temp_table; + // Find the table by name + // Wait up to 10 seconds for the table to be replicated + for (int i = 0; i < 100; i++) { + auto table = DatabaseCatalog::instance().getDatabase(view->getStorageID().database_name)->tryGetTable(coordination.temporary_table_name, view->getContext()); + if (table) { + new_table_id = table->getStorageID(); + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + std::tie(refresh_query, query_scope) = view->prepareRefresh(append, refresh_context, table_to_drop, new_table_id); + } + else + { + /// Non-coordinated refresh or append mode + std::tie(refresh_query, query_scope) = view->prepareRefresh(append, refresh_context, table_to_drop, new_table_id); + new_table_id = refresh_query->table_id; + } /// Add the query to system.processes and allow it to be killed with KILL QUERY. String query_for_logging = refresh_query->formatForLogging( @@ -684,7 +885,7 @@ UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version execution.progress.incrementPiecewiseAtomically(prog); }); - /// Run the query. + /// Run the query - each shard leader writes its portion of the data. BlockIO block_io = InterpreterInsertQuery( refresh_query, @@ -724,21 +925,47 @@ UUID RefreshTask::executeRefreshUnlocked(bool append, int32_t root_znode_version throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Refresh cancelled"); } - /// Exchange tables. - if (!append) - table_to_drop = view->exchangeTargetTable(new_table_id, refresh_context); + /// Note: Table exchange is done separately via exchangeTargetTableAfterRefresh() + /// after all shards have finished writing data. } catch (...) { - if (table_to_drop.has_value()) + /// Only drop the temporary table if we created it (global leader or non-coordinated). + if (table_to_drop.has_value() && (coordination.is_global_leader || !coordination.coordinated)) view->dropTempTable(table_to_drop.value(), refresh_context); throw; } + return new_table_id.uuid; +} + +void RefreshTask::exchangeTargetTableAfterRefresh(UUID new_table_uuid, bool append) +{ + if (append) + return; + + /// Only the global leader or non-coordinated refresh does the exchange. + if (!coordination.is_global_leader && coordination.coordinated) + return; + + LOG_DEBUG(log, "Exchanging target table with new table uuid={}", toString(new_table_uuid)); + + ContextMutablePtr refresh_context = view->createRefreshContext(); + + + auto table = DatabaseCatalog::instance().getDatabase(view->getStorageID().database_name)->tryGetTable(coordination.temporary_table_name, view->getContext()); + if (!table) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table {} does not exist", coordination.temporary_table_name); + auto new_table_id = table->getStorageID(); + + /// Exchange tables and get the old table to drop + auto table_to_drop = view->exchangeTargetTable(new_table_id, refresh_context); + + /// Drop the old table if (table_to_drop.has_value()) view->dropTempTable(table_to_drop.value(), refresh_context); - return new_table_id.uuid; + LOG_INFO(log, "Target table exchange completed"); } void RefreshTask::updateDependenciesIfNeeded(std::unique_lock & lock) @@ -909,6 +1136,25 @@ void RefreshTask::readZnodesIfNeeded(std::shared_ptr zookeepe coordination.running_znode_exists = responses[1].error == Coordination::Error::ZOK; coordination.paused_znode_exists = responses[2].error == Coordination::Error::ZOK; + /// Get current refresh directory from the "running" znode if it exists. + /// The "running" znode contains "replica_name:timestamp". + coordination.current_refresh_dir.clear(); + if (coordination.running_znode_exists && !responses[1].data.empty()) + { + size_t colon_pos = responses[1].data.rfind(':'); + if (colon_pos != String::npos) + { + try + { + String timestamp_str = responses[1].data.substr(colon_pos + 1); + coordination.current_refresh_dir = coordination.path + "/refresh_" + timestamp_str; + } + catch (...) { + coordination.current_refresh_dir.clear(); + } + } + } + if (coordination.root_znode.last_completed_timeslot != prev_last_completed_timeslot) { lock.unlock(); @@ -925,10 +1171,24 @@ bool RefreshTask::updateCoordinationState(CoordinationZnode root, bool running, { Coordination::Requests ops; ops.emplace_back(zkutil::makeSetRequest(coordination.path, root.toString(), root.version)); - if (running) + + /// If we want to start running and the running znode already exists (we created it in tryBecomeGlobalLeader), + /// just verify it exists instead of trying to create it again. + /// If we want to stop running, remove the znode. + /// If we want to start running and the znode doesn't exist yet, create it. + if (running && coordination.running_znode_exists) + { + /// We already created the running znode in tryBecomeGlobalLeader, just check it exists + ops.emplace_back(zkutil::makeCheckRequest(coordination.path + "/running", -1)); + } + else if (running) + { ops.emplace_back(zkutil::makeCreateRequest(coordination.path + "/running", coordination.replica_name, zkutil::CreateMode::Ephemeral)); + } else + { ops.emplace_back(zkutil::makeRemoveRequest(coordination.path + "/running", -1)); + } Coordination::Responses responses; @@ -936,7 +1196,9 @@ bool RefreshTask::updateCoordinationState(CoordinationZnode root, bool running, auto code = zookeeper->tryMulti(ops, responses); lock.lock(); - if (running && responses[0]->error == Coordination::Error::ZBADVERSION) + if (running && (responses[0]->error == Coordination::Error::ZBADVERSION || + code == Coordination::Error::ZNODEEXISTS || + code == Coordination::Error::ZNONODE)) /// Lost the race, this is normal, don't log a stack trace. return false; zkutil::KeeperMultiException::check(code, ops, responses); @@ -1076,4 +1338,279 @@ void RefreshTask::CoordinationZnode::parse(const String & data) last_attempt_time = std::chrono::sys_seconds(std::chrono::seconds(last_attempt_time_int)); } +void RefreshTask::cleanupOldRefreshDirectories(std::shared_ptr zookeeper, std::chrono::seconds max_age) +{ + if (!coordination.coordinated) + return; + + auto now = std::chrono::system_clock::now(); + auto cutoff = std::chrono::duration_cast(now.time_since_epoch()).count() - max_age.count(); + + Strings children; + auto code = zookeeper->tryGetChildren(coordination.path, children); + if (code != Coordination::Error::ZOK) + return; + + for (const auto & child : children) + { + if (child.starts_with("refresh_")) + { + try + { + /// Extract timestamp from "refresh_" + Int64 timestamp = std::stoll(child.substr(8)); + if (timestamp < cutoff) + { + String refresh_path = coordination.path + "/" + child; + LOG_DEBUG(log, "Cleaning up old refresh directory: {}", refresh_path); + + /// Remove all children first + Strings refresh_children; + if (zookeeper->tryGetChildren(refresh_path, refresh_children) == Coordination::Error::ZOK) + { + for (const auto & refresh_child : refresh_children) + zookeeper->tryRemove(refresh_path + "/" + refresh_child); + } + zookeeper->tryRemove(refresh_path); + } + } + catch (const std::exception & e) + { + LOG_WARNING(log, "Failed to parse or clean up refresh directory {}: {}", child, e.what()); + } + } + } +} + +String RefreshTask::createRefreshDirectory(std::shared_ptr zookeeper, std::chrono::sys_seconds timestamp) +{ + Int64 ts = std::chrono::duration_cast(timestamp.time_since_epoch()).count(); + String refresh_dir = coordination.path + "/refresh_" + std::to_string(ts); + + auto code = zookeeper->tryCreate(refresh_dir, "", zkutil::CreateMode::Persistent); + if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS) + throw Coordination::Exception::fromPath(code, refresh_dir); + + coordination.current_refresh_dir = refresh_dir; + return refresh_dir; +} + +bool RefreshTask::tryBecomeGlobalLeader(std::shared_ptr zookeeper, std::chrono::sys_seconds timestamp) +{ + if (!coordination.coordinated) + { + coordination.is_global_leader = true; + return true; + } + + /// Try to create ephemeral "running" znode with current timestamp + Int64 ts = std::chrono::duration_cast(timestamp.time_since_epoch()).count(); + String running_data = coordination.replica_name + ":" + std::to_string(ts); + + auto code = zookeeper->tryCreate(coordination.path + "/running", running_data, zkutil::CreateMode::Ephemeral); + if (code == Coordination::Error::ZOK) + { + coordination.is_global_leader = true; + coordination.running_znode_exists = true; + LOG_DEBUG(log, "Became global leader for refresh"); + return true; + } + else if (code == Coordination::Error::ZNODEEXISTS) + { + coordination.is_global_leader = false; + coordination.running_znode_exists = true; + LOG_DEBUG(log, "Another replica is already global leader"); + return false; + } + else + { + throw Coordination::Exception::fromPath(code, coordination.path + "/running"); + } +} + +bool RefreshTask::tryBecomeShardLeader(std::shared_ptr zookeeper) +{ + if (!coordination.coordinated) + { + coordination.is_shard_leader = true; + return true; + } + + if (coordination.current_refresh_dir.empty() and not coordination.is_global_leader) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot become shard leader without a current refresh directory"); + + /// Try to create znode for this shard in the refresh directory + String shard_leader_path = coordination.current_refresh_dir + "/" + coordination.shard_name; + + auto code = zookeeper->tryCreate(shard_leader_path, coordination.replica_name, zkutil::CreateMode::Ephemeral); + if (code == Coordination::Error::ZOK) + { + coordination.is_shard_leader = true; + LOG_DEBUG(log, "Became shard leader for shard {}", coordination.shard_name); + return true; + } + else if (code == Coordination::Error::ZNODEEXISTS) + { + coordination.is_shard_leader = false; + LOG_DEBUG(log, "Another replica is already shard leader for shard {}", coordination.shard_name); + return false; + } + else + { + throw Coordination::Exception::fromPath(code, shard_leader_path); + } +} + +String RefreshTask::getOrWaitForTemporaryTable(std::shared_ptr zookeeper) +{ + if (!coordination.coordinated) + return coordination.temporary_table_name; + + if (coordination.current_refresh_dir.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot get temporary table without a current refresh directory"); + + String temp_table_path = coordination.current_refresh_dir + "/temporary_table"; + + /// If we're the global leader, write the temporary table name + if (coordination.is_global_leader && !coordination.temporary_table_name.empty()) + { + LOG_DEBUG(log, "Global leader creating temporary_table znode at {} with value {}", temp_table_path, coordination.temporary_table_name); + auto code = zookeeper->tryCreate(temp_table_path, coordination.temporary_table_name, zkutil::CreateMode::Persistent); + if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS) + throw Coordination::Exception::fromPath(code, temp_table_path); + LOG_DEBUG(log, "Global leader created temporary_table znode, code={}", static_cast(code)); + return coordination.temporary_table_name; + } + + /// Wait for the temporary table name to be available + for (int attempt = 0; attempt < RefreshTimeout::REFRESH_TIMEOUT_SEC * 10; ++attempt) + { + String data; + if (zookeeper->tryGet(temp_table_path, data) && !data.empty()) + { + LOG_DEBUG(log, "Got temporary table name from znode: {}", data); + coordination.temporary_table_name = data; + return data; + } + + if (execution.interrupt_execution.load()) + throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Refresh cancelled while waiting for temporary table"); + + if (attempt % 50 == 0) // Log every 5 seconds + LOG_INFO(log, "Waiting for temporary table znode at {} (attempt {})", temp_table_path, attempt); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Timeout waiting for temporary table znode at {}", temp_table_path); +} + +void RefreshTask::markShardFinished(std::shared_ptr zookeeper) +{ + if (!coordination.coordinated) + return; + + if (coordination.current_refresh_dir.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot mark shard finished without a current refresh directory"); + + /// Store finished status in the current refresh directory + String finished_dir = coordination.current_refresh_dir + "/finished"; + zookeeper->tryCreate(finished_dir, "", zkutil::CreateMode::Persistent); + + String finished_path = finished_dir + "/" + coordination.shard_name; + + /// Create the finished znode to signal completion + auto code = zookeeper->tryCreate(finished_path, "", zkutil::CreateMode::Persistent); + if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNODEEXISTS) + throw Coordination::Exception::fromPath(code, finished_path); + + LOG_DEBUG(log, "Marked shard {} as finished in {}", coordination.shard_name, finished_path); +} + +size_t RefreshTask::getAllShardsCount(std::shared_ptr /* zookeeper */) +{ + if (!coordination.coordinated) + { + return 1; + } + + /// Get shard names from the cluster configuration rather than ZooKeeper. + /// This provides a consistent view of all shards and avoids race conditions + try + { + const auto database = DatabaseCatalog::instance().getDatabase(view->getStorageID().database_name); + const auto * replicated_db = dynamic_cast(database.get()); + if (!replicated_db) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Database {} is not a replicated database", view->getStorageID().database_name); + ClusterPtr cluster = replicated_db->tryGetCluster(); + if (!cluster) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster not found for database {}", view->getStorageID().database_name); + return cluster->getShardsInfo().size(); + } + catch (...) + { + LOG_WARNING(log, "Failed to get shards from cluster: {}", getCurrentExceptionMessage(true)); + throw; + } +} + +bool RefreshTask::checkAllShardsFinished(std::shared_ptr zookeeper) +{ + if (!coordination.coordinated) + return true; + + if (coordination.current_refresh_dir.empty()) + return false; + + auto all_shards = getAllShardsCount(zookeeper); + if (all_shards == 1) + return true; + + String finished_dir = coordination.current_refresh_dir + "/finished"; + + size_t num_finished_shards = 0; + Strings finished_children; + if (zookeeper->tryGetChildren(finished_dir, finished_children) == Coordination::Error::ZOK) + { + num_finished_shards = finished_children.size(); + } + else + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to get children of finished directory: {}", finished_dir); + } + if (num_finished_shards == all_shards) + return true; + LOG_DEBUG(log, "Waiting for shards to finish: {} / {}", num_finished_shards, all_shards); + return false; +} + +void RefreshTask::cleanupRefreshDirectory(std::shared_ptr zookeeper) +{ + if (!coordination.coordinated || coordination.current_refresh_dir.empty()) + return; + + LOG_DEBUG(log, "Cleaning up refresh directory: {}", coordination.current_refresh_dir); + + /// Remove the finished subdirectory and its children + String finished_dir = coordination.current_refresh_dir + "/finished"; + Strings finished_children; + if (zookeeper->tryGetChildren(finished_dir, finished_children) == Coordination::Error::ZOK) + { + for (const auto & child : finished_children) + zookeeper->tryRemove(finished_dir + "/" + child); + } + zookeeper->tryRemove(finished_dir); + + /// Remove all other children of the refresh directory + Strings children; + if (zookeeper->tryGetChildren(coordination.current_refresh_dir, children) == Coordination::Error::ZOK) + { + for (const auto & child : children) + zookeeper->tryRemove(coordination.current_refresh_dir + "/" + child); + } + zookeeper->tryRemove(coordination.current_refresh_dir); + + coordination.current_refresh_dir.clear(); +} + } diff --git a/src/Storages/MaterializedView/RefreshTask.h b/src/Storages/MaterializedView/RefreshTask.h index 10ff3f0f184f..786be1ea2880 100644 --- a/src/Storages/MaterializedView/RefreshTask.h +++ b/src/Storages/MaterializedView/RefreshTask.h @@ -172,18 +172,26 @@ class RefreshTask : public std::enable_shared_from_this /// When coordination is enabled, we have these znodes in Keeper: /// /// keeper_path (CoordinationZnode) - /// ├── "replicas" - /// │ ├── name1 - /// │ ├── name2 - /// │ └── name3 - /// ├── ["running"] (ephemeral) - /// └── ["paused"] + /// ├── "shards" + /// │ ├── shard1 + /// │ ├── shard2 + /// │ └── shard3 + /// ├── ["running"] (ephemeral, contains global leader replica name and timestamp) + /// ├── ["paused"] + /// └── "refresh_" (created for each refresh) + /// ├── "temporary_table" (contains name of the temporary table) + /// ├── "" (ephemeral, created by shard leader to claim leadership) + /// └── "finished" + /// ├── shard1 (created when shard1 completes its data write) + /// ├── shard2 (created when shard2 completes its data write) + /// └── shard3 (created when shard3 completes its data write) struct WatchState { std::atomic_bool should_reread_znodes {true}; std::atomic_bool root_watch_active {false}; std::atomic_bool children_watch_active {false}; + std::atomic_bool refresh_dir_watch_active {false}; }; CoordinationZnode root_znode; @@ -197,6 +205,16 @@ class RefreshTask : public std::enable_shared_from_this bool read_only = false; String path; String replica_name; + String shard_name; + + /// Current refresh directory path (e.g., "refresh_") + String current_refresh_dir; + /// Whether this replica is the global leader for the current refresh + bool is_global_leader = false; + /// Whether this replica is the shard leader for the current refresh + bool is_shard_leader = false; + /// Temporary table name for the current refresh + String temporary_table_name; }; struct ExecutionState @@ -270,10 +288,15 @@ class RefreshTask : public std::enable_shared_from_this /// (e.g. stop_requested, cancel_requested), they don't do anything significant themselves. void refreshTask(); - /// Perform an actual refresh: create new table, run INSERT SELECT, exchange tables, drop old table. + /// Perform an actual refresh: create new table, run INSERT SELECT. + /// Table exchange is done separately via exchangeTargetTableAfterRefresh(). /// Mutex must be unlocked. Called only from refresh_task. UUID executeRefreshUnlocked(bool append, int32_t root_znode_version); + /// Exchange the target table with the newly created temporary table after all shards have finished. + /// Only called by the global leader (or in non-coordinated mode). + void exchangeTargetTableAfterRefresh(UUID new_table_uuid, bool append); + /// Assigns dependencies_satisfied_until. void updateDependenciesIfNeeded(std::unique_lock & lock); @@ -284,6 +307,17 @@ class RefreshTask : public std::enable_shared_from_this bool updateCoordinationState(CoordinationZnode root, bool running, std::shared_ptr zookeeper, std::unique_lock & lock); void removeRunningZnodeIfMine(std::shared_ptr zookeeper); + /// Multi-shard coordination methods + void cleanupOldRefreshDirectories(std::shared_ptr zookeeper, std::chrono::seconds max_age = std::chrono::hours(24)); + String createRefreshDirectory(std::shared_ptr zookeeper, std::chrono::sys_seconds timestamp); + bool tryBecomeGlobalLeader(std::shared_ptr zookeeper, std::chrono::sys_seconds timestamp); + bool tryBecomeShardLeader(std::shared_ptr zookeeper); + String getOrWaitForTemporaryTable(std::shared_ptr zookeeper); + void markShardFinished(std::shared_ptr zookeeper); + bool checkAllShardsFinished(std::shared_ptr zookeeper); + void cleanupRefreshDirectory(std::shared_ptr zookeeper); + size_t getAllShardsCount(std::shared_ptr zookeeper); + void setState(RefreshState s, std::unique_lock & lock); void scheduleRefresh(std::lock_guard & lock); void interruptExecution(); diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 1775ecba16a5..771e02261b0f 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -515,15 +515,17 @@ ContextMutablePtr StorageMaterializedView::createRefreshContext() const } std::tuple, std::unique_ptr> -StorageMaterializedView::prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional & out_temp_table_id) const +StorageMaterializedView::prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional & out_temp_table_id, StorageID override_target_table) const { auto inner_table_id = getTargetTableId(); - StorageID target_table = inner_table_id; + auto target_table = inner_table_id; + if (!override_target_table.empty()) + target_table = override_target_table; auto select_query = getInMemoryMetadataPtr()->getSelectQuery().select_query; InterpreterSetQuery::applySettingsFromQuery(select_query, refresh_context); - if (!append) + if (!append && override_target_table.empty()) { CurrentThread::QueryScope query_scope(refresh_context); diff --git a/src/Storages/StorageMaterializedView.h b/src/Storages/StorageMaterializedView.h index d66dca8b7e32..c46aacdec399 100644 --- a/src/Storages/StorageMaterializedView.h +++ b/src/Storages/StorageMaterializedView.h @@ -128,7 +128,7 @@ class StorageMaterializedView final : public IStorage, WithMutableContext /// out_temp_table_id may be assigned before throwing an exception, in which case the caller /// must drop the temp table before rethrowing. std::tuple, std::unique_ptr> - prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional & out_temp_table_id) const; + prepareRefresh(bool append, ContextMutablePtr refresh_context, std::optional & out_temp_table_id, StorageID target_table_id) const; std::optional exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context) const; void dropTempTable(StorageID table, ContextMutablePtr refresh_context);