Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/dev/contribute.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ cd foundry-dev-tools
````{tab} Conda/Mamba
First create an environment specifically for foundry-dev-tools with pdm and activate it
```shell
mamba create -n foundry-dev-tools pdm openjdk===17
mamba create -n foundry-dev-tools python=3.12 pdm openjdk=17
mamba activate foundry-dev-tools
```
````
Note that python>3.12 throws an error when running `pdm install`: `configured Python interpreter version (3.14) is newer than PyO3's maximum supported version (3.12)`.

````{tab} Without Conda/Mamba
If you don't want to use conda or mamba, you'll need to install pdm through other means.
It is available in most Linux package managers, Homebrew, Scoop and also via pip.
Expand Down
4 changes: 1 addition & 3 deletions docs/examples/dataset.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,7 @@ import polars as pl

ctx = FoundryContext()
ds = ctx.get_dataset_by_path("/path/to/test_dataset")
arrow_table = ds.query_foundry_sql("SELECT *",return_type="arrow")

df = pl.from_arrow(arrow_table)
df = ds.query_foundry_sql("SELECT *", return_type="polars")
print(df)
```
````
Expand Down
30 changes: 25 additions & 5 deletions libs/foundry-dev-tools/src/foundry_dev_tools/clients/data_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from pathlib import Path

import pandas as pd
import polars as pl
import pyarrow as pa
import pyspark
import requests
Expand Down Expand Up @@ -111,7 +112,17 @@ def query_foundry_sql_legacy(
branch: Ref = ...,
sql_dialect: SqlDialect = ...,
timeout: int = ...,
) -> pd.core.frame.DataFrame: ...
) -> pd.DataFrame: ...

@overload
def query_foundry_sql_legacy(
self,
query: str,
return_type: Literal["polars"],
branch: Ref = ...,
sql_dialect: SqlDialect = ...,
timeout: int = ...,
) -> pl.DataFrame: ...

@overload
def query_foundry_sql_legacy(
Expand Down Expand Up @@ -151,7 +162,7 @@ def query_foundry_sql_legacy(
branch: Ref = ...,
sql_dialect: SqlDialect = ...,
timeout: int = ...,
) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ...
) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ...

def query_foundry_sql_legacy(
self,
Expand All @@ -160,7 +171,7 @@ def query_foundry_sql_legacy(
branch: Ref = "master",
sql_dialect: SqlDialect = "SPARK",
timeout: int = 600,
) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame:
) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame:
"""Queries the dataproxy query API with spark SQL.

Example:
Expand Down Expand Up @@ -206,20 +217,29 @@ def query_foundry_sql_legacy(
response_json = response.json()
if return_type == "raw":
return response_json["foundrySchema"], response_json["rows"]
# return_type arrow, pandas and polars use the FakeModule implementation in
# their _optional packages. The FakeModule throws an ImportError when trying
# to access attributes of the module, so no need to explicitly catch ImportError.
if return_type == "pandas":
from foundry_dev_tools._optional.pandas import pd

return pd.DataFrame(
data=response_json["rows"],
columns=[e["name"] for e in response_json["foundrySchema"]["fieldSchemaList"]],
)
if return_type == "arrow":
if return_type in {"arrow", "polars"}:
from foundry_dev_tools._optional.pyarrow import pa

return pa.table(
table = pa.table(
data=response_json["rows"],
names=[e["name"] for e in response_json["foundrySchema"]["fieldSchemaList"]],
)
if return_type == "arrow":
return table

from foundry_dev_tools._optional.polars import pl

return pl.from_arrow(table)
if return_type == "spark":
from foundry_dev_tools.utils.converter.foundry_spark import (
foundry_schema_to_spark_schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

if TYPE_CHECKING:
import pandas as pd
import polars as pl
import pyarrow as pa
import pyspark
import requests
Expand All @@ -35,7 +36,7 @@ def query_foundry_sql(
branch: Ref = ...,
sql_dialect: SqlDialect = ...,
timeout: int = ...,
) -> pd.core.frame.DataFrame: ...
) -> pd.DataFrame: ...

@overload
def query_foundry_sql(
Expand All @@ -57,6 +58,16 @@ def query_foundry_sql(
timeout: int = ...,
) -> pa.Table: ...

@overload
def query_foundry_sql(
self,
query: str,
return_type: Literal["polars"],
branch: Ref = ...,
sql_dialect: SqlDialect = ...,
timeout: int = ...,
) -> pl.DataFrame: ...

@overload
def query_foundry_sql(
self,
Expand All @@ -75,16 +86,16 @@ def query_foundry_sql(
branch: Ref = ...,
sql_dialect: SqlDialect = ...,
timeout: int = ...,
) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ...
) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ...

def query_foundry_sql(
def query_foundry_sql( # noqa: C901
self,
query: str,
return_type: SQLReturnType = "pandas",
branch: Ref = "master",
sql_dialect: SqlDialect = "SPARK",
timeout: int = 600,
) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame:
) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame:
"""Queries the Foundry SQL server with spark SQL dialect.

Uses Arrow IPC to communicate with the Foundry SQL Server Endpoint.
Expand All @@ -105,9 +116,9 @@ def query_foundry_sql(
timeout: Query Timeout, default value is 600 seconds

Returns:
:external+pandas:py:class:`~pandas.DataFrame` | :external+pyarrow:py:class:`~pyarrow.Table` | :external+spark:py:class:`~pyspark.sql.DataFrame`:
:external+pandas:py:class:`~pandas.DataFrame` | :external+polars:py:class:`~polars.DataFrame` | :external+pyarrow:py:class:`~pyarrow.Table` | :external+spark:py:class:`~pyspark.sql.DataFrame`:

A pandas DataFrame, Spark DataFrame or pyarrow.Table with the result.
A pandas, polars, Spark DataFrame or pyarrow.Table with the result.

Raises:
ValueError: Only direct read eligible queries can be returned as arrow Table.
Expand Down Expand Up @@ -139,6 +150,15 @@ def query_foundry_sql(
arrow_stream_reader = self.read_fsql_query_results_arrow(query_id=query_id)
if return_type == "pandas":
return arrow_stream_reader.read_pandas()
if return_type == "polars":
# The FakeModule implementation used in the _optional packages
# throws an ImportError when trying to access attributes of the module.
# This ImportError is caught below to fall back to query_foundry_sql_legacy
# which will again raise an ImportError when polars is not installed.
from foundry_dev_tools._optional.polars import pl

arrow_table = arrow_stream_reader.read_all()
return pl.from_arrow(arrow_table)

if return_type == "spark":
from foundry_dev_tools.utils.converter.foundry_spark import (
Expand All @@ -147,16 +167,12 @@ def query_foundry_sql(

return arrow_stream_to_spark_dataframe(arrow_stream_reader)
return arrow_stream_reader.read_all()

return self._query_fsql(
query=query,
branch=branch,
return_type=return_type,
)
except (
FoundrySqlSerializationFormatNotImplementedError,
ImportError,
) as exc:
# Swallow exception when return_type != 'arrow'
# to fall back to query_foundry_sql_legacy
if return_type == "arrow":
msg = (
"Only direct read eligible queries can be returned as arrow Table. Consider using setting"
Expand All @@ -166,6 +182,8 @@ def query_foundry_sql(
msg,
) from exc

# this fallback is not only used if return_type is 'raw', but also when one of
# the above exceptions is caught and return_type != 'arrow'
warnings.warn("Falling back to query_foundry_sql_legacy!")
return self.context.data_proxy.query_foundry_sql_legacy(
query=query,
Expand Down
19 changes: 15 additions & 4 deletions libs/foundry-dev-tools/src/foundry_dev_tools/foundry_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

One of the gaols of this module is to be self-contained so that it can be
dropped into any python installation with minimal dependency to 'requests'
Optional dependencies for the SQL functionality to work are pandas and pyarrow.
Optional dependencies for the SQL functionality to work are pandas, polars and pyarrow.

"""

Expand Down Expand Up @@ -41,6 +41,7 @@
from collections.abc import Iterator

import pandas as pd
import polars as pl
import pyarrow as pa
import pyspark
import requests
Expand Down Expand Up @@ -1000,7 +1001,7 @@ def query_foundry_sql(
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> pd.core.frame.DataFrame: ...
) -> pd.DataFrame: ...

@overload
def query_foundry_sql(
Expand All @@ -1022,6 +1023,16 @@ def query_foundry_sql(
timeout: int = ...,
) -> pa.Table: ...

@overload
def query_foundry_sql(
self,
query: str,
return_type: Literal["polars"],
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> pl.DataFrame: ...

@overload
def query_foundry_sql(
self,
Expand All @@ -1040,7 +1051,7 @@ def query_foundry_sql(
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ...
) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ...

def query_foundry_sql(
self,
Expand All @@ -1049,7 +1060,7 @@ def query_foundry_sql(
branch: api_types.Ref = "master",
sql_dialect: api_types.SqlDialect = "SPARK",
timeout: int = 600,
) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame:
) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame:
"""Queries the Foundry SQL server with spark SQL dialect.

Uses Arrow IPC to communicate with the Foundry SQL Server Endpoint.
Expand Down
28 changes: 16 additions & 12 deletions libs/foundry-dev-tools/src/foundry_dev_tools/resources/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import pandas as pd
import pandas.core.frame
import polars.dataframe.frame
import polars as pl
import pyarrow as pa
import pyspark.sql

Expand Down Expand Up @@ -539,7 +539,7 @@ def download_files_temporary(

def save_dataframe(
self,
df: pandas.core.frame.DataFrame | polars.dataframe.frame.DataFrame | pyspark.sql.DataFrame,
df: pandas.core.frame.DataFrame | pl.DataFrame | pyspark.sql.DataFrame,
transaction_type: api_types.FoundryTransaction = "SNAPSHOT",
foundry_schema: api_types.FoundrySchema | None = None,
) -> Self:
Expand Down Expand Up @@ -697,7 +697,7 @@ def query_foundry_sql(
return_type: Literal["pandas"],
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> pd.core.frame.DataFrame: ...
) -> pd.DataFrame: ...

@overload
def query_foundry_sql(
Expand All @@ -717,6 +717,15 @@ def query_foundry_sql(
timeout: int = ...,
) -> pa.Table: ...

@overload
def query_foundry_sql(
self,
query: str,
return_type: Literal["polars"],
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> pl.DataFrame: ...

@overload
def query_foundry_sql(
self,
Expand All @@ -733,15 +742,15 @@ def query_foundry_sql(
return_type: api_types.SQLReturnType = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ...
) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ...

def query_foundry_sql(
self,
query: str,
return_type: api_types.SQLReturnType = "pandas",
sql_dialect: api_types.SqlDialect = "SPARK",
timeout: int = 600,
) -> tuple[dict, list[list]] | pd.DataFrame | pa.Table | pyspark.sql.DataFrame:
) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame:
"""Wrapper around :py:meth:`foundry_dev_tools.clients.foundry_sql_server.FoundrySqlServerClient.query_foundry_sql`.

But it automatically prepends the dataset location, so instead of:
Expand Down Expand Up @@ -783,17 +792,12 @@ def to_pandas(self) -> pandas.core.frame.DataFrame:
"""
return self.query_foundry_sql("SELECT *", return_type="pandas")

def to_polars(self) -> polars.dataframe.frame.DataFrame:
def to_polars(self) -> pl.DataFrame:
"""Get dataset as a :py:class:`polars.DataFrame`.

Via :py:meth:`foundry_dev_tools.resources.dataset.Dataset.query_foundry_sql`
"""
try:
import polars as pl
except ImportError as e:
msg = "The optional 'polars' package is not installed. Please install it to use the 'to_polars' method"
raise ImportError(msg) from e
return pl.from_arrow(self.to_arrow())
return self.query_foundry_sql("SELECT *", return_type="polars")

@contextmanager
def transaction_context(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ def assert_in_literal(option, literal, variable_name) -> None: # noqa: ANN001
SqlDialect = Literal["ANSI", "SPARK"]
"""The SQL Dialect for Foundry SQL queries."""

SQLReturnType = Literal["pandas", "spark", "arrow", "raw"]
SQLReturnType = Literal["pandas", "polars", "spark", "arrow", "raw"]
"""The return_types for sql queries.

pandas: :external+pandas:py:class:`pandas.DataFrame`
polars: :external+polars:py:class:`polars.DataFrame`
arrow: :external+pyarrow:py:class:`pyarrow.Table`
spark: :external+spark:py:class:`~pyspark.sql.DataFrame`
raw: Tuple of (foundry_schema, data) (can only be used in legacy)
Expand Down
11 changes: 11 additions & 0 deletions tests/integration/clients/test_foundry_sql_server.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import polars as pl
import pytest

from foundry_dev_tools.errors.dataset import BranchNotFoundError, DatasetHasNoSchemaError, DatasetNotFoundError
Expand All @@ -12,6 +13,16 @@ def test_smoke():
assert one_row_one_column.shape == (1, 1)


def test_polars_return_type():
polars_df = TEST_SINGLETON.ctx.foundry_sql_server.query_foundry_sql(
f"SELECT sepal_length FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 2",
return_type="polars",
)
assert isinstance(polars_df, pl.DataFrame)
assert polars_df.height == 2
assert polars_df.width == 1


def test_exceptions():
with pytest.raises(BranchNotFoundError) as exc:
TEST_SINGLETON.ctx.foundry_sql_server.query_foundry_sql(
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/resources/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ def test_crud_dataset(spark_session, tmp_path): # noqa: PLR0915
ds_polars_branch = TEST_SINGLETON.ctx.get_dataset(ds.rid, branch="polars")
ds_polars_branch.save_dataframe(polars_df)
pl_assert_frame_equal(polars_df, ds_polars_branch.to_polars())
pl_assert_frame_equal(
polars_df,
ds_polars_branch.query_foundry_sql("SELECT *", return_type="polars"),
)

ds_spark_branch = TEST_SINGLETON.ctx.get_dataset(ds.rid, branch="spark")
ds_spark_branch.save_dataframe(spark_df)
Expand Down