Skip to content

Commit

Permalink
Adding keep_variables to reader (ARM-DOE#467)
Browse files Browse the repository at this point in the history
* Adding keep_variables to reader

* Updated description of keep_variables_to_drop_variables() function

* Fixed issue with missing datastream global attribute
  • Loading branch information
kenkehoe authored May 23, 2022
1 parent d358119 commit e0eaf7a
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 2 deletions.
13 changes: 13 additions & 0 deletions act/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""
Configuration file for the Python Atmospheric data Community Toolkit (ACT)
The values for a number of ACT parameters and the default metadata created
when reading files, correcting fields, etc. is controlled by this single
Python configuration file.
Examples:
---------
from act.config import DEFAULT_DATASTREAM_NAME
"""

DEFAULT_DATASTREAM_NAME = 'act_datastream'
114 changes: 113 additions & 1 deletion act/io/armfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import urllib
import warnings
from pathlib import Path, PosixPath
from netCDF4 import Dataset

import numpy as np
import xarray as xr

import act.utils as utils
from act.config import DEFAULT_DATASTREAM_NAME


def read_netcdf(
Expand All @@ -27,6 +29,7 @@ def read_netcdf(
cftime_to_datetime64=True,
combine_attrs='override',
cleanup_qc=False,
keep_variables=None,
**kwargs,
):
"""
Expand Down Expand Up @@ -63,6 +66,11 @@ def read_netcdf(
Call clean.cleanup() method to convert to standardized ancillary quality control
variables. This will not allow any keyword options, so if non-default behavior is
desired will need to call clean.cleanup() method on the object after reading the data.
keep_variables : str or list of str
Variable names to read from data file. Works by creating a list of variable names
to exclude from reading and passing into open_mfdataset() via drop_variables keyword.
Still allows use of drop_variables keyword for variables not listed in first file to
read.
**kwargs : keywords
Keywords to pass through to xarray.open_mfdataset().
Expand Down Expand Up @@ -93,6 +101,14 @@ def read_netcdf(
kwargs['use_cftime'] = use_cftime
kwargs['combine_attrs'] = combine_attrs

# Check if keep_variables is set. If so determine correct drop_variables
if keep_variables is not None:
drop_variables = None
if 'drop_variables' in kwargs.keys():
drop_variables = kwargs['drop_variables']
kwargs['drop_variables'] = keep_variables_to_drop_variables(
filenames, keep_variables, drop_variables=drop_variables)

# Create an exception tuple to use with try statements. Doing it this way
# so we can add the FileNotFoundError if requested. Can add more error
# handling in the future.
Expand Down Expand Up @@ -242,7 +258,7 @@ def read_netcdf(
# Ensure that we have _datastream set whether or no there's
# a datastream attribute already.
if is_arm_file_flag == 0:
ds.attrs['_datastream'] = 'act_datastream'
ds.attrs['_datastream'] = DEFAULT_DATASTREAM_NAME
else:
ds.attrs['_datastream'] = ds.attrs['datastream']

Expand All @@ -254,6 +270,94 @@ def read_netcdf(
return ds


def keep_variables_to_drop_variables(
filenames,
keep_variables,
drop_variables=None):
"""
Returns a list of variable names to exclude from reading by passing into
`Xarray.open_dataset` drop_variables keyword. This can greatly help reduce
loading time and disk space use of the Dataset.
When passed a netCDF file name, will open the file using the netCDF4 library to get
list of variable names. There is less overhead reading the varible names using
netCDF4 library than Xarray. If more than one filename is provided or string is
used for shell syntax globbing, will use the first file in the list.
Parameters
----------
filenames : str, pathlib.PosixPath or list of str
Name of file(s) to read.
keep_variables : str or list of str
Variable names desired to keep. Do not need to list associated dimention
names. These will be automatically kept as well.
drop_variables : str or list of str
Variable names to explicitly add to returned list. May be helpful if a variable
exists in a file that is not in the first file in the list.
Returns
-------
act_obj : list of str
Variable names to exclude from returned Dataset by using drop_variables keyword
when calling Xarray.open_dataset().
Examples
--------
.. code-block :: python
import act
filename = '/data/datastream/hou/houkasacrcfrM1.a1/houkasacrcfrM1.a1.20220404.*.nc'
drop_vars = act.io.armfiles.keep_variables_to_drop_variables(
filename, ['lat','lon','alt','crosspolar_differential_phase'],
drop_variables='variable_name_that_only_exists_in_last_file_of_the_day')
"""
read_variables = []
return_variables = []

if isinstance(keep_variables, str):
keep_variables = [keep_variables]

if isinstance(drop_variables, str):
drop_variables = [drop_variables]

# If filenames is a list subset to first file name.
if isinstance(filenames, (list, tuple)):
filename = filenames[0]
# If filenames is a string, check if it needs to be expanded in shell
# first. Then use first returned file name. Else use the string filename.
elif isinstance(filenames, str):
filename = glob.glob(filenames)
if len(filename) == 0:
return return_variables
else:
filename.sort()
filename = filename[0]

# Use netCDF4 library to extract the variable and dimension names.
rootgrp = Dataset(filename, 'r')
read_variables = list(rootgrp.variables)
dimensions = list(rootgrp.dimensions)
# Loop over the variables to exclude needed coordinate dimention names.
dims_to_keep = []
for var_name in keep_variables:
try:
dims_to_keep.extend(list(rootgrp[var_name].dimensions))
except IndexError:
pass

rootgrp.close()

# Remove names not matching keep_varibles excluding the associated coordinate dimentions
return_variables = set(read_variables) - set(keep_variables) - set(dims_to_keep)

# Add drop_variables to list
if drop_variables is not None:
return_variables = set(return_variables) | set(drop_variables)

return list(return_variables)


def check_arm_standards(ds):
"""
Expand All @@ -273,6 +377,14 @@ def check_arm_standards(ds):
if 'datastream' not in ds.attrs.keys():
the_flag = 0

# Check if the historical global attribute name is
# used instead of updated name of 'datastream'. If so
# correct the global attributes and flip flag.
if 'zeb_platform' in ds.attrs.keys():
ds.attrs['datastream'] = copy.copy(ds.attrs['zeb_platform'])
del ds.attrs['zeb_platform']
the_flag = 1 << 0

return the_flag


Expand Down
7 changes: 6 additions & 1 deletion act/qc/arm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
"""

import datetime as dt

import numpy as np
import requests

from act.config import DEFAULT_DATASTREAM_NAME


def add_dqr_to_qc(
obj,
Expand Down Expand Up @@ -61,6 +62,10 @@ def add_dqr_to_qc(
else:
raise ValueError('Object does not have datastream attribute')

if datastream == DEFAULT_DATASTREAM_NAME:
raise ValueError("'datastream' name required for DQR service set to default value "
f"{datastream}. Unable to perform DQR service query.")

# Clean up QC to conform to CF conventions
obj.clean.cleanup()

Expand Down
42 changes: 42 additions & 0 deletions act/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,48 @@ def test_io():
sonde_ds.close()


def test_keep_variables():

var_names = ['temp_mean', 'rh_mean', 'wdir_vec_mean', 'tbrg_precip_total_corr',
'atmos_pressure', 'wspd_vec_mean', 'pwd_pw_code_inst', 'pwd_pw_code_15min',
'pwd_mean_vis_10min', 'logger_temp', 'pwd_precip_rate_mean_1min',
'pwd_cumul_snow', 'pwd_mean_vis_1min', 'pwd_pw_code_1hr', 'org_precip_rate_mean',
'tbrg_precip_total', 'pwd_cumul_rain']
var_names = var_names + ['qc_' + ii for ii in var_names]
drop_variables = act.io.armfiles.keep_variables_to_drop_variables(
act.tests.EXAMPLE_MET1, var_names)

expected_drop_variables = [
'wdir_vec_std', 'base_time', 'alt', 'qc_wspd_arith_mean', 'pwd_err_code', 'logger_volt',
'temp_std', 'lon', 'qc_logger_volt', 'time_offset', 'wspd_arith_mean', 'lat', 'vapor_pressure_std',
'vapor_pressure_mean', 'rh_std', 'qc_vapor_pressure_mean']
assert drop_variables.sort() == expected_drop_variables.sort()

ds_object = act.io.armfiles.read_netcdf(act.tests.EXAMPLE_MET1, keep_variables='temp_mean')
assert list(ds_object.data_vars) == ['temp_mean']
del ds_object

var_names = ['temp_mean', 'qc_temp_mean']
ds_object = act.io.armfiles.read_netcdf(act.tests.EXAMPLE_MET1, keep_variables=var_names,
drop_variables='nonsense')
assert list(ds_object.data_vars).sort() == var_names.sort()
del ds_object

var_names = ['temp_mean', 'qc_temp_mean', 'alt', 'lat', 'lon']
ds_object = act.io.armfiles.read_netcdf(act.tests.EXAMPLE_MET_WILDCARD, keep_variables=var_names,
drop_variables=['lon'])
var_names = list(set(var_names) - set(['lon']))
assert list(ds_object.data_vars).sort() == var_names.sort()
del ds_object

filenames = Path(act.tests.EXAMPLE_MET_WILDCARD).parent
filenames = list(filenames.glob(Path(act.tests.EXAMPLE_MET_WILDCARD).name))
var_names = ['temp_mean', 'qc_temp_mean', 'alt', 'lat', 'lon']
ds_object = act.io.armfiles.read_netcdf(filenames, keep_variables=var_names)
assert list(ds_object.data_vars).sort() == var_names.sort()
del ds_object


def test_io_mfdataset():
met_ds = act.io.armfiles.read_netcdf(act.tests.EXAMPLE_MET_WILDCARD)
met_ds.load()
Expand Down

0 comments on commit e0eaf7a

Please sign in to comment.