Skip to content

Commit

Permalink
Merge pull request #6 from ddasilva/metadata-writer
Browse files Browse the repository at this point in the history
Work in progress code for Metadata Manager and NetCDF writer classes
  • Loading branch information
MSKirk authored Nov 20, 2024
2 parents 930628f + 095c118 commit 1096ad0
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 42 deletions.
3 changes: 1 addition & 2 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
name: suncet
channels:
- conda-forge
# - defaults

dependencies:
- pip >=22.1
Expand All @@ -21,5 +20,5 @@ dependencies:
- gnuradio-satellites=5.2.0
- termcolor==2.4.0
- pytest==7.1.3
- h5netcdf==1.1.0
- pykdtree==1.3.13
#prefix: ~/anaconda3
157 changes: 118 additions & 39 deletions suncet_processing_pipeline/make_level3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,66 +2,145 @@
This is the code to make the Level 3 data product.
"""
import argparse
from pathlib import Path
from pprint import pprint

import h5netcdf
import numpy as np
from termcolor import cprint

from . import config_parser
from . import metadata_mgr

class Level3:
"""Class for applying the Level2 -> Level3 processing stage.

The key method is `run()`, which acts lik a main() method for
this stage.
"""
def __init__(self, config):
class Level3:
"""Class for applying the Level2 -> Level3 processing stage."""
def __init__(self, run_name, config):
"""
Parameters
----------
level2_data : dict, str -> array
Level 2 data, mapping internal variable names to their values
(generally numpy arrays)
config : config_parser.Config
SunCET Data Processing Pipeline configration object
Args
run_name: string, name of run we are processing
config, config_parser.Config, SunCET Data Processing Pipeline
configration object
"""
self.run_name = run_name
self.run_dir = Path('processing_runs') / run_name
self.config = config

if not self.run_dir.exists():
raise RuntimeError(f'Could not find directory {self.run_dir}')

def run(self):
"""Main method to process the level2 -> level3 stage."""
# Parse command line arguments
parser = self._get_parser()
args = parser.parse_args()
# Load metadata
metadata = metadata_mgr.MetadataManager(self.run_dir)

def _get_parser(self):
"""Get command line ArgumentParser object with options defined.
Returns
-------
parser : argparse.ArgumentParser
object which can be used to parse command line objects
# Start NetCDF File
nc_output_path = self.run_dir / 'level3' / 'suncet_level3.nc'
nc = Level3NetCDFWriter(nc_output_path, metadata)

# Write some blank values
nc.write_variable('carring_lat', np.zeros(100))
nc.write_variable('carring_long', np.ones(100))
nc.close()


class Level3NetCDFWriter:
"""Class for writing Level3 NetCDF Output."""
def __init__(self, output_path, metadata):
self._output_path = output_path
self._metadata = metadata
self._nc_file = h5netcdf.File(self._output_path, 'w')

def write_variable(self, internal_name, variable_value):
"""Write a variable and its associated metadata to the file.
This function is passed the internal name of the variable, and uses
the metadata manager to look up the NetCDF4 name and associated
attrbutes.
Args
internal_name: Internal name of variable (within code)
variable_value: Value for the variable in the file
"""
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', help='Print more debugging output')
variable_name = self._metadata.get_netcdf4_variable_name(internal_name)

# Wrote variable data
print(f'Writing internal variable ', end='')
cprint(internal_name, 'yellow', end='')
print(f' NetCDF variable ', end='')
cprint(variable_name, 'yellow')

return parser
# TODO: this is broken
self._nc_file.dimensions[variable_name + '_dim'] = variable_value.shape

nc_variable = self._nc_file.create_variable(
name=variable_name,
dimensions=(variable_name + '_dim',),
dtype=variable_value.dtype
)

nc_variable[:] = variable_value

# Write variable attributes
attrs = self._metadata.get_netcdf4_attrs(internal_name)

print('attributes:')
pprint(attrs)

for key, value in attrs.items():
nc_variable.attrs[key] = value

print()

def close(self):
"""Close the NetCDF file, commiting all changes."""
self._nc_file.close()


def final_shdr_compositing_fix(level2_data, config):
"""Fix any lingaring SHDR Compositing Issues.
Parameters
----------
level2_data : dict, str -> array
Level 2 data, mapping internal variable names to their values
(generally numpy arrays)
config : config_parser.Config
SunCET Data Processing Pipeline configration object
Args
level2_data : dict, str -> array
Level 2 data, mapping internal variable names to their values
(generally numpy arrays)
config : config_parser.Config
SunCET Data Processing Pipeline configration object
Returns
-------
level2_data_fixed : dict, str -> array
Copy of level2 data with the fix applied.
Returns
level2_data_fixed : dict, str -> array
Copy of level2 data with the fix applied.
"""
raise NotImplementedError()


