Skip to content

Commit

Permalink
Merge branch '302-new-data-source-for-gdac-from-amazon-s3' into suppo…
Browse files Browse the repository at this point in the history
…rt-zarr
  • Loading branch information
gmaze committed Jan 21, 2025
2 parents 1ff6fff + 14bec87 commit b8e7964
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 28 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/pytests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ jobs:
continue-on-error: true

- name: Upload coverage to Codecov
uses: codecov/[email protected].1
uses: codecov/[email protected].2
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./cov.xml
files: ./cov.xml
flags: unittests, core, pinned, py${{matrix.python-version}}
name: codecov-github
fail_ci_if_error: false
Expand Down Expand Up @@ -261,10 +261,10 @@ jobs:
continue-on-error: true

- name: Upload coverage to Codecov
uses: codecov/[email protected].1
uses: codecov/[email protected].2
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./cov.xml
files: ./cov.xml
flags: unittests, all, pinned, py${{matrix.python-version}}
name: codecov-github
fail_ci_if_error: false
Expand Down
121 changes: 99 additions & 22 deletions argopy/stores/filesystems.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,32 +379,104 @@ def open_json(self, url, **kwargs):
js = None
return js

def open_dataset(self, path, *args, **kwargs):
"""Return a xarray.dataset from a path.
def open_dataset(
self,
path,
errors: Literal["raise", "ignore", "silent"] = "raise",
lazy: bool = False,
xr_opts: dict = {},
**kwargs,
) -> xr.Dataset:
"""Create a :class:`xarray.Dataset` from a local path pointing to a netcdf file
Parameters
----------
path: str
Path to resources passed to xarray.open_dataset
The local path of the netcdf file to open
lazy: bool, default=False
Define if we should try to open the netcdf dataset lazily or not
*args, **kwargs:
Other arguments are passed to :func:`xarray.open_dataset`
Returns
-------
:class:`xarray.DataSet`
:class:`xarray.Dataset`
"""
xr_opts = {}
if "xr_opts" in kwargs:
xr_opts.update(kwargs["xr_opts"])

with self.open(path) as of:
# log.debug("Opening dataset: '%s'" % path) # Redundant with fsspec logger
ds = xr.open_dataset(of, *args, **xr_opts)
ds.load()
if "source" not in ds.encoding:
if isinstance(path, str):
ds.encoding["source"] = path
return ds.copy()
def load_in_memory(path, errors="raise", xr_opts={}):
"""
Returns
-------
tuple: (data, _) or (None, _) if errors == "ignore"
"""
try:
data = self.fs.cat_file(path)
return data, None
except FileNotFoundError as e:
if errors == "raise":
raise e
elif errors == "ignore":
log.error("FileNotFoundError raised from: %s" % path)
return None, None

def load_lazily(path, errors="raise", xr_opts={}, akoverwrite: bool = False):
from . import ArgoKerchunker

if "ak" not in kwargs:
self.ak = ArgoKerchunker(
store="local", root=Path(OPTIONS["cachedir"]).joinpath("kerchunk")
)
else:
self.ak = kwargs["ak"]

if self.ak.supported(path):
xr_opts = {
"engine": "zarr",
"backend_kwargs": {
"consolidated": False,
"storage_options": {
"fo": self.ak.to_kerchunk(path, overwrite=akoverwrite),
"remote_protocol": fsspec.core.split_protocol(path)[0],
},
},
}
return "reference://", xr_opts
else:
warnings.warn(
"This path does not support byte range requests so we cannot load it lazily, falling back on "
"loading in memory."
)
log.debug("This path does not support byte range requests: %s" % path)
return load_in_memory(path, errors=errors, xr_opts=xr_opts)

if not lazy:
target, _ = load_in_memory(path, errors=errors, xr_opts=xr_opts)
else:
target, xr_opts = load_lazily(
path,
errors=errors,
xr_opts=xr_opts,
akoverwrite=kwargs.get("akoverwrite", False),
)

if target is not None:
ds = xr.open_dataset(target, **xr_opts)

if "source" not in ds.encoding:
if isinstance(path, str):
ds.encoding["source"] = path

self.register(path)
return ds

elif errors == "raise":
raise DataNotFound(path)

elif errors == "ignore":
log.error("DataNotFound from: %s" % path)
return None

def _mfprocessor(
self,
Expand Down Expand Up @@ -803,7 +875,7 @@ def make_request(
def open_dataset(
self,
url: str,
errors: Literal['raise', 'ignore', 'silent'] = "raise",
errors: Literal["raise", "ignore", "silent"] = "raise",
lazy: bool = False,
dwn_opts: dict = {},
xr_opts: dict = {},
Expand Down Expand Up @@ -1814,7 +1886,7 @@ class ftpstore(httpstore):
protocol = "ftp"

def open_dataset(self, url, *args, **kwargs):
"""Open and decode a xarray dataset from an ftp url
"""Open and decode a xarray dataset from a ftp url
Parameters
----------
Expand All @@ -1824,6 +1896,9 @@ def open_dataset(self, url, *args, **kwargs):
-------
:class:`xarray.Dataset`
"""
if 'lazy' in kwargs and kwargs['lazy']:
warnings.warn("FTP store does not support lazy dataset opening")

