import intake

data_catalog = intake.open_esm_datastore("../pump-catalog.json")
data_catalog

../pump-catalog catalog with 32 dataset(s) from 32 asset(s):

unique
casename 10
stream 4
path 32
baseline 2
levels 2
frequency 2
variables 10
derived_variables 0

Display interactive catalog#

import ipyaggrid


def make_short_name(casename_split):
    trimmed = casename_split[4:]
    if trimmed[-1] == "mixpods":
        trimmed = trimmed[:-1]
    return ".".join(trimmed)


df = data_catalog.df
df = df.assign(shortname=df.casename.str.split(".").map(make_short_name))

column_defs = [
    {
        "headerName": "shortname",
        "field": "shortname",
        "rowGroup": False,
        "pinned": True,
    },
    {"headerName": "stream", "field": "stream", "rowGroup": False},
    {"headerName": "baseline", "field": "baseline"},
    {"headerName": "frequency", "field": "frequency", "rowGroup": False},
    {"headerName": "levels", "field": "levels", "rowGroup": False},
    {"headerName": "variables", "field": "variables", "autoHeight": True},
    {"headerName": "casename", "field": "casename", "rowGroup": False},
    {"headerName": "path", "field": "path", "rowGroup": False},
]

grid_options = {
    "columnDefs": column_defs,
    "defaultColDef": {
        "resizable": True,
        "editable": False,
        "filter": True,
        "sortable": True,
    },
    "colResizeDefault": True,
    "rowSelection": "multiple",
    "statusBar": {  # new syntax since 19.0
        "statusPanels": [
            {"statusPanel": "agTotalRowCountComponent", "align": "left"},
            {"statusPanel": "agFilteredRowCountComponent"},
            {"statusPanel": "agSelectedRowCountComponent"},
            {"statusPanel": "agAggregationComponent"},
        ]
    },
    # "enableRangeHandle": True,
}

g = ipyaggrid.Grid(
    grid_data=df,
    grid_options=grid_options,
    quick_filter=True,
    export_csv=False,
    export_excel=False,
    export_mode="buttons",
    export_to_df=True,
    theme="ag-theme-balham",
    # show_toggle_edit=False,
    # show_toggle_delete=False,
    columns_fit="auto",
    # index=False,
    # keep_multiindex=False,
)
g

Subselect by filtering, then select Rows you want, click “Export Rows”. This populates g.grid_data_out["rows"]

selection = g.grid_data_out["rows"]
selection
shortname stream baseline frequency levels variables casename path
Index
2 kpp.lmd.004 combined old None 150 ['SSH', 'SSU', 'SSV', 'mlotst', 'oml', 'sos', ... gmom.e23.GJRAv3.TL319_t061_zstar_N150.kpp.lmd.... /glade/campaign/cgd/oce/projects/pump/cesm/gmo...
19 baseline.kpp.lmd.004 combined old None 65 ['KPP_NLT_temp_budget', 'SSH', 'SSU', 'SSV', '... gmom.e23.GJRAv3.TL319_t061_zstar_N65.baseline.... /glade/campaign/cgd/oce/projects/pump/cesm/gmo...
26 new_baseline.kpp.lmd.004 combined new None 65 ['N2_int', 'Rd_dx', 'SSH', 'SSU', 'SSV', 'ages... gmom.e23.GJRAv3.TL319_t061_zstar_N65.new_basel... /glade/campaign/cgd/oce/projects/pump/cesm/gmo...
subcat = data_catalog.search(casename=selection["casename"], stream=selection["stream"])
subcat.to_dataset_dict()
--> The keys in the returned dictionary of datasets are constructed as follows:
	'casename.stream'
0.00% [0/3 00:00<?]
---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
File ~/mambaforge/envs/pump/lib/python3.10/site-packages/intake_esm/source.py:240, in ESMDataSource._open_dataset(self)
    220 datasets = [
    221     _open_dataset(
    222         record[self.path_column_name],
   (...)
    237     for _, record in self.df.iterrows()
    238 ]
--> 240 datasets = dask.compute(*datasets)
    241 if len(datasets) == 1:

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    597     postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
    600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
     87         pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
     90     pool.submit,
     91     pool._max_workers,
     92     dsk,
     93     keys,
     94     cache=cache,
     95     get_id=_thread_get_id,
     96     pack_exception=pack_exception,
     97     **kwargs,
     98 )
    100 # Cleanup pools associated to dead threads

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    510     else:
--> 511         raise_exception(exc, tb)
    512 res, worker_id = loads(res_info)

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/dask/local.py:319, in reraise(exc, tb)
    318     raise exc.with_traceback(tb)
--> 319 raise exc

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
    225 id = get_id()

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/dask/utils.py:72, in apply(func, args, kwargs)
     71 if kwargs:
---> 72     return func(*args, **kwargs)
     73 else:

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/intake_esm/source.py:73, in _open_dataset(urlpath, varname, xarray_open_kwargs, preprocess, requested_variables, additional_attrs, expand_dims, data_format)
     72 else:
---> 73     ds = xr.open_dataset(url, **xarray_open_kwargs)
     74     if preprocess is not None:

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/xarray/backends/api.py:541, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, backend_kwargs, **kwargs)
    540 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 541 backend_ds = backend.open_dataset(
    542     filename_or_obj,
    543     drop_variables=drop_variables,
    544     **decoders,
    545     **kwargs,
    546 )
    547 ds = _dataset_from_backend_dataset(
    548     backend_ds,
    549     filename_or_obj,
   (...)
    557     **kwargs,
    558 )

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/xarray/backends/zarr.py:887, in ZarrBackendEntrypoint.open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, stacklevel, zarr_version)
    886 filename_or_obj = _normalize_path(filename_or_obj)
--> 887 store = ZarrStore.open_group(
    888     filename_or_obj,
    889     group=group,
    890     mode=mode,
    891     synchronizer=synchronizer,
    892     consolidated=consolidated,
    893     consolidate_on_close=False,
    894     chunk_store=chunk_store,
    895     storage_options=storage_options,
    896     stacklevel=stacklevel + 1,
    897     zarr_version=zarr_version,
    898 )
    900 store_entrypoint = StoreBackendEntrypoint()

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/xarray/backends/zarr.py:425, in ZarrStore.open_group(cls, store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, append_dim, write_region, safe_chunks, stacklevel, zarr_version)
    424 else:
--> 425     zarr_group = zarr.open_group(store, **open_kwargs)
    426 return cls(
    427     zarr_group,
    428     mode,
   (...)
    432     safe_chunks,
    433 )

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/zarr/hierarchy.py:1347, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, meta_array)
   1346 # handle polymorphic store arg
