diff --git a/.github/workflows/regression.yml b/.github/workflows/regression.yml index 53a7684e..e7b7336e 100644 --- a/.github/workflows/regression.yml +++ b/.github/workflows/regression.yml @@ -17,7 +17,7 @@ jobs: strategy: fail-fast: false matrix: - solution: [data.table, collapse, dplyr, pandas, pydatatable, spark, juliadf, juliads, polars, arrow, duckdb, duckdb-latest, datafusion] + solution: [data.table, collapse, dplyr, pandas, pydatatable, spark, juliadf, juliads, polars, arrow, duckdb, duckdb-latest, datafusion, dask] name: Regression Tests solo solutions runs-on: ubuntu-20.04 env: @@ -46,6 +46,10 @@ jobs: shell: bash run: rm time.csv logs.csv + - name: Update virtualenv + shell: bash + run: python3 -m pip install virtualenv + - name: Install all solutions shell: bash run: source path.env && python3 _utils/install_all_solutions.py ${{ matrix.solution }} @@ -167,4 +171,3 @@ jobs: name: all-out.zip path: all-out.zip if-no-files-found: error - diff --git a/_utils/prep_solutions.py b/_utils/prep_solutions.py index cd11f7b7..98f4ddfc 100755 --- a/_utils/prep_solutions.py +++ b/_utils/prep_solutions.py @@ -5,7 +5,7 @@ SOLUTIONS_FILENAME = "_control/solutions.csv" RUN_CONF_FILENAME = "run.conf" -SKIPPED_SOLUTIONS = ["clickhouse", "dask"] +SKIPPED_SOLUTIONS = ["clickhouse"] def print_usage(): diff --git a/dask/groupby-dask2.py b/dask/groupby-dask2.py index 41c0f231..4c90e314 100755 --- a/dask/groupby-dask2.py +++ b/dask/groupby-dask2.py @@ -23,8 +23,6 @@ from dask import distributed # we use process-pool instead of thread-pool due to GIL cost client = distributed.Client(processes=True, silence_logs=logging.ERROR) -# since we are running on local cluster of processes, we would prefer to keep the communication between workers to relative minimum, thus it's better to trade some tasks granularity for better processing locality -dk.config.set({"optimization.fuse.ave-width": 20}) data_name = os.environ['SRC_DATANAME'] on_disk = False #data_name.split("_")[1] == "1e9" # on-disk data storage #126 @@ -38,8 +36,12 @@ exit(0) # not yet implemented #171, currently groupby's dropna=False argument is ignored print("using disk memory-mapped data storage" if on_disk else "using in-memory data storage", flush=True) -#x = dd.read_parquet(src_grp, engine="fastparquet") if on_disk else -x = dd.read_csv(src_grp, dtype={"id1":"category","id2":"category","id3":"category","id4":"Int32","id5":"Int32","id6":"Int32","v1":"Int32","v2":"Int32","v3":"float64"}) +#x = dd.read_parquet(src_grp, engine="pyarrow") if on_disk else +x = dd.read_csv( + src_grp, + dtype={"id1":"category","id2":"category","id3":"category","id4":"Int32","id5":"Int32","id6":"Int32","v1":"Int32","v2":"Int32","v3":"float64"}, + engine="pyarrow" +) x = x.persist() @@ -189,147 +191,166 @@ print(ans.tail(3), flush=True) del ans -#question = "median v3 sd v3 by id4 id5" # q6 # median function not yet implemented: https://github.com/dask/dask/issues/4362 -#gc.collect() -#t_start = timeit.default_timer() -#ans = x.groupby(['id4','id5'], dropna=False, observed=True).agg({'v3': ['median','std']}).compute() -#ans.reset_index(inplace=True) -#print(ans.shape, flush=True) -#t = timeit.default_timer() - t_start -#m = memory_usage() -#t_start = timeit.default_timer() -#chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] -#chkt = timeit.default_timer() - t_start -#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -#del ans -#gc.collect() -#t_start = timeit.default_timer() -#ans = x.groupby(['id4','id5'], dropna=False, observed=True).agg({'v3': ['median','std']}).compute() -#ans.reset_index(inplace=True) -#print(ans.shape, flush=True) -#t = timeit.default_timer() - t_start -#m = memory_usage() -#t_start = timeit.default_timer() -#chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] -#chkt = timeit.default_timer() - t_start -#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -#print(ans.head(3), flush=True) -#print(ans.tail(3), flush=True) -#del ans +question = "median v3 sd v3 by id4 id5" # q6 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id4','id5'], dropna=False, observed=True).agg({'v3': ['median','std']}, shuffle='p2p').compute() +ans.reset_index(inplace=True) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby(['id4','id5'], dropna=False, observed=True).agg({'v3': ['median','std']}, shuffle='p2p').compute() +ans.reset_index(inplace=True) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans -# question = "max v1 - min v2 by id3" # q7 -# gc.collect() -# t_start = timeit.default_timer() -# ans = x.groupby('id3', dropna=False, observed=True).agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']].compute() -# ans.reset_index(inplace=True) -# print(ans.shape, flush=True) -# t = timeit.default_timer() - t_start -# m = memory_usage() -# t_start = timeit.default_timer() -# chk = [ans['range_v1_v2'].sum()] -# chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# del ans -# gc.collect() -# t_start = timeit.default_timer() -# ans = x.groupby('id3', dropna=False, observed=True).agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']].compute() -# ans.reset_index(inplace=True) -# print(ans.shape, flush=True) -# t = timeit.default_timer() - t_start -# m = memory_usage() -# t_start = timeit.default_timer() -# chk = [ans['range_v1_v2'].sum()] -# chkt = timeit.default_timer() - t_start -# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -# print(ans.head(3), flush=True) -# print(ans.tail(3), flush=True) -# del ans +question = "max v1 - min v2 by id3" # q7 +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id3', dropna=False, observed=True).agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']].compute() +ans.reset_index(inplace=True) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['range_v1_v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.groupby('id3', dropna=False, observed=True).agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']].compute() +ans.reset_index(inplace=True) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['range_v1_v2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans -#question = "largest two v3 by id6" # q8 -#gc.collect() -#t_start = timeit.default_timer() -#ans = x[~x['v3'].isna()][['id6','v3']].groupby('id6', dropna=False, observed=True).apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']].compute() -#ans.reset_index(level='id6', inplace=True) -#ans.reset_index(drop=True, inplace=True) # drop because nlargest creates some extra new index field -#print(ans.shape, flush=True) -#t = timeit.default_timer() - t_start -#m = memory_usage() -#t_start = timeit.default_timer() -#chk = [ans['v3'].sum()] -#chkt = timeit.default_timer() - t_start -#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -#del ans -#gc.collect() -#t_start = timeit.default_timer() -#ans = x[~x['v3'].isna()][['id6','v3']].groupby('id6', dropna=False, observed=True).apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']].compute() -#ans.reset_index(level='id6', inplace=True) -#ans.reset_index(drop=True, inplace=True) -#print(ans.shape, flush=True) -#t = timeit.default_timer() - t_start -#m = memory_usage() -#t_start = timeit.default_timer() -#chk = [ans['v3'].sum()] -#chkt = timeit.default_timer() - t_start -#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -#print(ans.head(3), flush=True) -#print(ans.tail(3), flush=True) -#del ans +question = "largest two v3 by id6" # q8 +gc.collect() +t_start = timeit.default_timer() +ans = x[~x['v3'].isna()][['id6','v3']].groupby('id6', dropna=False, observed=True).apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']].compute() +ans.reset_index(level='id6', inplace=True) +ans.reset_index(drop=True, inplace=True) # drop because nlargest creates some extra new index field +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v3'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x[~x['v3'].isna()][['id6','v3']].groupby('id6', dropna=False, observed=True).apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']].compute() +ans.reset_index(level='id6', inplace=True) +ans.reset_index(drop=True, inplace=True) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['v3'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans -#question = "regression v1 v2 by id2 id4" # q9 -#gc.collect() -#t_start = timeit.default_timer() -#ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False, observed=True).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute() -#ans.reset_index(inplace=True) -#print(ans.shape, flush=True) -#t = timeit.default_timer() - t_start -#m = memory_usage() -#t_start = timeit.default_timer() -#chk = [ans['r2'].sum()] -#chkt = timeit.default_timer() - t_start -#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -#del ans -#gc.collect() -#t_start = timeit.default_timer() -#ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False, observed=True).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute() -#ans.reset_index(inplace=True) -#print(ans.shape, flush=True) -#t = timeit.default_timer() - t_start -#m = memory_usage() -#t_start = timeit.default_timer() -#chk = [ans['r2'].sum()] -#chkt = timeit.default_timer() - t_start -#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -#print(ans.head(3), flush=True) -#print(ans.tail(3), flush=True) -#del ans +question = "regression v1 v2 by id2 id4" # q9 +gc.collect() +t_start = timeit.default_timer() +ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False, observed=True).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute() +ans.reset_index(inplace=True) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['r2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False, observed=True).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute() +ans.reset_index(inplace=True) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans['r2'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans -#question = "sum v3 count by id1:id6" # q10 -#gc.collect() -#t_start = timeit.default_timer() -#ans = x.groupby(['id1','id2','id3','id4','id5','id6'], dropna=False, observed=True).agg({'v3':'sum', 'v1':'size'}).compute() # column name different than expected, ignore it because: ValueError: Metadata inference failed in `rename`: Original error is below: ValueError('Level values must be unique: [nan, nan] on level 0',) -#ans.reset_index(inplace=True) -#print(ans.shape, flush=True) -#t = timeit.default_timer() - t_start -#m = memory_usage() -#t_start = timeit.default_timer() -#chk = [ans.v3.sum(), ans.v1.sum()] -#chkt = timeit.default_timer() - t_start -#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -#del ans -#gc.collect() -#t_start = timeit.default_timer() -#ans = x.groupby(['id1','id2','id3','id4','id5','id6'], dropna=False, observed=True).agg({'v3':'sum', 'v1':'size'}).compute() -#ans.reset_index(inplace=True) -#print(ans.shape, flush=True) -#t = timeit.default_timer() - t_start -#m = memory_usage() -#t_start = timeit.default_timer() -#chk = [ans.v3.sum(), ans.v1.sum()] -#chkt = timeit.default_timer() - t_start -#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) -#print(ans.head(3), flush=True) -#print(ans.tail(3), flush=True) -#del ans +question = "sum v3 count by id1:id6" # q10 +print(question) +gc.collect() +t_start = timeit.default_timer() +ans = ( + x.groupby( + ['id1', 'id2', 'id3', 'id4', 'id5', 'id6'], + dropna=False, + observed=True, + ) + .agg({'v3': 'sum', 'v1': 'size'}, split_out=x.npartitions) + .rename(columns={"v1": "count"}) + .compute() +) +ans.reset_index(inplace=True) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v3.sum(), ans["count"].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = ( + x.groupby( + ['id1', 'id2', 'id3', 'id4', 'id5', 'id6'], + dropna=False, + observed=True, + ) + .agg({'v3': 'sum', 'v1': 'size'}, split_out=x.npartitions) + .rename(columns={"v1": "count"}) + .compute() +) +ans.reset_index(inplace=True) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.v3.sum(), ans["count"].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans print("grouping finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True) diff --git a/dask/setup-dask.sh b/dask/setup-dask.sh index f22e3148..c6fac985 100755 --- a/dask/setup-dask.sh +++ b/dask/setup-dask.sh @@ -1,13 +1,11 @@ #!/bin/bash set -e -virtualenv dask/py-dask --python=python3 +virtualenv dask/py-dask --python=python3.10 source dask/py-dask/bin/activate # install binaries python3 -m pip install "dask[complete]" -python3 -m pip install pandas psutil -python3 -m pip install distributed # check # python3