diff --git a/.github/workflows/test_s3_minio.yml b/.github/workflows/test_s3_minio.yml index 76636c16..c1f4a7df 100644 --- a/.github/workflows/test_s3_minio.yml +++ b/.github/workflows/test_s3_minio.yml @@ -71,6 +71,8 @@ jobs: which python pip install -e . - name: Run tests + # run tests on one process to catch eventual SegFaults + # no SegFaults happen with n procs > 1 run: | pytest - name: Run S3 exploratory tests diff --git a/.github/workflows/test_s3_remote_reductionist.yml b/.github/workflows/test_s3_remote_reductionist.yml index 03908a9c..e4f55ed0 100644 --- a/.github/workflows/test_s3_remote_reductionist.yml +++ b/.github/workflows/test_s3_remote_reductionist.yml @@ -56,7 +56,10 @@ jobs: python -V which python pip install -e . - - name: Run one single test + - name: Run gold test + run: | + \time pytest tests/test_compression_remote_reductionist.py::test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v1 + - name: Run remote reductionist tests run: | pytest tests/test_compression_remote_reductionist.py - name: Upload HTML report artifact diff --git a/activestorage/active.py b/activestorage/active.py index 1cf03eef..7fd89a12 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -18,7 +18,10 @@ from activestorage.storage import reduce_chunk from activestorage import netcdf_to_zarr as nz +import time +import zarr +t1 = time.time() @contextlib.contextmanager def load_from_s3(uri, storage_options=None): """ @@ -78,7 +81,7 @@ def __init__( uri, ncvar, storage_type=None, - max_threads=100, + max_threads=150, storage_options=None, active_storage_url=None ): @@ -259,19 +262,25 @@ def _via_kerchunk(self, index): """ The objective is to use kerchunk to read the slices ourselves. """ + tx = time.time() # FIXME: Order of calls is hardcoded' if self.zds is None: print(f"Kerchunking file {self.uri} with variable " f"{self.ncvar} for storage type {self.storage_type}") + tx1 = time.time() ds, zarray, zattrs = nz.load_netcdf_zarr_generic( self.uri, self.ncvar, self.storage_type, self.storage_options, ) + ty1 = time.time() + print("Time to load to netCDF from Zarr", ty1 - tx1) # The following is a hangove from exploration # and is needed if using the original doing it ourselves # self.zds = make_an_array_instance_active(ds) + if isinstance(ds, zarr.hierarchy.Group): + ds = ds[self.ncvar] self.zds = ds # Retain attributes and other information @@ -284,7 +293,9 @@ def _via_kerchunk(self, index): # FIXME: We do not get the correct byte order on the Zarr # Array's dtype when using S3, so capture it here. self._dtype = np.dtype(zarray['dtype']) - + + ty = time.time() + print("Time of _via_kerchunk()", ty - tx) return self._get_selection(index) def _get_selection(self, *args): @@ -458,13 +469,17 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, Note the need to use counts for some methods """ + t1c = time.time() coord = '.'.join([str(c) for c in chunk_coords]) key = f"{self.ncvar}/{coord}" - rfile, offset, size = tuple(fsref[key]) + try: + rfile, offset, size = tuple(fsref[self.ncvar + " /" + key]) + except KeyError: + rfile, offset, size = tuple(fsref[key]) # S3: pass in pre-configured storage options (credentials) if self.storage_type == "s3": - print("S3 rfile is:", rfile) + # print("S3 rfile is:", rfile) parsed_url = urllib.parse.urlparse(rfile) bucket = parsed_url.netloc object = parsed_url.path @@ -476,8 +491,8 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, if bucket == "": bucket = os.path.dirname(object) object = os.path.basename(object) - print("S3 bucket:", bucket) - print("S3 file:", object) + # print("S3 bucket:", bucket) + # print("S3 file:", object) if self.storage_options is None: tmp, count = reductionist.reduce_chunk(session, S3_ACTIVE_STORAGE_URL, @@ -491,9 +506,9 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, operation=self._method) else: # special case for "anon=True" buckets that work only with e.g. - # fs = s3fs.S3FileSystem(anon=True, client_kwargs={'endpoint_url': S3_URL}) + # "fs = s3fs.S3FileSystem(anon=True, client_kwargs={'endpoint_url': S3_URL})" # where file uri = bucketX/fileY.mc - print("S3 Storage options to Reductionist:", self.storage_options) + # print("S3 Storage options:", self.storage_options) if self.storage_options.get("anon", None) == True: bucket = os.path.dirname(parsed_url.path) # bucketX object = os.path.basename(parsed_url.path) # fileY @@ -517,7 +532,8 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, missing, self.zds._dtype, self.zds._chunks, self.zds._order, chunk_selection, method=self.method) - + t2c = time.time() + print("Chunk processing time via _process_chunk()", t2c - t1c) if self.method is not None: return tmp, count else: diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 7bdc3d4d..389966b0 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -9,6 +9,64 @@ from activestorage.config import * from kerchunk.hdf import SingleHdf5ToZarr +import time +import h5py + + +def _correct_compressor_and_filename(content, varname, bryan_bucket=False): + """ + Correct the compressor type as it comes out of Kerchunk. + Also correct file name as Kerchnk now prefixes it with "s3://" + and for special buckets like Bryan's bnl the correct file is bnl/file.nc + not s3://bnl/file.nc + """ + new_content = content.copy() + + # prelimniary assembly + try: + new_zarray = ujson.loads(new_content['refs'][f"{varname}/.zarray"]) + group = False + except KeyError: + new_zarray = ujson.loads(new_content['refs'][f"{varname} /{varname}/.zarray"]) + group = True + + # re-add the correct compressor if it's in the "filters" list + if new_zarray["compressor"] is None and new_zarray["filters"]: + for zfilter in new_zarray["filters"]: + if zfilter["id"] == "zlib": + new_zarray["compressor"] = zfilter + new_zarray["filters"].remove(zfilter) + + if not group: + new_content['refs'][f"{varname}/.zarray"] = ujson.dumps(new_zarray) + else: + new_content['refs'][f"{varname} /{varname}/.zarray"] = ujson.dumps(new_zarray) + + # FIXME TODO this is an absolute nightmate: the type of bucket on UOR ACES + # this is a HACK and it works only with the crazy Bryan S3 bucket "bnl/file.nc" + # the problem: filename gets written to JSON as "s3://bnl/file.nc" but Reductionist doesn't + # find it since it needs url=bnl/file.nc, with endpoint URL being extracted from the + # endpoint_url of storage_options. BAH! + if bryan_bucket: + for key in new_content['refs'].keys(): + if varname in key and isinstance(new_content['refs'][key], list) and "s3://" in new_content['refs'][key][0]: + new_content['refs'][key][0] = new_content['refs'][key][0].replace("s3://", "") + + return new_content + + +def _return_zcomponents(content, varname): + """Return zarr array and attributes.""" + # account for both Group and Dataset + try: + zarray = ujson.loads(content['refs'][f"{varname}/.zarray"]) + zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"]) + except KeyError: + zarray = ujson.loads(content['refs'][f"{varname} /{varname}/.zarray"]) + zattrs = ujson.loads(content['refs'][f"{varname} /{varname}/.zattrs"]) + + return zarray, zattrs + def _correct_compressor_and_filename(content, varname, bryan_bucket=False): """ @@ -64,8 +122,26 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): ) fs2 = fsspec.filesystem('') with fs.open(file_url, 'rb') as s3file: + # this block allows for Dataset/Group selection but is causing + # SegFaults in S3/Minio tests; h5py backend very brittle: see below for reasoning behind this + # since this case is only for the S3/Minio tests, it's OK to not have it, test files are small + # with fs.open(file_url, 'rb') as s3file_o_1: # -> best to have unique name + # s3file_r_1 = h5py.File(s3file_o_1, mode="r") + # s3file_w_1 = h5py.File(s3file_o_1, mode="w") + # if isinstance(s3file_r_1[varname], h5py.Dataset): + # print("Looking only at a single Dataset", s3file_r_1[varname]) + # s3file_w_1.create_group(varname + " ") + # s3file_w_1[varname + " "][varname] = s3file_w_1[varname] + # s3file = s3file_w_1[varname + " "] + # elif isinstance(s3file_r_1[varname], h5py.Group): + # print("Looking only at a single Group", s3file_r_1[varname]) + # s3file = s3file_r_1[varname] + # storage_options = {"key": S3_ACCESS_KEY, + # "secret": S3_SECRET_KEY, + # "client_kwargs": {'endpoint_url': S3_URL}} h5chunks = SingleHdf5ToZarr(s3file, file_url, inline_threshold=0) + # storage_options=storage_options) # TODO absolute crap, this needs to go # see comments in _correct_compressor_and_filename @@ -79,6 +155,8 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): varname, bryan_bucket=bryan_bucket) f.write(ujson.dumps(content).encode()) + zarray, zattrs = _return_zcomponents(content, varname) + return outf, zarray, zattrs # S3 passed-in configuration elif storage_type == "s3" and storage_options is not None: @@ -87,7 +165,36 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): storage_options['default_cache_type'] = "first" # best for HDF5 fs = s3fs.S3FileSystem(**storage_options) fs2 = fsspec.filesystem('') - with fs.open(file_url, 'rb') as s3file: + tk1 = time.time() + with fs.open(file_url, 'rb') as s3file_o: + # restrict only to the Group/Dataset that the varname belongs to + # this saves 4-5x time in Kerchunk + # Restrict the s3file HDF5 file only to the Dataset or Group of interest. + # This bit extracts the Dataset or Group of interest + # (depending what type of object the varname is in). It is the best we can do with + # non-breaking h5py API, This is a touchy bit of said API, and depending on the + # way things are coded, can easily through SegFaults. Explanations: + # - an s3fs File object with HDF5 structure is passed in + # - h5py allows structural edits (creating/adding a Group) + # only if opening said file in WRITE mode + # - clear distinction between said File open in W mode as opposed to + # said file open in R(B) mode + # - the reason we open it in R mode is that we can only truncate it (select on key) if in R mode + # and then migrate extracted data to the file open in W mode + # - operations like copy or selection/truncating will always throw SegFaults + # if not operating with two open Files: W and R + # - this block can not be extracted into a function because we need to dealloc each instance of + # s3file_o, s3file_r and s3file_w (hence the naming is different in the step above) + s3file_r = h5py.File(s3file_o, mode="r") + s3file_w = h5py.File(s3file_o, mode="w") + if isinstance(s3file_r[varname], h5py.Dataset): + print("Looking only at a single Dataset", s3file_r[varname]) + s3file_w.create_group(varname + " ") + s3file_w[varname + " "][varname] = s3file_w[varname] + s3file = s3file_w[varname + " "] + elif isinstance(s3file_r[varname], h5py.Group): + print("Looking only at a single Group", s3file_r[varname]) + s3file = s3file_r[varname] # Kerchunk wants the correct file name in S3 format if not file_url.startswith("s3://"): @@ -100,13 +207,21 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): bryan_bucket = True h5chunks = SingleHdf5ToZarr(s3file, file_url, - inline_threshold=0) + inline_threshold=0, + storage_options=storage_options) + tk2 = time.time() with fs2.open(outf, 'wb') as f: content = h5chunks.translate() content = _correct_compressor_and_filename(content, varname, bryan_bucket=bryan_bucket) f.write(ujson.dumps(content).encode()) + tk3 = time.time() + print("Time to Kerchunk and write JSON file", tk3 - tk2) + + zarray, zattrs = _return_zcomponents(content, varname) + return outf, zarray, zattrs + # not S3 else: fs = fsspec.filesystem('') @@ -132,10 +247,8 @@ def gen_json(file_url, varname, outf, storage_type, storage_options): bryan_bucket=False) f.write(ujson.dumps(content).encode()) - zarray = ujson.loads(content['refs'][f"{varname}/.zarray"]) - zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"]) - - return outf, zarray, zattrs + zarray, zattrs = _return_zcomponents(content, varname) + return outf, zarray, zattrs def open_zarr_group(out_json, varname): @@ -150,14 +263,20 @@ def open_zarr_group(out_json, varname): mapper = fs.get_mapper("") # local FS mapper #mapper.fs.reference has the kerchunk mapping, how does this propagate into the Zarr array? zarr_group = zarr.open_group(mapper) - + + not_group = False try: - zarr_array = getattr(zarr_group, varname) - except AttributeError as attrerr: - print(f"Zarr Group does not contain variable {varname}. " - f"Zarr Group info: {zarr_group.info}") - raise attrerr - #print("Zarr array info:", zarr_array.info) + zarr_array = getattr(zarr_group, varname + " ") + except AttributeError: + not_group = True + pass + if not_group: + try: + zarr_array = getattr(zarr_group, varname) + except AttributeError: + print(f"Zarr Group does not contain variable {varname}. " + f"Zarr Group info: {zarr_group.info}") + raise return zarr_array @@ -179,17 +298,3 @@ def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, bu ref_ds = open_zarr_group(out_json.name, varname) return ref_ds, zarray, zattrs - - -#d = {'version': 1, -# 'refs': { -# '.zgroup': '{"zarr_format":2}', -# '.zattrs': '{"Conventions":"CF-1.6","access-list":"grenvillelister simonwilson jeffcole","awarning":"**** THIS SUITE WILL ARCHIVE NON-DUPLEXED DATA TO MOOSE. FOR CRITICAL MODEL RUNS SWITCH TO DUPLEXED IN: postproc --> Post Processing - common settings --> Moose Archiving --> non_duplexed_set. Follow guidance in http:\\/\\/www-twiki\\/Main\\/MassNonDuplexPolicy","branch-date":"1950-01-01","calendar":"360_day","code-version":"UM 11.6, NEMO vn3.6","creation_time":"2022-10-28 12:28","decription":"Initialised from EN4 climatology","description":"Copy of u-ar696\\/trunk@77470","email":"r.k.schieman@reading.ac.uk","end-date":"2015-01-01","experiment-id":"historical","forcing":"AA,BC,CO2","forcing-info":"blah, blah, blah","institution":"NCAS","macro-parent-experiment-id":"historical","macro-parent-experiment-mip":"CMIP","macro-parent-variant-id":"r1i1p1f3","model-id":"HadGEM3-CG31-MM","name":"\\/work\\/n02\\/n02\\/grenvill\\/cylc-run\\/u-cn134\\/share\\/cycle\\/19500101T0000Z\\/3h_","owner":"rosalynhatcher","project":"Coupled Climate","timeStamp":"2022-Oct-28 12:20:33 GMT","title":"[CANARI] GC3.1 N216 ORCA025 UM11.6","uuid":"51e5ef20-d376-4aa6-938e-4c242886b7b1"}', -# 'lat/.zarray': '{"chunks":[324],"compressor":{"id":"zlib","level":1},"dtype":"=0.2.4', + 'kerchunk>=0.2.4', # issues with numcodecs in 0.2.2/3 'netcdf4', 'numcodecs>=0.12', # github/issues/162 'numpy!=1.24.3', # severe masking bug diff --git a/tests/s3_exploratory/test_s3_arrange_files.py b/tests/s3_exploratory/test_s3_arrange_files.py index 18b1015e..e0358c98 100644 --- a/tests/s3_exploratory/test_s3_arrange_files.py +++ b/tests/s3_exploratory/test_s3_arrange_files.py @@ -6,14 +6,9 @@ import s3fs import tempfile -from activestorage.active import Active from activestorage.dummy_data import make_vanilla_ncdata -import activestorage.storage as st -from activestorage.reductionist import reduce_chunk as reductionist_reduce_chunk -from activestorage.netcdf_to_zarr import gen_json from kerchunk.hdf import SingleHdf5ToZarr -from numpy.testing import assert_allclose, assert_array_equal from pathlib import Path from config_minio import * diff --git a/tests/test_compression_remote_reductionist.py b/tests/test_compression_remote_reductionist.py index 4beb957b..9418369e 100644 --- a/tests/test_compression_remote_reductionist.py +++ b/tests/test_compression_remote_reductionist.py @@ -2,6 +2,8 @@ import numpy as np import pytest +import utils + from netCDF4 import Dataset from pathlib import Path @@ -10,8 +12,6 @@ from activestorage.dummy_data import make_compressed_ncdata from activestorage.reductionist import ReductionistError as RedErr -import utils - # Bryan's S3 machine + Bryan's reductionist STORAGE_OPTIONS_Bryan = { @@ -191,3 +191,95 @@ def test_compression_and_filters_cmip6_forced_s3_using_local_Reductionist(): result = active[0:2,4:6,7:9] assert nc_min == result assert result == 239.25946044921875 + + +# if pytest is run on a single thread, this test throws a PytestUnraisableException +# followed at the end by a SegFault (test passes fine, and writes report etc); when +# pytest runs in n>=2 threads all is fine. This is defo due to something in Kerchunk +# tests/test_compression_remote_reductionist.py::test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v1 +# /home/valeriu/miniconda3/envs/pyactive/lib/python3.12/site-packages/_pytest/unraisableexception.py:80: PytestUnraisableExceptionWarning: Exception ignored in: 'h5py._objects.ObjectID.__dealloc__' +# +# Traceback (most recent call last): +# File "h5py/_objects.pyx", line 201, in h5py._objects.ObjectID.__dealloc__ +# File "h5py/h5fd.pyx", line 180, in h5py.h5fd.H5FD_fileobj_truncate +# AttributeError: 'S3File' object has no attribute 'truncate' +# For no I am just shutting this up, later we may have to take it up with Kerchunk +@pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning") +def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v1(): + """ + Test identical to previous + test_compression_and_filters_cmip6_forced_s3_from_local_2 + but using a bigger file, for more relevant performance measures. + + This is for a special anon=True bucket connected to via valid key.secret + Variable standard_name: tendency_of_eastward_wind_due_to_orographic_gravity_wave_drag + Variable var_name: m01s06i247_4 + dims: 30 (time) 39 (plev) 325 (lat) 432 (lon) + + Entire mother file info: + mc ls bryan/bnl/da193a_25_day__198808-198808.nc + [2024-01-24 10:07:03 GMT] 2.8GiB STANDARD da193a_25_day__198808-198808.nc + + + NOTE: we used this test as timing reference for performance testing. + """ + storage_options = { + 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", + 'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT", + 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"} + } + active_storage_url = "https://192.171.169.248:8080" + bigger_file = "da193a_25_day__198808-198808.nc" + + test_file_uri = os.path.join( + S3_BUCKET, + bigger_file + ) + print("S3 Test file path:", test_file_uri) + active = Active(test_file_uri, 'm01s06i247_4', storage_type="s3", + storage_options=storage_options, + active_storage_url=active_storage_url) + + active._version = 1 + active._method = "min" + + result = active[0:2, 0:3, 4:6, 7:9] + + +def test_compression_and_filters_cmip6_forced_s3_from_local_bigger_file_v0(): + """ + Test identical to previous + test_compression_and_filters_cmip6_forced_s3_from_local_2 + but using a bigger file, for more relevant performance measures. + + This is for a special anon=True bucket connected to via valid key.secret + Variable standard_name: tendency_of_eastward_wind_due_to_orographic_gravity_wave_drag + Variable var_name: m01s06i247_4 + dims: 30 (time) 39 (plev) 325 (lat) 432 (lon) + + Entire mother file info: + mc ls bryan/bnl/da193a_25_day__198808-198808.nc + [2024-01-24 10:07:03 GMT] 2.8GiB STANDARD da193a_25_day__198808-198808.nc + + """ + storage_options = { + 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", + 'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT", + 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"} + } + active_storage_url = "https://192.171.169.248:8080" + bigger_file = "da193a_25_day__198808-198808.nc" + + test_file_uri = os.path.join( + S3_BUCKET, + bigger_file + ) + print("S3 Test file path:", test_file_uri) + active = Active(test_file_uri, 'm01s06i247_4', storage_type="s3", + storage_options=storage_options, + active_storage_url=active_storage_url) + + active._version = 0 + d = active[0:2, 0:3, 4:6, 7:9] + min_result = np.min(d) + print(min_result)