Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Valid 0412 #40

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions padocc/cli.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the CLI script

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
__author__ = "Daniel Westwood"
__contact__ = "[email protected]"
__copyright__ = "Copyright 2023 United Kingdom Research and Innovation"

## PADOCC CLI for entrypoint scripts

def main():
pass

if __name__ == '__main__':
main()
3 changes: 3 additions & 0 deletions padocc/core/filehandlers.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logging

Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def __str__(self) -> str:
def __len__(self) -> int:
"""Length of value"""
content = self.get()
self.logger.debug(f'content length: {len(content)}')
return len(content)

def __iter__(self) -> Generator[str, None, None]:
Expand Down Expand Up @@ -238,11 +239,13 @@ def _get_content(self) -> None:
Open the file to get content if it exists
"""
if self.file_exists():
self.logger.debug('Opening existing file')
with open(self._file) as f:
content = [r.strip() for r in f.readlines()]
self._value = content

else:
self.logger.debug('Creating new file')
self.create_file()
self._value = []

Expand Down
14 changes: 14 additions & 0 deletions padocc/core/mixins.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added properties mixin

Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,17 @@ def _rerun_command(self):
Setup for running this specific component interactively.
"""
return ''

class PropertiesMixin:

@property
def isparq(self) -> bool:
"""
Return True if the project is configured to use parquet.
"""

return (self.detail_cfg['type'] == 'parq')

@property
def cloud_format(self) -> bool:
return None
82 changes: 63 additions & 19 deletions padocc/core/project.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added info/help plus other minor changes

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging

from .errors import error_handler
from .utils import extract_file, BypassSwitch, apply_substitutions
from .utils import extract_file, BypassSwitch, apply_substitutions, phases
from .logs import reset_file_handler

from .mixins import DirectoryMixin, EvaluationsMixin
Expand Down Expand Up @@ -101,6 +101,11 @@ def __init__(
fh=fh,
logid=logid,
verbose=verbose)

if not os.path.isdir(self.groupdir):
raise ValueError(
f'The group "{groupID}" has not been initialised - not present in the working directory'
)

self.proj_code = proj_code

