diff --git a/docs/dev/contribute.md b/docs/dev/contribute.md index 648e2fa6..e21dae67 100644 --- a/docs/dev/contribute.md +++ b/docs/dev/contribute.md @@ -19,10 +19,12 @@ cd foundry-dev-tools ````{tab} Conda/Mamba First create an environment specifically for foundry-dev-tools with pdm and activate it ```shell -mamba create -n foundry-dev-tools pdm openjdk===17 +mamba create -n foundry-dev-tools python=3.12 pdm openjdk=17 mamba activate foundry-dev-tools ``` ```` +Note that python>3.12 throws an error when running `pdm install`: `configured Python interpreter version (3.14) is newer than PyO3's maximum supported version (3.12)`. + ````{tab} Without Conda/Mamba If you don't want to use conda or mamba, you'll need to install pdm through other means. It is available in most Linux package managers, Homebrew, Scoop and also via pip. diff --git a/docs/examples/dataset.md b/docs/examples/dataset.md index 1d037e77..f8687b8f 100644 --- a/docs/examples/dataset.md +++ b/docs/examples/dataset.md @@ -260,9 +260,7 @@ import polars as pl ctx = FoundryContext() ds = ctx.get_dataset_by_path("/path/to/test_dataset") -arrow_table = ds.query_foundry_sql("SELECT *",return_type="arrow") - -df = pl.from_arrow(arrow_table) +df = ds.query_foundry_sql("SELECT *", return_type="polars") print(df) ``` ```` diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/data_proxy.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/data_proxy.py index 77ce84b0..c682ada9 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/data_proxy.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/data_proxy.py @@ -27,6 +27,7 @@ from pathlib import Path import pandas as pd + import polars as pl import pyarrow as pa import pyspark import requests @@ -111,7 +112,17 @@ def query_foundry_sql_legacy( branch: Ref = ..., sql_dialect: SqlDialect = ..., timeout: int = ..., - ) -> pd.core.frame.DataFrame: ... + ) -> pd.DataFrame: ... + + @overload + def query_foundry_sql_legacy( + self, + query: str, + return_type: Literal["polars"], + branch: Ref = ..., + sql_dialect: SqlDialect = ..., + timeout: int = ..., + ) -> pl.DataFrame: ... @overload def query_foundry_sql_legacy( @@ -151,7 +162,7 @@ def query_foundry_sql_legacy( branch: Ref = ..., sql_dialect: SqlDialect = ..., timeout: int = ..., - ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ... + ) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ... def query_foundry_sql_legacy( self, @@ -160,7 +171,7 @@ def query_foundry_sql_legacy( branch: Ref = "master", sql_dialect: SqlDialect = "SPARK", timeout: int = 600, - ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: + ) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: """Queries the dataproxy query API with spark SQL. Example: @@ -206,6 +217,9 @@ def query_foundry_sql_legacy( response_json = response.json() if return_type == "raw": return response_json["foundrySchema"], response_json["rows"] + # return_type arrow, pandas and polars use the FakeModule implementation in + # their _optional packages. The FakeModule throws an ImportError when trying + # to access attributes of the module, so no need to explicitly catch ImportError. if return_type == "pandas": from foundry_dev_tools._optional.pandas import pd @@ -213,13 +227,19 @@ def query_foundry_sql_legacy( data=response_json["rows"], columns=[e["name"] for e in response_json["foundrySchema"]["fieldSchemaList"]], ) - if return_type == "arrow": + if return_type in {"arrow", "polars"}: from foundry_dev_tools._optional.pyarrow import pa - return pa.table( + table = pa.table( data=response_json["rows"], names=[e["name"] for e in response_json["foundrySchema"]["fieldSchemaList"]], ) + if return_type == "arrow": + return table + + from foundry_dev_tools._optional.polars import pl + + return pl.from_arrow(table) if return_type == "spark": from foundry_dev_tools.utils.converter.foundry_spark import ( foundry_schema_to_spark_schema, diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py index 5a1cb706..26a6d853 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/clients/foundry_sql_server.py @@ -17,6 +17,7 @@ if TYPE_CHECKING: import pandas as pd + import polars as pl import pyarrow as pa import pyspark import requests @@ -35,7 +36,7 @@ def query_foundry_sql( branch: Ref = ..., sql_dialect: SqlDialect = ..., timeout: int = ..., - ) -> pd.core.frame.DataFrame: ... + ) -> pd.DataFrame: ... @overload def query_foundry_sql( @@ -57,6 +58,16 @@ def query_foundry_sql( timeout: int = ..., ) -> pa.Table: ... + @overload + def query_foundry_sql( + self, + query: str, + return_type: Literal["polars"], + branch: Ref = ..., + sql_dialect: SqlDialect = ..., + timeout: int = ..., + ) -> pl.DataFrame: ... + @overload def query_foundry_sql( self, @@ -75,16 +86,16 @@ def query_foundry_sql( branch: Ref = ..., sql_dialect: SqlDialect = ..., timeout: int = ..., - ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ... + ) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ... - def query_foundry_sql( + def query_foundry_sql( # noqa: C901 self, query: str, return_type: SQLReturnType = "pandas", branch: Ref = "master", sql_dialect: SqlDialect = "SPARK", timeout: int = 600, - ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: + ) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: """Queries the Foundry SQL server with spark SQL dialect. Uses Arrow IPC to communicate with the Foundry SQL Server Endpoint. @@ -105,9 +116,9 @@ def query_foundry_sql( timeout: Query Timeout, default value is 600 seconds Returns: - :external+pandas:py:class:`~pandas.DataFrame` | :external+pyarrow:py:class:`~pyarrow.Table` | :external+spark:py:class:`~pyspark.sql.DataFrame`: + :external+pandas:py:class:`~pandas.DataFrame` | :external+polars:py:class:`~polars.DataFrame` | :external+pyarrow:py:class:`~pyarrow.Table` | :external+spark:py:class:`~pyspark.sql.DataFrame`: - A pandas DataFrame, Spark DataFrame or pyarrow.Table with the result. + A pandas, polars, Spark DataFrame or pyarrow.Table with the result. Raises: ValueError: Only direct read eligible queries can be returned as arrow Table. @@ -139,6 +150,15 @@ def query_foundry_sql( arrow_stream_reader = self.read_fsql_query_results_arrow(query_id=query_id) if return_type == "pandas": return arrow_stream_reader.read_pandas() + if return_type == "polars": + # The FakeModule implementation used in the _optional packages + # throws an ImportError when trying to access attributes of the module. + # This ImportError is caught below to fall back to query_foundry_sql_legacy + # which will again raise an ImportError when polars is not installed. + from foundry_dev_tools._optional.polars import pl + + arrow_table = arrow_stream_reader.read_all() + return pl.from_arrow(arrow_table) if return_type == "spark": from foundry_dev_tools.utils.converter.foundry_spark import ( @@ -147,16 +167,12 @@ def query_foundry_sql( return arrow_stream_to_spark_dataframe(arrow_stream_reader) return arrow_stream_reader.read_all() - - return self._query_fsql( - query=query, - branch=branch, - return_type=return_type, - ) except ( FoundrySqlSerializationFormatNotImplementedError, ImportError, ) as exc: + # Swallow exception when return_type != 'arrow' + # to fall back to query_foundry_sql_legacy if return_type == "arrow": msg = ( "Only direct read eligible queries can be returned as arrow Table. Consider using setting" @@ -166,6 +182,8 @@ def query_foundry_sql( msg, ) from exc + # this fallback is not only used if return_type is 'raw', but also when one of + # the above exceptions is caught and return_type != 'arrow' warnings.warn("Falling back to query_foundry_sql_legacy!") return self.context.data_proxy.query_foundry_sql_legacy( query=query, diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/foundry_api_client.py b/libs/foundry-dev-tools/src/foundry_dev_tools/foundry_api_client.py index 337591e2..cf0064ba 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/foundry_api_client.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/foundry_api_client.py @@ -2,7 +2,7 @@ One of the gaols of this module is to be self-contained so that it can be dropped into any python installation with minimal dependency to 'requests' -Optional dependencies for the SQL functionality to work are pandas and pyarrow. +Optional dependencies for the SQL functionality to work are pandas, polars and pyarrow. """ @@ -41,6 +41,7 @@ from collections.abc import Iterator import pandas as pd + import polars as pl import pyarrow as pa import pyspark import requests @@ -1000,7 +1001,7 @@ def query_foundry_sql( branch: api_types.Ref = ..., sql_dialect: api_types.SqlDialect = ..., timeout: int = ..., - ) -> pd.core.frame.DataFrame: ... + ) -> pd.DataFrame: ... @overload def query_foundry_sql( @@ -1022,6 +1023,16 @@ def query_foundry_sql( timeout: int = ..., ) -> pa.Table: ... + @overload + def query_foundry_sql( + self, + query: str, + return_type: Literal["polars"], + branch: api_types.Ref = ..., + sql_dialect: api_types.SqlDialect = ..., + timeout: int = ..., + ) -> pl.DataFrame: ... + @overload def query_foundry_sql( self, @@ -1040,7 +1051,7 @@ def query_foundry_sql( branch: api_types.Ref = ..., sql_dialect: api_types.SqlDialect = ..., timeout: int = ..., - ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ... + ) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ... def query_foundry_sql( self, @@ -1049,7 +1060,7 @@ def query_foundry_sql( branch: api_types.Ref = "master", sql_dialect: api_types.SqlDialect = "SPARK", timeout: int = 600, - ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: + ) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: """Queries the Foundry SQL server with spark SQL dialect. Uses Arrow IPC to communicate with the Foundry SQL Server Endpoint. diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/resources/dataset.py b/libs/foundry-dev-tools/src/foundry_dev_tools/resources/dataset.py index f78d2b25..8a731878 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/resources/dataset.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/resources/dataset.py @@ -31,7 +31,7 @@ import pandas as pd import pandas.core.frame - import polars.dataframe.frame + import polars as pl import pyarrow as pa import pyspark.sql @@ -539,7 +539,7 @@ def download_files_temporary( def save_dataframe( self, - df: pandas.core.frame.DataFrame | polars.dataframe.frame.DataFrame | pyspark.sql.DataFrame, + df: pandas.core.frame.DataFrame | pl.DataFrame | pyspark.sql.DataFrame, transaction_type: api_types.FoundryTransaction = "SNAPSHOT", foundry_schema: api_types.FoundrySchema | None = None, ) -> Self: @@ -697,7 +697,7 @@ def query_foundry_sql( return_type: Literal["pandas"], sql_dialect: api_types.SqlDialect = ..., timeout: int = ..., - ) -> pd.core.frame.DataFrame: ... + ) -> pd.DataFrame: ... @overload def query_foundry_sql( @@ -717,6 +717,15 @@ def query_foundry_sql( timeout: int = ..., ) -> pa.Table: ... + @overload + def query_foundry_sql( + self, + query: str, + return_type: Literal["polars"], + sql_dialect: api_types.SqlDialect = ..., + timeout: int = ..., + ) -> pl.DataFrame: ... + @overload def query_foundry_sql( self, @@ -733,7 +742,7 @@ def query_foundry_sql( return_type: api_types.SQLReturnType = ..., sql_dialect: api_types.SqlDialect = ..., timeout: int = ..., - ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ... + ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: ... def query_foundry_sql( self, @@ -741,7 +750,7 @@ def query_foundry_sql( return_type: api_types.SQLReturnType = "pandas", sql_dialect: api_types.SqlDialect = "SPARK", timeout: int = 600, - ) -> tuple[dict, list[list]] | pd.DataFrame | pa.Table | pyspark.sql.DataFrame: + ) -> tuple[dict, list[list]] | pd.DataFrame | pl.DataFrame | pa.Table | pyspark.sql.DataFrame: """Wrapper around :py:meth:`foundry_dev_tools.clients.foundry_sql_server.FoundrySqlServerClient.query_foundry_sql`. But it automatically prepends the dataset location, so instead of: @@ -783,17 +792,12 @@ def to_pandas(self) -> pandas.core.frame.DataFrame: """ return self.query_foundry_sql("SELECT *", return_type="pandas") - def to_polars(self) -> polars.dataframe.frame.DataFrame: + def to_polars(self) -> pl.DataFrame: """Get dataset as a :py:class:`polars.DataFrame`. Via :py:meth:`foundry_dev_tools.resources.dataset.Dataset.query_foundry_sql` """ - try: - import polars as pl - except ImportError as e: - msg = "The optional 'polars' package is not installed. Please install it to use the 'to_polars' method" - raise ImportError(msg) from e - return pl.from_arrow(self.to_arrow()) + return self.query_foundry_sql("SELECT *", return_type="polars") @contextmanager def transaction_context( diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/utils/api_types.py b/libs/foundry-dev-tools/src/foundry_dev_tools/utils/api_types.py index f262e97a..0eb3a9c6 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/utils/api_types.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/utils/api_types.py @@ -95,10 +95,11 @@ def assert_in_literal(option, literal, variable_name) -> None: # noqa: ANN001 SqlDialect = Literal["ANSI", "SPARK"] """The SQL Dialect for Foundry SQL queries.""" -SQLReturnType = Literal["pandas", "spark", "arrow", "raw"] +SQLReturnType = Literal["pandas", "polars", "spark", "arrow", "raw"] """The return_types for sql queries. pandas: :external+pandas:py:class:`pandas.DataFrame` +polars: :external+polars:py:class:`polars.DataFrame` arrow: :external+pyarrow:py:class:`pyarrow.Table` spark: :external+spark:py:class:`~pyspark.sql.DataFrame` raw: Tuple of (foundry_schema, data) (can only be used in legacy) diff --git a/tests/integration/clients/test_foundry_sql_server.py b/tests/integration/clients/test_foundry_sql_server.py index 7dd079dc..929cddde 100644 --- a/tests/integration/clients/test_foundry_sql_server.py +++ b/tests/integration/clients/test_foundry_sql_server.py @@ -1,3 +1,4 @@ +import polars as pl import pytest from foundry_dev_tools.errors.dataset import BranchNotFoundError, DatasetHasNoSchemaError, DatasetNotFoundError @@ -12,6 +13,16 @@ def test_smoke(): assert one_row_one_column.shape == (1, 1) +def test_polars_return_type(): + polars_df = TEST_SINGLETON.ctx.foundry_sql_server.query_foundry_sql( + f"SELECT sepal_length FROM `{TEST_SINGLETON.iris_new.rid}` LIMIT 2", + return_type="polars", + ) + assert isinstance(polars_df, pl.DataFrame) + assert polars_df.height == 2 + assert polars_df.width == 1 + + def test_exceptions(): with pytest.raises(BranchNotFoundError) as exc: TEST_SINGLETON.ctx.foundry_sql_server.query_foundry_sql( diff --git a/tests/integration/resources/test_dataset.py b/tests/integration/resources/test_dataset.py index a764d0d8..6e202395 100644 --- a/tests/integration/resources/test_dataset.py +++ b/tests/integration/resources/test_dataset.py @@ -97,6 +97,10 @@ def test_crud_dataset(spark_session, tmp_path): # noqa: PLR0915 ds_polars_branch = TEST_SINGLETON.ctx.get_dataset(ds.rid, branch="polars") ds_polars_branch.save_dataframe(polars_df) pl_assert_frame_equal(polars_df, ds_polars_branch.to_polars()) + pl_assert_frame_equal( + polars_df, + ds_polars_branch.query_foundry_sql("SELECT *", return_type="polars"), + ) ds_spark_branch = TEST_SINGLETON.ctx.get_dataset(ds.rid, branch="spark") ds_spark_branch.save_dataframe(spark_df)