From 460e248ade7b1fc53ef2bb69ffba1253f382d9df Mon Sep 17 00:00:00 2001 From: shibil-rahman-p1 Date: Wed, 11 Mar 2026 13:07:04 +0530 Subject: [PATCH 1/2] Add scripts for WXD Confluent Integration --- .../WXD - Confluent Integration/README.md | 115 +++++++++++++ .../read_confluent_table_standalone.py | 161 ++++++++++++++++++ 2 files changed, 276 insertions(+) create mode 100644 Tutorials/WXD - Confluent Integration/README.md create mode 100644 Tutorials/WXD - Confluent Integration/read_confluent_table_standalone.py diff --git a/Tutorials/WXD - Confluent Integration/README.md b/Tutorials/WXD - Confluent Integration/README.md new file mode 100644 index 0000000..2eac9e8 --- /dev/null +++ b/Tutorials/WXD - Confluent Integration/README.md @@ -0,0 +1,115 @@ +# WXD - Confluent Tableflow Integration + +This tutorial demonstrates how to integrate IBM watsonx.data with Confluent Tableflow to read data from Confluent-managed Iceberg tables using Apache Spark. + +## What This Does + +The [`read_confluent_table_standalone.py`](read_confluent_table_standalone.py:1) script provides a complete solution for: + +- **Connecting to Confluent Tableflow**: Establishes a connection to Confluent's REST catalog using API credentials +- **Auto-Discovery**: Automatically discovers available namespaces (schemas) and tables in the catalog +- **Table Inspection**: Describes table schemas and displays metadata +- **Data Querying**: Retrieves and displays sample data from Confluent Tableflow tables +- **Standalone Execution**: Runs independently with embedded Spark configuration + +## Prerequisites + +- Apache Spark with Iceberg support +- PySpark installed +- Confluent Tableflow API credentials (API key and secret) +- Access to a Confluent Tableflow catalog + +## Configuration + +### Storage Authentication Options + +#### Option 1: Confluent Managed Storage (Recommended) + +For **Confluent Managed Storage**, no additional storage configuration is required. The storage is automatically authenticated using the same API key provided in the catalog credentials: + +```python +.config("spark.sql.catalog.tableflowdemo.credential", ":") +``` + +#### Option 2: Integrated AWS S3 Storage + +For **integrated AWS S3 storage** with TableFlow, you need to add the following additional configurations to connect to the storage: + +```python +.config("spark.sql.catalog.tableflowdemo.s3.access-key-id", "") +.config("spark.sql.catalog.tableflowdemo.s3.secret-access-key", "") +.config("spark.sql.catalog.tableflowdemo.s3.region", "") +``` + +> **Note**: `tableflowdemo` is a local alias for Spark to consume the catalog. You can use any name you prefer for this alias. + +## Usage + +### Step 1: Update Configuration + +Edit the script to add your Confluent credentials: +```python +.config("spark.sql.catalog.tableflowdemo.uri", "") +.config("spark.sql.catalog.tableflowdemo.credential", ":") +``` + +### Step 2: Run the PySpark Script + +You can run the PySpark script in three different ways: + +#### Option 1: Using SparkLab (VS Code Development Environment) + +Use IBM watsonx.data's SparkLab for interactive development and testing: + +- **Documentation**: [VS Code Development Environment - Spark Labs](https://www.ibm.com/docs/en/watsonxdata/standard/2.3.x?topic=experience-vs-code-development-environment-spark-labs) +- **Benefits**: Interactive development, debugging support, immediate feedback +- **Best For**: Development, testing, and iterative refinement + +#### Option 2: Submit Spark Application using REST API + +Submit the application programmatically using the Spark Application API: + +- **Documentation**: [Submitting Spark Application by using REST API](https://www.ibm.com/docs/en/watsonxdata/standard/2.3.x?topic=engine-submitting-spark-application-by-using-rest-api) +- **Benefits**: Automation, integration with CI/CD pipelines, programmatic control +- **Best For**: Production deployments, automated workflows + +#### Option 3: Submit Spark Application using CPDCTL CLI + +Submit the application using the CPDCTL CLI interface: + +- **Documentation**: [Submitting Spark Application by using CPDCTL](https://www.ibm.com/docs/en/watsonxdata/standard/2.3.x?topic=engine-submitting-spark-application-by-using-cpdctl) +- **Benefits**: Command-line convenience, scriptable, easy integration with shell scripts +- **Best For**: Manual submissions, shell script automation, DevOps workflows + +### Expected Output + +The script will: +- Display all available namespaces in the catalog +- List all tables in the first namespace +- Show the schema of the first table +- Display row count and sample data (first 20 rows) + + +## Configuration Parameters + +| Parameter | Description | +|-----------|-------------| +| `spark.sql.catalog.tableflowdemo` | Catalog implementation (Iceberg Spark Catalog) | +| `spark.sql.catalog.tableflowdemo.type` | Catalog type (REST) | +| `spark.sql.catalog.tableflowdemo.uri` | REST catalog endpoint URL | +| `spark.sql.catalog.tableflowdemo.credential` | API key and secret in format `apikey:secret` | +| `spark.sql.catalog.tableflowdemo.io-impl` | File I/O implementation (S3FileIO) | +| `spark.sql.catalog.tableflowdemo.client.region` | AWS region for the catalog | +| `spark.sql.catalog.tableflowdemo.s3.remote-signing-enabled` | Enable remote signing for S3 operations | + +## Troubleshooting + +- **Authentication Errors**: Verify your API key and secret are correct +- **Connection Issues**: Check the REST catalog URI and network connectivity +- **Region Mismatch**: Ensure the region configuration matches your Confluent setup +- **S3 Access Issues**: If using integrated S3 storage, verify your S3 credentials and region + +## Additional Resources + +- [Confluent Tableflow Documentation](https://docs.confluent.io/cloud/current/topics/tableflow/overview.html#tableflow-in-ccloud) +- [IBM watsonx.data Documentation](https://www.ibm.com/docs/en/watsonxdata) \ No newline at end of file diff --git a/Tutorials/WXD - Confluent Integration/read_confluent_table_standalone.py b/Tutorials/WXD - Confluent Integration/read_confluent_table_standalone.py new file mode 100644 index 0000000..6c12140 --- /dev/null +++ b/Tutorials/WXD - Confluent Integration/read_confluent_table_standalone.py @@ -0,0 +1,161 @@ +""" +This module reads data from a Confluent Tableflow Iceberg table +with embedded Spark configuration for standalone execution. + +Usage: + spark-submit read_confluent_table_standalone.py +""" + +import json +from pyspark.sql import SparkSession + + +def escape_name(name: str) -> str: + """Escape table/schema names with backticks.""" + return f"`{name}`" + + +def create_spark_session_with_confluent_config() -> SparkSession: + """ + Create Spark session with Confluent Tableflow configuration embedded. + + Returns: + SparkSession: Configured Spark session with Confluent Tableflow catalog + """ + spark = ( + SparkSession.builder + .appName("Read Confluent Tableflow Table") + .config("spark.sql.catalog.tableflowdemo", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.tableflowdemo.type", "rest") + .config("spark.sql.catalog.tableflowdemo.uri", + "https://tableflow.{CLOUD_REGION}.aws.confluent.cloud/iceberg/catalog/organizations/{ORG_ID}/environments/{ENV_ID}") + .config("spark.sql.catalog.tableflowdemo.credential", + ":") + .config("spark.sql.catalog.tableflowdemo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .config("spark.sql.catalog.tableflowdemo.rest-metrics-reporting-enabled", "false") + .config("spark.sql.catalog.tableflowdemo.s3.remote-signing-enabled", "true") + .config("spark.sql.catalog.tableflowdemo.client.region", "{CLOUD_REGION}") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .getOrCreate() + ) + + print("\n" + "="*100) + print("SPARK SESSION CREATED WITH CONFLUENT TABLEFLOW CONFIGURATION") + print("="*100) + print(f"Catalog: tableflowdemo") + print(f"Region: us-east-1") + print(f"Catalog Type: Iceberg REST") + print("="*100 + "\n") + + return spark + + +def read_and_display_table(spark: SparkSession, catalog: str) -> None: + """ + Read and display table contents in human-readable format. + Automatically discovers namespaces and tables, then queries the first available table. + + Args: + spark: SparkSession instance + catalog: Catalog name (e.g., "tableflowdemo") + """ + try: + print("\n" + "="*100) + print(f"READING CONFLUENT TABLEFLOW CATALOG: {catalog}") + print("="*100) + + # Display and store available namespaces (schemas) + print("\n=== Available Namespaces ===") + namespaces_df = spark.sql(f"SHOW NAMESPACES IN {catalog}") + namespaces_df.show(truncate=False) + + # Get the first namespace + namespaces = namespaces_df.collect() + if not namespaces: + print(f"No namespaces found in catalog '{catalog}'") + return + + # Strip backticks if they exist in the namespace value + first_namespace = namespaces[0]['namespace'].strip('`') + print(f"\nUsing first namespace: {first_namespace}") + + # Display and store available tables in the first namespace + print(f"\n=== Tables in {catalog}.`{first_namespace}` ===") + tables_df = spark.sql(f"SHOW TABLES IN {catalog}.`{first_namespace}`") + tables_df.show(truncate=False) + + # Get the first table + tables = tables_df.collect() + if not tables: + print(f"No tables found in namespace '{first_namespace}'") + return + + first_table = tables[0]['tableName'] + print(f"\nUsing first table: {first_table}") + + # Now query the first table + schema = first_namespace + table = first_table + + # Describe the table schema + print(f"\n=== Describe Table: {catalog}.`{schema}`.{table} ===") + spark.sql(f"DESCRIBE TABLE {catalog}.`{schema}`.{table}").show(truncate=False) + + # Get row count + print(f"\n=== Row Count ===") + row_count = spark.sql(f"SELECT COUNT(*) as count FROM {catalog}.`{schema}`.{table}").collect()[0]['count'] + print(f"Total rows: {row_count}") + + # Query the Iceberg table + print(f"\n=== Sample Data (First 20 Rows) ===") + spark.sql(f"SELECT * FROM {catalog}.`{schema}`.{table}").show(n=20, truncate=False) + + print("\n" + "="*100) + print(f"SUMMARY: Successfully queried {catalog}.{schema}.{table}") + print(f"Total rows in table: {row_count}") + print("="*100 + "\n") + + except Exception as e: + print(f"\nERROR reading table: {e}\n") + raise + + +def main(): + """ + Main function to read and display Confluent Tableflow table. + Automatically discovers and queries the first available table. + """ + spark = None + + try: + # Configuration - The catalog name from Spark config + catalog = "tableflowdemo" + + print("\n" + "="*100) + print("CONFLUENT TABLEFLOW READER - STANDALONE MODE") + print("="*100) + print(f"Catalog: {catalog}") + print("="*100 + "\n") + + # Create Spark session with embedded configuration + spark = create_spark_session_with_confluent_config() + + # Read and display table (auto-discovers first namespace and table) + read_and_display_table(spark, catalog) + + except Exception as e: + print(f"\nError during table read: {e}") + import traceback + traceback.print_exc() + raise + finally: + if spark: + print("\nStopping Spark session...") + spark.stop() + print("Spark session stopped.\n") + + +if __name__ == "__main__": + main() + +# Made with Bob From dba338a6ea1ec1974b263d1e9060026926693cdb Mon Sep 17 00:00:00 2001 From: shibil-rahman-p1 Date: Wed, 11 Mar 2026 13:32:13 +0530 Subject: [PATCH 2/2] Update README with additional changes --- Tutorials/WXD - Confluent Integration/README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Tutorials/WXD - Confluent Integration/README.md b/Tutorials/WXD - Confluent Integration/README.md index 2eac9e8..a80b675 100644 --- a/Tutorials/WXD - Confluent Integration/README.md +++ b/Tutorials/WXD - Confluent Integration/README.md @@ -14,8 +14,7 @@ The [`read_confluent_table_standalone.py`](read_confluent_table_standalone.py:1) ## Prerequisites -- Apache Spark with Iceberg support -- PySpark installed +- Watsonx.data Instance with Spark Engine deployed - Confluent Tableflow API credentials (API key and secret) - Access to a Confluent Tableflow catalog