Skip to content

Commit

Permalink
Added dependent modules for docs testing
Browse files Browse the repository at this point in the history
  • Loading branch information
dwest77a committed Mar 26, 2024
1 parent a57c87c commit 85c7859
Showing 1 changed file with 108 additions and 10 deletions.
118 changes: 108 additions & 10 deletions pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,74 @@
__copyright__ = "Copyright 2023 United Kingdom Research and Innovation"

import os
import xarray as xr
import json
import fsspec

from pipeline.errors import MissingVariableError
from pipeline.errors import MissingVariableError, MissingKerchunkError, ChunkDataError

def open_kerchunk(kfile: str, logger, isparq=False, remote_protocol='file'):
"""Open kerchunk file from JSON/parquet formats"""
if isparq:
logger.debug('Opening Kerchunk Parquet store')
from fsspec.implementations.reference import ReferenceFileSystem
fs = ReferenceFileSystem(
kfile,
remote_protocol='file',
target_protocol="file",
lazy=True)
return xr.open_dataset(
fs.get_mapper(),
engine="zarr",
backend_kwargs={"consolidated": False, "decode_times": False}
)
else:
logger.debug('Opening Kerchunk JSON file')
try:
mapper = fsspec.get_mapper('reference://',fo=kfile, target_options={"compression":None}, remote_protocol=remote_protocol)
except json.JSONDecodeError as err:
logger.error(f"Kerchunk file {kfile} appears to be empty")
raise MissingKerchunkError
# Need a safe repeat here
ds = None
attempts = 0
while attempts < 3 and not ds:
attempts += 1
try:
ds = xr.open_zarr(mapper, consolidated=False, decode_times=True)
except OverflowError:
ds = None
except Exception as err:
raise err #MissingKerchunkError(message=f'Failed to open kerchunk file {kfile}')
if not ds:
raise ChunkDataError
logger.debug('Successfully opened Kerchunk with virtual xarray ds')
return ds

def get_attribute(env: str, args, var: str):
"""Assemble environment variable or take from passed argument.
Finds value of variable from Environment or ParseArgs object, or reports failure
"""
if getattr(args, var):
return getattr(args, var)
elif os.getenv(env):
try:
if getattr(args, var):
return getattr(args, var)
except AttributeError:
pass
if os.getenv(env):
return os.getenv(env)
else:
print(var)
raise MissingVariableError(type=var)
def format_str(string: str, length: int):

def format_str(string: str, length: int, concat=False):
"""Simple function to format a string to a correct length"""
string = str(string)
while len(string) < length:
string += ' '
if len(string) >= length and concat:
string = string[:length-3] + '...'
else:
while len(string) < length:
string += ' '
return string[:length]

class BypassSwitch:
Expand Down Expand Up @@ -71,5 +118,56 @@ def mem_to_val(value):

def get_codes(group, workdir, filename):
"""Returns a list of the project codes given a filename (repeat id)"""
with open(f'{workdir}/groups/{group}/{filename}.txt') as f:
return [r.strip() for r in f.readlines()]
if workdir:
codefile = f'{workdir}/groups/{group}/{filename}.txt'
else:
codefile = f'{group}/{filename}.txt'
if os.path.isfile(codefile):
with open(codefile) as f:
return [r.strip() for r in f.readlines()]
else:
return []

def set_codes(group, workdir, filename, contents, overwrite=0):
codefile = f'{group}/{filename}.txt'
if workdir:
codefile = f'{workdir}/groups/{group}/{filename}.txt'

ow = 'w'
if overwrite == 1:
ow = 'w+'

with open(codefile, ow) as f:
f.write(contents)

def get_proj_file(proj_dir, proj_file):
projfile = f'{proj_dir}/{proj_file}'
if os.path.isfile(projfile):
try:
with open(projfile) as f:
contents = json.load(f)
return contents
except:
with open(projfile) as f:
print(f.readlines())
return None
else:
return None

def set_proj_file(proj_dir, proj_file, contents, logger):
projfile = f'{proj_dir}/{proj_file}'
if not os.path.isfile(projfile):
os.system(f'touch {projfile}')
try:
with open(projfile,'w') as f:
f.write(json.dumps(contents))
logger.debug(f'{proj_file} updated')
except Exception as err:
logger.error(f'{proj_file} unable to update - {err}')

def get_proj_dir(proj_code, workdir, groupID):
if groupID:
return f'{workdir}/in_progress/{groupID}/{proj_code}'
else:
return f'{workdir}/in_progress/{proj_code}'

0 comments on commit 85c7859

Please sign in to comment.