-> 1347 store = _normalize_store_arg(
   1348     store, storage_options=storage_options, mode=mode,
   1349     zarr_version=zarr_version)
   1350 if zarr_version is None:

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/zarr/hierarchy.py:1220, in _normalize_store_arg(store, storage_options, mode, zarr_version)
   1219     return MemoryStore() if zarr_version == 2 else MemoryStoreV3()
-> 1220 return normalize_store_arg(store,
   1221                            storage_options=storage_options, mode=mode,
   1222                            zarr_version=zarr_version)

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/zarr/storage.py:171, in normalize_store_arg(store, storage_options, mode, zarr_version)
    170     normalize_store = _normalize_store_arg_v3
--> 171 return normalize_store(store, storage_options, mode)

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/zarr/storage.py:144, in _normalize_store_arg_v2(store, storage_options, mode)
    143 if "://" in store or "::" in store:
--> 144     return FSStore(store, mode=mode, **(storage_options or {}))
    145 elif storage_options:

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/zarr/storage.py:1328, in FSStore.__init__(self, url, normalize_keys, key_separator, mode, exceptions, dimension_separator, fs, check, create, missing_exceptions, **storage_options)
   1327     storage_options["auto_mkdir"] = True
-> 1328 self.map = fsspec.get_mapper(url, **{**mapper_options, **storage_options})
   1329 self.fs = self.map.fs  # for direct operations

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/fsspec/mapping.py:237, in get_mapper(url, check, create, missing_exceptions, alternate_root, **kwargs)
    236 # Removing protocol here - could defer to each open() on the backend
--> 237 fs, urlpath = url_to_fs(url, **kwargs)
    238 root = alternate_root if alternate_root is not None else urlpath

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/fsspec/core.py:365, in url_to_fs(url, **kwargs)
    364 urlpath, protocol, _ = chain[0]
--> 365 fs = filesystem(protocol, **inkwargs)
    366 return fs, urlpath

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/fsspec/registry.py:252, in filesystem(protocol, **storage_options)
    251 cls = get_filesystem_class(protocol)
--> 252 return cls(**storage_options)

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/fsspec/spec.py:76, in _Cached.__call__(cls, *args, **kwargs)
     75 else:
---> 76     obj = super().__call__(*args, **kwargs)
     77     # Setting _fs_token here causes some static linters to complain.

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/fsspec/implementations/reference.py:164, in ReferenceFileSystem.__init__(self, fo, target, ref_storage_args, target_protocol, target_options, remote_protocol, remote_options, fs, template_overrides, simple_templates, max_gap, max_block, **kwargs)
    163 # text JSON
