diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9b76cdbff5..7c952c2c56 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -32,29 +32,30 @@ jobs: matrix: os: [ubuntu-latest] python-version: ["3.9"] - pytest_args: [tests] - include: - # Run stability tests on the lowest and highest versions of Python only - # These are temporarily redundant with the current global python-version - # - pytest_args: tests/stability - # python-version: "3.9" - # os: ubuntu-latest - # - pytest_args: tests/stability - # python-version: "3.9" - # os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.11" - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.11" - os: ubuntu-latest - # Run stability tests on Python Windows and MacOS (latest py39 only) - - pytest_args: tests/stability - python-version: "3.9" - os: windows-latest - - pytest_args: tests/stability - python-version: "3.9" - os: macos-latest + # pytest_args: [tests] + pytest_args: [tests/benchmarks/test_deltalake.py] +# include: +# # Run stability tests on the lowest and highest versions of Python only +# # These are temporarily redundant with the current global python-version +# # - pytest_args: tests/stability +# # python-version: "3.9" +# # os: ubuntu-latest +# # - pytest_args: tests/stability +# # python-version: "3.9" +# # os: ubuntu-latest +# - pytest_args: tests/stability +# python-version: "3.11" +# os: ubuntu-latest +# - pytest_args: tests/stability +# python-version: "3.11" +# os: ubuntu-latest +# # Run stability tests on Python Windows and MacOS (latest py39 only) +# - pytest_args: tests/stability +# python-version: "3.9" +# os: windows-latest +# - pytest_args: tests/stability +# python-version: "3.9" +# os: macos-latest steps: - name: Checkout diff --git a/ci/environment.yml b/ci/environment.yml index aadf8a1189..1e2dea97d7 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -41,4 +41,6 @@ dependencies: - gilknocker ==0.4.1 - openssl >1.1.0g - pyopenssl ==22.1.0 # Pinned by snowflake-connector-python - - cryptography ==38.0.4 # Pinned by snowflake-connector-python \ No newline at end of file + - cryptography ==38.0.4 # Pinned by snowflake-connector-python + - pip: + - git+https://github.com/dask-contrib/dask-deltatable.git # TODO: link to release version \ No newline at end of file diff --git a/tests/benchmarks/test_deltalake.py b/tests/benchmarks/test_deltalake.py new file mode 100644 index 0000000000..d6d726cc1d --- /dev/null +++ b/tests/benchmarks/test_deltalake.py @@ -0,0 +1,136 @@ +import os + +import dask.dataframe as dd +import dask_deltatable as ddt +import pytest + +DATASETS = { + "0.5 GB": "s3://coiled-datasets/h2o-delta/N_1e7_K_1e2/", + "5 GB": "s3://coiled-datasets/h2o-delta/N_1e8_K_1e2/", +} + +enabled_datasets = os.getenv("H2O_DELTA_DATASETS") +if enabled_datasets is not None: + enabled_datasets = {k.strip() for k in enabled_datasets.split(",")} + if unknown_datasets := enabled_datasets - DATASETS.keys(): + raise ValueError("Unknown h2o-delta dataset(s): ", unknown_datasets) +else: + enabled_datasets = { + "0.5 GB", + "5 GB", + } + + +@pytest.fixture(params=list(DATASETS)) +def uri(request): + if request.param not in enabled_datasets: + raise pytest.skip( + "Disabled by default config or H2O_DELTA_DATASETS env variable" + ) + return DATASETS[request.param] + + +@pytest.fixture(params=["read_deltalake", "read_parquet"]) +def ddf(request, small_client, uri): + if request.param == "read_deltalake": + delta_storage_options = { + "AWS_REGION": "us-east-2", + "AWS_ACCESS_KEY_ID": os.environ["AWS_ACCESS_KEY_ID"], + "AWS_SECRET_ACCESS_KEY": os.environ["AWS_SECRET_ACCESS_KEY"], + } + yield ddt.read_deltalake(uri, delta_storage_options=delta_storage_options) + else: + yield dd.read_parquet( + f"{uri}*/*.parquet", engine="pyarrow", storage_options={"anon": "true"} + ) + + +def test_q1(ddf): + ddf = ddf[["id1", "v2"]] + ddf.groupby("id1", dropna=False, observed=True).agg({"v2": "sum"}).compute() + + +# def test_q2(ddf): +# ddf = ddf[["id1", "id2", "v1"]] +# ( +# ddf.groupby(["id1", "id2"], dropna=False, observed=True) +# .agg({"v1": "sum"}) +# .compute() +# ) +# +# +# def test_q3(ddf): +# ddf = ddf[["id3", "v1", "v3"]] +# ( +# ddf.groupby("id3", dropna=False, observed=True) +# .agg({"v1": "sum", "v3": "mean"}) +# .compute() +# ) +# +# +# def test_q4(ddf): +# ddf = ddf[["id4", "v1", "v2", "v3"]] +# ( +# ddf.groupby("id4", dropna=False, observed=True) +# .agg({"v1": "mean", "v2": "mean", "v3": "mean"}) +# .compute() +# ) +# +# +# def test_q5(ddf): +# ddf = ddf[["id6", "v1", "v2", "v3"]] +# ( +# ddf.groupby("id6", dropna=False, observed=True) +# .agg( +# {"v1": "sum", "v2": "sum", "v3": "sum"}, +# ) +# .compute() +# ) +# +# +# def test_q6(ddf, shuffle_method): +# # Median aggregation uses an explicitly-set shuffle +# ddf = ddf[["id4", "id5", "v3"]] +# ( +# ddf.groupby(["id4", "id5"], dropna=False, observed=True) +# .agg({"v3": ["median", "std"]}, shuffle=shuffle_method) +# .compute() # requires shuffle arg to be set explicitly +# ) +# +# +# def test_q7(ddf): +# ddf = ddf[["id3", "v1", "v2"]] +# ( +# ddf.groupby("id3", dropna=False, observed=True) +# .agg({"v1": "max", "v2": "min"}) +# .assign(range_v1_v2=lambda x: x["v1"] - x["v2"])[["range_v1_v2"]] +# .compute() +# ) +# +# +# def test_q8(ddf, configure_shuffling): +# # .groupby(...).apply(...) uses a shuffle to transfer data before applying the function +# ddf = ddf[["id6", "v1", "v2", "v3"]] +# ( +# ddf[~ddf["v3"].isna()][["id6", "v3"]] +# .groupby("id6", dropna=False, observed=True) +# .apply( +# lambda x: x.nlargest(2, columns="v3"), +# meta={"id6": "Int64", "v3": "float64"}, +# )[["v3"]] +# .compute() +# ) +# +# +# def test_q9(ddf, configure_shuffling): +# # .groupby(...).apply(...) uses a shuffle to transfer data before applying the function +# ddf = ddf[["id2", "id4", "v1", "v2"]] +# ( +# ddf[["id2", "id4", "v1", "v2"]] +# .groupby(["id2", "id4"], dropna=False, observed=True) +# .apply( +# lambda x: pd.Series({"r2": x.corr(numeric_only=True)["v1"]["v2"] ** 2}), +# meta={"r2": "float64"}, +# ) +# .compute() +# )