Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions Tutorials/WXD - Confluent Integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# WXD - Confluent Tableflow Integration

This tutorial demonstrates how to integrate IBM watsonx.data with Confluent Tableflow to read data from Confluent-managed Iceberg tables using Apache Spark.

## What This Does

The [`read_confluent_table_standalone.py`](read_confluent_table_standalone.py:1) script provides a complete solution for:

- **Connecting to Confluent Tableflow**: Establishes a connection to Confluent's REST catalog using API credentials
- **Auto-Discovery**: Automatically discovers available namespaces (schemas) and tables in the catalog
- **Table Inspection**: Describes table schemas and displays metadata
- **Data Querying**: Retrieves and displays sample data from Confluent Tableflow tables
- **Standalone Execution**: Runs independently with embedded Spark configuration

## Prerequisites

- Watsonx.data Instance with Spark Engine deployed
- Confluent Tableflow API credentials (API key and secret)
- Access to a Confluent Tableflow catalog

## Configuration

### Storage Authentication Options

#### Option 1: Confluent Managed Storage (Recommended)

For **Confluent Managed Storage**, no additional storage configuration is required. The storage is automatically authenticated using the same API key provided in the catalog credentials:

```python
.config("spark.sql.catalog.tableflowdemo.credential", "<apikey>:<secret>")
```

#### Option 2: Integrated AWS S3 Storage

For **integrated AWS S3 storage** with TableFlow, you need to add the following additional configurations to connect to the storage:

```python
.config("spark.sql.catalog.tableflowdemo.s3.access-key-id", "<s3-accesskey>")
.config("spark.sql.catalog.tableflowdemo.s3.secret-access-key", "<s3-secretkey>")
.config("spark.sql.catalog.tableflowdemo.s3.region", "<s3-region>")
```

> **Note**: `tableflowdemo` is a local alias for Spark to consume the catalog. You can use any name you prefer for this alias.

## Usage

### Step 1: Update Configuration

Edit the script to add your Confluent credentials:
```python
.config("spark.sql.catalog.tableflowdemo.uri", "<rest-catalog-uri>")
.config("spark.sql.catalog.tableflowdemo.credential", "<apikey>:<secret>")
```

### Step 2: Run the PySpark Script

You can run the PySpark script in three different ways:

#### Option 1: Using SparkLab (VS Code Development Environment)

Use IBM watsonx.data's SparkLab for interactive development and testing:

