Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 12 additions & 19 deletions crates/core/src/schema/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,15 @@ use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use once_cell::sync::Lazy;
use serde_json::Value as JsonValue;
use std::fs;
use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;

static DELETE_RECORD_AVRO_SCHEMA_IN_JSON: Lazy<Result<JsonValue>> = Lazy::new(|| {
let schema_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("schemas")
.join("HoodieDeleteRecord.avsc");

let content = fs::read_to_string(schema_path)
.map_err(|e| CoreError::Schema(format!("Failed to read schema file: {e}")))?;
static DELETE_RECORD_AVRO_SCHEMA_STR: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/schemas/HoodieDeleteRecord.avsc"
));

serde_json::from_str(&content)
static DELETE_RECORD_AVRO_SCHEMA_IN_JSON: Lazy<Result<JsonValue>> = Lazy::new(|| {
serde_json::from_str(DELETE_RECORD_AVRO_SCHEMA_STR)
.map_err(|e| CoreError::Schema(format!("Failed to parse schema to JSON: {e}")))
});

Expand Down Expand Up @@ -110,15 +105,13 @@ pub fn avro_schema_for_delete_record(delete_record_value: &AvroValue) -> Result<
AvroSchema::parse(&json).map_err(CoreError::AvroError)
}

static DELETE_RECORD_LIST_AVRO_SCHEMA: Lazy<Result<AvroSchema>> = Lazy::new(|| {
let schema_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("schemas")
.join("HoodieDeleteRecordList.avsc");

let mut file = File::open(&schema_path)
.map_err(|e| CoreError::Schema(format!("Failed to open schema file: {e}")))?;
static DELETE_RECORD_LIST_AVRO_SCHEMA_STR: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/schemas/HoodieDeleteRecordList.avsc"
));

AvroSchema::parse_reader(&mut file).map_err(CoreError::AvroError)
static DELETE_RECORD_LIST_AVRO_SCHEMA: Lazy<Result<AvroSchema>> = Lazy::new(|| {
AvroSchema::parse_str(DELETE_RECORD_LIST_AVRO_SCHEMA_STR).map_err(CoreError::AvroError)
});

pub fn avro_schema_for_delete_record_list() -> Result<&'static AvroSchema> {
Expand Down
18 changes: 13 additions & 5 deletions crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ impl FileSystemView {
}

/// Collect file slices from loaded file groups using the timeline view.
///
/// File slices are first collected from the DashMap using read locks (released
/// promptly), then metadata is loaded on the owned clones without holding any
/// locks.
async fn collect_file_slices(
&self,
partition_pruner: &PartitionPruner,
Expand All @@ -219,21 +223,25 @@ impl FileSystemView {
let excluding_file_groups = timeline_view.excluding_file_groups();

let mut file_slices = Vec::new();
for mut partition_entry in self.partition_to_file_groups.iter_mut() {
for partition_entry in self.partition_to_file_groups.iter() {
if !partition_pruner.should_include(partition_entry.key()) {
continue;
}
let file_groups = partition_entry.value_mut();
for fg in file_groups.iter_mut() {
let file_groups = partition_entry.value();
for fg in file_groups.iter() {
if excluding_file_groups.contains(fg) {
continue;
}
if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
fsl.load_metadata_if_needed(&self.storage).await?;
if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
file_slices.push(fsl.clone());
}
}
}

for fsl in &mut file_slices {
fsl.load_metadata_if_needed(&self.storage).await?;
}

Ok(file_slices)
}

Expand Down
Loading