From 742e16503e053a6409bcc21582acaa9353b51b6a Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Mon, 9 Mar 2026 01:04:24 -0500 Subject: [PATCH] fix: avoid dashmap deadlock for correlated queries --- crates/core/src/schema/delete.rs | 31 ++++++++++++------------------- crates/core/src/table/fs_view.rs | 18 +++++++++++++----- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/crates/core/src/schema/delete.rs b/crates/core/src/schema/delete.rs index 7f5efd1d..351e9dcb 100644 --- a/crates/core/src/schema/delete.rs +++ b/crates/core/src/schema/delete.rs @@ -25,20 +25,15 @@ use arrow_array::{RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use once_cell::sync::Lazy; use serde_json::Value as JsonValue; -use std::fs; -use std::fs::File; -use std::path::PathBuf; use std::sync::Arc; -static DELETE_RECORD_AVRO_SCHEMA_IN_JSON: Lazy> = Lazy::new(|| { - let schema_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("schemas") - .join("HoodieDeleteRecord.avsc"); - - let content = fs::read_to_string(schema_path) - .map_err(|e| CoreError::Schema(format!("Failed to read schema file: {e}")))?; +static DELETE_RECORD_AVRO_SCHEMA_STR: &str = include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/schemas/HoodieDeleteRecord.avsc" +)); - serde_json::from_str(&content) +static DELETE_RECORD_AVRO_SCHEMA_IN_JSON: Lazy> = Lazy::new(|| { + serde_json::from_str(DELETE_RECORD_AVRO_SCHEMA_STR) .map_err(|e| CoreError::Schema(format!("Failed to parse schema to JSON: {e}"))) }); @@ -110,15 +105,13 @@ pub fn avro_schema_for_delete_record(delete_record_value: &AvroValue) -> Result< AvroSchema::parse(&json).map_err(CoreError::AvroError) } -static DELETE_RECORD_LIST_AVRO_SCHEMA: Lazy> = Lazy::new(|| { - let schema_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("schemas") - .join("HoodieDeleteRecordList.avsc"); - - let mut file = File::open(&schema_path) - .map_err(|e| CoreError::Schema(format!("Failed to open schema file: {e}")))?; +static DELETE_RECORD_LIST_AVRO_SCHEMA_STR: &str = include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/schemas/HoodieDeleteRecordList.avsc" +)); - AvroSchema::parse_reader(&mut file).map_err(CoreError::AvroError) +static DELETE_RECORD_LIST_AVRO_SCHEMA: Lazy> = Lazy::new(|| { + AvroSchema::parse_str(DELETE_RECORD_LIST_AVRO_SCHEMA_STR).map_err(CoreError::AvroError) }); pub fn avro_schema_for_delete_record_list() -> Result<&'static AvroSchema> { diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs index 223a86f7..a3740c1e 100644 --- a/crates/core/src/table/fs_view.rs +++ b/crates/core/src/table/fs_view.rs @@ -210,6 +210,10 @@ impl FileSystemView { } /// Collect file slices from loaded file groups using the timeline view. + /// + /// File slices are first collected from the DashMap using read locks (released + /// promptly), then metadata is loaded on the owned clones without holding any + /// locks. async fn collect_file_slices( &self, partition_pruner: &PartitionPruner, @@ -219,21 +223,25 @@ impl FileSystemView { let excluding_file_groups = timeline_view.excluding_file_groups(); let mut file_slices = Vec::new(); - for mut partition_entry in self.partition_to_file_groups.iter_mut() { + for partition_entry in self.partition_to_file_groups.iter() { if !partition_pruner.should_include(partition_entry.key()) { continue; } - let file_groups = partition_entry.value_mut(); - for fg in file_groups.iter_mut() { + let file_groups = partition_entry.value(); + for fg in file_groups.iter() { if excluding_file_groups.contains(fg) { continue; } - if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) { - fsl.load_metadata_if_needed(&self.storage).await?; + if let Some(fsl) = fg.get_file_slice_as_of(timestamp) { file_slices.push(fsl.clone()); } } } + + for fsl in &mut file_slices { + fsl.load_metadata_if_needed(&self.storage).await?; + } + Ok(file_slices) }