- **Documentation**: [VS Code Development Environment - Spark Labs](https://www.ibm.com/docs/en/watsonxdata/standard/2.3.x?topic=experience-vs-code-development-environment-spark-labs)
- **Benefits**: Interactive development, debugging support, immediate feedback
- **Best For**: Development, testing, and iterative refinement

#### Option 2: Submit Spark Application using REST API

Submit the application programmatically using the Spark Application API:

- **Documentation**: [Submitting Spark Application by using REST API](https://www.ibm.com/docs/en/watsonxdata/standard/2.3.x?topic=engine-submitting-spark-application-by-using-rest-api)
- **Benefits**: Automation, integration with CI/CD pipelines, programmatic control
- **Best For**: Production deployments, automated workflows

#### Option 3: Submit Spark Application using CPDCTL CLI

Submit the application using the CPDCTL CLI interface:

- **Documentation**: [Submitting Spark Application by using CPDCTL](https://www.ibm.com/docs/en/watsonxdata/standard/2.3.x?topic=engine-submitting-spark-application-by-using-cpdctl)
- **Benefits**: Command-line convenience, scriptable, easy integration with shell scripts
- **Best For**: Manual submissions, shell script automation, DevOps workflows

### Expected Output

The script will:
- Display all available namespaces in the catalog
- List all tables in the first namespace
- Show the schema of the first table
- Display row count and sample data (first 20 rows)


## Configuration Parameters

| Parameter | Description |
|-----------|-------------|
| `spark.sql.catalog.tableflowdemo` | Catalog implementation (Iceberg Spark Catalog) |
| `spark.sql.catalog.tableflowdemo.type` | Catalog type (REST) |
| `spark.sql.catalog.tableflowdemo.uri` | REST catalog endpoint URL |
| `spark.sql.catalog.tableflowdemo.credential` | API key and secret in format `apikey:secret` |
| `spark.sql.catalog.tableflowdemo.io-impl` | File I/O implementation (S3FileIO) |
| `spark.sql.catalog.tableflowdemo.client.region` | AWS region for the catalog |
| `spark.sql.catalog.tableflowdemo.s3.remote-signing-enabled` | Enable remote signing for S3 operations |

## Troubleshooting

- **Authentication Errors**: Verify your API key and secret are correct
- **Connection Issues**: Check the REST catalog URI and network connectivity
- **Region Mismatch**: Ensure the region configuration matches your Confluent setup
- **S3 Access Issues**: If using integrated S3 storage, verify your S3 credentials and region

## Additional Resources

- [Confluent Tableflow Documentation](https://docs.confluent.io/cloud/current/topics/tableflow/overview.html#tableflow-in-ccloud)
- [IBM watsonx.data Documentation](https://www.ibm.com/docs/en/watsonxdata)
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
"""
This module reads data from a Confluent Tableflow Iceberg table
with embedded Spark configuration for standalone execution.

Usage:
spark-submit read_confluent_table_standalone.py
"""

import json
from pyspark.sql import SparkSession


def escape_name(name: str) -> str:
"""Escape table/schema names with backticks."""
return f"`{name}`"


def create_spark_session_with_confluent_config() -> SparkSession:
"""
Create Spark session with Confluent Tableflow configuration embedded.

Returns:
SparkSession: Configured Spark session with Confluent Tableflow catalog
"""
spark = (
SparkSession.builder
.appName("Read Confluent Tableflow Table")
.config("spark.sql.catalog.tableflowdemo", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.tableflowdemo.type", "rest")
.config("spark.sql.catalog.tableflowdemo.uri",
"https://tableflow.{CLOUD_REGION}.aws.confluent.cloud/iceberg/catalog/organizations/{ORG_ID}/environments/{ENV_ID}")
.config("spark.sql.catalog.tableflowdemo.credential",
"<apikey>:<secret>")
.config("spark.sql.catalog.tableflowdemo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.catalog.tableflowdemo.rest-metrics-reporting-enabled", "false")
.config("spark.sql.catalog.tableflowdemo.s3.remote-signing-enabled", "true")
.config("spark.sql.catalog.tableflowdemo.client.region", "{CLOUD_REGION}")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
)

print("\n" + "="*100)
print("SPARK SESSION CREATED WITH CONFLUENT TABLEFLOW CONFIGURATION")
print("="*100)
print(f"Catalog: tableflowdemo")
print(f"Region: us-east-1")
print(f"Catalog Type: Iceberg REST")
print("="*100 + "\n")

return spark


def read_and_display_table(spark: SparkSession, catalog: str) -> None:
"""
Read and display table contents in human-readable format.
Automatically discovers namespaces and tables, then queries the first available table.

Args:
spark: SparkSession instance
catalog: Catalog name (e.g., "tableflowdemo")
"""
try:
print("\n" + "="*100)
print(f"READING CONFLUENT TABLEFLOW CATALOG: {catalog}")
print("="*100)

# Display and store available namespaces (schemas)
print("\n=== Available Namespaces ===")
namespaces_df = spark.sql(f"SHOW NAMESPACES IN {catalog}")
namespaces_df.show(truncate=False)

# Get the first namespace
namespaces = namespaces_df.collect()
if not namespaces:
print(f"No namespaces found in catalog '{catalog}'")
return

# Strip backticks if they exist in the namespace value
first_namespace = namespaces[0]['namespace'].strip('`')
print(f"\nUsing first namespace: {first_namespace}")

# Display and store available tables in the first namespace
print(f"\n=== Tables in {catalog}.`{first_namespace}` ===")
tables_df = spark.sql(f"SHOW TABLES IN {catalog}.`{first_namespace}`")
tables_df.show(truncate=False)

# Get the first table
tables = tables_df.collect()
if not tables:
print(f"No tables found in namespace '{first_namespace}'")
return

first_table = tables[0]['tableName']
print(f"\nUsing first table: {first_table}")

# Now query the first table
schema = first_namespace
table = first_table

# Describe the table schema
print(f"\n=== Describe Table: {catalog}.`{schema}`.{table} ===")
spark.sql(f"DESCRIBE TABLE {catalog}.`{schema}`.{table}").show(truncate=False)

# Get row count
print(f"\n=== Row Count ===")
row_count = spark.sql(f"SELECT COUNT(*) as count FROM {catalog}.`{schema}`.{table}").collect()[0]['count']
print(f"Total rows: {row_count}")

# Query the Iceberg table
print(f"\n=== Sample Data (First 20 Rows) ===")
spark.sql(f"SELECT * FROM {catalog}.`{schema}`.{table}").show(n=20, truncate=False)

print("\n" + "="*100)
print(f"SUMMARY: Successfully queried {catalog}.{schema}.{table}")
print(f"Total rows in table: {row_count}")
print("="*100 + "\n")

except Exception as e:
print(f"\nERROR reading table: {e}\n")
raise


def main():
"""
Main function to read and display Confluent Tableflow table.
Automatically discovers and queries the first available table.
"""
spark = None

try:
# Configuration - The catalog name from Spark config
catalog = "tableflowdemo"

print("\n" + "="*100)
print("CONFLUENT TABLEFLOW READER - STANDALONE MODE")
print("="*100)
print(f"Catalog: {catalog}")
print("="*100 + "\n")

# Create Spark session with embedded configuration
spark = create_spark_session_with_confluent_config()

# Read and display table (auto-discovers first namespace and table)
read_and_display_table(spark, catalog)

except Exception as e:
print(f"\nError during table read: {e}")
import traceback
traceback.print_exc()
raise
finally:
if spark:
print("\nStopping Spark session...")
spark.stop()
print("Spark session stopped.\n")


if __name__ == "__main__":
main()

# Made with Bob