Expand Down Expand Up @@ -149,14 +154,41 @@ def __init__(
self._outfile = None

def __str__(self):
return f'<PADOCC Project: {self.groupID}>'
return f'<PADOCC Project: {self.proj_code} ({self.groupID})>'

def __repr__(self):
return str(self)

def info(self, fn=print):
"""
Display some info about this particular project
"""
if self.groupID is not None:
fn(f'{self.proj_code} ({self.groupID}):')
else:
fn(f'{self.proj_code}:')
fn(f' > Phase: {self._get_phase()}')
fn(f' > Files: {len(self.allfiles)}')
fn(f' > Version: {self.get_version()}')

def help(self, fn=print):
"""
Public user functions for the project operator.
"""
fn(str(self))
fn(' > project.info() - Get some information about this project')
fn(' > project.get_version() - Get the version number for the output product')
fn(' > project.save_files() - Save all open files related to this project')
fn('Properties:')
fn(' > project.proj_code - code for this project.')
fn(' > project.groupID - group to which this project belongs.')
fn(' > project.dir - directory containing the projects files.')
fn(' > project.cfa_path - path to the CFA file.')
fn(' > project.outfile - path to the output product (Kerchunk/Zarr)')

def run(
self,
mode: str = None,
mode: str = 'kerchunk',
subset_bypass: bool = False,
forceful : bool = None,
thorough : bool = None,
Expand All @@ -181,12 +213,12 @@ def run(
self.save_files()
return status
except Exception as err:

return error_handler(
err, self.logger, self.phase,
jobid=self._logid, dryrun=self._dryrun,
subset_bypass=subset_bypass,
status_fh=self.status_log)
print(err)
#return error_handler(
#err, self.logger, self.phase,
#jobid=self._logid, dryrun=self._dryrun,
##subset_bypass=subset_bypass,
#status_fh=self.status_log)

def _run(self, **kwargs):
# Default project operation run.
Expand All @@ -209,6 +241,13 @@ def get_version(self):
"""
return self.detail_cfg['version_no'] or 1

@property
def dir(self):
if self.groupID:
return f'{self.workdir}/in_progress/{self.groupID}/{self.proj_code}'
else:
return f'{self.workdir}/in_progress/general/{self.proj_code}'

@property
def cfa_path(self):
return f'{self.dir}/{self.proj_code}.nca'
Expand All @@ -225,9 +264,6 @@ def outfile(self):
def outfile(self, value : str):
self._outfile = value

def __str__(self):
return self.proj_code

def dir_exists(self, checkdir : str = None):
if not checkdir:
checkdir = self.dir
Expand Down Expand Up @@ -258,6 +294,21 @@ def save_files(self):
self.allfiles.close()
self.status_log.close()

def _get_phase(self):
"""
Gets the highest phase this project has currently undertaken successfully"""

max_sid = 0
for row in self.status_log:
status = row[0]
if status != 'Success':
continue

phase = row[1]
sid = phases.index(phase)
max_sid = max(sid, max_sid)
return phases[max_sid]

def _configure_filelist(self):
pattern = self.base_cfg['pattern']

Expand Down Expand Up @@ -303,13 +354,6 @@ def _setup_config(
config['substitutions'] = substitutions
self.base_cfg.set(config)

@property
def dir(self):
if self.groupID:
return f'{self.workdir}/in_progress/{self.groupID}/{self.proj_code}'
else:
return f'{self.workdir}/in_progress/general/{self.proj_code}'

def _create_dirs(self, first_time : bool = None):
if not self.dir_exists():
if self._dryrun:
Expand Down
7 changes: 7 additions & 0 deletions padocc/core/utils.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added phases

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
'validate':'30:00' # From CMIP experiments - no reliable prediction mechanism possible
}

phases = [
'scan',
'compute',
'validate',
'catalog'
]

class BypassSwitch:
"""Class to represent all bypass switches throughout the pipeline.
Requires a switch string which is used to enable/disable specific pipeline
Expand Down
3 changes: 2 additions & 1 deletion padocc/operations/group.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added subset bypass as known kwarg for .run

Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def _compute_config(
self,
proj_code,
mode=None,
subset_bypass=False,
**kwargs
) -> None:
"""
Expand Down Expand Up @@ -306,7 +307,7 @@ def _compute_config(
version_no=version,
**kwargs
)
status = proj_op.run()
status = proj_op.run(subset_bypass=subset_bypass)
proj_op.save_files()
return status

Expand Down
2 changes: 2 additions & 0 deletions padocc/operations/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def _open_json(file):
pattern = pattern[0]
if status:
self.logger.warning(status)
else:
pattern = os.path.abspath(pattern)

if substitutions:
cfg_values['substitutions'] = substitutions
Expand Down
21 changes: 13 additions & 8 deletions padocc/phases/compute.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added compute labels for multiple processes

Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def _convert_kerchunk(self, nfile: str, ctype, **kwargs) -> None:
def _hdf5_to_zarr(self, nfile: str, **kwargs) -> dict:
"""Wrapper for converting NetCDF4/HDF5 type files to Kerchunk"""
from kerchunk.hdf import SingleHdf5ToZarr
return SingleHdf5ToZarr(nfile, **kwargs).translate()
return SingleHdf5ToZarr(nfile,**kwargs).translate()

def _ncf3_to_zarr(self, nfile: str, **kwargs) -> dict:
"""Wrapper for converting NetCDF3 type files to Kerchunk"""
Expand Down Expand Up @@ -193,6 +193,7 @@ def __init__(
limiter : int = None,
skip_concat : bool = False,
new_version : bool = None,
label : str = 'compute',
**kwargs
) -> None:
"""
Expand Down Expand Up @@ -233,6 +234,7 @@ def __init__(
workdir,
groupID=groupID,
thorough=thorough,
label=label,
**kwargs)

self.logger.debug('Starting variable definitions')
Expand All @@ -243,7 +245,8 @@ def __init__(
self.skip_concat = skip_concat

self.stage = stage
self._identify_mode()
self.mode = self.detail_cfg['mode'] or 'kerchunk'
self.fmt = self.detail_cfg['type'] or 'JSON'

self.validate_time = None
self.concat_time = None
Expand Down Expand Up @@ -284,14 +287,20 @@ def __init__(
self.temp_zattrs.set({})

self.combine_kwargs = {} # Now using concat_dims and identical dims finders.
self.create_kwargs = {'inline_threshold':1}
self.create_kwargs = {'inline_threshold':0}
self.pre_kwargs = {}

self.special_attrs = {}
self.var_shapes = {}

self.logger.debug('Finished all setup steps')

def help(self, fn=print):
super().help(fn=fn)
fn('')
fn('Compute Options:')
fn(' > project.run() - Run compute for this project')

def _run(self, mode: str = 'kerchunk'):
"""
Default _run hook for compute operations. A user should aim to use the
Expand Down Expand Up @@ -597,20 +606,16 @@ def _determine_dim_specs(self, objs: list) -> None:
# Calculate Partial Validation Estimate here
t1 = datetime.now()
self.logger.info("Determining concatenation dimensions")
print()
self._find_concat_dims(objs)
if self.combine_kwargs['concat_dims'] == []:
self.logger.info("No concatenation dimensions available - virtual dimension will be constructed.")
else:
self.logger.info(f"Found {self.combine_kwargs['concat_dims']} concatenation dimensions.")
print()

# Identical (Variables) Dimensions
self.logger.info("Determining identical variables")
print()
self._find_identical_dims(objs)
self.logger.info(f"Found {self.combine_kwargs['identical_dims']} identical variables.")
print()

# This one only happens for two files so don't need to take a mean
self.validate_time = (datetime.now()-t1).total_seconds()
Expand Down Expand Up @@ -749,7 +754,7 @@ def _combine_and_save(self, refs: dict) -> None:
])

t1 = datetime.now()
if self.fmt == 'json':
if self.fmt == 'JSON':
self.logger.info('Concatenating to JSON format Kerchunk file')
self._data_to_json(refs)
else:
Expand Down
19 changes: 14 additions & 5 deletions padocc/phases/scan.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated logging/error handling

Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def __init__(
proj_code : str,
workdir : str,
groupID : str = None,
label : str = None,
label : str = 'scan',
**kwargs,
) -> None:

Expand All @@ -139,7 +139,13 @@ def __init__(
label = 'scan-operation'

super().__init__(
proj_code, workdir, groupID=groupID, **kwargs)
proj_code, workdir, groupID=groupID, label=label,**kwargs)

def help(self, fn=print):
super().help(fn=fn)
fn('')
fn('Scan Options:')
fn(' > project.run() - Run a scan for this project')

def _run(self, mode: str = 'kerchunk') -> None:
"""Main process handler for scanning phase"""
Expand All @@ -150,22 +156,25 @@ def _run(self, mode: str = 'kerchunk') -> None:

if nfiles < 3:
self.detail_cfg = {'skipped':True}
self.logger.info('Skip scanning phase >> proceed directly to compute')
self.logger.info(f'Skip scanning phase (only found {nfiles} files) >> proceed directly to compute')
return None


# Create all files in mini-kerchunk set here. Then try an assessment.
limiter = min(100, max(2, int(nfiles/20)))

self.logger.info(f'Determined {limiter} files to scan (out of {nfiles})')
self.logger.debug(f'Using {mode} scan operations')

if mode == 'zarr':
self._scan_zarr(limiter=limiter)
elif mode == 'kerchunk':
self._scan_kerchunk(limiter=limiter)
else:
self.logger.error('Unrecognised mode - must be one of ["kerchunk","zarr","CFA"]')
return 'Failed'
self.update_status('scan','ValueError',jobid=self._logid, dryrun=self._dryrun)
raise ValueError(
f'Unrecognised mode: {mode} - must be one of ["kerchunk","zarr","CFA"]'
)

self.update_status('scan','Success',jobid=self._logid, dryrun=self._dryrun)
return 'Success'
Expand Down
13 changes: 0 additions & 13 deletions padocc/phases/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,6 @@
from ujson import JSONDecodeError
from dask.distributed import LocalCluster

class CloudValidator:
"""
Encapsulate all validation testing into a single class. Instantiate for a specific project,
the object could then contain all project info (from detail-cfg) opened only once. Also a
copy of the total datasets (from native and cloud sources). Subselections can be passed
between class methods along with a variable index (class variables: variable list, dimension list etc.)

Class logger attribute so this doesn't need to be passed between functions.
Bypass switch contained here with all switches.
"""
def __init__(self):
pass

## 1. Array Selection Tools

def find_dimensions(dimlen: int, divisions: int) -> int:
Expand Down
Loading
Loading