Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::any::Any;
use std::borrow::Cow;
use std::fmt;

Check warning on line 3 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/delta_datafusion/table_provider.rs
use std::sync::Arc;

use arrow::array::BooleanArray;
Expand Down Expand Up @@ -46,7 +46,7 @@
use delta_kernel::table_properties::DataSkippingNumIndexedCols;
use futures::StreamExt as _;
use itertools::Itertools;
use object_store::ObjectMeta;

Check warning on line 49 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/delta_datafusion/table_provider.rs
use serde::{Deserialize, Serialize};
use url::Url;
use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
Expand Down Expand Up @@ -251,7 +251,7 @@
/// Whether to wrap partition values in a dictionary encoding to potentially save space
pub(super) wrap_partition_values: Option<bool>,
/// Whether to push down filter in end result or just prune the files
pub(super) enable_parquet_pushdown: bool,

Check warning on line 254 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / check

Diff in /home/runner/work/delta-rs/delta-rs/crates/core/src/delta_datafusion/table_provider.rs
/// Schema to scan table with
pub(super) schema: Option<SchemaRef>,
pub(super) options: std::collections::HashMap<String, String>
Expand Down Expand Up @@ -1048,29 +1048,6 @@
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let source_uri = Url::parse(self.table_uri.as_str())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let url_key = object_store_url(&source_uri);
let runtime = context.runtime_env();
// self.config
if runtime.object_store(url_key).is_err() {
let storage_config = StorageConfig::parse_options(
self.config.options.clone()
)?;
let source_store = logstore_for(source_uri,storage_config)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let object_store_url = source_store.object_store_url();

// check if delta store is already registered
if runtime.object_store(object_store_url.clone()).is_err() {
runtime.register_object_store(
object_store_url.as_ref(),
source_store.object_store(None),
);
}
}
// Now that everything is set up, execute the inner Delta

self.parquet_scan.execute(partition, context)
}

Expand Down
Loading