Skip to content

Commit

Permalink
Merge pull request #4 from cedadev/localK
Browse files Browse the repository at this point in the history
v0.2.0
  • Loading branch information
dwest77a authored Nov 6, 2024
2 parents 1397478 + b660d25 commit ce917d8
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 22 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.local
.local
**/__pycache__/**
19 changes: 12 additions & 7 deletions ceda_datapoint/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def info(self) -> None:
for term, searched in self._search_terms.items():
print(f' - {term}: {searched}')

def open_cluster(
def collect_cloud_assets(
self,
mode='xarray',
combine=False,
Expand Down Expand Up @@ -250,29 +250,34 @@ def __getitem__(self, collection):
"""
return DataPointSearch(self.search(collections=[collection]))

def list_query_terms(self, collection=None):
def list_query_terms(self, collection=None) -> dict | None:
"""
List the possible query terms for all or
a particular collection.
"""

def search_terms(search, coll):
def search_terms(search, coll, display : bool = False):


item = search[0]
if item is not None:
print(f'{coll}: {list(item.attributes.keys())}')
if display:
print(f'{coll}: {list(item.attributes.keys())}')
return { coll : list(item.attributes.keys())}
else:
print(f'{coll}: < No Items >')
if display:
print(f'{coll}: < No Items >')
return {coll : None}

if collection is not None:
dps = self.search(collections=[collection], max_items=1)
search_terms(dps, collection)
return search_terms(dps, collection)

else:
for coll in self._client.get_collections():
c = self.search(collections=[coll.id], max_items=1)
search_terms(c, coll.id)
_ = search_terms(c, coll.id, display=True)
return

def list_collections(self):
"""
Expand Down
80 changes: 69 additions & 11 deletions ceda_datapoint/core/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import fsspec
import xarray as xr
import logging
import requests
import json

from ceda_datapoint.mixins import UIMixin, PropertiesMixin
from .utils import hash_id
Expand All @@ -21,10 +23,13 @@ def __init__(
self,
products: list,
parent_id: str = None,
meta: dict = None):
meta: dict = None,
local_only: bool = False):

self._id = f'{parent_id}-{hash_id(parent_id)}'

self._local_only = local_only

meta = meta or {}

self._products = {}
Expand All @@ -43,6 +48,10 @@ def __str__(self):
return f'<DataPointCluster: {self._id} (Datasets: {len(self._products)})>'

def __getitem__(self, index):

if isinstance(index, int):
index = list(self._products.keys())[index]

if index not in self._products:
logger.warning(
f'"{index}" not found in available products.'
Expand Down Expand Up @@ -75,6 +84,7 @@ def open_dataset(
self,
id,
mode: str = 'xarray',
local_only: bool = False,
**kwargs,
) -> xr.Dataset:

Expand All @@ -83,6 +93,8 @@ def open_dataset(
'Only "xarray" mode currently implemented - cf-python is a future option'
)

local_only = local_only or self._local_only

if isinstance(id, int):
id = list(self._products.keys())[id]

Expand All @@ -93,7 +105,7 @@ def open_dataset(
return None

product = self._products[id]
return product.open_dataset(**kwargs)
return product.open_dataset(local_only=local_only, **kwargs)

def open_datasets(self):
raise NotImplementedError(
Expand All @@ -110,6 +122,8 @@ def __init__(
order: int = None,
mode: str = 'xarray',
meta: dict = None,
stac_attrs: dict = None,
properties: dict = None,
):

if mode != 'xarray':
Expand All @@ -127,6 +141,9 @@ def __init__(
'cloud_format': cf
}

self._stac_attrs = stac_attrs
self._properties = properties

@property
def cloud_format(self):
return self._cloud_format
Expand All @@ -145,7 +162,7 @@ def info(self):
for k, v in self._meta.items():
print(f' - {k}: {v}')

def open_dataset(self, **kwargs):
def open_dataset(self, local_only: bool = False, **kwargs):
"""
Open the dataset for this product (in xarray).
Specific methods to open cloud formats are private since
Expand All @@ -157,17 +174,26 @@ def open_dataset(self, **kwargs):
'No cloud format given for this dataset'
)

if self._cloud_format == 'kerchunk':
return self._open_kerchunk(**kwargs)
elif self._cloud_format == 'CFA':
return self._open_cfa(**kwargs)
else:
raise ValueError(
'Cloud format not recognised - must be one of ("kerchunk", "CFA")'
try:
if self._cloud_format == 'kerchunk':
return self._open_kerchunk(local_only=local_only, **kwargs)
elif self._cloud_format == 'CFA':
return self._open_cfa(**kwargs)
else:
raise ValueError(
'Cloud format not recognised - must be one of ("kerchunk", "CFA")'
)
except ValueError as err:
raise err
except FileNotFoundError:
raise FileNotFoundError(
'The requested resource could not be located: '
f'{self._asset_meta["href"]}'
)

def _open_kerchunk(
self,
local_only: bool = False,
**kwargs,
) -> xr.Dataset:

Expand All @@ -182,6 +208,9 @@ def _open_kerchunk(

mapper_kwargs = self._asset_meta.get('mapper_kwargs') or {}
open_zarr_kwargs = self._asset_meta.get('open_zarr_kwargs') or {}

if local_only:
href = _fetch_kerchunk_make_local(href)

mapper = fsspec.get_mapper(
'reference://',
Expand Down Expand Up @@ -222,4 +251,33 @@ def _zarr_kwargs_default(add_kwargs={}):
defaults = {
'consolidated':False,
}
return defaults | add_kwargs
return defaults | add_kwargs

def _fetch_kerchunk_make_local(href: str):
"""
Fetch a kerchunk file, open as json content and do find/replace
to access local files only.
"""
attempts = 0
success = False
while attempts < 3 and not success:
resp = requests.get(href)
if resp.status_code == 200:
success = True
attempts += 1
if attempts >= 3 and not success:
raise ValueError(
f'File {href}: Download unsuccessful - '
'could not download the file successfully (tried 3 times)'
)

refs = json.loads(resp.text)

for key in refs['refs'].keys():
v = refs['refs'][key]
if isinstance(v, list) and len(v) == 3:
# First character
if 'https://' in v[0]:
refs['refs'][key][0] = v[0].replace('https://dap.ceda.ac.uk/','/')
return refs

5 changes: 4 additions & 1 deletion ceda_datapoint/core/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ def _load_cloud_assets(
# Register this asset as a DataPointCloudProduct
order = priority.index(cf)
asset_id = f'{self._id}-{id}'
a = DataPointCloudProduct(asset, id=asset_id, cf=cf, order=order, meta=self._meta)
a = DataPointCloudProduct(
asset,
id=asset_id, cf=cf, order=order, meta=self._meta,
stac_attrs=self._stac_attrs, properties=self._properties)
asset_list.append(a)

if len(asset_list) == 0:
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "ceda-datapoint"
version = "0.1.1"
version = "0.2.0"
description = "DataPoint provides python-based search/access tools for using data primarily from the CEDA Archive."
authors = ["Daniel Westwood <[email protected]>"]
license = "{file = \"LICENSE\"}"
Expand All @@ -12,7 +12,7 @@ fsspec = "2024.9.0"
xarray = "2024.6.0"
pystac-client = "0.8.3"
kerchunk = "0.2.6"
cfapyx = "2024.10.11"
cfapyx = "2024.11.6"
requests = "^2.32.3"
aiohttp = "^3.10.10"
sphinx = "7.1.2"
Expand Down

0 comments on commit ce917d8

Please sign in to comment.