Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into io_v2
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamTheisen committed Nov 15, 2023
2 parents afe9ecb + 756acb3 commit 1622cb9
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 149 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package-conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
- name: Setup Conda Environment
uses: mamba-org/setup-micromamba@v1
with:
python-version: ${{ matrix.python-version }}
create-args: python=${{ matrix.python-version }}
environment-file: ./continuous_integration/environment_actions.yml
environment-name: act_env

Expand Down
52 changes: 31 additions & 21 deletions act/io/arm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,26 @@
"""

import copy
import datetime as dt
import glob
import json
import re
import tarfile
import tempfile
import urllib
import warnings
from pathlib import Path, PosixPath
from netCDF4 import Dataset
from os import PathLike
import tarfile
import tempfile
from pathlib import Path, PosixPath

from cftime import num2date
import numpy as np
import xarray as xr
import datetime as dt
from cftime import num2date
from netCDF4 import Dataset

import act
import act.utils as utils
from act.config import DEFAULT_DATASTREAM_NAME
from act.utils.io_utils import unpack_tar, unpack_gzip, cleanup_files, is_gunzip_file
from act.utils.io_utils import cleanup_files, is_gunzip_file, unpack_gzip, unpack_tar


def read_arm_netcdf(
Expand Down Expand Up @@ -132,7 +132,8 @@ def read_arm_netcdf(
if 'drop_variables' in kwargs.keys():
drop_variables = kwargs['drop_variables']
kwargs['drop_variables'] = keep_variables_to_drop_variables(
filenames, keep_variables, drop_variables=drop_variables)
filenames, keep_variables, drop_variables=drop_variables
)

# Create an exception tuple to use with try statements. Doing it this way
# so we can add the FileNotFoundError if requested. Can add more error
Expand Down Expand Up @@ -173,7 +174,9 @@ def read_arm_netcdf(
# If requested use base_time and time_offset to derive time. Assumes that the units
# of both are in seconds and that the value is number of seconds since epoch.
if use_base_time:
time = num2date(ds['base_time'].values + ds['time_offset'].values, ds['base_time'].attrs['units'])
time = num2date(
ds['base_time'].values + ds['time_offset'].values, ds['base_time'].attrs['units']
)
time = time.astype('datetime64[ns]')

# Need to use a new Dataset creation to correctly index time for use with
Expand Down Expand Up @@ -275,10 +278,7 @@ def read_arm_netcdf(
return ds


def keep_variables_to_drop_variables(
filenames,
keep_variables,
drop_variables=None):
def keep_variables_to_drop_variables(filenames, keep_variables, drop_variables=None):
"""
Returns a list of variable names to exclude from reading by passing into
`Xarray.open_dataset` drop_variables keyword. This can greatly help reduce
Expand Down Expand Up @@ -342,7 +342,6 @@ def keep_variables_to_drop_variables(
# Use netCDF4 library to extract the variable and dimension names.
rootgrp = Dataset(filename, 'r')
read_variables = list(rootgrp.variables)
dimensions = list(rootgrp.dimensions)
# Loop over the variables to exclude needed coordinate dimention names.
dims_to_keep = []
for var_name in keep_variables:
Expand Down Expand Up @@ -395,7 +394,9 @@ def check_arm_standards(ds):
return the_flag


def create_ds_from_arm_dod(proc, set_dims, version='', fill_value=-9999.0, scalar_fill_dim=None, local_file=False):
def create_ds_from_arm_dod(
proc, set_dims, version='', fill_value=-9999.0, scalar_fill_dim=None, local_file=False
):
"""
Queries the ARM DOD api and builds a dataset based on the ARM DOD and
Expand Down Expand Up @@ -626,7 +627,9 @@ def write_netcdf(
try:
att_values = write_ds[var_name].attrs[attr_name]
if isinstance(att_values, (list, tuple)):
att_values = [att_value.replace(' ', join_char) for att_value in att_values]
att_values = [
att_value.replace(' ', join_char) for att_value in att_values
]
write_ds[var_name].attrs[attr_name] = ' '.join(att_values)

except KeyError:
Expand Down Expand Up @@ -754,9 +757,16 @@ def write_netcdf(
pass
current_time = dt.datetime.now().replace(microsecond=0)
if 'history' in list(write_ds.attrs.keys()):
write_ds.attrs['history'] += ''.join(['\n', str(current_time), ' created by ACT ', str(act.__version__),
' act.io.write.write_netcdf'])

write_ds.attrs['history'] += ''.join(
[
'\n',
str(current_time),
' created by ACT ',
str(act.__version__),
' act.io.write.write_netcdf',
]
)

if hasattr(write_ds, 'time_bounds') and not write_ds.time.encoding:
write_ds.time.encoding.update(write_ds.time_bounds.encoding)

Expand Down Expand Up @@ -825,7 +835,7 @@ def read_arm_mmcr(filenames):
# read it in with xarray
multi_ds = []
for f in filenames:
nc = Dataset(f, "a")
nc = Dataset(f, 'a')
# Change heights name to range to read appropriately to xarray
if 'heights' in nc.dimensions:
nc.renameDimension('heights', 'range')
Expand Down Expand Up @@ -873,7 +883,7 @@ def read_arm_mmcr(filenames):
data=data,
coords={time_name: ds['time'].values[idx], range_name: range_data[idy]},
dims=[time_name, range_name],
attrs=attrs
attrs=attrs,
)
ds[new_var_name] = da

Expand Down
155 changes: 83 additions & 72 deletions act/qc/arm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import datetime as dt
import numpy as np
import requests
import json

from act.config import DEFAULT_DATASTREAM_NAME

Expand Down Expand Up @@ -71,7 +72,7 @@ def add_dqr_to_qc(
Returns
-------
ds : xarray.Dataset
Xarray dataset containing new quality control variables
Xarray dataset containing new or updated quality control variables
Examples
--------
Expand Down Expand Up @@ -99,93 +100,103 @@ def add_dqr_to_qc(
if cleanup_qc:
ds.clean.cleanup()

# In order to properly flag data, get all variables if None. Exclude QC variables.
if variable is None:
variable = list(set(ds.data_vars) - set(ds.clean.matched_qc_variables))
start_date = ds['time'].values[0].astype('datetime64[s]').astype(dt.datetime).strftime('%Y%m%d')
end_date = ds['time'].values[-1].astype('datetime64[s]').astype(dt.datetime).strftime('%Y%m%d')

# Clean up assessment to ensure it is a string with no spaces.
if isinstance(assessment, (list, tuple)):
assessment = ','.join(assessment)

# Not strictly needed but should make things more better.
assessment = assessment.replace(' ', '')
assessment = assessment.lower()

# Create URL
url = 'https://dqr-web-service.svcs.arm.gov/dqr_full'
url += f"/{datastream}"
url += f"/{start_date}/{end_date}"
url += f"/{assessment}"

# Call web service
req = requests.get(url)

# Check status values and raise error if not successful
status = req.status_code
if status == 400:
raise ValueError('Check parameters')
if status == 500:
raise ValueError('DQR Webservice Temporarily Down')

# Convert from string to dictionary
docs = json.loads(req.text)

# If no DQRs found will not have a key with datastream.
# The status will also be 404.
try:
docs = docs[datastream]
except KeyError:
return ds

dqr_results = {}
for quality_category in docs:
for dqr_number in docs[quality_category]:
if exclude is not None and dqr_number in exclude:
continue

if include is not None and dqr_number not in include:
continue

index = np.array([], dtype=np.int32)
for time_range in docs[quality_category][dqr_number]['dates']:
starttime = np.datetime64(time_range['start_date'])
endtime = np.datetime64(time_range['end_date'])
ind = np.where((ds['time'].values >= starttime) & (ds['time'].values <= endtime))
if ind[0].size > 0:
index = np.append(index, ind[0])

if index.size > 0:
dqr_results[dqr_number] = {
'index': index,
'test_assessment': quality_category.lower().capitalize(),
'test_meaning': f"{dqr_number} : {docs[quality_category][dqr_number]['description']}",
'variables': docs[quality_category][dqr_number]['variables'],
}

if dqr_link:
print(f"{dqr_number} - {quality_category.lower().capitalize()}: "
f"https://adc.arm.gov/ArchiveServices/DQRService?dqrid={dqr_number}")

# Check to ensure variable is list
if not isinstance(variable, (list, tuple)):
if variable and not isinstance(variable, (list, tuple)):
variable = [variable]

# Loop through each variable and call web service for that variable
loc_vars = ['lat', 'lon', 'alt', 'latitude', 'longitude', 'altitude']
for var_name in variable:
if skip_location_vars:
if var_name in loc_vars:
continue
# Create URL
url = 'http://www.archive.arm.gov/dqrws/ARMDQR?datastream='
url += datastream
url += '&varname=' + var_name
url += ''.join(
[
'&searchmetric=',
assessment,
'&dqrfields=dqrid,starttime,endtime,metric,subject',
]
)

# Call web service
req = requests.get(url)

# Check status values and raise error if not successful
status = req.status_code
if status == 400:
raise ValueError('Check parameters')
if status == 500:
raise ValueError('DQR Webservice Temporarily Down')

# Get data and run through each dqr
dqrs = req.text.splitlines()
time = ds['time'].values
dqr_results = {}
for line in dqrs:
line = line.split('|')
dqr_no = line[0]

# Exclude DQRs if in list
if exclude is not None and dqr_no in exclude:
continue
for key, value in dqr_results.items():
for var_name in value['variables']:

# Only include if in include list
if include is not None and dqr_no not in include:
# Do not process on location variables
if skip_location_vars and var_name in loc_vars:
continue

starttime = np.datetime64(dt.datetime.utcfromtimestamp(int(line[1])))
endtime = np.datetime64(dt.datetime.utcfromtimestamp(int(line[2])))
ind = np.where((time >= starttime) & (time <= endtime))

if ind[0].size == 0:
# Only process provided variable names
if variable is not None and var_name not in variable:
continue

if 'time' not in ds[var_name].dims:
ind = np.where((ds[var_name].values == ds[var_name].values) | (np.isnan(ds[var_name].values)))
if np.size(ind) == 1:
ind = ind[0]

if dqr_no in dqr_results.keys():
dqr_results[dqr_no]['index'] = np.append(dqr_results[dqr_no]['index'], ind)
else:
dqr_results[dqr_no] = {
'index': ind,
'test_assessment': line[3],
'test_meaning': ': '.join([dqr_no, line[-1]]),
}
if dqr_link:
print_url = 'https://adc.arm.gov/ArchiveServices/DQRService?dqrid=' + str(dqr_no)
print(dqr_no, '-', line[3], ':', print_url)
for key, value in dqr_results.items():
try:
ds.qcfilter.add_test(
var_name,
index=value['index'],
index=np.unique(value['index']),
test_meaning=value['test_meaning'],
test_assessment=value['test_assessment'],
)
test_assessment=value['test_assessment'])

except KeyError: # Variable name not in Dataset
continue

except IndexError:
print(f"Skipping '{var_name}' DQR application because of IndexError")
continue

if normalize_assessment:
ds.clean.normalize_assessment(variables=var_name)
if normalize_assessment:
ds.clean.normalize_assessment(variables=var_name)

return ds
24 changes: 12 additions & 12 deletions act/qc/bsrn_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ def bsrn_comparison_tests(
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=RuntimeWarning)
if use_dask and isinstance(self._ds[glb_diffuse_SW_dn_name].data, da.Array):
sum_sw_down = (self._ds[glb_diffuse_SW_dn_name].data +
self._ds[direct_normal_SW_dn_name].data * np.cos(np.radians(sza)))
sum_sw_down = (self._ds[glb_diffuse_SW_dn_name].data
+ self._ds[direct_normal_SW_dn_name].data * np.cos(np.radians(sza)))
sum_sw_down[sum_sw_down < 50] = np.nan
ratio = self._ds[gbl_SW_dn_name].data / sum_sw_down
index_a = sza < 75
Expand All @@ -445,8 +445,8 @@ def bsrn_comparison_tests(
index_4 = da.where((ratio < 0.85) & index_b, True, False)
index = (index_1 | index_2 | index_3 | index_4).compute()
else:
sum_sw_down = (self._ds[glb_diffuse_SW_dn_name].values +
self._ds[direct_normal_SW_dn_name].values * np.cos(np.radians(sza)))
sum_sw_down = (self._ds[glb_diffuse_SW_dn_name].values
+ self._ds[direct_normal_SW_dn_name].values * np.cos(np.radians(sza)))
sum_sw_down[sum_sw_down < 50] = np.nan
ratio = self._ds[gbl_SW_dn_name].values / sum_sw_down
index_a = sza < 75
Expand Down Expand Up @@ -505,14 +505,14 @@ def bsrn_comparison_tests(
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=RuntimeWarning)
if use_dask and isinstance(self._ds[glb_diffuse_SW_dn_name].data, da.Array):
sum_sw_down = (self._ds[glb_diffuse_SW_dn_name].data +
self._ds[direct_normal_SW_dn_name].data * np.cos(np.radians(sza)))
sum_sw_down = (self._ds[glb_diffuse_SW_dn_name].data
+ self._ds[direct_normal_SW_dn_name].data * np.cos(np.radians(sza)))

sum_sw_down[sum_sw_down < 50] = np.nan
index = da.where(self._ds[glb_SW_up_name].data > sum_sw_down, True, False).compute()
else:
sum_sw_down = (self._ds[glb_diffuse_SW_dn_name].values +
self._ds[direct_normal_SW_dn_name].values * np.cos(np.radians(sza)))
sum_sw_down = (self._ds[glb_diffuse_SW_dn_name].values
+ self._ds[direct_normal_SW_dn_name].values * np.cos(np.radians(sza)))
sum_sw_down[sum_sw_down < 50] = np.nan
index = self._ds[glb_SW_up_name].values > sum_sw_down

Expand Down Expand Up @@ -577,10 +577,10 @@ def bsrn_comparison_tests(
f'for {test_options[3]} test.')

if use_dask and isinstance(self._ds[glb_LW_dn_name].data, da.Array):
index_1 = da.where(self._ds[glb_LW_dn_name].data >
(self._ds[glb_LW_up_name].data + LWdn_lt_LWup_component), True, False)
index_2 = da.where(self._ds[glb_LW_dn_name].data <
(self._ds[glb_LW_up_name].data - LWdn_gt_LWup_component), True, False)
index_1 = da.where(self._ds[glb_LW_dn_name].data
> (self._ds[glb_LW_up_name].data + LWdn_lt_LWup_component), True, False)
index_2 = da.where(self._ds[glb_LW_dn_name].data
< (self._ds[glb_LW_up_name].data - LWdn_gt_LWup_component), True, False)
index = (index_1 | index_2).compute()
else:
index_1 = self._ds[glb_LW_dn_name].values > (self._ds[glb_LW_up_name].values + LWdn_lt_LWup_component)
Expand Down
Loading

0 comments on commit 1622cb9

Please sign in to comment.