diff --git a/act/qc/arm.py b/act/qc/arm.py index 91e760b1d8..0fb84597d4 100644 --- a/act/qc/arm.py +++ b/act/qc/arm.py @@ -7,6 +7,7 @@ import datetime as dt import numpy as np import requests +import json from act.config import DEFAULT_DATASTREAM_NAME @@ -71,7 +72,7 @@ def add_dqr_to_qc( Returns ------- ds : xarray.Dataset - Xarray dataset containing new quality control variables + Xarray dataset containing new or updated quality control variables Examples -------- @@ -99,93 +100,103 @@ def add_dqr_to_qc( if cleanup_qc: ds.clean.cleanup() - # In order to properly flag data, get all variables if None. Exclude QC variables. - if variable is None: - variable = list(set(ds.data_vars) - set(ds.clean.matched_qc_variables)) + start_date = ds['time'].values[0].astype('datetime64[s]').astype(dt.datetime).strftime('%Y%m%d') + end_date = ds['time'].values[-1].astype('datetime64[s]').astype(dt.datetime).strftime('%Y%m%d') + + # Clean up assessment to ensure it is a string with no spaces. + if isinstance(assessment, (list, tuple)): + assessment = ','.join(assessment) + + # Not strictly needed but should make things more better. + assessment = assessment.replace(' ', '') + assessment = assessment.lower() + + # Create URL + url = 'https://dqr-web-service.svcs.arm.gov/dqr_full' + url += f"/{datastream}" + url += f"/{start_date}/{end_date}" + url += f"/{assessment}" + + # Call web service + req = requests.get(url) + + # Check status values and raise error if not successful + status = req.status_code + if status == 400: + raise ValueError('Check parameters') + if status == 500: + raise ValueError('DQR Webservice Temporarily Down') + + # Convert from string to dictionary + docs = json.loads(req.text) + + # If no DQRs found will not have a key with datastream. + # The status will also be 404. + try: + docs = docs[datastream] + except KeyError: + return ds + + dqr_results = {} + for quality_category in docs: + for dqr_number in docs[quality_category]: + if exclude is not None and dqr_number in exclude: + continue + + if include is not None and dqr_number not in include: + continue + + index = np.array([], dtype=np.int32) + for time_range in docs[quality_category][dqr_number]['dates']: + starttime = np.datetime64(time_range['start_date']) + endtime = np.datetime64(time_range['end_date']) + ind = np.where((ds['time'].values >= starttime) & (ds['time'].values <= endtime)) + if ind[0].size > 0: + index = np.append(index, ind[0]) + + if index.size > 0: + dqr_results[dqr_number] = { + 'index': index, + 'test_assessment': quality_category.lower().capitalize(), + 'test_meaning': f"{dqr_number} : {docs[quality_category][dqr_number]['description']}", + 'variables': docs[quality_category][dqr_number]['variables'], + } + + if dqr_link: + print(f"{dqr_number} - {quality_category.lower().capitalize()}: " + f"https://adc.arm.gov/ArchiveServices/DQRService?dqrid={dqr_number}") # Check to ensure variable is list - if not isinstance(variable, (list, tuple)): + if variable and not isinstance(variable, (list, tuple)): variable = [variable] - # Loop through each variable and call web service for that variable loc_vars = ['lat', 'lon', 'alt', 'latitude', 'longitude', 'altitude'] - for var_name in variable: - if skip_location_vars: - if var_name in loc_vars: - continue - # Create URL - url = 'http://www.archive.arm.gov/dqrws/ARMDQR?datastream=' - url += datastream - url += '&varname=' + var_name - url += ''.join( - [ - '&searchmetric=', - assessment, - '&dqrfields=dqrid,starttime,endtime,metric,subject', - ] - ) - - # Call web service - req = requests.get(url) - - # Check status values and raise error if not successful - status = req.status_code - if status == 400: - raise ValueError('Check parameters') - if status == 500: - raise ValueError('DQR Webservice Temporarily Down') - - # Get data and run through each dqr - dqrs = req.text.splitlines() - time = ds['time'].values - dqr_results = {} - for line in dqrs: - line = line.split('|') - dqr_no = line[0] - - # Exclude DQRs if in list - if exclude is not None and dqr_no in exclude: - continue + for key, value in dqr_results.items(): + for var_name in value['variables']: - # Only include if in include list - if include is not None and dqr_no not in include: + # Do not process on location variables + if skip_location_vars and var_name in loc_vars: continue - starttime = np.datetime64(dt.datetime.utcfromtimestamp(int(line[1]))) - endtime = np.datetime64(dt.datetime.utcfromtimestamp(int(line[2]))) - ind = np.where((time >= starttime) & (time <= endtime)) - - if ind[0].size == 0: + # Only process provided variable names + if variable is not None and var_name not in variable: continue - if 'time' not in ds[var_name].dims: - ind = np.where((ds[var_name].values == ds[var_name].values) | (np.isnan(ds[var_name].values))) - if np.size(ind) == 1: - ind = ind[0] - - if dqr_no in dqr_results.keys(): - dqr_results[dqr_no]['index'] = np.append(dqr_results[dqr_no]['index'], ind) - else: - dqr_results[dqr_no] = { - 'index': ind, - 'test_assessment': line[3], - 'test_meaning': ': '.join([dqr_no, line[-1]]), - } - if dqr_link: - print_url = 'https://adc.arm.gov/ArchiveServices/DQRService?dqrid=' + str(dqr_no) - print(dqr_no, '-', line[3], ':', print_url) - for key, value in dqr_results.items(): try: ds.qcfilter.add_test( var_name, - index=value['index'], + index=np.unique(value['index']), test_meaning=value['test_meaning'], - test_assessment=value['test_assessment'], - ) + test_assessment=value['test_assessment']) + + except KeyError: # Variable name not in Dataset + continue + except IndexError: print(f"Skipping '{var_name}' DQR application because of IndexError") + continue - if normalize_assessment: - ds.clean.normalize_assessment(variables=var_name) + if normalize_assessment: + ds.clean.normalize_assessment(variables=var_name) return ds diff --git a/act/tests/test_qc.py b/act/tests/test_qc.py index ea6d6ffa60..bfeaaab667 100644 --- a/act/tests/test_qc.py +++ b/act/tests/test_qc.py @@ -90,45 +90,61 @@ def test_qc_test_errors(): def test_arm_qc(): # Test DQR Webservice using known DQR variable = 'wspd_vec_mean' - qc_variable = 'qc_' + variable ds = read_netcdf(EXAMPLE_METE40) + ds_org = copy.deepcopy(ds) + qc_variable = ds.qcfilter.check_for_ancillary_qc(variable) - # DQR webservice does go down, so ensure it - # properly runs first before testing + # DQR webservice does go down, so ensure it properly runs first before testing try: - ds = add_dqr_to_qc(ds, variable=variable) - ran = True - ds.attrs['_datastream'] = ds.attrs['datastream'] - del ds.attrs['datastream'] - ds2 = add_dqr_to_qc(ds, variable=variable) - ds3 = add_dqr_to_qc(ds) - add_dqr_to_qc(ds, variable=variable, exclude=['D190529.4']) - add_dqr_to_qc(ds, variable=variable, include=['D400101.1']) - with np.testing.assert_raises(ValueError): - del ds.attrs['_datastream'] - add_dqr_to_qc(ds, variable=variable) + ds = add_dqr_to_qc(ds) except ValueError: - ran = False - - if ran: - assert qc_variable in ds - dqr = [True for d in ds[qc_variable].attrs['flag_meanings'] if 'D190529.4' in d] - assert dqr[0] is True - assert 'Suspect' not in ds[qc_variable].attrs['flag_assessments'] - assert 'Incorrect' not in ds[qc_variable].attrs['flag_assessments'] - - assert qc_variable in ds2 - dqr = [True for d in ds2[qc_variable].attrs['flag_meanings'] if 'D190529.4' in d] - assert dqr[0] is True - assert 'Suspect' not in ds2[qc_variable].attrs['flag_assessments'] - assert 'Incorrect' not in ds2[qc_variable].attrs['flag_assessments'] - - assert qc_variable in ds3 - dqr = [True for d in ds3[qc_variable].attrs['flag_meanings'] if 'D190529.4' in d] - assert dqr[0] is True - assert 'Suspect' not in ds3[qc_variable].attrs['flag_assessments'] - assert 'Incorrect' not in ds3[qc_variable].attrs['flag_assessments'] + return + + assert 'Suspect' not in ds[qc_variable].attrs['flag_assessments'] + assert 'Incorrect' not in ds[qc_variable].attrs['flag_assessments'] + assert 'Bad' in ds[qc_variable].attrs['flag_assessments'] + assert 'Indeterminate' in ds[qc_variable].attrs['flag_assessments'] + + # Check that defualt will update all variables in DQR + for var_name in ['wdir_vec_mean', 'wdir_vec_std', 'wspd_arith_mean', 'wspd_vec_mean']: + qc_var = ds.qcfilter.check_for_ancillary_qc(var_name) + assert ds[qc_var].attrs['flag_meanings'][-1].startswith('D190529.4') + + # Check that variable keyword works as expected. + ds = copy.deepcopy(ds_org) + add_dqr_to_qc(ds, variable=variable) + qc_var = ds.qcfilter.check_for_ancillary_qc(variable) + assert ds[qc_var].attrs['flag_meanings'][-1].startswith('D190529.4') + qc_var = ds.qcfilter.check_for_ancillary_qc('wdir_vec_std') + assert len(ds[qc_var].attrs['flag_masks']) == 0 + + # Check that include and exclude keywords work as expected + ds = copy.deepcopy(ds_org) + add_dqr_to_qc(ds, variable=variable, exclude=['D190529.4']) + assert len(ds[qc_variable].attrs['flag_meanings']) == 4 + add_dqr_to_qc(ds, variable=variable, include=['D400101.1']) + assert len(ds[qc_variable].attrs['flag_meanings']) == 4 + add_dqr_to_qc(ds, variable=variable, include=['D190529.4']) + assert len(ds[qc_variable].attrs['flag_meanings']) == 5 + add_dqr_to_qc(ds, variable=variable, assessment='Incorrect') + assert len(ds[qc_variable].attrs['flag_meanings']) == 5 + + # Test additional keywords + add_dqr_to_qc(ds, variable=variable, assessment='Suspect', cleanup_qc=False, + dqr_link=True, skip_location_vars=True) + assert len(ds[qc_variable].attrs['flag_meanings']) == 6 + + # Default is to normalize assessment terms. Check that we can turn off. + add_dqr_to_qc(ds, variable=variable, normalize_assessment=False) + assert 'Suspect' in ds[qc_variable].attrs['flag_assessments'] + + # Test that an error is raised when no datastream global attributes + with np.testing.assert_raises(ValueError): + ds4 = copy.deepcopy(ds) + del ds4.attrs['datastream'] + del ds4.attrs['_datastream'] + add_dqr_to_qc(ds4, variable=variable) def test_qcfilter(): @@ -1454,6 +1470,7 @@ def test_scalar_dqr(): if ran: assert 'qc_lat' in ds + assert np.size(ds['qc_lon'].values) == 1 assert np.size(ds['qc_lat'].values) == 1 assert np.size(ds['qc_alt'].values) == 1 assert np.size(ds['base_time'].values) == 1