From 8fd84a02a25334720398fd12c70ccb2f728b026c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20EIckler?= <797483+eickler@users.noreply.github.com> Date: Wed, 31 Dec 2025 10:23:27 +0100 Subject: [PATCH 1/5] test: Test cases for safeguarding; nested test will fail --- crates/integrations/datafusion/src/catalog.rs | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index c3cbcc88b4..2fb7dfcebc 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -96,3 +96,102 @@ impl CatalogProvider for IcebergCatalogProvider { self.schemas.get(name).cloned() } } + +#[cfg(test)] +mod tests { + use iceberg::memory::MemoryCatalogBuilder; + use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; + use iceberg::{CatalogBuilder, NamespaceIdent, TableCreation}; + + use super::*; + + async fn create_catalog() -> Arc { + let mut props = HashMap::new(); + props.insert("warehouse".to_string(), "memory://".to_string()); + + let catalog = MemoryCatalogBuilder::default() + .load("test", props) + .await + .unwrap(); + + Arc::new(catalog) + } + + #[tokio::test] + async fn test_iceberg_catalog_provider_empty() { + let catalog = create_catalog().await; + + let provider = IcebergCatalogProvider::try_new(catalog).await.unwrap(); + + assert!(provider.schema_names().is_empty()); + } + + #[tokio::test] + async fn test_iceberg_catalog_provider_single_namespace() { + let catalog = create_catalog().await; + let ns = NamespaceIdent::new("a".to_string()); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let provider = IcebergCatalogProvider::try_new(catalog).await.unwrap(); + let schema_names = provider.schema_names(); + + assert_eq!(schema_names.len(), 1); + assert!(schema_names.contains(&"a".to_string())); + assert!(provider.schema("a").is_some()); + } + + #[tokio::test] + async fn test_iceberg_catalog_provider_with_table() { + let catalog = create_catalog().await; + let ns = NamespaceIdent::new("a".to_string()); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("t".to_string()) + .schema(schema) + .build(); + + catalog.create_table(&ns, table_creation).await.unwrap(); + + let provider = IcebergCatalogProvider::try_new(catalog).await.unwrap(); + + let schema_provider = provider.schema("a").unwrap(); + let table_names = schema_provider.table_names(); + + assert!(table_names.contains(&"t".to_string())); + assert!(schema_provider.table("t").await.unwrap().is_some()); + } + + #[tokio::test] + async fn test_iceberg_catalog_provider_nested_namespaces() { + let catalog = create_catalog().await; + let ns1 = NamespaceIdent::new("a".to_string()); + let ns2 = NamespaceIdent::from_vec(vec!["a".to_string(), "b".to_string()]).unwrap(); + catalog + .create_namespace(&ns1, HashMap::new()) + .await + .unwrap(); + catalog + .create_namespace(&ns2, HashMap::new()) + .await + .unwrap(); + + let provider = IcebergCatalogProvider::try_new(catalog).await.unwrap(); + let schema_names = provider.schema_names(); + + // This will fail since only list_namespaces(None) is used. + assert!(schema_names.contains(&"a".to_string())); + assert!(schema_names.contains(&"a.b".to_string())); + assert_eq!(schema_names.len(), 2); + + assert!(provider.schema("a").is_some()); + assert!(provider.schema("a.b").is_some()); + } +} From 8fda186237116008f9d5301281d80ee5f95ea841 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20EIckler?= <797483+eickler@users.noreply.github.com> Date: Wed, 31 Dec 2025 10:40:31 +0100 Subject: [PATCH 2/5] feat: Support for nested namespaces --- crates/integrations/datafusion/src/catalog.rs | 81 +++++++++++++++---- 1 file changed, 66 insertions(+), 15 deletions(-) diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 2fb7dfcebc..695a612c1c 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -50,30 +50,21 @@ impl IcebergCatalogProvider { // TODO: // Schemas and providers should be cached and evicted based on time // As of right now; schemas might become stale. - let schema_names: Vec<_> = client - .list_namespaces(None) - .await? - .iter() - .flat_map(|ns| ns.as_ref().clone()) - .collect(); + let namespace_idents = fetch_all_namespaces(client.as_ref()).await?; let providers = try_join_all( - schema_names + namespace_idents .iter() - .map(|name| { - IcebergSchemaProvider::try_new( - client.clone(), - NamespaceIdent::new(name.clone()), - ) - }) + .map(|nsi| IcebergSchemaProvider::try_new(client.clone(), nsi.clone())) .collect::>(), ) .await?; - let schemas: HashMap> = schema_names + let schemas: HashMap> = namespace_idents .into_iter() .zip(providers.into_iter()) - .map(|(name, provider)| { + .map(|(nsi, provider)| { + let name = nsi.as_ref().join("."); let provider = Arc::new(provider) as Arc; (name, provider) }) @@ -83,6 +74,22 @@ impl IcebergCatalogProvider { } } +async fn fetch_all_namespaces(client: &dyn Catalog) -> Result> { + let mut all_namespaces = Vec::new(); + let mut queue = std::collections::VecDeque::new(); + queue.push_back(None); + + while let Some(parent) = queue.pop_front() { + let children = client.list_namespaces(parent.as_ref()).await?; + for child in children { + all_namespaces.push(child.clone()); + queue.push_back(Some(child)); + } + } + + Ok(all_namespaces) +} + impl CatalogProvider for IcebergCatalogProvider { fn as_any(&self) -> &dyn Any { self @@ -194,4 +201,48 @@ mod tests { assert!(provider.schema("a").is_some()); assert!(provider.schema("a.b").is_some()); } + + #[tokio::test] + async fn test_fetch_all_namespaces_empty() { + let catalog = create_catalog().await; + let namespaces = fetch_all_namespaces(catalog.as_ref()).await.unwrap(); + assert!(namespaces.is_empty()); + } + + #[tokio::test] + async fn test_fetch_all_namespaces_one() { + let catalog = create_catalog().await; + let ns = NamespaceIdent::new("a".to_string()); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let namespaces = fetch_all_namespaces(catalog.as_ref()).await.unwrap(); + assert_eq!(namespaces.len(), 1); + assert!(namespaces.contains(&ns)); + } + + #[tokio::test] + async fn test_fetch_all_namespaces_nested() { + let catalog = create_catalog().await; + let ns1 = NamespaceIdent::new("a".to_string()); + let ns2 = NamespaceIdent::from_vec(vec!["a".to_string(), "b".to_string()]).unwrap(); + let ns3 = NamespaceIdent::from_vec(vec!["a".to_string(), "b".to_string(), "c".to_string()]) + .unwrap(); + catalog + .create_namespace(&ns1, HashMap::new()) + .await + .unwrap(); + catalog + .create_namespace(&ns2, HashMap::new()) + .await + .unwrap(); + catalog + .create_namespace(&ns3, HashMap::new()) + .await + .unwrap(); + + let namespaces = fetch_all_namespaces(catalog.as_ref()).await.unwrap(); + assert!(namespaces.contains(&ns1)); + assert!(namespaces.contains(&ns2)); + assert!(namespaces.contains(&ns3)); + } } From 26d526800d5032dd363268e6796e1ab640fd2987 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20EIckler?= <797483+eickler@users.noreply.github.com> Date: Wed, 31 Dec 2025 11:31:10 +0100 Subject: [PATCH 3/5] test: Added integration test for hierarchical namespace --- crates/integrations/datafusion/src/catalog.rs | 7 +- .../tests/integration_datafusion_test.rs | 79 ++++++++++++++++--- 2 files changed, 75 insertions(+), 11 deletions(-) diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 695a612c1c..83ac86cbf8 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -106,7 +106,7 @@ impl CatalogProvider for IcebergCatalogProvider { #[cfg(test)] mod tests { - use iceberg::memory::MemoryCatalogBuilder; + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::{CatalogBuilder, NamespaceIdent, TableCreation}; @@ -114,7 +114,10 @@ mod tests { async fn create_catalog() -> Arc { let mut props = HashMap::new(); - props.insert("warehouse".to_string(), "memory://".to_string()); + props.insert( + MEMORY_CATALOG_WAREHOUSE.to_string(), + "memory://".to_string(), + ); let catalog = MemoryCatalogBuilder::default() .load("test", props) diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 6f8898abb8..00e660c56e 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -598,8 +598,8 @@ async fn test_insert_into_nested() -> Result<()> { // Insert data with nested structs let insert_sql = r#" INSERT INTO catalog.test_insert_nested.nested_table - SELECT - 1 as id, + SELECT + 1 as id, 'Alice' as name, named_struct( 'address', named_struct( @@ -613,8 +613,8 @@ async fn test_insert_into_nested() -> Result<()> { ) ) as profile UNION ALL - SELECT - 2 as id, + SELECT + 2 as id, 'Bob' as name, named_struct( 'address', named_struct( @@ -736,15 +736,15 @@ async fn test_insert_into_nested() -> Result<()> { let df = ctx .sql( r#" - SELECT - id, + SELECT + id, name, profile.address.street, profile.address.city, profile.address.zip, profile.contact.email, profile.contact.phone - FROM catalog.test_insert_nested.nested_table + FROM catalog.test_insert_nested.nested_table ORDER BY id "#, ) @@ -850,8 +850,8 @@ async fn test_insert_into_partitioned() -> Result<()> { let df = ctx .sql( r#" - INSERT INTO catalog.test_partitioned_write.partitioned_table - VALUES + INSERT INTO catalog.test_partitioned_write.partitioned_table + VALUES (1, 'electronics', 'laptop'), (2, 'electronics', 'phone'), (3, 'books', 'novel'), @@ -943,3 +943,64 @@ async fn test_insert_into_partitioned() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_child_namespace_crud() -> Result<()> { + let iceberg_catalog = get_iceberg_catalog().await; + + let parent_ns = NamespaceIdent::new("parent_ns".to_string()); + set_test_namespace(&iceberg_catalog, &parent_ns).await?; + let child_ns = NamespaceIdent::from_vec(vec!["parent_ns".to_string(), "child_ns".to_string()])?; + set_test_namespace(&iceberg_catalog, &child_ns).await?; + + let creation = get_table_creation(temp_path(), "t", None)?; + iceberg_catalog.create_table(&child_ns, creation).await?; + + let client = Arc::new(iceberg_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("catalog", catalog); + + let df = ctx + .sql("INSERT INTO catalog.\"parent_ns.child_ns\".t VALUES (1, 'test')") + .await + .unwrap(); + + let batches = df.collect().await.unwrap(); + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + let rows_inserted = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(rows_inserted.value(0), 1); + + let df = ctx + .sql("SELECT * FROM catalog.\"parent_ns.child_ns\".t") + .await + .unwrap(); + + let batches = df.collect().await.unwrap(); + + check_record_batches( + batches, + expect![[r#" + Field { "foo1": Int32, metadata: {"PARQUET:field_id": "1"} }, + Field { "foo2": Utf8, metadata: {"PARQUET:field_id": "2"} }"#]], + expect![[r#" + foo1: PrimitiveArray + [ + 1, + ], + foo2: StringArray + [ + "test", + ]"#]], + &[], + Some("foo1"), + ); + + Ok(()) +} From ff8d1f86110dad7c687556aae7511e41a119d9cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20EIckler?= <797483+eickler@users.noreply.github.com> Date: Sat, 3 Jan 2026 12:09:24 +0100 Subject: [PATCH 4/5] refactor: Change integration test to sqllogictest --- .../tests/integration_datafusion_test.rs | 61 ------------------- crates/sqllogictest/src/engine/datafusion.rs | 27 +++++++- .../testdata/schedules/df_test.toml | 4 ++ .../testdata/slts/df_test/namespaces.slt | 28 +++++++++ .../testdata/slts/df_test/show_tables.slt | 5 +- 5 files changed, 62 insertions(+), 63 deletions(-) create mode 100644 crates/sqllogictest/testdata/slts/df_test/namespaces.slt diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 00e660c56e..08c49da01a 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -943,64 +943,3 @@ async fn test_insert_into_partitioned() -> Result<()> { Ok(()) } - -#[tokio::test] -async fn test_child_namespace_crud() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog().await; - - let parent_ns = NamespaceIdent::new("parent_ns".to_string()); - set_test_namespace(&iceberg_catalog, &parent_ns).await?; - let child_ns = NamespaceIdent::from_vec(vec!["parent_ns".to_string(), "child_ns".to_string()])?; - set_test_namespace(&iceberg_catalog, &child_ns).await?; - - let creation = get_table_creation(temp_path(), "t", None)?; - iceberg_catalog.create_table(&child_ns, creation).await?; - - let client = Arc::new(iceberg_catalog); - let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); - - let ctx = SessionContext::new(); - ctx.register_catalog("catalog", catalog); - - let df = ctx - .sql("INSERT INTO catalog.\"parent_ns.child_ns\".t VALUES (1, 'test')") - .await - .unwrap(); - - let batches = df.collect().await.unwrap(); - assert_eq!(batches.len(), 1); - let batch = &batches[0]; - let rows_inserted = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(rows_inserted.value(0), 1); - - let df = ctx - .sql("SELECT * FROM catalog.\"parent_ns.child_ns\".t") - .await - .unwrap(); - - let batches = df.collect().await.unwrap(); - - check_record_batches( - batches, - expect![[r#" - Field { "foo1": Int32, metadata: {"PARQUET:field_id": "1"} }, - Field { "foo2": Utf8, metadata: {"PARQUET:field_id": "2"} }"#]], - expect![[r#" - foo1: PrimitiveArray - [ - 1, - ], - foo2: StringArray - [ - "test", - ]"#]], - &[], - Some("foo1"), - ); - - Ok(()) -} diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index e9f93287d8..67d8915bd4 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -24,7 +24,7 @@ use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_sqllogictest::DataFusion; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec}; -use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation}; +use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, table}; use iceberg_datafusion::IcebergCatalogProvider; use indicatif::ProgressBar; @@ -96,6 +96,7 @@ impl DataFusionEngine { // Create test tables Self::create_unpartitioned_table(&catalog, &namespace).await?; Self::create_partitioned_table(&catalog, &namespace).await?; + Self::create_namespaced_table(&catalog).await?; Ok(Arc::new( IcebergCatalogProvider::try_new(Arc::new(catalog)).await?, @@ -161,4 +162,28 @@ impl DataFusionEngine { Ok(()) } + + async fn create_namespaced_table(catalog: &impl Catalog) -> anyhow::Result<()> { + let parent_ns = NamespaceIdent::new("parent_ns".to_string()); + catalog.create_namespace(&parent_ns, HashMap::new()).await?; + let child_ns = + NamespaceIdent::from_vec(vec!["parent_ns".to_string(), "child_ns".to_string()])?; + catalog.create_namespace(&child_ns, HashMap::new()).await?; + + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "foo1", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "foo2", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + let table_creation = TableCreation::builder() + .name("t".to_string()) + .schema(schema.clone()) + .build(); + + catalog.create_table(&child_ns, table_creation).await?; + + Ok(()) + } } diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml b/crates/sqllogictest/testdata/schedules/df_test.toml index df5e638d5a..39c6c08af8 100644 --- a/crates/sqllogictest/testdata/schedules/df_test.toml +++ b/crates/sqllogictest/testdata/schedules/df_test.toml @@ -25,3 +25,7 @@ slt = "df_test/show_tables.slt" [[steps]] engine = "df" slt = "df_test/insert_into.slt" + +[[steps]] +engine = "df" +slt = "df_test/namespaces.slt" diff --git a/crates/sqllogictest/testdata/slts/df_test/namespaces.slt b/crates/sqllogictest/testdata/slts/df_test/namespaces.slt new file mode 100644 index 0000000000..05af95b52a --- /dev/null +++ b/crates/sqllogictest/testdata/slts/df_test/namespaces.slt @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Insert a single row into the namespaced table and verify the count +query I +INSERT INTO default."parent_ns.child_ns".t VALUES (1, 'test') +---- +1 + +# Verify the inserted row +query IT +SELECT * FROM default."parent_ns.child_ns".t +---- +1 test diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt index c5da5f6276..95378ef1c1 100644 --- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt +++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt @@ -37,4 +37,7 @@ default information_schema parameters VIEW default information_schema routines VIEW default information_schema schemata VIEW default information_schema tables VIEW -default information_schema views VIEW \ No newline at end of file +default information_schema views VIEW +default parent_ns.child_ns t BASE TABLE +default parent_ns.child_ns t$manifests BASE TABLE +default parent_ns.child_ns t$snapshots BASE TABLE From b76bf316ce51c1b7a598173bab3234d577e9f4f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20EIckler?= <797483+eickler@users.noreply.github.com> Date: Sat, 3 Jan 2026 12:30:34 +0100 Subject: [PATCH 5/5] test: Typo fix How the heck did that get here. --- crates/sqllogictest/src/engine/datafusion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index 67d8915bd4..9ed220f29d 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -24,7 +24,7 @@ use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_sqllogictest::DataFusion; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec}; -use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, table}; +use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation}; use iceberg_datafusion::IcebergCatalogProvider; use indicatif::ProgressBar;