--> 164 with open(fo, "rb", **dic) as f:
    165     logger.info("Read reference from URL %s", fo)

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/fsspec/core.py:102, in OpenFile.__enter__(self)
    100 mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 102 f = self.fs.open(self.path, mode=mode)
    104 self.fobjects = [f]

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/fsspec/spec.py:1135, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
   1134 ac = kwargs.pop("autocommit", not self._intrans)
-> 1135 f = self._open(
   1136     path,
   1137     mode=mode,
   1138     block_size=block_size,
   1139     autocommit=ac,
   1140     cache_options=cache_options,
   1141     **kwargs,
   1142 )
   1143 if compression is not None:

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/fsspec/implementations/local.py:183, in LocalFileSystem._open(self, path, mode, block_size, **kwargs)
    182     self.makedirs(self._parent(path), exist_ok=True)
--> 183 return LocalFileOpener(path, mode, fs=self, **kwargs)

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/fsspec/implementations/local.py:285, in LocalFileOpener.__init__(self, path, mode, autocommit, fs, compression, **kwargs)
    284 self.blocksize = io.DEFAULT_BUFFER_SIZE
--> 285 self._open()

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/fsspec/implementations/local.py:290, in LocalFileOpener._open(self)
    289 if self.autocommit or "w" not in self.mode:
--> 290     self.f = open(self.path, mode=self.mode)
    291     if self.compression:

FileNotFoundError: [Errno 2] No such file or directory: '/glade/campaign/cgd/oce/projects/pump/cesm/gmom.e23.GJRAv3.TL319_t061_zstar_N65.baseline.kpp.lmd.004.mixpods/run/jsons/combined.json'

The above exception was the direct cause of the following exception:

ESMDataSourceError                        Traceback (most recent call last)
Cell In[7], line 2
      1 subcat = data_catalog.search(casename=selection["casename"], stream=selection["stream"])
----> 2 subcat.to_dataset_dict()

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/pydantic/decorator.py:40, in pydantic.decorator.validate_arguments.validate.wrapper_function()

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/pydantic/decorator.py:134, in pydantic.decorator.ValidatedFunction.call()

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/pydantic/decorator.py:206, in pydantic.decorator.ValidatedFunction.execute()

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/intake_esm/core.py:651, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
    649         except Exception as exc:
    650             if not skip_on_error:
--> 651                 raise exc
    652 self.datasets = self._create_derived_variables(datasets, skip_on_error)
    653 return self.datasets

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/intake_esm/core.py:647, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs)
    645 for task in gen:
    646     try:
--> 647         key, ds = task.result()
    648         datasets[key] = ds
    649     except Exception as exc:

File ~/mambaforge/envs/pump/lib/python3.10/concurrent/futures/_base.py:439, in Future.result(self, timeout)
    437     raise CancelledError()
    438 elif self._state == FINISHED:
--> 439     return self.__get_result()
    441 self._condition.wait(timeout)
    443 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/mambaforge/envs/pump/lib/python3.10/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/mambaforge/envs/pump/lib/python3.10/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/intake_esm/core.py:789, in _load_source(key, source)
    788 def _load_source(key, source):
--> 789     return key, source.to_dask()

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/intake_esm/source.py:273, in ESMDataSource.to_dask(self)
    271 def to_dask(self):
    272     """Return xarray object (which will have chunks)"""
--> 273     self._load_metadata()
    274     return self._ds

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/intake/source/base.py:285, in DataSourceBase._load_metadata(self)
    283 """load metadata only if needed"""
    284 if self._schema is None:
--> 285     self._schema = self._get_schema()
    286     self.dtype = self._schema.dtype
    287     self.shape = self._schema.shape

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/intake_esm/source.py:205, in ESMDataSource._get_schema(self)
    202 def _get_schema(self) -> Schema:
    204     if self._ds is None:
--> 205         self._open_dataset()
    206         metadata = {'dims': {}, 'data_vars': {}, 'coords': ()}
    207         self._schema = Schema(
    208             datashape=None,
    209             dtype=None,
   (...)
    212             extra_metadata=metadata,
    213         )

File ~/mambaforge/envs/pump/lib/python3.10/site-packages/intake_esm/source.py:265, in ESMDataSource._open_dataset(self)
    262     self._ds.attrs[OPTIONS['dataset_key']] = self.key
    264 except Exception as exc:
--> 265     raise ESMDataSourceError(
    266         f"""Failed to load dataset with key='{self.key}'
    267          You can use `cat['{self.key}'].df` to inspect the assets/files for this key.
    268          """
    269     ) from exc

ESMDataSourceError: Failed to load dataset with key='gmom.e23.GJRAv3.TL319_t061_zstar_N65.baseline.kpp.lmd.004.mixpods.combined'
                 You can use `cat['gmom.e23.GJRAv3.TL319_t061_zstar_N65.baseline.kpp.lmd.004.mixpods.combined'].df` to inspect the assets/files for this key.