diff --git a/.gitignore b/.gitignore index 565b35d..f83d6ae 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -.local \ No newline at end of file +.local +**/__pycache__/** diff --git a/ceda_datapoint/core/client.py b/ceda_datapoint/core/client.py index 022cb6e..e1f4007 100644 --- a/ceda_datapoint/core/client.py +++ b/ceda_datapoint/core/client.py @@ -107,7 +107,7 @@ def info(self) -> None: for term, searched in self._search_terms.items(): print(f' - {term}: {searched}') - def open_cluster( + def collect_cloud_assets( self, mode='xarray', combine=False, @@ -250,29 +250,34 @@ def __getitem__(self, collection): """ return DataPointSearch(self.search(collections=[collection])) - def list_query_terms(self, collection=None): + def list_query_terms(self, collection=None) -> dict | None: """ List the possible query terms for all or a particular collection. """ - def search_terms(search, coll): + def search_terms(search, coll, display : bool = False): item = search[0] if item is not None: - print(f'{coll}: {list(item.attributes.keys())}') + if display: + print(f'{coll}: {list(item.attributes.keys())}') + return { coll : list(item.attributes.keys())} else: - print(f'{coll}: < No Items >') + if display: + print(f'{coll}: < No Items >') + return {coll : None} if collection is not None: dps = self.search(collections=[collection], max_items=1) - search_terms(dps, collection) + return search_terms(dps, collection) else: for coll in self._client.get_collections(): c = self.search(collections=[coll.id], max_items=1) - search_terms(c, coll.id) + _ = search_terms(c, coll.id, display=True) + return def list_collections(self): """ diff --git a/ceda_datapoint/core/cloud.py b/ceda_datapoint/core/cloud.py index 9604213..faf5d3b 100644 --- a/ceda_datapoint/core/cloud.py +++ b/ceda_datapoint/core/cloud.py @@ -5,6 +5,8 @@ import fsspec import xarray as xr import logging +import requests +import json from ceda_datapoint.mixins import UIMixin, PropertiesMixin from .utils import hash_id @@ -21,10 +23,13 @@ def __init__( self, products: list, parent_id: str = None, - meta: dict = None): + meta: dict = None, + local_only: bool = False): self._id = f'{parent_id}-{hash_id(parent_id)}' + self._local_only = local_only + meta = meta or {} self._products = {} @@ -43,6 +48,10 @@ def __str__(self): return f'' def __getitem__(self, index): + + if isinstance(index, int): + index = list(self._products.keys())[index] + if index not in self._products: logger.warning( f'"{index}" not found in available products.' @@ -75,6 +84,7 @@ def open_dataset( self, id, mode: str = 'xarray', + local_only: bool = False, **kwargs, ) -> xr.Dataset: @@ -83,6 +93,8 @@ def open_dataset( 'Only "xarray" mode currently implemented - cf-python is a future option' ) + local_only = local_only or self._local_only + if isinstance(id, int): id = list(self._products.keys())[id] @@ -93,7 +105,7 @@ def open_dataset( return None product = self._products[id] - return product.open_dataset(**kwargs) + return product.open_dataset(local_only=local_only, **kwargs) def open_datasets(self): raise NotImplementedError( @@ -110,6 +122,8 @@ def __init__( order: int = None, mode: str = 'xarray', meta: dict = None, + stac_attrs: dict = None, + properties: dict = None, ): if mode != 'xarray': @@ -127,6 +141,9 @@ def __init__( 'cloud_format': cf } + self._stac_attrs = stac_attrs + self._properties = properties + @property def cloud_format(self): return self._cloud_format @@ -145,7 +162,7 @@ def info(self): for k, v in self._meta.items(): print(f' - {k}: {v}') - def open_dataset(self, **kwargs): + def open_dataset(self, local_only: bool = False, **kwargs): """ Open the dataset for this product (in xarray). Specific methods to open cloud formats are private since @@ -157,17 +174,26 @@ def open_dataset(self, **kwargs): 'No cloud format given for this dataset' ) - if self._cloud_format == 'kerchunk': - return self._open_kerchunk(**kwargs) - elif self._cloud_format == 'CFA': - return self._open_cfa(**kwargs) - else: - raise ValueError( - 'Cloud format not recognised - must be one of ("kerchunk", "CFA")' + try: + if self._cloud_format == 'kerchunk': + return self._open_kerchunk(local_only=local_only, **kwargs) + elif self._cloud_format == 'CFA': + return self._open_cfa(**kwargs) + else: + raise ValueError( + 'Cloud format not recognised - must be one of ("kerchunk", "CFA")' + ) + except ValueError as err: + raise err + except FileNotFoundError: + raise FileNotFoundError( + 'The requested resource could not be located: ' + f'{self._asset_meta["href"]}' ) def _open_kerchunk( self, + local_only: bool = False, **kwargs, ) -> xr.Dataset: @@ -182,6 +208,9 @@ def _open_kerchunk( mapper_kwargs = self._asset_meta.get('mapper_kwargs') or {} open_zarr_kwargs = self._asset_meta.get('open_zarr_kwargs') or {} + + if local_only: + href = _fetch_kerchunk_make_local(href) mapper = fsspec.get_mapper( 'reference://', @@ -222,4 +251,33 @@ def _zarr_kwargs_default(add_kwargs={}): defaults = { 'consolidated':False, } - return defaults | add_kwargs \ No newline at end of file + return defaults | add_kwargs + +def _fetch_kerchunk_make_local(href: str): + """ + Fetch a kerchunk file, open as json content and do find/replace + to access local files only. + """ + attempts = 0 + success = False + while attempts < 3 and not success: + resp = requests.get(href) + if resp.status_code == 200: + success = True + attempts += 1 + if attempts >= 3 and not success: + raise ValueError( + f'File {href}: Download unsuccessful - ' + 'could not download the file successfully (tried 3 times)' + ) + + refs = json.loads(resp.text) + + for key in refs['refs'].keys(): + v = refs['refs'][key] + if isinstance(v, list) and len(v) == 3: + # First character + if 'https://' in v[0]: + refs['refs'][key][0] = v[0].replace('https://dap.ceda.ac.uk/','/') + return refs + diff --git a/ceda_datapoint/core/item.py b/ceda_datapoint/core/item.py index e64f3ac..43e2a44 100644 --- a/ceda_datapoint/core/item.py +++ b/ceda_datapoint/core/item.py @@ -209,7 +209,10 @@ def _load_cloud_assets( # Register this asset as a DataPointCloudProduct order = priority.index(cf) asset_id = f'{self._id}-{id}' - a = DataPointCloudProduct(asset, id=asset_id, cf=cf, order=order, meta=self._meta) + a = DataPointCloudProduct( + asset, + id=asset_id, cf=cf, order=order, meta=self._meta, + stac_attrs=self._stac_attrs, properties=self._properties) asset_list.append(a) if len(asset_list) == 0: diff --git a/pyproject.toml b/pyproject.toml index 25bcc6f..c831978 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "ceda-datapoint" -version = "0.1.1" +version = "0.2.0" description = "DataPoint provides python-based search/access tools for using data primarily from the CEDA Archive." authors = ["Daniel Westwood "] license = "{file = \"LICENSE\"}" @@ -12,7 +12,7 @@ fsspec = "2024.9.0" xarray = "2024.6.0" pystac-client = "0.8.3" kerchunk = "0.2.6" -cfapyx = "2024.10.11" +cfapyx = "2024.11.6" requests = "^2.32.3" aiohttp = "^3.10.10" sphinx = "7.1.2"