diff --git a/.testdata/create_test_data.py b/.testdata/create_test_data.py index bf053079..4bcf3349 100644 --- a/.testdata/create_test_data.py +++ b/.testdata/create_test_data.py @@ -385,7 +385,7 @@ def create_settings_grid(): "keep_temp_files": True, }, "output": { - "path": "output/event", + "path": "output/event_grid", "grid": {"name": "output.nc"}, }, "hazard": { @@ -415,6 +415,16 @@ def create_settings_grid(): with open(Path(p, "settings_grid.toml"), "wb") as f: tomli_w.dump(doc, f) + doc_r = copy.deepcopy(doc) + doc_r["output"]["path"] = "output/risk_grid" + doc_r["hazard"]["file"] = "hazard/risk_map.nc" + doc_r["hazard"]["return_periods"] = [2, 5, 10, 25] + doc_r["hazard"]["settings"] = {"var_as_band": True} + doc_r["hazard"]["risk"] = True + + with open(Path(p, "settings_grid_risk.toml"), "wb") as f: + tomli_w.dump(doc_r, f) + def create_settings_risk(): """_summary_.""" diff --git a/src/fiat/cfg.py b/src/fiat/cfg.py index a7c37d16..0990302e 100644 --- a/src/fiat/cfg.py +++ b/src/fiat/cfg.py @@ -57,6 +57,10 @@ def __init__( # Create the hidden temporary folder self._create_temp_dir() + # Create risk directory if needed + if self.get("hazard.risk"): + self._create_risk_dir() + # Set the cache size per GDAL object _cache_size = self.get("global.gdal_cache") if _cache_size is not None: @@ -109,6 +113,14 @@ def _create_output_dir( generic_folder_check(_p) self["output.path"] = _p + def _create_risk_dir( + self, + ): + """_summary_.""" + _ph = Path(self["output.path"], "rp_damages") + generic_folder_check(_ph) + self["output.path.risk"] = _ph + def _create_temp_dir( self, ): diff --git a/src/fiat/check.py b/src/fiat/check.py index 89410a84..b6404ace 100644 --- a/src/fiat/check.py +++ b/src/fiat/check.py @@ -210,10 +210,11 @@ def check_hazard_rp_iden( if deter_type(bn_str, l - 1) != 3: return [float(n) for n in bnames] - if len(rp_cfg) == len(bnames): - rp_str = "\n".join([str(n) for n in rp_cfg]).encode() - if deter_type(rp_str, l - 1) != 3: - return rp_cfg + if rp_cfg is not None: + if len(rp_cfg) == len(bnames): + rp_str = "\n".join([str(n) for n in rp_cfg]).encode() + if deter_type(rp_str, l - 1) != 3: + return rp_cfg logger.error( f"'{path.name}': cannot determine the return periods for the risk calculation" diff --git a/src/fiat/io.py b/src/fiat/io.py index 343f6a08..25396cd0 100644 --- a/src/fiat/io.py +++ b/src/fiat/io.py @@ -6,7 +6,6 @@ import weakref from abc import ABCMeta, abstractmethod from io import BufferedReader, BufferedWriter, FileIO, TextIOWrapper -from itertools import product from math import floor, log10 from typing import Any @@ -558,8 +557,6 @@ def __init__( else: ValueError("") - self.create_windows() - def __iter__(self): self.flush() self._reset_chunking() @@ -637,24 +634,6 @@ def shape(self): """_summary_.""" return self._x, self._y - def create_windows(self): - """_summary_.""" - _lu = tuple( - product( - range(0, self._x, self._chunk[0]), - range(0, self._y, self._chunk[1]), - ), - ) - for _l, _u in _lu: - w = min(self._chunk[0], self._x - _l) - h = min(self._chunk[1], self._y - _u) - yield ( - _l, - _u, - w, - h, - ) - def get_metadata_item( self, entry: str, @@ -1269,6 +1248,13 @@ def get_srs(self): """_summary_.""" return self.src.GetSpatialRef() + def set_chunk_size( + self, + chunk: tuple, + ): + """_summary_.""" + self._chunk = chunk + @_BaseIO._check_mode def set_geotransform(self, affine: tuple): """_summary_.""" diff --git a/src/fiat/models/base.py b/src/fiat/models/base.py index 4d1aaaf1..74968239 100644 --- a/src/fiat/models/base.py +++ b/src/fiat/models/base.py @@ -3,7 +3,6 @@ from abc import ABCMeta, abstractmethod from multiprocessing import Manager from os import cpu_count -from pathlib import Path from osgeo import osr @@ -129,11 +128,6 @@ def _read_hazard_grid(self): # Directly calculate the coefficients rp_coef = calc_rp_coef(rp) self.cfg["hazard.rp_coefficients"] = rp_coef - # Set the risk folder for raster calculations - self.cfg["output.path.risk"] = Path( - self.cfg["output.path"], - "rp_damages", - ) # Information for output ns = check_hazard_band_names( diff --git a/src/fiat/models/calc.py b/src/fiat/models/calc.py index cf7e9a5d..18d67c70 100644 --- a/src/fiat/models/calc.py +++ b/src/fiat/models/calc.py @@ -46,11 +46,11 @@ def calc_rp_coef( # Step 1: Compute frequencies associated with T-values. _rp = sorted(rp) idxs = [_rp.index(n) for n in rp] - rp.sort() - rp_l = len(rp) + rp_u = sorted(rp) + rp_l = len(rp_u) - f = [1 / n for n in rp] - lf = [math.log(1 / n) for n in rp] + f = [1 / n for n in rp_u] + lf = [math.log(1 / n) for n in rp_u] if rp_l == 1: return f @@ -79,9 +79,9 @@ def calc_rp_coef( b[0] if idx == 0 else f[idx] + a[idx - 1] - if idx == len(rp) - 1 + if idx == rp_l - 1 else a[idx - 1] + b[idx] - for idx in range(len(rp)) + for idx in range(rp_l) ] return [alpha[idx] for idx in idxs] diff --git a/src/fiat/models/grid.py b/src/fiat/models/grid.py index cbf6be5c..d99a452b 100644 --- a/src/fiat/models/grid.py +++ b/src/fiat/models/grid.py @@ -11,7 +11,7 @@ from fiat.io import open_grid from fiat.log import spawn_logger from fiat.models.base import BaseModel -from fiat.models.util import grid_worker_exact +from fiat.models.util import grid_worker_exact, grid_worker_risk logger = spawn_logger("fiat.model.grid") @@ -59,9 +59,19 @@ def _read_exposure_grid(self): self.exposure_grid = data - def resolve(): + def resolve(self): """_summary_.""" - pass + if self.cfg.get("hazard.risk"): + logger.info("Setting up risk calculations..") + + # Time the function + _s = time.time() + grid_worker_risk( + self.cfg, + self.exposure_grid.chunk, + ) + _e = time.time() - _s + logger.info(f"Risk calculation time: {round(_e, 2)} seconds") def run(self): """_summary_.""" @@ -87,6 +97,7 @@ def run(self): ) futures.append(fs) logger.info("Busy...") + # Wait for the children to finish their calculations wait(futures) else: @@ -106,7 +117,7 @@ def run(self): logger.info("Busy...") p.join() _e = time.time() - _s - logger.info(f"Calculations time: {round(_e, 2)} seconds") + self.resolve() logger.info(f"Output generated in: '{self.cfg['output.path']}'") logger.info("Grid calculation are done!") diff --git a/src/fiat/models/util.py b/src/fiat/models/util.py index 67d79c4a..e22bd2a2 100644 --- a/src/fiat/models/util.py +++ b/src/fiat/models/util.py @@ -1,15 +1,15 @@ """The FIAT model workers.""" -from math import isnan +from math import floor, isnan from pathlib import Path from numpy import full, ravel, unravel_index, where from fiat.gis import geom, overlay -from fiat.io import BufferTextHandler, GridSource +from fiat.io import BufferTextHandler, GridSource, open_grid from fiat.log import LogItem, Sender -from fiat.models.calc import calc_haz -from fiat.util import NEWLINE_CHAR, _pat, replace_empty +from fiat.models.calc import calc_haz, calc_risk +from fiat.util import NEWLINE_CHAR, _pat, create_windows, replace_empty def geom_worker( @@ -120,17 +120,22 @@ def grid_worker_exact( write_bands = [] exp_nds = [] dmfs = [] + band_n = "" + + # Check the band names + if haz.count > 1: + band_n = "_" + cfg.get("hazard.band_names")[idx - 1] # Extract the hazard band as an object haz_band = haz[idx] # Set the output directory _out = cfg.get("output.path") if cfg.get("hazard.risk"): - _out = cfg.get("hazard.path.risk") + _out = cfg.get("output.path.risk") # Create the outgoing netcdf containing every exposure damages - out_src = GridSource( - Path(_out, "output.nc"), + out_src = open_grid( + Path(_out, f"output{band_n}.nc"), mode="w", ) out_src.create( @@ -142,10 +147,10 @@ def grid_worker_exact( out_src.set_srs(exp.get_srs()) out_src.set_geotransform(exp.get_geotransform()) # Create the outgoing total damage grid - td_out = GridSource( + td_out = open_grid( Path( _out, - "total_damages.nc", + f"total_damages{band_n}.nc", ), mode="w", ) @@ -200,7 +205,7 @@ def grid_worker_exact( _coords = _coords[_hcoords] e_ch = e_ch[_hcoords] h_1d = h_1d[_hcoords] - h_1d.clip(min(vul.index), max(vul.index)) + h_1d = h_1d.clip(min(vul.index), max(vul.index)) dmm = [vul[round(float(n), 2), dmfs[idx]] for n in h_1d] e_ch = e_ch * dmm @@ -223,18 +228,17 @@ def grid_worker_exact( # Flush the cache and dereference for _w in write_bands[:]: - w = _w write_bands.remove(_w) - w.flush() - w = None + _w.close() + _w = None # Flush and close all exp_bands = None - td_band.flush() + td_band.close() td_band = None td_out = None - out_src.flush() + out_src.close() out_src = None haz_band = None @@ -243,3 +247,131 @@ def grid_worker_exact( def grid_worker_loose(): """_summary_.""" pass + + +def grid_worker_risk( + cfg: object, + chunk: tuple, +): + """_summary_.""" + _rp_coef = cfg.get("hazard.rp_coefficients") + _out = cfg.get("output.path") + _chunk = [floor(_n / len(_rp_coef)) for _n in chunk] + td = [] + rp = [] + + # TODO this is really fucking bad; fix in the future + # Read the data from the calculations + for _name in cfg.get("hazard.band_names"): + td.append( + open_grid( + Path(cfg.get("output.path.risk"), f"total_damages_{_name}.nc"), + chunk=_chunk, + mode="r", + ) + ) + rp.append( + open_grid( + Path(cfg.get("output.path.risk"), f"total_damages_{_name}.nc"), + chunk=_chunk, + mode="r", + ) + ) + + # Create the estimatied annual damages output file + exp_bands = {} + write_bands = [] + exp_nds = [] + ead_src = open_grid( + Path(_out, "ead.nc"), + mode="w", + ) + ead_src.create( + rp[0].shape, + rp[0].count, + rp[0].dtype, + options=["FORMAT=NC4", "COMPRESS=DEFLATE"], + ) + ead_src.set_srs(rp[0].get_srs()) + ead_src.set_geotransform(rp[0].get_geotransform()) + + # Gather and set information before looping through windows. + for idx in range(rp[0].count): + exp_bands[idx] = [obj[idx + 1] for obj in rp] + write_bands.append(ead_src[idx + 1]) + exp_nds.append(rp[0][idx + 1].nodata) + write_bands[idx].src.SetNoDataValue(exp_nds[idx]) + + # Do the calculation for the EAD + for idx, rpx in exp_bands.items(): + for _w in create_windows(rp[0].shape, _chunk): + ead_ch = write_bands[idx][_w] + # check for one + d_ch = rpx[0][_w] + d_1d = ravel(d_ch) + _coords = where(d_1d != exp_nds[0])[0] + + # Check if something is there + if len(_coords) == 0: + continue + + data = [_data[_w] for _data in rpx] + data = [ravel(_data)[_coords] for _data in data] + data = calc_risk(_rp_coef, data) + idx2d = unravel_index(_coords, *[_chunk]) + ead_ch[idx2d] = data + write_bands[idx].write_chunk(ead_ch, _w[:2]) + + rpx = None + + # Do some cleaning + exp_bands = None + for _w in write_bands[:]: + write_bands.remove(_w) + _w.close() + _w = None + ead_src.close() + ead_src = None + + # Create ead total outgoing dataset + td_src = open_grid( + Path(_out, "ead_total.nc"), + mode="w", + ) + td_src.create( + td[0].shape, + 1, + td[0].dtype, + options=["FORMAT=NC4", "COMPRESS=DEFLATE"], + ) + td_src.set_srs(td[0].get_srs()) + td_src.set_geotransform(td[0].get_geotransform()) + td_band = td_src[1] + td_noval = -0.5 * 2**128 + td_band.src.SetNoDataValue(td_noval) + + # Do the calculations for total damages + for _w in create_windows(td[0].shape, _chunk): + # Get the data + td_ch = td_band[_w] + data = [_data[1][_w] for _data in td] + d_1d = ravel(data[0]) + _coords = where(d_1d != td[0][1].nodata)[0] + + # Check whether there is data to begin with + if len(_coords) == 0: + continue + + # Get data, calc risk and write it. + data = [ravel(_i)[_coords] for _i in data] + data = calc_risk(_rp_coef, data) + idx2d = unravel_index(_coords, *[_chunk]) + td_ch[idx2d] = data + td_band.write_chunk(td_ch, _w[:2]) + + # Cleaning up afterwards + td = None + td_band.close() + td_band = None + td_src.close() + td_src = None diff --git a/src/fiat/util.py b/src/fiat/util.py index 32be5915..4ca0d308 100644 --- a/src/fiat/util.py +++ b/src/fiat/util.py @@ -8,6 +8,7 @@ import sys from collections.abc import MutableMapping from gc import get_referents +from itertools import product from pathlib import Path from types import FunctionType, ModuleType @@ -174,6 +175,29 @@ def _text_chunk_gen( yield _nlines, sd +def create_windows( + shape: tuple, + chunk: tuple, +): + """_summary_.""" + _x, _y = shape + _lu = tuple( + product( + range(0, _x, chunk[0]), + range(0, _y, chunk[1]), + ), + ) + for _l, _u in _lu: + w = min(chunk[0], _x - _l) + h = min(chunk[1], _y - _u) + yield ( + _l, + _u, + w, + h, + ) + + class DoNotCall(type): """_summary_.""" diff --git a/test/conftest.py b/test/conftest.py index d5ad6d98..5669e454 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -22,44 +22,50 @@ def settings_toml_risk(): @pytest.fixture() -def cfg_event(settings_toml_event): +def cfg_geom_event(settings_toml_event): c = ConfigReader(settings_toml_event) return c @pytest.fixture() -def cfg_event_mis(): +def cfg_geom_event_mis(): p = Path(Path.cwd(), ".testdata", "settings_missing.toml") return ConfigReader(p) @pytest.fixture() -def cfg_grid(settings_toml_grid): - c = ConfigReader(settings_toml_grid) +def cfg_geom_risk(settings_toml_risk): + c = ConfigReader(settings_toml_risk) return c @pytest.fixture() -def cfg_risk(settings_toml_risk): - c = ConfigReader(settings_toml_risk) +def cfg_grid_event(settings_toml_grid): + c = ConfigReader(settings_toml_grid) return c @pytest.fixture() -def gm(cfg_event): - d = open_geom(cfg_event.get_path("exposure.geom.file1")) +def cfg_grid_risk(settings_toml_grid): + p = Path(Path.cwd(), ".testdata", "settings_grid_risk.toml") + return ConfigReader(p) + + +@pytest.fixture() +def gm(cfg_geom_event): + d = open_geom(cfg_geom_event.get_path("exposure.geom.file1")) return d @pytest.fixture() -def gr_event(cfg_event): - d = open_grid(cfg_event.get_path("hazard.file")) +def gr_event(cfg_geom_event): + d = open_grid(cfg_geom_event.get_path("hazard.file")) return d @pytest.fixture() -def gr_risk(cfg_risk): - d = open_grid(cfg_risk.get_path("hazard.file"), var_as_band=True) +def gr_risk(cfg_geom_risk): + d = open_grid(cfg_geom_risk.get_path("hazard.file"), var_as_band=True) return d diff --git a/test/test_io.py b/test/test_io.py index eee4c8bb..18b73037 100644 --- a/test/test_io.py +++ b/test/test_io.py @@ -1,8 +1,8 @@ from fiat.io import open_csv -def test_tabel(cfg_event): - tb = open_csv(cfg_event.get("vulnerability.file")) +def test_tabel(cfg_geom_event): + tb = open_csv(cfg_geom_event.get("vulnerability.file")) assert len(tb.columns) == 3 assert len(tb.index) == 21 assert tb[9, "struct_2"] == 0.74 diff --git a/test/test_model.py b/test/test_model.py index f4d1d536..c545d643 100644 --- a/test/test_model.py +++ b/test/test_model.py @@ -5,9 +5,9 @@ from osgeo import gdal -def test_geom_model(tmpdir, cfg_event): - cfg_event.set_output_dir(str(tmpdir)) - model = GeomModel(cfg_event) +def test_geom_model(tmpdir, cfg_geom_event): + cfg_geom_event.set_output_dir(str(tmpdir)) + model = GeomModel(cfg_geom_event) model.run() # check the output @@ -16,9 +16,9 @@ def test_geom_model(tmpdir, cfg_event): assert float(out[3, "Total Damage"]) == 1038 -def test_geom_missing(tmpdir, cfg_event_mis): - cfg_event_mis.set_output_dir(str(tmpdir)) - model = GeomModel(cfg_event_mis) +def test_geom_missing(tmpdir, cfg_geom_event_mis): + cfg_geom_event_mis.set_output_dir(str(tmpdir)) + model = GeomModel(cfg_geom_event_mis) model.run() assert Path(str(tmpdir), "missing.log").exists() @@ -26,9 +26,9 @@ def test_geom_missing(tmpdir, cfg_event_mis): assert sum(1 for _ in missing) == 1 -def test_geom_risk(tmpdir, cfg_risk): - cfg_risk.set_output_dir(str(tmpdir)) - model = GeomModel(cfg_risk) +def test_geom_risk(tmpdir, cfg_geom_risk): + cfg_geom_risk.set_output_dir(str(tmpdir)) + model = GeomModel(cfg_geom_risk) model.run() out = open_csv(Path(str(tmpdir), "output.csv"), index="Object ID") @@ -37,9 +37,9 @@ def test_geom_risk(tmpdir, cfg_risk): assert float(out[3, "Risk (EAD)"]) == 1022.47 -def test_grid_model(tmpdir, cfg_grid): - cfg_grid.set_output_dir(str(tmpdir)) - model = GridModel(cfg_grid) +def test_grid_model(tmpdir, cfg_grid_event): + cfg_grid_event.set_output_dir(str(tmpdir)) + model = GridModel(cfg_grid_event) model.run() src = gdal.OpenEx( @@ -57,3 +57,25 @@ def test_grid_model(tmpdir, cfg_grid): src = None assert int(arr[2, 4] * 10) == 14091 assert int(arr[7, 3] * 10) == 8700 + + +def test_grid_risk(tmpdir, cfg_grid_risk): + cfg_grid_risk.set_output_dir(str(tmpdir)) + model = GridModel(cfg_grid_risk) + model.run() + + src = gdal.OpenEx( + str(Path(str(tmpdir), "ead.nc")), + ) + arr = src.ReadAsArray() + src = None + assert int(arr[1, 2] * 10) == 10920 + assert int(arr[5, 6] * 10) == 8468 + + src = gdal.OpenEx( + str(Path(str(tmpdir), "ead_total.nc")), + ) + arr = src.ReadAsArray() + src = None + assert int(arr[1, 2] * 10) == 10920 + assert int(arr[5, 6] * 10) == 8468