Skip to content

Commit

Permalink
Finished docstring updates
Browse files Browse the repository at this point in the history
  • Loading branch information
dwest77a committed Mar 28, 2024
1 parent 2b1aa00 commit 416197a
Showing 1 changed file with 90 additions and 155 deletions.
245 changes: 90 additions & 155 deletions pipeline/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import numpy as np

from pipeline.logs import init_logger
from pipeline.utils import get_attribute, BypassSwitch
from pipeline.utils import get_attribute, BypassSwitch, get_codes, get_proj_dir, get_proj_file, set_codes
from pipeline.errors import *
from pipeline.compute.serial_process import Converter, Indexer
from pipeline.compute import KerchunkConverter, KerchunkDSProcessor

def format_float(value: int, logger):
"""Format byte-value with proper units"""
def format_float(value: int, logger) -> str | None:
"""
Format byte-value with proper units
"""
logger.debug(f'Formatting value {value} in bytes')
if value:
unit_index = 0
Expand All @@ -35,71 +37,23 @@ def format_float(value: int, logger):
else:
return None

def safe_format(value: int, fstring: str):
"""Attempt to format a string given some fstring template."""
def safe_format(value: int, fstring: str) -> str:
"""Attempt to format a string given some fstring template.
- Handles issues by returning '', usually when value is None initially."""
try:
return fstring.format(value=value)
except:
return ''

def trial_kerchunk(args, nfile: str, ctype: str, logger):
"""Perform Kerchunk reading on specific file"""
logger.info(f'Running Kerchunk reader for {nfile}')

quickConvert = Converter(logger, bypass_driver=args.bypass.skip_driver)

kwargs = {}
supported_extensions = ['ncf3','hdf5','tif']

usetype = ctype

logger.debug(f'Attempting conversion for 1 {ctype} extension')
t1 = datetime.now()
tdict = quickConvert.convert_to_zarr(nfile, ctype, **kwargs)
t_len = (datetime.now()-t1).total_seconds()
ext_index = 0
while not tdict and ext_index < len(supported_extensions)-1:
# Try the other ones
extension = supported_extensions[ext_index]
logger.debug(f'Attempting conversion for {extension} extension')

if extension != ctype:
t1 = datetime.now()
tdict = quickConvert.convert_to_zarr(nfile, extension, **kwargs)
t_len = (datetime.now()-t1).total_seconds()
usetype = extension
ext_index += 1

if not tdict:
logger.error('Scanning failed for all drivers, file type is not Kerchunkable')
raise KerchunkDriverFatalError
else:
logger.info(f'Scan successful with {usetype} driver')
return tdict, usetype, t_len

def load_from_previous(args, cache_id, logger):
cachefile = f'{args.proj_dir}/cache/{cache_id}.json'
if os.path.isfile(cachefile):
logger.info(f"Found existing cached file {cache_id}.json")
with open(cachefile) as f:
refs = json.load(f)
return refs
else:
return None

def perform_scan(args, testfile: str, ctype: str, logger, savecache=True, cache_id=None, thorough=False):
"""Map to kerchunk data and perform calculations on test netcdf file."""
if cache_id and not thorough:
refs = load_from_previous(args, cache_id, logger)
time = 0
if not refs:
refs, ctype, time = trial_kerchunk(args, testfile, ctype, logger)
else:
refs, ctype, time = trial_kerchunk(args, testfile, ctype, logger)
def summarise_json(args, count: int, ctype: str, logger) -> tuple:
"""
Open previously written JSON cached files and perform analysis.
"""
refs = get_proj_file(args.proj_dir, f'cache/{count}.json')
if not refs:
return None, None, None, None, None
return None, None, None, None

logger.debug('Starting Analysis of references')
logger.debug(f'Starting Analysis of references for {count}')