try:
this_url = self.fs._strip_protocol(url)
data = self.fs.cat_file(this_url)
Expand Down Expand Up @@ -2053,8 +2128,7 @@ def open_mfdataset(


class httpstore_erddap_auth(httpstore):
"""Argo http file system """

"""Argo http file system"""

async def get_auth_client(self, **kwargs):
session = aiohttp.ClientSession(**kwargs)
Expand Down Expand Up @@ -2276,6 +2350,7 @@ class gdacfs:
:meth:`argopy.utils.check_gdac_path`, :meth:`argopy.utils.list_gdac_servers`
"""

protocol2fs = {"file": filestore, "http": httpstore, "ftp": ftpstore, "s3": s3store}
"""Dictionary mapping path protocol to Argo file system to instantiate"""

Expand All @@ -2295,7 +2370,9 @@ def path2protocol(path: Union[str, Path]) -> str:
elif "s3" in split:
return "s3"
else:
raise GdacPathError("Unknown protocol for an Argo GDAC host: %s" % split)
raise GdacPathError(
"Unknown protocol for an Argo GDAC host: %s" % split
)

def __new__(cls, path: Union[str, Path, None] = None):
"""Create a file system for any Argo GDAC compliant path"""
Expand All @@ -2309,8 +2386,8 @@ def __new__(cls, path: Union[str, Path, None] = None):
if protocol == "ftp":
ftp_host = urlparse(path).hostname
ftp_port = 0 if urlparse(path).port is None else urlparse(path).port
fs_args['host'] = ftp_host
fs_args['port'] = ftp_port
fs_args["host"] = ftp_host
fs_args["port"] = ftp_port

try:
fs = fs(**fs_args)
Expand Down
4 changes: 2 additions & 2 deletions argopy/stores/kerchunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ def to_kerchunk(self, ncfile: Union[str, Path], overwrite: bool = False):
self.translate(ncfile)
elif str(ncfile) not in self.kerchunk_references:
if self.fs.exists(self._ncfile2jsfile(ncfile)):
self.kerchunk_references.update({ncfile: self._ncfile2jsfile(ncfile)})
self.kerchunk_references.update({str(ncfile): self._ncfile2jsfile(ncfile)})
else:
self.translate(ncfile)

# Read and load the kerchunk JSON file:
kerchunk_jsfile = self.kerchunk_references[ncfile]
kerchunk_jsfile = self.kerchunk_references[str(ncfile)]
with self.fs.open(kerchunk_jsfile, "r") as file:
kerchunk_data = json.load(file)

Expand Down

0 comments on commit b8e7964

Please sign in to comment.