Conversation
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughAdds Apache Iceberg I/O: new iceberg and util modules, CLI split into load/save groups, cluster-level save_table routing, tests, docs, and optional dependency updates for Iceberg support. Changes
Sequence Diagram(s)sequenceDiagram
participant User as User/CLI
participant IO_CLI_Save as IO CLI (cli_save)
participant Cluster as Cluster.save_table()
participant CrateDB as CrateDB
participant Iceberg as Iceberg (to_iceberg)
User->>IO_CLI_Save: run `save table` (source=crate://table, target=iceberg://...)
IO_CLI_Save->>Cluster: instantiate cluster & call save_table(source, target)
Cluster->>CrateDB: query schema & stream row chunks
CrateDB-->>Cluster: return chunk
Cluster->>Iceberg: call to_iceberg(target_url, chunk)
Iceberg->>Iceberg: ensure catalog/namespace and write chunk
Iceberg-->>Cluster: ack
Cluster-->>IO_CLI_Save: return result
IO_CLI_Save-->>User: exit
sequenceDiagram
participant User as User/CLI
participant IO_CLI_Load as IO CLI (cli_load)
participant Cluster as Cluster.load_table()
participant Iceberg as Iceberg (from_iceberg)
participant CrateDB as CrateDB
User->>IO_CLI_Load: run `load table` (source=iceberg://..., target=crate://table)
IO_CLI_Load->>Cluster: instantiate cluster & call load_table(source, target)
Cluster->>Iceberg: call from_iceberg(source_url, target_url)
Iceberg->>Iceberg: parse URL, load catalog/table, stream Polars chunks
Iceberg->>CrateDB: insert_bulk per chunk
CrateDB-->>Iceberg: ack inserted
Iceberg-->>Cluster: completion
Cluster->>CrateDB: refresh target table
Cluster-->>IO_CLI_Load: return result
IO_CLI_Load-->>User: exit
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
cratedb_toolkit/io/iceberg.py (1)
134-134:progressparameter is unused in both public API functions (Ruff ARG001).Both
from_icebergandto_icebergacceptprogress: bool = Falsebut never reference it. If it is intentionally reserved for future use, document it; otherwise suppress with_progressto signal the intent clearly.♻️ Suggested fix
-def from_iceberg(source_url, target_url, progress: bool = False): +def from_iceberg(source_url, target_url, progress: bool = False): # noqa: ARG001or rename to
_progressto explicitly mark as intentionally unused:-def from_iceberg(source_url, target_url, progress: bool = False): +def from_iceberg(source_url, target_url, _progress: bool = False):Apply the same to
to_iceberg.Also applies to: 155-155
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cratedb_toolkit/io/iceberg.py` at line 134, The `progress` parameter is declared but never used in the public APIs; rename it to `_progress` (or prefixed underscore) in both from_iceberg and to_iceberg to mark it intentionally unused (or add a brief docstring note if you prefer to keep the name), and update their signatures and any callers to use `_progress` consistently so the unused-argument lint (ARG001) is resolved; specifically change the parameter in the functions from_iceberg and to_iceberg to `_progress: bool = False` and adjust any local references/calls accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cratedb_toolkit/io/util.py`:
- Around line 83-97: The engine created by sa.create_engine in
pandas_from_cratedb is never disposed, leaking the connection pool if the
generator is not fully exhausted; update pandas_from_cratedb to ensure
engine.dispose() is always called (e.g., create the engine, open the connection,
yield chunks inside a try/finally and call engine.dispose() in the finally block
or wrap the generator with contextlib.contextmanager) so the pool is released
even if callers of pandas_from_cratedb or higher-level read_cratedb do not fully
iterate the generator. Ensure you reference and call engine.dispose() associated
with the engine variable created by sa.create_engine(source_url).
- Around line 52-79: The code currently creates engine = sa.create_engine(...)
and iterates under pl.Config(...) calling frame.collect_batches(...) and
batch.to_pandas().to_sql(...), but engine.dispose() is only called after the
loop and will be skipped if any exception is raised; wrap the streaming + to_sql
work in a try/finally so engine.dispose() always runs (create engine before the
try, run the pl.Config and for-loop inside the try block, and call
engine.dispose() in the finally block), ensuring disposal even if
frame.collect_batches, batch.to_pandas(), or to_sql raise.
In `@doc/io/iceberg/index.md`:
- Around line 150-155: Update the example commands so they include a valid
--cluster-url (or CRATEDB_CLUSTER_URL) and place the ?batch-size query parameter
inside the cluster URL rather than as a standalone URL fragment: modify the ctk
load table example (the "ctk load table" invocation) to pass --cluster-url with
a full crate URL (user, password, host, port, and path/table) and keep the
file+iceberg path as the source; modify the ctk save table example (the "ctk
save table" invocation) to pass
--cluster-url="crate://...?...&?batch-size=200000" with a full host/port and
table path and provide the target file+iceberg URI with explicit
catalog/namespace/table so both examples are copy-pastable and show where
batch-size belongs.
---
Duplicate comments:
In `@cratedb_toolkit/io/iceberg.py`:
- Line 175: The current line computing iceberg_append uses
asbool(URL(target_url).query_params.get("append")) which propagates a ValueError
for non-canonical strings; wrap the conversion in a safe guard: extract the raw
append_str via URL(target_url).query_params.get("append"), then call asbool
inside a try/except ValueError and on exception set iceberg_append = False
(optionally log/debug the invalid value), so asbool, URL, target_url and
iceberg_append are handled without raising to the caller.
- Around line 43-44: The __del__ method in IcebergAddress calls
self.tmpdir.cleanup() unguarded, which can raise AttributeError if __post_init__
or tempfile.TemporaryDirectory() failed and self.tmpdir was never set; modify
__del__ to first check for the attribute (e.g., hasattr(self, "tmpdir")) or use
try/except AttributeError around self.tmpdir.cleanup() so cleanup is only
attempted when tmpdir exists and to avoid exceptions during object finalization.
In `@cratedb_toolkit/io/util.py`:
- Line 64: Remove the dead/unused linter directive by deleting the trailing "#
noqa: ERA001" from the commented line containing
"table_name=cratedb_table.fullname, connection=engine,
if_table_exists=\"replace\"" in util.py (or remove the entire commented line if
it serves no purpose); this eliminates the Ruff RUF100 warning since ERA001 is
not enabled in the config. Ensure any remaining comment stays syntactically
valid and contains no other unused noqa tags.
---
Nitpick comments:
In `@cratedb_toolkit/io/iceberg.py`:
- Line 134: The `progress` parameter is declared but never used in the public
APIs; rename it to `_progress` (or prefixed underscore) in both from_iceberg and
to_iceberg to mark it intentionally unused (or add a brief docstring note if you
prefer to keep the name), and update their signatures and any callers to use
`_progress` consistently so the unused-argument lint (ARG001) is resolved;
specifically change the parameter in the functions from_iceberg and to_iceberg
to `_progress: bool = False` and adjust any local references/calls accordingly.
| engine = sa.create_engine(str(cratedb_url)) | ||
|
|
||
| # Note: The conversion to pandas is zero-copy, | ||
| # so we can utilize their SQL utils for free. | ||
| # https://github.com/pola-rs/polars/issues/7852 | ||
| # Note: This code also uses the most efficient `insert_bulk` method with CrateDB. | ||
| # https://cratedb.com/docs/sqlalchemy-cratedb/dataframe.html#efficient-insert-operations-with-pandas | ||
| # Note: `collect_batches()` is marked as unstable and slower than native sinks; | ||
| # consider native Polars sinks (e.g., write_database) as a faster alternative if available. | ||
| # https://github.com/crate/cratedb-toolkit/pull/444#discussion_r2825382887 | ||
| # Note: This variant appeared to be much slower, let's revisit and investigate why? | ||
| # table.to_polars().collect(streaming=True).write_database( | ||
| # table_name=cratedb_table.fullname, connection=engine, if_table_exists="replace" # noqa: ERA001 | ||
| # Note: When `collect_batches` yields more than one batch, the first batch must use the | ||
| # user-specified `if_exists`, but subsequent batches must use "append". | ||
| with pl.Config(streaming_chunk_size=chunk_size): | ||
| for i, batch in enumerate(frame.collect_batches(engine="streaming", chunk_size=chunk_size)): | ||
| batch.to_pandas().to_sql( | ||
| name=cratedb_table.table, | ||
| schema=cratedb_table.schema, | ||
| con=engine, | ||
| if_exists=if_exists if i == 0 else "append", | ||
| index=False, | ||
| chunksize=chunk_size, | ||
| method=insert_bulk, | ||
| ) | ||
|
|
||
| engine.dispose() |
There was a problem hiding this comment.
engine.dispose() is skipped when collect_batches or to_sql raises an exception.
engine.dispose() at line 79 is only reached on the happy path. Any exception thrown inside the pl.Config block (e.g., from collect_batches, to_pandas, or to_sql) leaves the connection pool open.
🛡️ Proposed fix
engine = sa.create_engine(str(cratedb_url))
+ try:
- with pl.Config(streaming_chunk_size=chunk_size):
+ with pl.Config(streaming_chunk_size=chunk_size):
for i, batch in enumerate(frame.collect_batches(engine="streaming", chunk_size=chunk_size)):
batch.to_pandas().to_sql(
name=cratedb_table.table,
schema=cratedb_table.schema,
con=engine,
if_exists=if_exists if i == 0 else "append",
index=False,
chunksize=chunk_size,
method=insert_bulk,
)
+ finally:
engine.dispose()🧰 Tools
🪛 Ruff (0.15.1)
[warning] 64-64: Unused noqa directive (non-enabled: ERA001)
Remove unused noqa directive
(RUF100)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cratedb_toolkit/io/util.py` around lines 52 - 79, The code currently creates
engine = sa.create_engine(...) and iterates under pl.Config(...) calling
frame.collect_batches(...) and batch.to_pandas().to_sql(...), but
engine.dispose() is only called after the loop and will be skipped if any
exception is raised; wrap the streaming + to_sql work in a try/finally so
engine.dispose() always runs (create engine before the try, run the pl.Config
and for-loop inside the try block, and call engine.dispose() in the finally
block), ensuring disposal even if frame.collect_batches, batch.to_pandas(), or
to_sql raise.
| def pandas_from_cratedb( | ||
| source_url: str, schema: str, table: str, chunk_size: int | ||
| ) -> t.Generator[pd.DataFrame, None, None]: | ||
| """ | ||
| Read from a CrateDB table into pandas data frames, yielding batches/chunks. | ||
| """ | ||
| engine = sa.create_engine(source_url) | ||
| with engine.connect() as connection: | ||
| for chunk in pd.read_sql_table( | ||
| table_name=table, | ||
| schema=schema, | ||
| con=connection, | ||
| chunksize=chunk_size, | ||
| ): | ||
| yield chunk |
There was a problem hiding this comment.
engine in pandas_from_cratedb is never disposed.
sa.create_engine(source_url) opens a connection pool that is never released. with engine.connect() closes the individual connection, but the engine's pool is held until garbage collection. In read_cratedb, this function is called inside a generator chain that callers may not exhaust, compounding the risk.
🛡️ Proposed fix
engine = sa.create_engine(source_url)
+ try:
with engine.connect() as connection:
for chunk in pd.read_sql_table(
table_name=table,
schema=schema,
con=connection,
chunksize=chunk_size,
):
yield chunk
+ finally:
+ engine.dispose()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def pandas_from_cratedb( | |
| source_url: str, schema: str, table: str, chunk_size: int | |
| ) -> t.Generator[pd.DataFrame, None, None]: | |
| """ | |
| Read from a CrateDB table into pandas data frames, yielding batches/chunks. | |
| """ | |
| engine = sa.create_engine(source_url) | |
| with engine.connect() as connection: | |
| for chunk in pd.read_sql_table( | |
| table_name=table, | |
| schema=schema, | |
| con=connection, | |
| chunksize=chunk_size, | |
| ): | |
| yield chunk | |
| def pandas_from_cratedb( | |
| source_url: str, schema: str, table: str, chunk_size: int | |
| ) -> t.Generator[pd.DataFrame, None, None]: | |
| """ | |
| Read from a CrateDB table into pandas data frames, yielding batches/chunks. | |
| """ | |
| engine = sa.create_engine(source_url) | |
| try: | |
| with engine.connect() as connection: | |
| for chunk in pd.read_sql_table( | |
| table_name=table, | |
| schema=schema, | |
| con=connection, | |
| chunksize=chunk_size, | |
| ): | |
| yield chunk | |
| finally: | |
| engine.dispose() |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cratedb_toolkit/io/util.py` around lines 83 - 97, The engine created by
sa.create_engine in pandas_from_cratedb is never disposed, leaking the
connection pool if the generator is not fully exhausted; update
pandas_from_cratedb to ensure engine.dispose() is always called (e.g., create
the engine, open the connection, yield chunks inside a try/finally and call
engine.dispose() in the finally block or wrap the generator with
contextlib.contextmanager) so the pool is released even if callers of
pandas_from_cratedb or higher-level read_cratedb do not fully iterate the
generator. Ensure you reference and call engine.dispose() associated with the
engine variable created by sa.create_engine(source_url).
| ```shell | ||
| ctk load table "file+iceberg://./var/lib/iceberg/?batch-size=200000" | ||
| ``` | ||
| ```shell | ||
| ctk save table --cluster-url="crate://?batch-size=200000" | ||
| ``` |
There was a problem hiding this comment.
Incomplete example commands in the batch-size section may mislead users.
Line 151 omits --cluster-url (or CRATEDB_CLUSTER_URL), so copy-pasting the command fails. Line 154 shows crate://?batch-size=200000 as the cluster URL, which has no host, port, or table — it is not a usable standalone URL and does not illustrate where the parameter belongs within a real cluster URL.
📝 Suggested fix
```shell
-ctk load table "file+iceberg://./var/lib/iceberg/?batch-size=200000"
+ctk load table "file+iceberg://./var/lib/iceberg/?batch-size=200000" \
+ --cluster-url="crate://crate:crate@localhost:4200/demo/taxi"-ctk save table --cluster-url="crate://?batch-size=200000"
+ctk save table \
+ --cluster-url="crate://crate:crate@localhost:4200/demo/taxi?batch-size=200000" \
+ "file+iceberg://./var/lib/iceberg/?catalog=default&namespace=demo&table=taxi"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@doc/io/iceberg/index.md` around lines 150 - 155, Update the example commands
so they include a valid --cluster-url (or CRATEDB_CLUSTER_URL) and place the
?batch-size query parameter inside the cluster URL rather than as a standalone
URL fragment: modify the ctk load table example (the "ctk load table"
invocation) to pass --cluster-url with a full crate URL (user, password, host,
port, and path/table) and keep the file+iceberg path as the source; modify the
ctk save table example (the "ctk save table" invocation) to pass
--cluster-url="crate://...?...&?batch-size=200000" with a full host/port and
table path and provide the target file+iceberg URI with explicit
catalog/namespace/table so both examples are copy-pastable and show where
batch-size belongs.
About
Import from and export to Apache Iceberg tables, for interoperability purposes.
Documentation
https://cratedb-toolkit--444.org.readthedocs.build/io/iceberg/
References
/cc @karynzv, @hammerhead, @zolbatar, @grbade, @WalBeh