From bec3a6213db5032e1e81e46d7fd6b3e98307c376 Mon Sep 17 00:00:00 2001 From: BrianOBlanton Date: Wed, 29 Nov 2023 08:42:55 -0500 Subject: [PATCH] more mods for time series processing --- adda/adda.py | 23 +++++++++++++++++++---- harvester/get_adcirc_stations.py | 1 + harvester/get_observations_stations.py | 4 ++-- processing/compute_error_field.py | 16 ++++++++++------ test/test_adda_in_floodwater.sh | 14 ++++++++++---- 5 files changed, 42 insertions(+), 16 deletions(-) diff --git a/adda/adda.py b/adda/adda.py index 999b5a0..c4414ad 100755 --- a/adda/adda.py +++ b/adda/adda.py @@ -147,7 +147,10 @@ def main(args): knockout_dict=None, fort63_style=fort63_style ) # Fetch best resolution and no resampling - data_adc,meta_adc=rpl.fetch_station_product(urls, return_sample_min=args.return_sample_min, fort63_style=fort63_style ) + data_adc,meta_adc=rpl.fetch_station_product(urls, return_sample_min=args.return_sample_min, fort63_style=fort63_style ) + + print(data_adc) + sys.exit(1) # Revert Harvester filling of nans to -99999 back to nans data_adc.replace('-99999',np.nan,inplace=True) @@ -243,14 +246,26 @@ def main(args): ## ## compute errors ## - comp_err = compute_error_field.compute_error_field(data_obs_smoothed, data_adc, meta_obs, n_pad=0) # All default params + + comp_err = compute_error_field.compute_error_field(data_obs_smoothed, data_adc, meta_obs, n_pad=0) + + #comp_err.adc.to_csv('check_adc_ispre.csv',float_format='%.3f') comp_err._intersection_stations() + #comp_err.adc.to_csv('check_adc_ispo.csv',float_format='%.3f') + + #comp_err.adc.to_csv('check_adc_itpre.csv',float_format='%.3f') comp_err._intersection_times() + #comp_err.adc.to_csv('check_adc_itpo.csv',float_format='%.3f') + + #comp_err.adc.to_csv('check_adc_ttpre.csv',float_format='%.3f') comp_err._tidal_transform_data() + #comp_err.adc.to_csv('check_adc_ttpo.csv',float_format='%.3f') + #print(f'COMPERR input times {obs_starttime} and {obs_endtime}') comp_err._apply_time_bounds((obs_starttime,obs_endtime)) # redundant but here for illustration comp_err._compute_and_average_errors() - comp_err.obs.to_csv('check_po5.csv',float_format='%.3f') + #comp_err.adc.to_csv('check_adc_po5.csv',float_format='%.3f') + #comp_err.obs.to_csv('check_obs_po5.csv',float_format='%.3f') # Set up IO env #utilities.log.info("Product Level Working in {}.".format(os.getcwd())) @@ -320,7 +335,7 @@ def main(args): adc_plot_grid = interpolate_scaled_offset_field.generic_grid() df_plot_transformed = interpolate_scaled_offset_field.interpolation_model_transform(adc_plot_grid, model=model, input_grid_type='grid',pathpoly=pathpoly) - # Write out the model for posterity + # Write out the model newfilename = io_utilities.get_full_filename_with_subdirectory_prepended(rootdir, iosubdir, 'interpolate_linear_model.h5') try: joblib.dump(model, newfilename) diff --git a/harvester/get_adcirc_stations.py b/harvester/get_adcirc_stations.py index 0e05472..bfad62b 100644 --- a/harvester/get_adcirc_stations.py +++ b/harvester/get_adcirc_stations.py @@ -303,6 +303,7 @@ def fetch_station_product(self, urls, return_sample_min=0, fort63_style=False, v utilities.log.info('Removing knockouts from acquired ADCIRC data') except Exception as e: utilities.log.error('TDS: Broad failure {}'.format(e)) + return data, meta ## Several scenarios may be used for this example main. For now we choose the ADDA style execution diff --git a/harvester/get_observations_stations.py b/harvester/get_observations_stations.py index 318c2af..26b7078 100644 --- a/harvester/get_observations_stations.py +++ b/harvester/get_observations_stations.py @@ -195,7 +195,7 @@ def remove_missingness_stations(self, df_in, max_nan_percentage_cutoff=100)-> pd def fetch_station_product(self, time_range, return_sample_min=0, interval=None): """ - Fetch the desire data. The main information is part of the class (sources, products, etc.). However, one must still specify the return_sample_minutes + Fetch the data. The main information is part of the class (sources, products, etc.). However, one must still specify the return_sample_minutes to sample the data. This harvesting code will read the raw data for the selected product. Perform an interpolation (it doesn't pad nans), and then resample the data at the desired freq (in minutes) @@ -209,7 +209,7 @@ def fetch_station_product(self, time_range, return_sample_min=0, interval=None): """ starttime = time_range[0] endtime=time_range[1] - utilities.log.info(f'Retrieving data for the time range {starttime}-{endtime}') + utilities.log.info(f'Retrieving data for the time range {starttime} to {endtime}') template = "An exception of type {0} occurred." interval=interval diff --git a/processing/compute_error_field.py b/processing/compute_error_field.py index 2cebf61..9cbee91 100644 --- a/processing/compute_error_field.py +++ b/processing/compute_error_field.py @@ -36,19 +36,19 @@ def interpolate_and_sample( diurnal_range, df_in )-> pd.DataFrame: df_out: DataFrame. (datetime64 x station) on a semidiurnal time sequence """ - df_in.to_csv('check_in.csv',float_format='%.3f') + #df_in.to_csv('check_in.csv',float_format='%.3f') df_x = pd.DataFrame(diurnal_range, columns=['TIME']) df_x.set_index('TIME',inplace=True) df_out = df_in.append(df_x) # This merges the two indexes df_out = df_out.loc[~df_out.index.duplicated(keep='first')] #Keep first because real data is first on the list df_out.sort_index(inplace=True) # this is sorted with intervening nans that need to be imputed - df_out.to_csv('check_pre.csv',float_format='%.3f') + #df_out.to_csv('check_pre.csv',float_format='%.3f') #df_out_int = df_out.interpolate(method='linear') df_out_int = df_out.interpolate(method='values') - df_out_int.to_csv('check_po1.csv',float_format='%.3f') + #df_out_int.to_csv('check_po1.csv',float_format='%.3f') df_out_int = df_out_int.loc[diurnal_range] df_out_int.index.name='TIME' - df_out_int.to_csv('check_po2.csv',float_format='%.3f') + #df_out_int.to_csv('check_po2.csv',float_format='%.3f') return df_out_int def combine_data_to_dict(in_adc,in_obs,in_err, product='WL')->dict: @@ -225,7 +225,9 @@ def _tidal_transform_data(self): time_step = int(3600*n_hours_per_tide/n_hours_per_period) # Always scale to an hour (3600s) diurnal_range = pd.date_range(timein, timeout+np.timedelta64(n_pad,'h'), freq=str(time_step)+'S').to_list() + #self.adc.to_csv('check_adc_po3a.csv',float_format='%.3f') self.adc = interpolate_and_sample( diurnal_range, self.adc ) + #self.adc.to_csv('check_adc_po3b.csv',float_format='%.3f') self.obs = interpolate_and_sample( diurnal_range, self.obs ) # Truncate the time ranges to only retain full tidal periods. Update data inplace. @@ -245,7 +247,8 @@ def _tidal_transform_data(self): else: utilities.log.debug('Tidal interpolation: Not enough data to perform a tidal interpolation. Skipping.') - self.obs.to_csv('check_po3.csv',float_format='%.3f') + #self.adc.to_csv('check_adc_po3.csv',float_format='%.3f') + #self.obs.to_csv('check_obs_po3.csv',float_format='%.3f') # MIGHT need something special for Hurricanes ? def _apply_time_bounds(self, time_range): @@ -277,7 +280,8 @@ def _apply_time_bounds(self, time_range): self._intersection_times() # Should not be needed here but just in case utilities.log.debug('New adc time lo {}, New adc time hi {}'.format( min(self.adc.index).strftime(dformat), max(self.adc.index.strftime(dformat)))) - self.obs.to_csv('check_po4.csv',float_format='%.3f') + #self.adc.to_csv('check_adc_po4.csv',float_format='%.3f') + #self.obs.to_csv('check_obs_po4.csv',float_format='%.3f') # Statistical stuff def _remove_station_outliers(self): diff --git a/test/test_adda_in_floodwater.sh b/test/test_adda_in_floodwater.sh index e7b45fb..2e5410c 100644 --- a/test/test_adda_in_floodwater.sh +++ b/test/test_adda_in_floodwater.sh @@ -1,9 +1,12 @@ #!/bin/bash -fw_dir="/scratch/bblanton/ecflow_output/dev/ec95d_dev_da_idalia/" + +#fw_dir="/scratch/bblanton/ecflow_output/dev/ec95d_dev_da_idalia/" #fw_dir="/home/bblanton/storm_surge/ec95d_dev_da_idalia.test1" +fw_dir="/scratch/sbunya/ecflow_output/ncsc123_gfs_da/" ADCIRC_DA_CONFIG_FILE="/home/bblanton/ecfc/da/data_assimilation.yaml" +#ADCIRC_DA_CONFIG_FILE="/home/sbunya/ecflow_configs/da/data_assimilation.yaml" #...Retrieve config variables from yaml venv=`cat $ADCIRC_DA_CONFIG_FILE | shyaml get-value venv` @@ -16,10 +19,13 @@ echo "adda=$path2adda/adda.py" RUN="conda run --no-capture-output" -current_time="--current_time advisory_017" +#current_time="--current_time advisory_017" +current_time="--current_time 20231118/hour_12" -grid="--gridname ec95d" -meteo="--met nhc" +#grid="--gridname ec95d" +grid="--gridname NCSC_SAB_V1.23" +#meteo="--met nhc" +meteo="--met gfs" com="$RUN -n $venv $path2adda/adda.py $current_time $meteo --da_config_file $ADCIRC_DA_CONFIG_FILE $grid --fw_arch_dir $fw_dir" echo $com $com