# Perform summations, extract chunk attributes
sizes = []
Expand All @@ -123,36 +77,45 @@ def perform_scan(args, testfile: str, ctype: str, logger, savecache=True, cache_
chunksize = dict(kdict[chunkkey])['chunks']
vars[var] = chunksize

# Save refs individually within cache.
if savecache:
cachedir = f'{args.proj_dir}/cache'
if not os.path.isdir(cachedir):
os.makedirs(cachedir)
with open(f'{cachedir}/{cache_id}.json','w') as f:
f.write(json.dumps(refs))
return np.sum(sizes), chunks, vars, ctype

return np.sum(sizes), chunks, vars, ctype, time

def eval_sizes(files: list):
def eval_sizes(files: list) -> list:
"""Get a list of file sizes on disk from a list of filepaths"""
return [os.stat(files[count]).st_size for count in range(len(files))]

def get_seconds(time_allowed: str):
def get_seconds(time_allowed: str) -> int:
"""Convert time in MM:SS to seconds"""
if not time_allowed:
return 10000000000
mins, secs = time_allowed.split(':')
return int(secs) + 60*int(mins)

def format_seconds(seconds: int):
def format_seconds(seconds: int) -> str:
"""Convert time in seconds to MM:SS"""
mins = int(seconds/60) + 1
if mins < 10:
mins = f'0{mins}'
return f'{mins}:00'

def perform_safe_calculations(std_vars: list, cpf: list, volms: list, files: list, times: list, logger):
"""Perform all calculations safely to mitigate errors that come through during data collation."""
def perform_safe_calculations(std_vars: list, cpf: list, volms: list, files: list, logger) -> tuple:
"""
Perform all calculations safely to mitigate errors that arise during data collation.
:param std_vars: (list) A list of the variables collected, which should be the same across
all input files.
:param cpf: (list) The chunks per file recorded for each input file.
:param volms: (list) The total data size recorded for each input file.
:param files: (list) A list of the paths to each file.
:param logger: (obj) Logging object for info/debug/error messages.
:returns: Average values of: chunks per file (cpf), number of variables (num_vars), chunk size (avg_chunk),
spatial resolution of each chunk assuming 2:1 ratio lat/lon (spatial_res), totals of NetCDF and Kerchunk estimate
data amounts, number of files, total number of chunks and the addition percentage.
"""
kchunk_const = 167 # Bytes per Kerchunk ref (standard/typical)
if std_vars:
num_vars = len(std_vars)
Expand All @@ -179,10 +142,10 @@ def perform_safe_calculations(std_vars: list, cpf: list, volms: list, files: lis
spatial_res = None

if files and avg_vol:
data_represented = avg_vol*len(files)
netcdf_data = avg_vol*len(files)
num_files = len(files)
else:
data_represented = None
netcdf_data = None
num_files = None

if files and avg_cpf:
Expand All @@ -195,60 +158,68 @@ def perform_safe_calculations(std_vars: list, cpf: list, volms: list, files: lis
else:
addition = None

if files and len(times) > 0:
estm_time = int(np.mean(times)*len(files))
if avg_cpf and num_files:
kerchunk_data = avg_cpf * num_files * kchunk_const
else:
estm_time = 0
kerchunk_data = None

return avg_cpf, num_vars, avg_chunk, spatial_res, data_represented, num_files, total_chunks, addition, estm_time
return avg_cpf, num_vars, avg_chunk, spatial_res, netcdf_data, kerchunk_data, num_files, total_chunks, addition

def write_skip(proj_dir, proj_code, logger):
def write_skip(proj_dir: str, proj_code: str, logger) -> None:
"""
Quick function to write a 'skipped' detail file.
"""
details = {'skipped':True}
with open(f'{proj_dir}/detail-cfg.json','w') as f:
f.write(json.dumps(details))
logger.info(f'Skipped scanning - {proj_code}/detail-cfg.json blank file created')

def scan_dataset(args, files: list, logger):
def scan_dataset(args, files: list, logger) -> None:
"""Main process handler for scanning phase"""
proj_code = args.proj_code
proj_dir = args.proj_dir

detailfile = f'{proj_dir}/detail-cfg.json'
cfgfile = f'{proj_dir}/base-cfg.json'
logger.debug(f'Assessment for {proj_code}')

# Set up conditions, skip for small file count < 5
escape, is_varwarn, is_skipwarn = False, False, False
cpf, volms, times = [],[],[]
trial_files = 5
cpf, volms = [],[]

if len(files) < 5:
if len(files) < 3:
write_skip(proj_dir, proj_code, logger)
return None
else:
logger.info(f'Identified {len(files)} files for scanning')

# Perform scans for sample (max 5) files
count = 0
std_vars = None
std_chunks = None
ctypes = []
ctype = None

scanfile = files[0]
if '.' in scanfile:
ctype = f'.{scanfile.split(".")[-1]}'
else:
ctype = 'ncf3'
# Create all files in mini-kerchunk set here. Then try an assessment.
limiter = int(len(files)/20)
limiter = max(2, limiter)
limiter = min(100, limiter)

filecap = min(100,len(files))
while not escape and len(cpf) < trial_files:
logger.info(f'Attempting scan for file {count+1} (min 5, max 100)')
# Add random file selector here
scanfile = files[count]
logger.info(f'Determined {limiter} files to scan')

mini_ds = KerchunkDSProcessor(
args.proj_code,
cfg_file=cfgfile, detail_file=detailfile, workdir=args.workdir,
thorough=True, forceful=True, # Always run from scratch forcefully to get best time estimates.
version_no='trial-', verb=args.verbose, logid='0',
groupID=args.groupID, limiter=limiter)

mini_ds.create_refs()

logger.info(f'Summarising scan results for {limiter} files')
for count in range(limiter):
try:
# Measure time and ensure job will not overrun if it can be prevented.
volume, chunks_per_file, varchunks, ctype, time = perform_scan(args, scanfile, ctype, logger,
savecache=True, cache_id=str(count),
thorough=args.quality)
volume, chunks_per_file, varchunks, ctype = summarise_json(args, count, ctype, logger)
vars = sorted(list(varchunks.keys()))

# Keeping the below options although may be redundant as have already processed the files
if not std_vars:
std_vars = vars
if vars != std_vars:
Expand All @@ -261,13 +232,9 @@ def scan_dataset(args, files: list, logger):
if std_chunks[var] != varchunks[var]:
raise ConcatFatalError(var=var, chunk1=std_chunks[var], chunk2=varchunks[var])

if count == 0 and time > get_seconds(args.time_allowed)/trial_files:
raise ExpectTimeoutError(required=format_seconds(time*5), current=args.time_allowed)

cpf.append(chunks_per_file)
volms.append(volume)
ctypes.append(ctype)
times.append(time)

logger.info(f'Data recorded for file {count+1}')
except ExpectTimeoutError as err:
Expand All @@ -276,28 +243,30 @@ def scan_dataset(args, files: list, logger):
raise err
except Exception as err:
raise err
count += 1
if count >= filecap:
escape = True
if escape:
raise FilecapExceededError(filecap)

logger.info('Scan complete, compiling outputs')
logger.info('Summary complete, compiling outputs')
(avg_cpf, num_vars, avg_chunk,
spatial_res, data_represented, num_files,
total_chunks, addition, estm_time) = perform_safe_calculations(std_vars, cpf, volms, files, times, logger)
spatial_res, netcdf_data, kerchunk_data, num_files,
total_chunks, addition) = perform_safe_calculations(std_vars, cpf, volms, files, logger)

c2m = 167 # Memory for each chunk in kerchunk in B

details = {
'netcdf_data' : format_float(data_represented, logger),
'kerchunk_data' : format_float(avg_cpf * num_files * c2m, logger),
'netcdf_data' : format_float(netcdf_data, logger),
'kerchunk_data' : format_float(kerchunk_data, logger),
'num_files' : num_files,
'chunks_per_file' : safe_format(avg_cpf,'{value:.1f}'),
'total_chunks' : safe_format(total_chunks,'{value:.2f}'),
'estm_chunksize' : format_float(avg_chunk,logger),
'estm_spatial_res' : safe_format(spatial_res,'{value:.2f}') + ' deg',
'estm_time' : format_seconds(estm_time),
'timings' : {
'convert_estm' : mini_ds.convert_time,
'concat_estm' : mini_ds.concat_time,
'validate_estm' : mini_ds.validate_time,
'convert_actual' : None,
'concat_actual' : None,
'validate_actual': None,
},
'variable_count' : num_vars,
'addition' : safe_format(addition,'{value:.3f}') + ' %',
'var_err' : is_varwarn,
Expand All @@ -324,51 +293,17 @@ def scan_dataset(args, files: list, logger):
# Replace with dumping dictionary
f.write(json.dumps(details))
logger.info(f'Written output file {proj_code}/detail-cfg.json')
logger.info('Performing concatenation attempt with minimal files')
try:
assemble_trial_concatenation(args, ctype, logger)
except Exception as err:
logger.error('Error in concatenating files')
raise err

def assemble_trial_concatenation(args, ctype, logger):

cfg_file = f'{args.proj_dir}/base-cfg.json'
detail_file = f'{args.proj_dir}/detail-cfg.json'

idx_trial = Indexer(args.proj_code, cfg_file=cfg_file, detail_file=detail_file,
workdir=args.workdir, issave_meta=True, thorough=False, forceful=args.forceful,
verb=args.verbose, mode=args.mode,
bypass=args.bypass, groupID=args.groupID, limiter=2, ctype=ctype)

idx_trial.create_refs()
with open(detail_file,'w') as f:
f.write(json.dumps(idx_trial.collect_details()))
logger.debug('Collected new details into detail-cfg.json')


def scan_config(args, fh=None, logid=None, **kwargs):
def scan_config(args, fh=None, logid=None, **kwargs) -> None:
"""Configure scanning and access main section"""

logger = init_logger(args.verbose, args.mode, 'scan',fh=fh, logid=logid)
logger.debug(f'Setting up scanning process')

cfg_file = f'{args.proj_dir}/base-cfg.json'
if os.path.isfile(cfg_file):
with open(cfg_file) as f:
cfg = json.load(f)
else:
os.system(f'ls {args.proj_dir}')
logger.error(f'cfg file missing or not provided - {cfg_file}')
return None

args.workdir = get_attribute('WORKDIR', args, 'workdir')
args.groupdir = get_attribute('GROUPDIR', args, 'groupdir')

if args.groupID:
args.proj_dir = f'{args.workdir}/in_progress/{args.groupID}/{args.proj_code}'
else:
args.proj_dir = f'{args.workdir}/in_progress/{args.proj_code}'
args.proj_dir = get_proj_dir(args.proj_code, args.workdir, args.groupID)

logger.debug(f"""Extracted attributes: {args.proj_code},
{args.workdir},
Expand Down

0 comments on commit 416197a

Please sign in to comment.