Skip to content

Commit

Permalink
Added proj code concatenation, fixed edge case bug with netcdf openin…
Browse files Browse the repository at this point in the history
…g in validate
  • Loading branch information
dwest77a committed Feb 15, 2024
1 parent 03e7fee commit 0803490
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 22 deletions.
25 changes: 19 additions & 6 deletions assess.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,29 @@ def extract_keys(filepath: str, logger, savetype=None, examine=None, phase=None,
examine_log(log, efile, code, ecode=ecode, phase=phase, groupID=groupID, repeat_id=repeat_id)
return savedcodes, keys, len(listfiles)

def save_sel(codes: list, groupdir: str, label: str, logger):
def save_sel(codes: list, groupdir: str, label: str, logger, overwrite=0):
"""Save selection of codes to a file with a given repeat label.
Requires a groupdir (directory belonging to a group), list of codes and a label for the new file.
"""
if len(codes) > 1:
codeset = '\n'.join([code[1].strip() for code in codes])
with open(f'{groupdir}/proj_codes_{label}.txt','w') as f:
f.write(codeset)

logger.info(f'Written {len(codes)} to proj_codes_{label}')
if os.path.isfile(f'{groupdir}/proj_codes_{label}.txt'):
if overwrite == 0:
logger.info(f'Skipped writing {len(codes)} to proj_codes_{label} - file exists and overwrite not set')
elif overwrite == 1:
logger.info(f'Adding {len(codes)} to existing proj_codes_{label}')
# Need check for duplicates here
with open(f'{groupdir}/proj_codes_{label}.txt','a') as f:
f.write(codeset)
elif overwrite == 2:
logger.info(f'Overwriting with {len(codes)} in existing proj_codes_{label} file')
with open(f'{groupdir}/proj_codes_{label}.txt','w') as f:
f.write(codeset)
else:
with open(f'{groupdir}/proj_codes_{label}.txt','w') as f:
f.write(codeset)
logger.info(f'Written {len(codes)} to proj_codes_{label}')
else:
logger.info('No codes identified, no files written')

Expand Down Expand Up @@ -301,7 +313,7 @@ def error_check(args, logger):
print(f'Identified {errs["Warning"]} files with Warnings')

if args.repeat_label and args.write:
save_sel(savedcodes, args.groupdir, args.repeat_label, logger)
save_sel(savedcodes, args.groupdir, args.repeat_label, logger, overwrite=args.overwrite)
elif args.repeat_label:
logger.info(f'Skipped writing {len(savedcodes)} to proj_codes_{args.repeat_label}')
else:
Expand Down Expand Up @@ -441,6 +453,7 @@ def assess_main(args):
parser.add_argument('-E','--examine', dest='examine', action='store_true', help='Examine log outputs individually.')
parser.add_argument('-c','--clean-up', dest='cleanup', default=None, help='Clean up group directory of errors/outputs/labels')

parser.add_argument('-O','--overwrite', dest='overwrite', action='count', help='Force overwrite of steps if previously done')

parser.add_argument('-w','--workdir', dest='workdir', help='Working directory for pipeline')
parser.add_argument('-g','--groupdir', dest='groupdir', help='Group directory for pipeline')
Expand Down
28 changes: 12 additions & 16 deletions pipeline/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def get_netcdf_list(proj_dir: str, logger, thorough=False):
numfiles = int(len(xfiles)/1000)
if numfiles < 3:
numfiles = 3
logger.info(f'Selecting a subset of {numfiles} files')

if numfiles > len(xfiles):
numfiles = len(xfiles)
Expand All @@ -68,6 +67,7 @@ def get_netcdf_list(proj_dir: str, logger, thorough=False):
testindex = random.randint(0,numfiles-1)
indexes.append(testindex)

logger.info(f'Selecting a subset of {len(indexes)}/{len(xfiles)} files')
return indexes, xfiles

def pick_index(nfiles: list, indexes: list):
Expand Down Expand Up @@ -130,8 +130,6 @@ def locate_kerchunk(args, logger, get_str=False):
logger.error(f'No Kerchunk file located at {args.proj_dir} and no in-place validation indicated - exiting')
raise MissingKerchunkError



def open_kerchunk(kfile: str, logger, isparq=False, remote_protocol='file'):
"""Open kerchunk file from JSON/parquet formats"""
if isparq:
Expand Down Expand Up @@ -199,26 +197,25 @@ def open_netcdfs(args, logger, thorough=False):
if len(indexes) == len(xfiles):
thorough = True
xobjs = []
many = len(indexes)
if not thorough:
if not args.bypass.skip_memcheck:
check_memory(xfiles, indexes, args.memory, logger)
else:
logger.warning('Memory checks bypassed')
for one, i in enumerate(indexes):
xobjs.append(xr.open_dataset(xfiles[i]))

if len(xobjs) == 0:
logger.error('No valid timestep objects identified')
raise NoValidTimeSlicesError(message='Kerchunk', verbose=args.verbose)
return xobjs, indexes, len(xfiles)
else:
if not args.bypass.skip_memcheck:
check_memory(xfiles, [i for i in range(len(xfiles))], args.memory, logger)
else:
logger.warning('Memory checks bypassed')
xobjs = xr.concat([xr.open_dataset(fx) for fx in xfiles], dim='time', data_vars='minimal')
indexes = [i for i in range(len(xobjs))]

if len(xobjs) == 0:
logger.error('No valid timestep objects identified')
raise NoValidTimeSlicesError(message='Kerchunk', verbose=args.verbose)
return xobjs, indexes, len(xfiles)
xobj = xr.concat([xr.open_dataset(fx) for fx in xfiles], dim='time', data_vars='minimal')
return xobj, None, len(xfiles)

## 3. Validation Testing

Expand Down Expand Up @@ -449,26 +446,25 @@ def validate_dataset(args):
xobjs, indexes, nfiles = open_netcdfs(args, logger, thorough=args.quality)
if len(xobjs) == 0:
raise NoValidTimeSlicesError(message='Xarray/NetCDF')
if len(indexes) == nfiles:
if indexes == None:
args.quality = True

## Open kerchunk file
kobj, in_place = locate_kerchunk(args, logger)
kobj, _v = locate_kerchunk(args, logger)
if not kobj:
raise MissingKerchunkError

## Set up loop variables
fullset = False
total = len(indexes)

if args.quality:
fullset = True

if not fullset:
logger.info(f"Attempting file subset validation: {len(indexes)}/{total}")
logger.info(f"Attempting file subset validation: {len(indexes)}/{nfiles}")
for step, index in enumerate(indexes):
xobj = xobjs[step]
logger.info(f'Running tests for selected file: {index} ({step+1}/{total})')
logger.info(f'Running tests for selected file: {index} ({step+1}/{len(indexes)})')

try:
validate_timestep(args, xobj, kobj, step+1, nfiles, logger)
Expand Down

0 comments on commit 0803490

Please sign in to comment.