def _get_parser():
"""Get command line ArgumentParser object with options defined.
Returns
object which can be used to parse command line objects
"""
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--run-name', type=str, required=True,
help='String name of the run')
parser.add_argument('-v', '--verbose', help='Print more debugging output')

return parser

if __name__ == '__main__':
level3 = Level3()

def main():
"""Main method when running this script directly."""
args = _get_parser().parse_args()

# Load config
config_filename = Path('processing_runs') / args.run_name / 'config.ini'
config = config_parser.ConfigParser()
config.read(config_filename)

# Call run() method on Level3 class
level3 = Level3(args.run_name, config)
level3.run()


if __name__ == '__main__':
main()
135 changes: 135 additions & 0 deletions suncet_processing_pipeline/metadata_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import pandas as pd
from pathlib import Path


class MetadataManager:
"""Class for interacting with SunCET Metadata files.
This expect the metadata is downloaded into the run directory. To do that,
see: setup_minimum_required_folders_files.py
"""
def __init__(self, run_dir):
"""Initialize a metadata manager from a run directory, which
is expected to have the required files.
Args
run_dir: Path to run directory
"""
# Set paths and check they exist
self._metadata_path = Path(run_dir) / 'suncet_metadata_definition.csv'
self._metadata_ver_path = Path(run_dir) / 'suncet_metadata_definition_version.csv'

if not self._metadata_path.exists():
raise FileNotFoundError(
f"Error: could not find metadata at {self._metadata_path}"
)

if not self._metadata_ver_path.exists():
raise FileNotFoundError(
f"Error: could not find metadata version at {self._metadata_path}"
)

# Load metadata CSV using Pandas
print(f'Reading metadata from {self._metadata_path}')
self._metadata_df = pd.read_csv(self._metadata_path)
self._metadata_df = _clean_metadata_comments(self._metadata_df)

# Load metadata version (just read string from text file)
with open(self._metadata_ver_path) as fh:
self._metadata_ver = fh.read().strip()

print(f'Found metadata version "{self._metadata_ver}"')

# Convert metadata df to dictionary mapping internal name to dictionary
# of columns -> values
self._metadata_dict = _get_metadata_dict(self._metadata_df)

def get_netcdf4_variable_name(self, internal_name):
"""Get name of variable for writing to a NetCDF4 file
Args
internal_name: Internal name of variable (within code)
Returns
what that internal name should be called in a NetCDF4 file
"""
# Ensure variable is in the metadata dictionary
if internal_name not in self._metadata_dict:
raise RuntimeError(
f"Could not find metadata for variable with internal name '{internal_name}'"
)

# Get the variable name, raising Exception if its not filled out in the
# table
var_name = self._metadata_dict[internal_name]['netCDF variable name']

if not var_name:
raise RuntimeError(
'Needed NetCDF variable name for internal name "{internal_name}", but missing'
)

# Return good result
return var_name

def get_netcdf4_attrs(self, internal_name):
"""Get dictionary of static NetCDF4 attributes for a given variable.
Args
internal_name: Internal name of variable (within code)
Returns
dictionary of attribute keys to values
"""
# Ensure variable is in the metadata dictionary
if internal_name not in self._metadata_dict:
raise RuntimeError(
f"Could not find metadata for variable with internal name "
f"'{internal_name}'."
)
# Load variable dict and return subset of keys that are relevant
var_dict = self._metadata_dict[internal_name]

return {
"units": var_dict["units (human)"]
}


def _get_metadata_dict(metadata_df):
"""Convert metadata dataframe to dictinoary mapping internal name
to dictionary of cols to values.
Args
metadata_df: Metadata dictionary as loaded from flie with comments
cleaned
Returns
dictionary mapping internal names to dictionaries holding the
row information.
"""
metadata_dict = {}

for _, row in metadata_df.iterrows():
cur_dict = {col: row[col] for col in metadata_df.columns}
cur_key = row['Internal Variable Name']

metadata_dict[cur_key] = cur_dict

return metadata_dict


def _clean_metadata_comments(metadata_df):
"""Remove comment rows from the metadata Data Frame.
A command has the work "COMMENT" in the first column
Args
dataframe as loaded directly from CSV file
Returns
dataframe with comment row dropped
"""
collected_rows = []
first_row = metadata_df.columns[0]

for _, row in metadata_df.iterrows():
if 'COMMENT' not in row[first_row].upper():
collected_rows.append(row)

return pd.DataFrame(collected_rows)

9 changes: 8 additions & 1 deletion suncet_processing_pipeline/tests/make_level3_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import tempfile
from .. import config_parser, make_level3


Expand All @@ -7,6 +8,12 @@ def test_Level3_object_instantiates():
os.path.dirname(__file__), '..', 'config_files',
'config_default.ini'
)

config = config_parser.Config(default_config)
make_level3.Level3(config)
temp_dir = tempfile.TemporaryDirectory()

try:
make_level3.Level3(temp_dir.name, config)
finally:
temp_dir.cleanup()

0 comments on commit 1096ad0

Please sign in to comment.