Skip to content

I/O: Adapter for Apache Iceberg#444

Open
amotl wants to merge 25 commits intomainfrom
iceberg
Open

I/O: Adapter for Apache Iceberg#444
amotl wants to merge 25 commits intomainfrom
iceberg

Conversation

@amotl
Copy link
Member

@amotl amotl commented Jun 4, 2025

@coderabbitai
Copy link

coderabbitai bot commented Jun 4, 2025

Warning

Rate limit exceeded

@amotl has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 18 minutes and 6 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

Walkthrough

Adds Apache Iceberg I/O: new iceberg and util modules, CLI split into load/save groups, cluster-level save_table routing, tests, docs, and optional dependency updates for Iceberg support.

Changes

Cohort / File(s) Summary
Top-level CLI
cratedb_toolkit/cli.py
Register two IO command groups: load wired to io_cli_load and new save wired to io_cli_save.
IO CLI
cratedb_toolkit/io/cli.py
Rename clicli_load, add cli_save group and save table command; load_table now uses cli_load; new save_table implements export options and normalizes empty transformation paths.
Iceberg IO module
cratedb_toolkit/io/iceberg.py
New module with IcebergAddress dataclass, from_iceberg() (Iceberg→CrateDB) and to_iceberg() (CrateDB→Iceberg), URL/catalog parsing, chunked transfers, and catalog/storage property handling.
IO utilities
cratedb_toolkit/io/util.py
New helpers: parse_uri, polars_to_cratedb, pandas_from_cratedb, read_cratedb to parse URIs and stream data between CrateDB and Polars/Pandas.
Cluster core
cratedb_toolkit/cluster/core.py
Add save_table() to ManagedCluster and StandaloneCluster; extend load_table() to route iceberg schemes to iceberg IO; Standalone supports to_iceberg, Managed save is NotImplemented.
Tests
tests/io/test_iceberg.py
Add end-to-end tests and example_iceberg fixture covering CLI load/save flows, metadata discovery, schema/rowcount assertions, and error cases for missing namespace/name.
Dependencies & tooling
pyproject.toml
Add optional Iceberg extras and new optional groups referencing pyiceberg[...]; update poe task watch path and add io-opentable group.
Docs / Backlog
doc/io/iceberg/index.md, doc/backlog/io.md, doc/backlog/index.md, doc/io/index.md
Add user-facing Iceberg I/O documentation, backlog notes, and register pages in toctree.
Changelog
CHANGES.md
Add unreleased note about the new Apache Iceberg I/O adapter.

Sequence Diagram(s)

