Skip to content

Commit

Permalink
Updated DQR tool to use new DQR web-service.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenkehoe committed Oct 24, 2023
1 parent edd4d53 commit 206da7b
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 106 deletions.
155 changes: 83 additions & 72 deletions act/qc/arm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import datetime as dt
import numpy as np
import requests
import json

from act.config import DEFAULT_DATASTREAM_NAME

Expand Down Expand Up @@ -71,7 +72,7 @@ def add_dqr_to_qc(
Returns
-------
ds : xarray.Dataset
Xarray dataset containing new quality control variables
Xarray dataset containing new or updated quality control variables
Examples
--------
Expand Down Expand Up @@ -99,93 +100,103 @@ def add_dqr_to_qc(
if cleanup_qc:
ds.clean.cleanup()

# In order to properly flag data, get all variables if None. Exclude QC variables.
if variable is None:
variable = list(set(ds.data_vars) - set(ds.clean.matched_qc_variables))
start_date = ds['time'].values[0].astype('datetime64[s]').astype(dt.datetime).strftime('%Y%m%d')
end_date = ds['time'].values[-1].astype('datetime64[s]').astype(dt.datetime).strftime('%Y%m%d')

# Clean up assessment to ensure it is a string with no spaces.
if isinstance(assessment, (list, tuple)):
assessment = ','.join(assessment)

# Not strictly needed but should make things more better.
assessment = assessment.replace(' ', '')
assessment = assessment.lower()

# Create URL
url = 'https://dqr-web-service.svcs.arm.gov/dqr_full'
url += f"/{datastream}"
url += f"/{start_date}/{end_date}"
url += f"/{assessment}"

# Call web service
req = requests.get(url)

# Check status values and raise error if not successful
status = req.status_code
if status == 400:
raise ValueError('Check parameters')
if status == 500:
raise ValueError('DQR Webservice Temporarily Down')

# Convert from string to dictionary
docs = json.loads(req.text)

# If no DQRs found will not have a key with datastream.
# The status will also be 404.
try:
docs = docs[datastream]
except KeyError:
return ds

dqr_results = {}
for quality_category in docs:
for dqr_number in docs[quality_category]:
if exclude is not None and dqr_number in exclude:
continue

if include is not None and dqr_number not in include:
continue

index = np.array([], dtype=np.int32)
for time_range in docs[quality_category][dqr_number]['dates']:
starttime = np.datetime64(time_range['start_date'])
endtime = np.datetime64(time_range['end_date'])
ind = np.where((ds['time'].values >= starttime) & (ds['time'].values <= endtime))
if ind[0].size > 0:
index = np.append(index, ind[0])

if index.size > 0:
dqr_results[dqr_number] = {
'index': index,
'test_assessment': quality_category.lower().capitalize(),
'test_meaning': f"{dqr_number} : {docs[quality_category][dqr_number]['description']}",
'variables': docs[quality_category][dqr_number]['variables'],
}

if dqr_link:
print(f"{dqr_number} - {quality_category.lower().capitalize()}: "
f"https://adc.arm.gov/ArchiveServices/DQRService?dqrid={dqr_number}")

# Check to ensure variable is list
if not isinstance(variable, (list, tuple)):
if variable and not isinstance(variable, (list, tuple)):
variable = [variable]

# Loop through each variable and call web service for that variable
loc_vars = ['lat', 'lon', 'alt', 'latitude', 'longitude', 'altitude']
for var_name in variable:
if skip_location_vars:
if var_name in loc_vars:
continue
# Create URL
url = 'http://www.archive.arm.gov/dqrws/ARMDQR?datastream='
url += datastream
url += '&varname=' + var_name
url += ''.join(
[
'&searchmetric=',
assessment,
'&dqrfields=dqrid,starttime,endtime,metric,subject',
]
)

# Call web service
req = requests.get(url)

# Check status values and raise error if not successful
status = req.status_code
if status == 400:
raise ValueError('Check parameters')
if status == 500:
raise ValueError('DQR Webservice Temporarily Down')

# Get data and run through each dqr
dqrs = req.text.splitlines()
time = ds['time'].values
dqr_results = {}
for line in dqrs:
line = line.split('|')
dqr_no = line[0]

# Exclude DQRs if in list
if exclude is not None and dqr_no in exclude:
continue
for key, value in dqr_results.items():
for var_name in value['variables']:

# Only include if in include list
if include is not None and dqr_no not in include:
# Do not process on location variables
if skip_location_vars and var_name in loc_vars:
continue

starttime = np.datetime64(dt.datetime.utcfromtimestamp(int(line[1])))
endtime = np.datetime64(dt.datetime.utcfromtimestamp(int(line[2])))
ind = np.where((time >= starttime) & (time <= endtime))

if ind[0].size == 0:
# Only process provided variable names
if variable is not None and var_name not in variable:
continue

if 'time' not in ds[var_name].dims:
ind = np.where((ds[var_name].values == ds[var_name].values) | (np.isnan(ds[var_name].values)))
if np.size(ind) == 1:
ind = ind[0]

if dqr_no in dqr_results.keys():
dqr_results[dqr_no]['index'] = np.append(dqr_results[dqr_no]['index'], ind)
else:
dqr_results[dqr_no] = {
'index': ind,
'test_assessment': line[3],
'test_meaning': ': '.join([dqr_no, line[-1]]),
}
if dqr_link:
print_url = 'https://adc.arm.gov/ArchiveServices/DQRService?dqrid=' + str(dqr_no)
print(dqr_no, '-', line[3], ':', print_url)
for key, value in dqr_results.items():
try:
ds.qcfilter.add_test(
var_name,
index=value['index'],
index=np.unique(value['index']),
test_meaning=value['test_meaning'],
test_assessment=value['test_assessment'],
)
test_assessment=value['test_assessment'])

except KeyError: # Variable name not in Dataset
continue

except IndexError:
print(f"Skipping '{var_name}' DQR application because of IndexError")
continue

if normalize_assessment:
ds.clean.normalize_assessment(variables=var_name)
if normalize_assessment:
ds.clean.normalize_assessment(variables=var_name)

return ds
85 changes: 51 additions & 34 deletions act/tests/test_qc.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,45 +90,61 @@ def test_qc_test_errors():
def test_arm_qc():
# Test DQR Webservice using known DQR
variable = 'wspd_vec_mean'
qc_variable = 'qc_' + variable
ds = read_netcdf(EXAMPLE_METE40)
ds_org = copy.deepcopy(ds)
qc_variable = ds.qcfilter.check_for_ancillary_qc(variable)

# DQR webservice does go down, so ensure it
# properly runs first before testing
# DQR webservice does go down, so ensure it properly runs first before testing
try:
ds = add_dqr_to_qc(ds, variable=variable)
ran = True
ds.attrs['_datastream'] = ds.attrs['datastream']
del ds.attrs['datastream']
ds2 = add_dqr_to_qc(ds, variable=variable)
ds3 = add_dqr_to_qc(ds)
add_dqr_to_qc(ds, variable=variable, exclude=['D190529.4'])
add_dqr_to_qc(ds, variable=variable, include=['D400101.1'])
with np.testing.assert_raises(ValueError):
del ds.attrs['_datastream']
add_dqr_to_qc(ds, variable=variable)
ds = add_dqr_to_qc(ds)

except ValueError:
ran = False

if ran:
assert qc_variable in ds
dqr = [True for d in ds[qc_variable].attrs['flag_meanings'] if 'D190529.4' in d]
assert dqr[0] is True
assert 'Suspect' not in ds[qc_variable].attrs['flag_assessments']
assert 'Incorrect' not in ds[qc_variable].attrs['flag_assessments']

assert qc_variable in ds2
dqr = [True for d in ds2[qc_variable].attrs['flag_meanings'] if 'D190529.4' in d]
assert dqr[0] is True
assert 'Suspect' not in ds2[qc_variable].attrs['flag_assessments']
assert 'Incorrect' not in ds2[qc_variable].attrs['flag_assessments']

assert qc_variable in ds3
dqr = [True for d in ds3[qc_variable].attrs['flag_meanings'] if 'D190529.4' in d]
assert dqr[0] is True
assert 'Suspect' not in ds3[qc_variable].attrs['flag_assessments']
assert 'Incorrect' not in ds3[qc_variable].attrs['flag_assessments']
return

assert 'Suspect' not in ds[qc_variable].attrs['flag_assessments']
assert 'Incorrect' not in ds[qc_variable].attrs['flag_assessments']
assert 'Bad' in ds[qc_variable].attrs['flag_assessments']
assert 'Indeterminate' in ds[qc_variable].attrs['flag_assessments']

# Check that defualt will update all variables in DQR
for var_name in ['wdir_vec_mean', 'wdir_vec_std', 'wspd_arith_mean', 'wspd_vec_mean']:
qc_var = ds.qcfilter.check_for_ancillary_qc(var_name)
assert ds[qc_var].attrs['flag_meanings'][-1].startswith('D190529.4')

# Check that variable keyword works as expected.
ds = copy.deepcopy(ds_org)
add_dqr_to_qc(ds, variable=variable)
qc_var = ds.qcfilter.check_for_ancillary_qc(variable)
assert ds[qc_var].attrs['flag_meanings'][-1].startswith('D190529.4')
qc_var = ds.qcfilter.check_for_ancillary_qc('wdir_vec_std')
assert len(ds[qc_var].attrs['flag_masks']) == 0

# Check that include and exclude keywords work as expected
ds = copy.deepcopy(ds_org)
add_dqr_to_qc(ds, variable=variable, exclude=['D190529.4'])
assert len(ds[qc_variable].attrs['flag_meanings']) == 4
add_dqr_to_qc(ds, variable=variable, include=['D400101.1'])
assert len(ds[qc_variable].attrs['flag_meanings']) == 4
add_dqr_to_qc(ds, variable=variable, include=['D190529.4'])
assert len(ds[qc_variable].attrs['flag_meanings']) == 5
add_dqr_to_qc(ds, variable=variable, assessment='Incorrect')
assert len(ds[qc_variable].attrs['flag_meanings']) == 5

# Test additional keywords
add_dqr_to_qc(ds, variable=variable, assessment='Suspect', cleanup_qc=False,
dqr_link=True, skip_location_vars=True)
assert len(ds[qc_variable].attrs['flag_meanings']) == 6

# Default is to normalize assessment terms. Check that we can turn off.
add_dqr_to_qc(ds, variable=variable, normalize_assessment=False)
assert 'Suspect' in ds[qc_variable].attrs['flag_assessments']

# Test that an error is raised when no datastream global attributes
with np.testing.assert_raises(ValueError):
ds4 = copy.deepcopy(ds)
del ds4.attrs['datastream']
del ds4.attrs['_datastream']
add_dqr_to_qc(ds4, variable=variable)


def test_qcfilter():
Expand Down Expand Up @@ -1454,6 +1470,7 @@ def test_scalar_dqr():

if ran:
assert 'qc_lat' in ds
assert np.size(ds['qc_lon'].values) == 1
assert np.size(ds['qc_lat'].values) == 1
assert np.size(ds['qc_alt'].values) == 1
assert np.size(ds['base_time'].values) == 1

0 comments on commit 206da7b

Please sign in to comment.