diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index c3cbcc88b4..83ac86cbf8 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -50,30 +50,21 @@ impl IcebergCatalogProvider { // TODO: // Schemas and providers should be cached and evicted based on time // As of right now; schemas might become stale. - let schema_names: Vec<_> = client - .list_namespaces(None) - .await? - .iter() - .flat_map(|ns| ns.as_ref().clone()) - .collect(); + let namespace_idents = fetch_all_namespaces(client.as_ref()).await?; let providers = try_join_all( - schema_names + namespace_idents .iter() - .map(|name| { - IcebergSchemaProvider::try_new( - client.clone(), - NamespaceIdent::new(name.clone()), - ) - }) + .map(|nsi| IcebergSchemaProvider::try_new(client.clone(), nsi.clone())) .collect::>(), ) .await?; - let schemas: HashMap> = schema_names + let schemas: HashMap> = namespace_idents .into_iter() .zip(providers.into_iter()) - .map(|(name, provider)| { + .map(|(nsi, provider)| { + let name = nsi.as_ref().join("."); let provider = Arc::new(provider) as Arc; (name, provider) }) @@ -83,6 +74,22 @@ impl IcebergCatalogProvider { } } +async fn fetch_all_namespaces(client: &dyn Catalog) -> Result> { + let mut all_namespaces = Vec::new(); + let mut queue = std::collections::VecDeque::new(); + queue.push_back(None); + + while let Some(parent) = queue.pop_front() { + let children = client.list_namespaces(parent.as_ref()).await?; + for child in children { + all_namespaces.push(child.clone()); + queue.push_back(Some(child)); + } + } + + Ok(all_namespaces) +} + impl CatalogProvider for IcebergCatalogProvider { fn as_any(&self) -> &dyn Any { self @@ -96,3 +103,149 @@ impl CatalogProvider for IcebergCatalogProvider { self.schemas.get(name).cloned() } } + +#[cfg(test)] +mod tests { + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; + use iceberg::{CatalogBuilder, NamespaceIdent, TableCreation}; + + use super::*; + + async fn create_catalog() -> Arc { + let mut props = HashMap::new(); + props.insert( + MEMORY_CATALOG_WAREHOUSE.to_string(), + "memory://".to_string(), + ); + + let catalog = MemoryCatalogBuilder::default() + .load("test", props) + .await + .unwrap(); + + Arc::new(catalog) + } + + #[tokio::test] + async fn test_iceberg_catalog_provider_empty() { + let catalog = create_catalog().await; + + let provider = IcebergCatalogProvider::try_new(catalog).await.unwrap(); + + assert!(provider.schema_names().is_empty()); + } + + #[tokio::test] + async fn test_iceberg_catalog_provider_single_namespace() { + let catalog = create_catalog().await; + let ns = NamespaceIdent::new("a".to_string()); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let provider = IcebergCatalogProvider::try_new(catalog).await.unwrap(); + let schema_names = provider.schema_names(); + + assert_eq!(schema_names.len(), 1); + assert!(schema_names.contains(&"a".to_string())); + assert!(provider.schema("a").is_some()); + } + + #[tokio::test] + async fn test_iceberg_catalog_provider_with_table() { + let catalog = create_catalog().await; + let ns = NamespaceIdent::new("a".to_string()); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("t".to_string()) + .schema(schema) + .build(); + + catalog.create_table(&ns, table_creation).await.unwrap(); + + let provider = IcebergCatalogProvider::try_new(catalog).await.unwrap(); + + let schema_provider = provider.schema("a").unwrap(); + let table_names = schema_provider.table_names(); + + assert!(table_names.contains(&"t".to_string())); + assert!(schema_provider.table("t").await.unwrap().is_some()); + } + + #[tokio::test] + async fn test_iceberg_catalog_provider_nested_namespaces() { + let catalog = create_catalog().await; + let ns1 = NamespaceIdent::new("a".to_string()); + let ns2 = NamespaceIdent::from_vec(vec!["a".to_string(), "b".to_string()]).unwrap(); + catalog + .create_namespace(&ns1, HashMap::new()) + .await + .unwrap(); + catalog + .create_namespace(&ns2, HashMap::new()) + .await + .unwrap(); + + let provider = IcebergCatalogProvider::try_new(catalog).await.unwrap(); + let schema_names = provider.schema_names(); + + // This will fail since only list_namespaces(None) is used. + assert!(schema_names.contains(&"a".to_string())); + assert!(schema_names.contains(&"a.b".to_string())); + assert_eq!(schema_names.len(), 2); + + assert!(provider.schema("a").is_some()); + assert!(provider.schema("a.b").is_some()); + } + + #[tokio::test] + async fn test_fetch_all_namespaces_empty() { + let catalog = create_catalog().await; + let namespaces = fetch_all_namespaces(catalog.as_ref()).await.unwrap(); + assert!(namespaces.is_empty()); + } + + #[tokio::test] + async fn test_fetch_all_namespaces_one() { + let catalog = create_catalog().await; + let ns = NamespaceIdent::new("a".to_string()); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let namespaces = fetch_all_namespaces(catalog.as_ref()).await.unwrap(); + assert_eq!(namespaces.len(), 1); + assert!(namespaces.contains(&ns)); + } + + #[tokio::test] + async fn test_fetch_all_namespaces_nested() { + let catalog = create_catalog().await; + let ns1 = NamespaceIdent::new("a".to_string()); + let ns2 = NamespaceIdent::from_vec(vec!["a".to_string(), "b".to_string()]).unwrap(); + let ns3 = NamespaceIdent::from_vec(vec!["a".to_string(), "b".to_string(), "c".to_string()]) + .unwrap(); + catalog + .create_namespace(&ns1, HashMap::new()) + .await + .unwrap(); + catalog + .create_namespace(&ns2, HashMap::new()) + .await + .unwrap(); + catalog + .create_namespace(&ns3, HashMap::new()) + .await + .unwrap(); + + let namespaces = fetch_all_namespaces(catalog.as_ref()).await.unwrap(); + assert!(namespaces.contains(&ns1)); + assert!(namespaces.contains(&ns2)); + assert!(namespaces.contains(&ns3)); + } +} diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 6f8898abb8..08c49da01a 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -598,8 +598,8 @@ async fn test_insert_into_nested() -> Result<()> { // Insert data with nested structs let insert_sql = r#" INSERT INTO catalog.test_insert_nested.nested_table - SELECT - 1 as id, + SELECT + 1 as id, 'Alice' as name, named_struct( 'address', named_struct( @@ -613,8 +613,8 @@ async fn test_insert_into_nested() -> Result<()> { ) ) as profile UNION ALL - SELECT - 2 as id, + SELECT + 2 as id, 'Bob' as name, named_struct( 'address', named_struct( @@ -736,15 +736,15 @@ async fn test_insert_into_nested() -> Result<()> { let df = ctx .sql( r#" - SELECT - id, + SELECT + id, name, profile.address.street, profile.address.city, profile.address.zip, profile.contact.email, profile.contact.phone - FROM catalog.test_insert_nested.nested_table + FROM catalog.test_insert_nested.nested_table ORDER BY id "#, ) @@ -850,8 +850,8 @@ async fn test_insert_into_partitioned() -> Result<()> { let df = ctx .sql( r#" - INSERT INTO catalog.test_partitioned_write.partitioned_table - VALUES + INSERT INTO catalog.test_partitioned_write.partitioned_table + VALUES (1, 'electronics', 'laptop'), (2, 'electronics', 'phone'), (3, 'books', 'novel'), diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index e9f93287d8..9ed220f29d 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -96,6 +96,7 @@ impl DataFusionEngine { // Create test tables Self::create_unpartitioned_table(&catalog, &namespace).await?; Self::create_partitioned_table(&catalog, &namespace).await?; + Self::create_namespaced_table(&catalog).await?; Ok(Arc::new( IcebergCatalogProvider::try_new(Arc::new(catalog)).await?, @@ -161,4 +162,28 @@ impl DataFusionEngine { Ok(()) } + + async fn create_namespaced_table(catalog: &impl Catalog) -> anyhow::Result<()> { + let parent_ns = NamespaceIdent::new("parent_ns".to_string()); + catalog.create_namespace(&parent_ns, HashMap::new()).await?; + let child_ns = + NamespaceIdent::from_vec(vec!["parent_ns".to_string(), "child_ns".to_string()])?; + catalog.create_namespace(&child_ns, HashMap::new()).await?; + + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "foo1", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "foo2", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + let table_creation = TableCreation::builder() + .name("t".to_string()) + .schema(schema.clone()) + .build(); + + catalog.create_table(&child_ns, table_creation).await?; + + Ok(()) + } } diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml b/crates/sqllogictest/testdata/schedules/df_test.toml index df5e638d5a..39c6c08af8 100644 --- a/crates/sqllogictest/testdata/schedules/df_test.toml +++ b/crates/sqllogictest/testdata/schedules/df_test.toml @@ -25,3 +25,7 @@ slt = "df_test/show_tables.slt" [[steps]] engine = "df" slt = "df_test/insert_into.slt" + +[[steps]] +engine = "df" +slt = "df_test/namespaces.slt" diff --git a/crates/sqllogictest/testdata/slts/df_test/namespaces.slt b/crates/sqllogictest/testdata/slts/df_test/namespaces.slt new file mode 100644 index 0000000000..05af95b52a --- /dev/null +++ b/crates/sqllogictest/testdata/slts/df_test/namespaces.slt @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Insert a single row into the namespaced table and verify the count +query I +INSERT INTO default."parent_ns.child_ns".t VALUES (1, 'test') +---- +1 + +# Verify the inserted row +query IT +SELECT * FROM default."parent_ns.child_ns".t +---- +1 test diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt index c5da5f6276..95378ef1c1 100644 --- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt +++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt @@ -37,4 +37,7 @@ default information_schema parameters VIEW default information_schema routines VIEW default information_schema schemata VIEW default information_schema tables VIEW -default information_schema views VIEW \ No newline at end of file +default information_schema views VIEW +default parent_ns.child_ns t BASE TABLE +default parent_ns.child_ns t$manifests BASE TABLE +default parent_ns.child_ns t$snapshots BASE TABLE