sequenceDiagram
    participant User as User/CLI
    participant IO_CLI_Save as IO CLI (cli_save)
    participant Cluster as Cluster.save_table()
    participant CrateDB as CrateDB
    participant Iceberg as Iceberg (to_iceberg)

    User->>IO_CLI_Save: run `save table` (source=crate://table, target=iceberg://...)
    IO_CLI_Save->>Cluster: instantiate cluster & call save_table(source, target)
    Cluster->>CrateDB: query schema & stream row chunks
    CrateDB-->>Cluster: return chunk
    Cluster->>Iceberg: call to_iceberg(target_url, chunk)
    Iceberg->>Iceberg: ensure catalog/namespace and write chunk
    Iceberg-->>Cluster: ack
    Cluster-->>IO_CLI_Save: return result
    IO_CLI_Save-->>User: exit
Loading
sequenceDiagram
    participant User as User/CLI
    participant IO_CLI_Load as IO CLI (cli_load)
    participant Cluster as Cluster.load_table()
    participant Iceberg as Iceberg (from_iceberg)
    participant CrateDB as CrateDB

    User->>IO_CLI_Load: run `load table` (source=iceberg://..., target=crate://table)
    IO_CLI_Load->>Cluster: instantiate cluster & call load_table(source, target)
    Cluster->>Iceberg: call from_iceberg(source_url, target_url)
    Iceberg->>Iceberg: parse URL, load catalog/table, stream Polars chunks
    Iceberg->>CrateDB: insert_bulk per chunk
    CrateDB-->>Iceberg: ack inserted
    Iceberg-->>Cluster: completion
    Cluster->>CrateDB: refresh target table
    Cluster-->>IO_CLI_Load: return result
    IO_CLI_Load-->>User: exit
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • hammerhead

Poem

🐰 I hopped along an iceberg trail so bright,
Split load and save with a twitch of delight,
Catalogs hummed while Polars streamed the rows,
Tables found new homes where the cold wind blows,
A rabbit cheers — two commands and data flows!

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'I/O: Adapter for Apache Iceberg' accurately and concisely summarizes the main change: adding I/O adapter support for Apache Iceberg tables.
Description check ✅ Passed The description is directly related to the changeset, explaining the purpose (import/export Apache Iceberg tables) and providing relevant context including documentation and references.
Docstring Coverage ✅ Passed Docstring coverage is 93.33% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch iceberg

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

amotl

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

amotl

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

@amotl amotl requested review from matriv and seut February 19, 2026 13:09
@amotl amotl marked this pull request as ready for review February 19, 2026 13:16
coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
cratedb_toolkit/io/iceberg.py (1)

134-134: progress parameter is unused in both public API functions (Ruff ARG001).

Both from_iceberg and to_iceberg accept progress: bool = False but never reference it. If it is intentionally reserved for future use, document it; otherwise suppress with _progress to signal the intent clearly.

♻️ Suggested fix
-def from_iceberg(source_url, target_url, progress: bool = False):
+def from_iceberg(source_url, target_url, progress: bool = False):  # noqa: ARG001

or rename to _progress to explicitly mark as intentionally unused:

-def from_iceberg(source_url, target_url, progress: bool = False):
+def from_iceberg(source_url, target_url, _progress: bool = False):

Apply the same to to_iceberg.

Also applies to: 155-155

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cratedb_toolkit/io/iceberg.py` at line 134, The `progress` parameter is
declared but never used in the public APIs; rename it to `_progress` (or
prefixed underscore) in both from_iceberg and to_iceberg to mark it
intentionally unused (or add a brief docstring note if you prefer to keep the
name), and update their signatures and any callers to use `_progress`
consistently so the unused-argument lint (ARG001) is resolved; specifically
change the parameter in the functions from_iceberg and to_iceberg to `_progress:
bool = False` and adjust any local references/calls accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cratedb_toolkit/io/util.py`:
- Around line 83-97: The engine created by sa.create_engine in
pandas_from_cratedb is never disposed, leaking the connection pool if the
generator is not fully exhausted; update pandas_from_cratedb to ensure
engine.dispose() is always called (e.g., create the engine, open the connection,
yield chunks inside a try/finally and call engine.dispose() in the finally block
or wrap the generator with contextlib.contextmanager) so the pool is released
even if callers of pandas_from_cratedb or higher-level read_cratedb do not fully
iterate the generator. Ensure you reference and call engine.dispose() associated
with the engine variable created by sa.create_engine(source_url).
- Around line 52-79: The code currently creates engine = sa.create_engine(...)
and iterates under pl.Config(...) calling frame.collect_batches(...) and
batch.to_pandas().to_sql(...), but engine.dispose() is only called after the
loop and will be skipped if any exception is raised; wrap the streaming + to_sql
work in a try/finally so engine.dispose() always runs (create engine before the
try, run the pl.Config and for-loop inside the try block, and call
engine.dispose() in the finally block), ensuring disposal even if
frame.collect_batches, batch.to_pandas(), or to_sql raise.

In `@doc/io/iceberg/index.md`:
- Around line 150-155: Update the example commands so they include a valid
--cluster-url (or CRATEDB_CLUSTER_URL) and place the ?batch-size query parameter
inside the cluster URL rather than as a standalone URL fragment: modify the ctk
load table example (the "ctk load table" invocation) to pass --cluster-url with
a full crate URL (user, password, host, port, and path/table) and keep the
file+iceberg path as the source; modify the ctk save table example (the "ctk
save table" invocation) to pass
--cluster-url="crate://...?...&?batch-size=200000" with a full host/port and
table path and provide the target file+iceberg URI with explicit
catalog/namespace/table so both examples are copy-pastable and show where
batch-size belongs.

---

Duplicate comments:
In `@cratedb_toolkit/io/iceberg.py`:
- Line 175: The current line computing iceberg_append uses
asbool(URL(target_url).query_params.get("append")) which propagates a ValueError
for non-canonical strings; wrap the conversion in a safe guard: extract the raw
append_str via URL(target_url).query_params.get("append"), then call asbool
inside a try/except ValueError and on exception set iceberg_append = False
(optionally log/debug the invalid value), so asbool, URL, target_url and
iceberg_append are handled without raising to the caller.
- Around line 43-44: The __del__ method in IcebergAddress calls
self.tmpdir.cleanup() unguarded, which can raise AttributeError if __post_init__
or tempfile.TemporaryDirectory() failed and self.tmpdir was never set; modify
__del__ to first check for the attribute (e.g., hasattr(self, "tmpdir")) or use
try/except AttributeError around self.tmpdir.cleanup() so cleanup is only
attempted when tmpdir exists and to avoid exceptions during object finalization.

In `@cratedb_toolkit/io/util.py`:
- Line 64: Remove the dead/unused linter directive by deleting the trailing "#
noqa: ERA001" from the commented line containing
"table_name=cratedb_table.fullname, connection=engine,
if_table_exists=\"replace\"" in util.py (or remove the entire commented line if
it serves no purpose); this eliminates the Ruff RUF100 warning since ERA001 is
not enabled in the config. Ensure any remaining comment stays syntactically
valid and contains no other unused noqa tags.

---

Nitpick comments:
In `@cratedb_toolkit/io/iceberg.py`:
- Line 134: The `progress` parameter is declared but never used in the public
APIs; rename it to `_progress` (or prefixed underscore) in both from_iceberg and
to_iceberg to mark it intentionally unused (or add a brief docstring note if you
prefer to keep the name), and update their signatures and any callers to use
`_progress` consistently so the unused-argument lint (ARG001) is resolved;
specifically change the parameter in the functions from_iceberg and to_iceberg
to `_progress: bool = False` and adjust any local references/calls accordingly.

Comment on lines +52 to +79
engine = sa.create_engine(str(cratedb_url))

# Note: The conversion to pandas is zero-copy,
# so we can utilize their SQL utils for free.
# https://github.com/pola-rs/polars/issues/7852
# Note: This code also uses the most efficient `insert_bulk` method with CrateDB.
# https://cratedb.com/docs/sqlalchemy-cratedb/dataframe.html#efficient-insert-operations-with-pandas
# Note: `collect_batches()` is marked as unstable and slower than native sinks;
# consider native Polars sinks (e.g., write_database) as a faster alternative if available.
# https://github.com/crate/cratedb-toolkit/pull/444#discussion_r2825382887
# Note: This variant appeared to be much slower, let's revisit and investigate why?
# table.to_polars().collect(streaming=True).write_database(
# table_name=cratedb_table.fullname, connection=engine, if_table_exists="replace" # noqa: ERA001
# Note: When `collect_batches` yields more than one batch, the first batch must use the
# user-specified `if_exists`, but subsequent batches must use "append".
with pl.Config(streaming_chunk_size=chunk_size):
for i, batch in enumerate(frame.collect_batches(engine="streaming", chunk_size=chunk_size)):
batch.to_pandas().to_sql(
name=cratedb_table.table,
schema=cratedb_table.schema,
con=engine,
if_exists=if_exists if i == 0 else "append",
index=False,
chunksize=chunk_size,
method=insert_bulk,
)

engine.dispose()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

engine.dispose() is skipped when collect_batches or to_sql raises an exception.

engine.dispose() at line 79 is only reached on the happy path. Any exception thrown inside the pl.Config block (e.g., from collect_batches, to_pandas, or to_sql) leaves the connection pool open.

🛡️ Proposed fix
     engine = sa.create_engine(str(cratedb_url))
+    try:
-    with pl.Config(streaming_chunk_size=chunk_size):
+        with pl.Config(streaming_chunk_size=chunk_size):
             for i, batch in enumerate(frame.collect_batches(engine="streaming", chunk_size=chunk_size)):
                 batch.to_pandas().to_sql(
                     name=cratedb_table.table,
                     schema=cratedb_table.schema,
                     con=engine,
                     if_exists=if_exists if i == 0 else "append",
                     index=False,
                     chunksize=chunk_size,
                     method=insert_bulk,
                 )
+    finally:
         engine.dispose()
🧰 Tools
🪛 Ruff (0.15.1)

[warning] 64-64: Unused noqa directive (non-enabled: ERA001)

Remove unused noqa directive

(RUF100)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cratedb_toolkit/io/util.py` around lines 52 - 79, The code currently creates
engine = sa.create_engine(...) and iterates under pl.Config(...) calling
frame.collect_batches(...) and batch.to_pandas().to_sql(...), but
engine.dispose() is only called after the loop and will be skipped if any
exception is raised; wrap the streaming + to_sql work in a try/finally so
engine.dispose() always runs (create engine before the try, run the pl.Config
and for-loop inside the try block, and call engine.dispose() in the finally
block), ensuring disposal even if frame.collect_batches, batch.to_pandas(), or
to_sql raise.

Comment on lines +83 to +97
def pandas_from_cratedb(
source_url: str, schema: str, table: str, chunk_size: int
) -> t.Generator[pd.DataFrame, None, None]:
"""
Read from a CrateDB table into pandas data frames, yielding batches/chunks.
"""
engine = sa.create_engine(source_url)
with engine.connect() as connection:
for chunk in pd.read_sql_table(
table_name=table,
schema=schema,
con=connection,
chunksize=chunk_size,
):
yield chunk
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

engine in pandas_from_cratedb is never disposed.

sa.create_engine(source_url) opens a connection pool that is never released. with engine.connect() closes the individual connection, but the engine's pool is held until garbage collection. In read_cratedb, this function is called inside a generator chain that callers may not exhaust, compounding the risk.

🛡️ Proposed fix
     engine = sa.create_engine(source_url)
+    try:
         with engine.connect() as connection:
             for chunk in pd.read_sql_table(
                 table_name=table,
                 schema=schema,
                 con=connection,
                 chunksize=chunk_size,
             ):
                 yield chunk
+    finally:
+        engine.dispose()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def pandas_from_cratedb(
source_url: str, schema: str, table: str, chunk_size: int
) -> t.Generator[pd.DataFrame, None, None]:
"""
Read from a CrateDB table into pandas data frames, yielding batches/chunks.
"""
engine = sa.create_engine(source_url)
with engine.connect() as connection:
for chunk in pd.read_sql_table(
table_name=table,
schema=schema,
con=connection,
chunksize=chunk_size,
):
yield chunk
def pandas_from_cratedb(
source_url: str, schema: str, table: str, chunk_size: int
) -> t.Generator[pd.DataFrame, None, None]:
"""
Read from a CrateDB table into pandas data frames, yielding batches/chunks.
"""
engine = sa.create_engine(source_url)
try:
with engine.connect() as connection:
for chunk in pd.read_sql_table(
table_name=table,
schema=schema,
con=connection,
chunksize=chunk_size,
):
yield chunk
finally:
engine.dispose()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cratedb_toolkit/io/util.py` around lines 83 - 97, The engine created by
sa.create_engine in pandas_from_cratedb is never disposed, leaking the
connection pool if the generator is not fully exhausted; update
pandas_from_cratedb to ensure engine.dispose() is always called (e.g., create
the engine, open the connection, yield chunks inside a try/finally and call
engine.dispose() in the finally block or wrap the generator with
contextlib.contextmanager) so the pool is released even if callers of
pandas_from_cratedb or higher-level read_cratedb do not fully iterate the
generator. Ensure you reference and call engine.dispose() associated with the
engine variable created by sa.create_engine(source_url).

Comment on lines +150 to +155
```shell
ctk load table "file+iceberg://./var/lib/iceberg/?batch-size=200000"
```
```shell
ctk save table --cluster-url="crate://?batch-size=200000"
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Incomplete example commands in the batch-size section may mislead users.

Line 151 omits --cluster-url (or CRATEDB_CLUSTER_URL), so copy-pasting the command fails. Line 154 shows crate://?batch-size=200000 as the cluster URL, which has no host, port, or table — it is not a usable standalone URL and does not illustrate where the parameter belongs within a real cluster URL.

📝 Suggested fix
 ```shell
-ctk load table "file+iceberg://./var/lib/iceberg/?batch-size=200000"
+ctk load table "file+iceberg://./var/lib/iceberg/?batch-size=200000" \
+    --cluster-url="crate://crate:crate@localhost:4200/demo/taxi"
-ctk save table --cluster-url="crate://?batch-size=200000"
+ctk save table \
+    --cluster-url="crate://crate:crate@localhost:4200/demo/taxi?batch-size=200000" \
+    "file+iceberg://./var/lib/iceberg/?catalog=default&namespace=demo&table=taxi"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@doc/io/iceberg/index.md` around lines 150 - 155, Update the example commands
so they include a valid --cluster-url (or CRATEDB_CLUSTER_URL) and place the
?batch-size query parameter inside the cluster URL rather than as a standalone
URL fragment: modify the ctk load table example (the "ctk load table"
invocation) to pass --cluster-url with a full crate URL (user, password, host,
port, and path/table) and keep the file+iceberg path as the source; modify the
ctk save table example (the "ctk save table" invocation) to pass
--cluster-url="crate://...?...&?batch-size=200000" with a full host/port and
table path and provide the target file+iceberg URI with explicit
catalog/namespace/table so both examples are copy-pastable and show where
batch-size belongs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments