diff --git a/src/egon/data/datasets/DSM_cts_ind.py b/src/egon/data/datasets/DSM_cts_ind.py index a52279497..88af8459b 100644 --- a/src/egon/data/datasets/DSM_cts_ind.py +++ b/src/egon/data/datasets/DSM_cts_ind.py @@ -778,12 +778,11 @@ def delete_dsm_entries(carrier): db.execute_sql(sql) def dsm_cts_ind( - con=db.engine(), + con=None, cts_cool_vent_ac_share=0.22, ind_cool_vent_share=0.039, ind_vent_share=0.017, ): - """ Execute methodology to create and implement components for DSM considering a) CTS per osm-area: combined potentials of cooling, ventilation and air conditioning @@ -805,6 +804,9 @@ def dsm_cts_ind( """ + if con is None: + con = db.engine() + # CTS per osm-area: cooling, ventilation and air conditioning print(" ") diff --git a/src/egon/data/datasets/chp/__init__.py b/src/egon/data/datasets/chp/__init__.py index 9749d7286..3f35bbbe4 100644 --- a/src/egon/data/datasets/chp/__init__.py +++ b/src/egon/data/datasets/chp/__init__.py @@ -3,14 +3,16 @@ (CHP) plants. """ +from pathlib import Path + from geoalchemy2 import Geometry from shapely.ops import nearest_points from sqlalchemy import Boolean, Column, Float, Integer, Sequence, String from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker import geopandas as gpd import pandas as pd +import pypsa from egon.data import config, db from egon.data.datasets import Dataset @@ -19,6 +21,7 @@ assign_use_case, existing_chp_smaller_10mw, extension_per_federal_state, + extension_to_areas, select_target, ) from egon.data.datasets.power_plants import ( @@ -27,9 +30,6 @@ filter_mastr_geometry, scale_prox2now, ) -import pypsa -from egon.data.datasets.chp.small_chp import extension_to_areas -from pathlib import Path Base = declarative_base() @@ -130,7 +130,8 @@ def nearest( return value -def assign_heat_bus(scenario="eGon2035"): +@db.session_scoped +def assign_heat_bus(scenario="eGon2035", session=None): """Selects heat_bus for chps used in district heating. Parameters @@ -138,6 +139,10 @@ def assign_heat_bus(scenario="eGon2035"): scenario : str, optional Name of the corresponding scenario. The default is 'eGon2035'. + session : sqlalchemy.orm.Session + The session used in this function. Can be ignored because it will be + supplied automatically. + Returns ------- None. @@ -192,7 +197,6 @@ def assign_heat_bus(scenario="eGon2035"): ) # Insert district heating CHP with heat_bus_id - session = sessionmaker(bind=db.engine())() for i, row in chp.iterrows(): if row.carrier != "biomass": entry = EgonChp( @@ -226,10 +230,10 @@ def assign_heat_bus(scenario="eGon2035"): geom=f"SRID=4326;POINT({row.geom.x} {row.geom.y})", ) session.add(entry) - session.commit() -def insert_biomass_chp(scenario): +@db.session_scoped +def insert_biomass_chp(scenario, session=None): """Insert biomass chp plants of future scenario Parameters @@ -237,6 +241,10 @@ def insert_biomass_chp(scenario): scenario : str Name of scenario. + session : sqlalchemy.orm.Session + The session used in this function. Can be ignored because it will be + supplied automatically. + Returns ------- None. @@ -283,7 +291,6 @@ def insert_biomass_chp(scenario): mastr_loc = assign_use_case(mastr_loc, cfg["sources"]) # Insert entries with location - session = sessionmaker(bind=db.engine())() for i, row in mastr_loc.iterrows(): if row.ThermischeNutzleistung > 0: entry = EgonChp( @@ -303,7 +310,6 @@ def insert_biomass_chp(scenario): geom=f"SRID=4326;POINT({row.Laengengrad} {row.Breitengrad})", ) session.add(entry) - session.commit() def insert_chp_egon2035(): diff --git a/src/egon/data/datasets/chp/match_nep.py b/src/egon/data/datasets/chp/match_nep.py index 341ed24d2..b5258e584 100755 --- a/src/egon/data/datasets/chp/match_nep.py +++ b/src/egon/data/datasets/chp/match_nep.py @@ -2,7 +2,6 @@ The module containing all code dealing with large chp from NEP list. """ -from sqlalchemy.orm import sessionmaker import geopandas import pandas as pd @@ -312,7 +311,8 @@ def match_nep_chp( ################################################### Final table ################################################### -def insert_large_chp(sources, target, EgonChp): +@db.session_scoped +def insert_large_chp(sources, target, EgonChp, session=None): # Select CHP from NEP list chp_NEP = select_chp_from_nep(sources) @@ -516,7 +516,6 @@ def insert_large_chp(sources, target, EgonChp): ) # Insert into target table - session = sessionmaker(bind=db.engine())() for i, row in insert_chp.iterrows(): entry = EgonChp( sources={ @@ -536,6 +535,5 @@ def insert_large_chp(sources, target, EgonChp): geom=f"SRID=4326;POINT({row.geometry.x} {row.geometry.y})", ) session.add(entry) - session.commit() return MaStR_konv diff --git a/src/egon/data/datasets/chp/small_chp.py b/src/egon/data/datasets/chp/small_chp.py index 486f55e2a..eea5175c2 100755 --- a/src/egon/data/datasets/chp/small_chp.py +++ b/src/egon/data/datasets/chp/small_chp.py @@ -1,7 +1,6 @@ """ The module containing all code dealing with chp < 10MW. """ -from sqlalchemy.orm import sessionmaker import geopandas as gpd import numpy as np @@ -13,7 +12,8 @@ ) -def insert_mastr_chp(mastr_chp, EgonChp): +@db.session_scoped +def insert_mastr_chp(mastr_chp, EgonChp, session=None): """Insert MaStR data from exising CHPs into database table Parameters @@ -22,6 +22,9 @@ def insert_mastr_chp(mastr_chp, EgonChp): List of existing CHPs in MaStR. EgonChp : class Class definition of daabase table for CHPs + session : sqlalchemy.orm.Session + The session inside which this function operates. Ignore this, because + it will be supplied automatically. Returns ------- @@ -29,7 +32,6 @@ def insert_mastr_chp(mastr_chp, EgonChp): """ - session = sessionmaker(bind=db.engine())() for i, row in mastr_chp.iterrows(): entry = EgonChp( sources={ @@ -49,7 +51,6 @@ def insert_mastr_chp(mastr_chp, EgonChp): geom=f"SRID=4326;POINT({row.geometry.x} {row.geometry.y})", ) session.add(entry) - session.commit() def existing_chp_smaller_10mw(sources, MaStR_konv, EgonChp): @@ -100,6 +101,7 @@ def existing_chp_smaller_10mw(sources, MaStR_konv, EgonChp): insert_mastr_chp(mastr_chp, EgonChp) +@db.session_scoped def extension_to_areas( areas, additional_capacity, @@ -108,6 +110,7 @@ def extension_to_areas( EgonChp, district_heating=True, scenario="eGon2035", + session=None, ): """Builds new CHPs on potential industry or district heating areas. @@ -151,14 +154,15 @@ def extension_to_areas( ORM-class definition of CHP database-table. district_heating : boolean, optional State if the areas are district heating areas. The default is True. + session : sqlalchemy.orm.Session + The session inside which this function operates. Ignore this, because + it will be supplied automatically. Returns ------- None. """ - session = sessionmaker(bind=db.engine())() - np.random.seed(seed=config.settings()["egon-data"]["--random-seed"]) # Add new CHP as long as the additional capacity is not reached diff --git a/src/egon/data/datasets/electrical_neighbours.py b/src/egon/data/datasets/electrical_neighbours.py index b1fa1ccb3..735782f50 100755 --- a/src/egon/data/datasets/electrical_neighbours.py +++ b/src/egon/data/datasets/electrical_neighbours.py @@ -3,17 +3,17 @@ import zipfile -import geopandas as gpd -import pandas as pd from shapely.geometry import LineString from sqlalchemy.orm import sessionmaker +import geopandas as gpd +import pandas as pd -import egon.data.datasets.etrago_setup as etrago -import egon.data.datasets.scenario_parameters.parameters as scenario_parameters from egon.data import config, db from egon.data.datasets import Dataset from egon.data.datasets.fill_etrago_gen import add_marginal_costs from egon.data.datasets.scenario_parameters import get_sector_parameters +import egon.data.datasets.etrago_setup as etrago +import egon.data.datasets.scenario_parameters.parameters as scenario_parameters class ElectricalNeighbours(Dataset): @@ -961,6 +961,7 @@ def insert_generators(capacities): session.add(entry) session.commit() + session.close() # assign generators time-series data renew_carriers_2035 = ["wind_onshore", "wind_offshore", "solar"] @@ -1020,9 +1021,11 @@ def insert_generators(capacities): session.add(entry) session.commit() + session.close() -def insert_storage(capacities): +@db.session_scoped +def insert_storage(capacities, session=None): """Insert storage units for foreign countries based on TYNDP-data Parameters @@ -1052,7 +1055,8 @@ def insert_storage(capacities): """ ) - # Add missing information suitable for eTraGo selected from scenario_parameter table + # Add missing information suitable for eTraGo selected from + # scenario_parameter table parameters_pumped_hydro = scenario_parameters.electricity("eGon2035")[ "efficiency" ]["pumped_hydro"] @@ -1078,9 +1082,12 @@ def insert_storage(capacities): ) # Add columns for additional parameters to df - store["dispatch"], store["store"], store["standing_loss"], store[ - "max_hours" - ] = (None, None, None, None) + ( + store["dispatch"], + store["store"], + store["standing_loss"], + store["max_hours"], + ) = (None, None, None, None) # Insert carrier specific parameters @@ -1093,7 +1100,6 @@ def insert_storage(capacities): ] = parameters_pumped_hydro[x] # insert data - session = sessionmaker(bind=db.engine())() for i, row in store.iterrows(): entry = etrago.EgonPfHvStorage( scn_name="eGon2035", @@ -1153,7 +1159,8 @@ def tyndp_generation(): insert_storage(capacities) -def tyndp_demand(): +@db.session_scoped +def tyndp_demand(session=None): """Copy load timeseries data from TYNDP 2020. According to NEP 2021, the data for 2030 and 2040 is interpolated linearly. @@ -1182,10 +1189,6 @@ def tyndp_demand(): """ ) - # Connect to database - engine = db.engine() - session = sessionmaker(bind=engine)() - nodes = [ "AT00", "BE00", diff --git a/src/egon/data/datasets/electricity_demand/__init__.py b/src/egon/data/datasets/electricity_demand/__init__.py index 067ca90f6..15cb12d86 100644 --- a/src/egon/data/datasets/electricity_demand/__init__.py +++ b/src/egon/data/datasets/electricity_demand/__init__.py @@ -21,7 +21,6 @@ # will be later imported from another file ### Base = declarative_base() -engine = db.engine() class HouseholdElectricityDemand(Dataset): @@ -100,10 +99,11 @@ def get_annual_household_el_demand_cells(): == HouseholdElectricityProfilesInCensusCells.cell_id ) .order_by(HouseholdElectricityProfilesOfBuildings.id) + .all() ) - df_buildings_and_profiles = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col="id" + df_buildings_and_profiles = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], index="id" ) # Read demand profiles from egon-data-bundle diff --git a/src/egon/data/datasets/electricity_demand_timeseries/cts_buildings.py b/src/egon/data/datasets/electricity_demand_timeseries/cts_buildings.py index 9d75d0d45..bc0b801ee 100644 --- a/src/egon/data/datasets/electricity_demand_timeseries/cts_buildings.py +++ b/src/egon/data/datasets/electricity_demand_timeseries/cts_buildings.py @@ -290,12 +290,17 @@ def amenities_without_buildings(): EgonDemandRegioZensusElectricity.sector == "service", EgonDemandRegioZensusElectricity.scenario == "eGon2035", ) + .all() ) - df_amenities_without_buildings = gpd.read_postgis( - cells_query.statement, - cells_query.session.bind, - geom_col="geom_amenity", + df_amenities_without_buildings = gpd.GeoDataFrame( + pd.DataFrame.from_records( + [ + db.asdict(row, conversions={"geom_amenity": to_shape}) + for row in cells_query + ] + ), + geometry="geom_amenity", ) return df_amenities_without_buildings @@ -451,9 +456,9 @@ def buildings_with_amenities(): EgonDemandRegioZensusElectricity.sector == "service", EgonDemandRegioZensusElectricity.scenario == "eGon2035", ) - ) - df_amenities_in_buildings = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + ).all() + df_amenities_in_buildings = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) df_amenities_in_buildings["geom_building"] = df_amenities_in_buildings[ @@ -530,11 +535,16 @@ def buildings_with_amenities(): df_lost_cells["zensus_population_id"] ) ) - - df_lost_cells = gpd.read_postgis( - cells_query.statement, - cells_query.session.bind, - geom_col="geom", + cells_query = cells_query.all() + + df_lost_cells = gpd.GeoDataFrame( + pd.DataFrame.from_records( + [ + db.asdict(row, conversions={"geom": to_shape}) + for row in cells_query + ] + ), + geometry="geom", ) # place random amenity in cell @@ -678,13 +688,18 @@ def buildings_without_amenities(): q_cts_without_amenities ) ) + cells_query = cells_query.all() # df_buildings_without_amenities = pd.read_sql( # cells_query.statement, cells_query.session.bind, index_col=None) - df_buildings_without_amenities = gpd.read_postgis( - cells_query.statement, - cells_query.session.bind, - geom_col="geom_building", + df_buildings_without_amenities = gpd.GeoDataFrame( + pd.DataFrame.from_records( + [ + db.asdict(row, conversions={"geom_building": to_shape}) + for row in cells_query + ] + ), + geometry="geom_building", ) df_buildings_without_amenities = df_buildings_without_amenities.rename( @@ -772,13 +787,17 @@ def cells_with_cts_demand_only(df_buildings_without_amenities): EgonDemandRegioZensusElectricity.zensus_population_id == DestatisZensusPopulationPerHa.id ) + .all() ) - df_cts_cell_without_amenities = gpd.read_postgis( - cells_query.statement, - cells_query.session.bind, - geom_col="geom", - index_col=None, + df_cts_cell_without_amenities = gpd.GeoDataFrame( + pd.DataFrame.from_records( + [ + db.asdict(row, conversions={"geom": to_shape}) + for row in cells_query + ] + ), + geometry="geom", ) # TODO remove after #722 @@ -829,6 +848,7 @@ def calc_census_cell_share(scenario, sector): EgonDemandRegioZensusElectricity.zensus_population_id == MapZensusGridDistricts.zensus_population_id ) + .all() ) elif sector == "heat": @@ -841,12 +861,12 @@ def calc_census_cell_share(scenario, sector): EgonPetaHeat.zensus_population_id == MapZensusGridDistricts.zensus_population_id ) + .all() ) - df_demand = pd.read_sql( - cells_query.statement, - cells_query.session.bind, - index_col="zensus_population_id", + df_demand = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], + index="zensus_population_id", ) # get demand share of cell per bus @@ -992,23 +1012,24 @@ def calc_cts_building_profiles( egon_building_ids ) ) + .all() ) - df_demand_share = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_demand_share = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) # Get substation cts electricity load profiles of selected bus_ids with db.session_scope() as session: cells_query = ( - session.query(EgonEtragoElectricityCts).filter( - EgonEtragoElectricityCts.scn_name == scenario - ) - ).filter(EgonEtragoElectricityCts.bus_id.in_(bus_ids)) + session.query(EgonEtragoElectricityCts) + .filter(EgonEtragoElectricityCts.scn_name == scenario) + .filter(EgonEtragoElectricityCts.bus_id.in_(bus_ids)) + .all() + ) - df_cts_profiles = pd.read_sql( - cells_query.statement, - cells_query.session.bind, + df_cts_profiles = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) df_cts_profiles = pd.DataFrame.from_dict( df_cts_profiles.set_index("bus_id")["p_set"].to_dict(), @@ -1029,23 +1050,24 @@ def calc_cts_building_profiles( egon_building_ids ) ) + .all() ) - df_demand_share = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_demand_share = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) # Get substation cts heat load profiles of selected bus_ids with db.session_scope() as session: cells_query = ( - session.query(EgonEtragoHeatCts).filter( - EgonEtragoHeatCts.scn_name == scenario - ) - ).filter(EgonEtragoHeatCts.bus_id.in_(bus_ids)) + session.query(EgonEtragoHeatCts) + .filter(EgonEtragoHeatCts.scn_name == scenario) + .filter(EgonEtragoHeatCts.bus_id.in_(bus_ids)) + .all() + ) - df_cts_profiles = pd.read_sql( - cells_query.statement, - cells_query.session.bind, + df_cts_profiles = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) df_cts_profiles = pd.DataFrame.from_dict( df_cts_profiles.set_index("bus_id")["p_set"].to_dict(), @@ -1097,12 +1119,10 @@ def remove_double_bus_id(df_cts_buildings): cells_query = session.query( MapZensusGridDistricts.zensus_population_id, MapZensusGridDistricts.bus_id, - ) + ).all() - df_egon_map_zensus_buildings_buses = pd.read_sql( - cells_query.statement, - cells_query.session.bind, - index_col=None, + df_egon_map_zensus_buildings_buses = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) df_cts_buildings = pd.merge( left=df_cts_buildings, @@ -1330,10 +1350,10 @@ def cts_electricity(): """ log.info("Start logging!") with db.session_scope() as session: - cells_query = session.query(CtsBuildings) + cells_query = session.query(CtsBuildings).all() - df_cts_buildings = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_cts_buildings = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) log.info("CTS buildings from DB imported!") df_demand_share_2035 = calc_building_demand_profile_share( @@ -1369,10 +1389,10 @@ def cts_heat(): """ log.info("Start logging!") with db.session_scope() as session: - cells_query = session.query(CtsBuildings) + cells_query = session.query(CtsBuildings).all() - df_cts_buildings = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_cts_buildings = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) log.info("CTS buildings from DB imported!") @@ -1425,19 +1445,20 @@ def get_cts_electricity_peak_load(): ).filter( EgonCtsElectricityDemandBuildingShare.scenario == scenario ) + cells_query = cells_query.all() - df_demand_share = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_demand_share = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) with db.session_scope() as session: cells_query = session.query(EgonEtragoElectricityCts).filter( EgonEtragoElectricityCts.scn_name == scenario ) + cells_query = cells_query.all() - df_cts_profiles = pd.read_sql( - cells_query.statement, - cells_query.session.bind, + df_cts_profiles = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) df_cts_profiles = pd.DataFrame.from_dict( df_cts_profiles.set_index("bus_id")["p_set"].to_dict(), @@ -1498,9 +1519,10 @@ def get_cts_heat_peak_load(): ).filter( EgonCtsElectricityDemandBuildingShare.scenario == scenario ) + cells_query = cells_query.all() - df_demand_share = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_demand_share = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) log.info(f"Retrieved demand share for scenario: {scenario}") @@ -1508,10 +1530,10 @@ def get_cts_heat_peak_load(): cells_query = session.query(EgonEtragoHeatCts).filter( EgonEtragoHeatCts.scn_name == scenario ) + cells_query = cells_query.all() - df_cts_profiles = pd.read_sql( - cells_query.statement, - cells_query.session.bind, + df_cts_profiles = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) log.info(f"Retrieved substation profiles for scenario: {scenario}") diff --git a/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py b/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py index 268d8ad97..9edb61cf5 100755 --- a/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py +++ b/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py @@ -131,7 +131,6 @@ ) import egon.data.config -engine = db.engine() Base = declarative_base() data_config = egon.data.config.datasets() @@ -243,18 +242,17 @@ def match_osm_and_zensus_data( schema="society", ) # get table metadata from db by name and schema - inspect(engine).reflecttable(egon_destatis_building_count, None) + inspect(db.engine()).reflecttable(egon_destatis_building_count, None) with db.session_scope() as session: cells_query = session.query( egon_destatis_building_count.c.zensus_population_id, egon_destatis_building_count.c.building_count, - ) + ).all() - egon_destatis_building_count = pd.read_sql( - cells_query.statement, - cells_query.session.bind, - index_col="zensus_population_id", + egon_destatis_building_count = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], + index="zensus_population_id", ) egon_destatis_building_count = egon_destatis_building_count.dropna() @@ -332,7 +330,7 @@ def generate_synthetic_buildings(missing_buildings, edge_length): schema="society", ) # get table metadata from db by name and schema - inspect(engine).reflecttable( + inspect(db.engine()).reflecttable( destatis_zensus_population_per_ha_inside_germany, None ) @@ -346,7 +344,7 @@ def generate_synthetic_buildings(missing_buildings, edge_length): ) destatis_zensus_population_per_ha_inside_germany = gpd.read_postgis( - cells_query.statement, cells_query.session.bind, index_col="id" + cells_query.statement, db.engine(), index_col="id" ) # add geom data of zensus cell @@ -385,7 +383,7 @@ def generate_synthetic_buildings(missing_buildings, edge_length): # get table metadata from db by name and schema buildings = Table("osm_buildings", Base.metadata, schema="openstreetmap") - inspect(engine).reflecttable(buildings, None) + inspect(db.engine()).reflecttable(buildings, None) # get max number of building ids from non-filtered building table with db.session_scope() as session: @@ -589,7 +587,7 @@ def reduce_synthetic_buildings( buildings = Table("osm_buildings", Base.metadata, schema="openstreetmap") # get table metadata from db by name and schema - inspect(engine).reflecttable(buildings, None) + inspect(db.engine()).reflecttable(buildings, None) # total number of buildings with db.session_scope() as session: @@ -649,10 +647,11 @@ def get_building_peak_loads(): == HouseholdElectricityProfilesInCensusCells.cell_id ) .order_by(HouseholdElectricityProfilesOfBuildings.id) + .all() ) - df_buildings_and_profiles = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col="id" + df_buildings_and_profiles = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], index="id" ) # Read demand profiles from egon-data-bundle @@ -705,10 +704,10 @@ def ve(s): df_building_peak_loads["sector"] = "residential" BuildingElectricityPeakLoads.__table__.drop( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) BuildingElectricityPeakLoads.__table__.create( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) df_building_peak_loads = df_building_peak_loads.melt( @@ -752,19 +751,23 @@ def map_houseprofiles_to_buildings(): schema="boundaries", ) # get table metadata from db by name and schema - inspect(engine).reflecttable(egon_map_zensus_buildings_residential, None) + inspect(db.engine()).reflecttable( + egon_map_zensus_buildings_residential, None + ) with db.session_scope() as session: cells_query = session.query(egon_map_zensus_buildings_residential) - egon_map_zensus_buildings_residential = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + cells_query = cells_query.all() + egon_map_zensus_buildings_residential = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) with db.session_scope() as session: cells_query = session.query(HouseholdElectricityProfilesInCensusCells) - egon_hh_profile_in_zensus_cell = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None - ) # index_col="cell_id") + cells_query = cells_query.all() + egon_hh_profile_in_zensus_cell = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] # , index="cell_id") + ) # Match OSM and zensus data to define missing buildings missing_buildings = match_osm_and_zensus_data( @@ -800,13 +803,13 @@ def map_houseprofiles_to_buildings(): # synthetic_buildings = synthetic_buildings.drop(columns=["grid_id"]) synthetic_buildings["n_amenities_inside"] = 0 - OsmBuildingsSynthetic.__table__.drop(bind=engine, checkfirst=True) - OsmBuildingsSynthetic.__table__.create(bind=engine, checkfirst=True) + OsmBuildingsSynthetic.__table__.drop(bind=db.engine(), checkfirst=True) + OsmBuildingsSynthetic.__table__.create(bind=db.engine(), checkfirst=True) # Write new buildings incl coord into db synthetic_buildings.to_postgis( "osm_buildings_synthetic", - con=engine, + con=db.engine(), if_exists="append", schema="openstreetmap", dtype={ @@ -821,10 +824,10 @@ def map_houseprofiles_to_buildings(): ) HouseholdElectricityProfilesOfBuildings.__table__.drop( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) HouseholdElectricityProfilesOfBuildings.__table__.create( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) # Write building mapping into db diff --git a/src/egon/data/datasets/electricity_demand_timeseries/hh_profiles.py b/src/egon/data/datasets/electricity_demand_timeseries/hh_profiles.py index fc12de792..466fa2a4f 100644 --- a/src/egon/data/datasets/electricity_demand_timeseries/hh_profiles.py +++ b/src/egon/data/datasets/electricity_demand_timeseries/hh_profiles.py @@ -139,7 +139,6 @@ import egon.data.config Base = declarative_base() -engine = db.engine() # Get random seed from config @@ -270,13 +269,13 @@ def write_hh_profiles_to_db(hh_profiles): hh_profiles = hh_profiles.groupby("type").load_in_wh.apply(tuple) hh_profiles = hh_profiles.reset_index() - IeeHouseholdLoadProfiles.__table__.drop(bind=engine, checkfirst=True) - IeeHouseholdLoadProfiles.__table__.create(bind=engine) + IeeHouseholdLoadProfiles.__table__.drop(bind=db.engine(), checkfirst=True) + IeeHouseholdLoadProfiles.__table__.create(bind=db.engine()) hh_profiles.to_sql( name=IeeHouseholdLoadProfiles.__table__.name, schema=IeeHouseholdLoadProfiles.__table__.schema, - con=engine, + con=db.engine(), if_exists="append", method="multi", chunksize=100, @@ -1443,10 +1442,10 @@ def get_load_timeseries( def write_refinded_households_to_db(df_census_households_grid_refined): # Write allocation table into database EgonDestatisZensusHouseholdPerHaRefined.__table__.drop( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) EgonDestatisZensusHouseholdPerHaRefined.__table__.create( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) with db.session_scope() as session: @@ -1558,10 +1557,10 @@ def gen_profile_names(n): # Write allocation table into database HouseholdElectricityProfilesInCensusCells.__table__.drop( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) HouseholdElectricityProfilesInCensusCells.__table__.create( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) with db.session_scope() as session: @@ -1587,8 +1586,8 @@ def get_houseprofiles_in_census_cells(): with db.session_scope() as session: q = session.query(HouseholdElectricityProfilesInCensusCells) - census_profile_mapping = pd.read_sql( - q.statement, q.session.bind, index_col="cell_id" + census_profile_mapping = pd.DataFrame.from_records( + [db.asdict(row) for row in q.all()], index="cell_id" ) return census_profile_mapping @@ -1668,9 +1667,10 @@ def get_cell_demand_metadata_from_db(attribute, list_of_identifiers): list_of_identifiers ) ) + cells_query = cells_query.all() - cell_demand_metadata = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col="cell_id" + cell_demand_metadata = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], index="cell_id" ) return cell_demand_metadata @@ -1699,9 +1699,10 @@ def get_hh_profiles_from_db(profile_ids): cells_query = session.query( IeeHouseholdLoadProfiles.load_in_wh, IeeHouseholdLoadProfiles.type ).filter(IeeHouseholdLoadProfiles.type.in_(profile_ids)) + cells_query = cells_query.all() - df_profile_loads = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col="type" + df_profile_loads = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], index="type" ) # convert array to Dataframe @@ -1754,9 +1755,10 @@ def tuple_format(x): HouseholdElectricityProfilesInCensusCells.cell_id == MapZensusGridDistricts.zensus_population_id, ) + cells_query = cells_query.all() - cells = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col="cell_id" + cells = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], index="cell_id" ) # convert profile ids to tuple (type, id) format @@ -1792,16 +1794,16 @@ def tuple_format(x): if drop_table: EgonEtragoElectricityHouseholds.__table__.drop( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) EgonEtragoElectricityHouseholds.__table__.create( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) # Insert data into respective database table mvgd_profiles.to_sql( name=EgonEtragoElectricityHouseholds.__table__.name, schema=EgonEtragoElectricityHouseholds.__table__.schema, - con=engine, + con=db.engine(), if_exists="append", method="multi", chunksize=10000, diff --git a/src/egon/data/datasets/electricity_demand_timeseries/tools.py b/src/egon/data/datasets/electricity_demand_timeseries/tools.py index 4ccabe603..aa6b90d03 100644 --- a/src/egon/data/datasets/electricity_demand_timeseries/tools.py +++ b/src/egon/data/datasets/electricity_demand_timeseries/tools.py @@ -8,8 +8,6 @@ from egon.data import db -engine = db.engine() - def random_point_in_square(geom, tol): """ @@ -66,7 +64,7 @@ def random_ints_until_sum(s_sum, m_max): return list_r -def write_table_to_postgis(gdf, table, engine=db.engine(), drop=True): +def write_table_to_postgis(gdf, table, engine=None, drop=True): """ Helper function to append df data to table in db. Only predefined columns are passed. Error will raise if column is missing. Dtype of columns are @@ -85,6 +83,9 @@ def write_table_to_postgis(gdf, table, engine=db.engine(), drop=True): """ + if engine is None: + engine = db.engine() + # Only take in db table defined columns columns = [column.key for column in table.__table__.columns] gdf = gdf.loc[:, columns] @@ -139,7 +140,7 @@ def psql_insert_copy(table, conn, keys, data_iter): def write_table_to_postgres( - df, db_table, engine=db.engine(), drop=False, index=False, if_exists="append" + df, db_table, engine=None, drop=False, index=False, if_exists="append" ): """ Helper function to append df data to table in db. Fast string-copy is used. @@ -165,6 +166,9 @@ def write_table_to_postgres( """ + if engine is None: + engine = db.engine() + # Only take in db table defined columns and dtypes columns = { column.key: column.type for column in db_table.__table__.columns diff --git a/src/egon/data/datasets/emobility/heavy_duty_transport/create_h2_buses.py b/src/egon/data/datasets/emobility/heavy_duty_transport/create_h2_buses.py index 7611db043..bd2fe76b3 100644 --- a/src/egon/data/datasets/emobility/heavy_duty_transport/create_h2_buses.py +++ b/src/egon/data/datasets/emobility/heavy_duty_transport/create_h2_buses.py @@ -163,8 +163,11 @@ def read_hgv_h2_demand(scenario: str = "eGon2035"): EgonHeavyDutyTransportVoronoi.scenario, EgonHeavyDutyTransportVoronoi.hydrogen_consumption, ).filter(EgonHeavyDutyTransportVoronoi.scenario == scenario) + query = query.all() - df = pd.read_sql(query.statement, query.session.bind, index_col="nuts3") + df = pd.DataFrame.from_records( + [db.asdict(row) for row in query], index="nuts3" + ) sql_vg250 = """ SELECT nuts as nuts3, geometry as geom diff --git a/src/egon/data/datasets/emobility/motorized_individual_travel/ev_allocation.py b/src/egon/data/datasets/emobility/motorized_individual_travel/ev_allocation.py index a64ed1c50..7ac19be63 100644 --- a/src/egon/data/datasets/emobility/motorized_individual_travel/ev_allocation.py +++ b/src/egon/data/datasets/emobility/motorized_individual_travel/ev_allocation.py @@ -205,10 +205,10 @@ def calc_evs_per_municipality(ev_data, rs7_data): Vg250GemPopulation.ags_0.label("ags"), Vg250GemPopulation.gen, Vg250GemPopulation.population_total.label("pop"), - ) + ).all() - muns = pd.read_sql( - query.statement, query.session.bind, index_col=None + muns = pd.DataFrame.from_records( + [db.asdict(row) for row in query], index=None ).astype({"ags": "int64"}) muns["ags_district"] = ( @@ -312,12 +312,11 @@ def calc_evs_per_grid_district(ev_data_muns): ) .group_by(MvGridDistricts.bus_id, Vg250Gem.ags) .order_by(Vg250Gem.ags) + .all() ) - mvgd_pop_per_mun = pd.read_sql( - query_pop_per_mvgd.statement, - query_pop_per_mvgd.session.bind, - index_col=None, + mvgd_pop_per_mun = pd.DataFrame.from_records( + [db.asdict(row) for row in query_pop_per_mvgd] ).astype({"bus_id": "int64", "pop": "int64", "ags": "int64"}) # Calc population share of each municipality in MVGD @@ -547,11 +546,13 @@ def get_random_evs(row): # Load EVs per grid district print("Loading EV counts for grid districts...") with db.session_scope() as session: - query = session.query(EgonEvCountMvGridDistrict).filter( - EgonEvCountMvGridDistrict.scenario == scenario_name + query = ( + session.query(EgonEvCountMvGridDistrict) + .filter(EgonEvCountMvGridDistrict.scenario == scenario_name) + .all() ) - ev_per_mvgd = pd.read_sql( - query.statement, query.session.bind, index_col=None + ev_per_mvgd = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) # Convert EV types' wide to long format @@ -569,11 +570,8 @@ def get_random_evs(row): query = session.query(EgonEvPool).filter( EgonEvPool.scenario == scenario_name ) - ev_pool = pd.read_sql( - query.statement, - query.session.bind, - index_col=None, - ) + query = query.all() + ev_pool = pd.DataFrame.from_records([db.asdict(row) for row in query]) # Draw EVs randomly for each grid district from pool print(" Draw EVs from pool for grid districts...") diff --git a/src/egon/data/datasets/emobility/motorized_individual_travel/model_timeseries.py b/src/egon/data/datasets/emobility/motorized_individual_travel/model_timeseries.py index 8ef1ca408..90938674c 100644 --- a/src/egon/data/datasets/emobility/motorized_individual_travel/model_timeseries.py +++ b/src/egon/data/datasets/emobility/motorized_individual_travel/model_timeseries.py @@ -473,10 +473,11 @@ def load_evs_trips( .order_by( EgonEvTrip.egon_ev_pool_ev_id, EgonEvTrip.simbev_event_id ) + .all() ) - trip_data = pd.read_sql( - query.statement, query.session.bind, index_col=None + trip_data = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ).astype( { "ev_id": "int", @@ -562,10 +563,11 @@ def calc_initial_ev_soc(bus_id: int, scenario_name: str) -> pd.DataFrame: EgonEvTrip.simbev_event_id == 0, ) .group_by(EgonEvPool.type) + .all() ) - initial_soc_per_ev_type = pd.read_sql( - query_ev_soc.statement, query_ev_soc.session.bind, index_col="type" + initial_soc_per_ev_type = pd.DataFrame.from_records( + [db.asdict(row) for row in query_ev_soc], index="type" ) initial_soc_per_ev_type[ @@ -631,7 +633,6 @@ def write_link(scenario_name: str) -> None: terrain_factor=1, ) ) - with db.session_scope() as session: session.add( EgonPfHvLinkTimeseries( scn_name=scenario_name, @@ -670,7 +671,6 @@ def write_store(scenario_name: str) -> None: standing_loss=0, ) ) - with db.session_scope() as session: session.add( EgonPfHvStoreTimeseries( scn_name=scenario_name, @@ -697,7 +697,6 @@ def write_load( sign=-1, ) ) - with db.session_scope() as session: session.add( EgonPfHvLoadTimeseries( scn_name=scenario_name, @@ -905,9 +904,9 @@ def delete_model_data_from_db(): def load_grid_district_ids() -> pd.Series: """Load bus IDs of all grid districts""" with db.session_scope() as session: - query_mvgd = session.query(MvGridDistricts.bus_id) - return pd.read_sql( - query_mvgd.statement, query_mvgd.session.bind, index_col=None + query_mvgd = session.query(MvGridDistricts.bus_id).all() + return pd.DataFrame.from_records( + [db.asdict(row) for row in query_mvgd] ).bus_id.sort_values() @@ -1014,9 +1013,10 @@ def generate_model_data_bunch(scenario_name: str, bunch: range) -> None: ) .filter(EgonEvMvGridDistrict.bus_id.in_(mvgd_bus_ids)) .filter(EgonEvMvGridDistrict.egon_ev_pool_ev_id.isnot(None)) + .all() ) - evs_grid_district = pd.read_sql( - query.statement, query.session.bind, index_col=None + evs_grid_district = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ).astype({"ev_id": "int"}) mvgd_bus_ids = evs_grid_district.bus_id.unique() diff --git a/src/egon/data/datasets/power_plants/__init__.py b/src/egon/data/datasets/power_plants/__init__.py index ab7cd4d83..e2f640c48 100755 --- a/src/egon/data/datasets/power_plants/__init__.py +++ b/src/egon/data/datasets/power_plants/__init__.py @@ -1,15 +1,7 @@ """The central module containing all code dealing with power plant data. """ from geoalchemy2 import Geometry -from sqlalchemy import ( - BigInteger, - Boolean, - Column, - Float, - Integer, - Sequence, - String, -) +from sqlalchemy import BigInteger, Column, Float, Integer, Sequence, String from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker @@ -19,6 +11,7 @@ from egon.data import db from egon.data.datasets import Dataset +from egon.data.datasets.power_plants import assign_weather_data from egon.data.datasets.power_plants.conventional import ( match_nep_no_chp, select_nep_power_plants, @@ -26,7 +19,6 @@ ) from egon.data.datasets.power_plants.pv_rooftop import pv_rooftop_per_mv_grid import egon.data.config -import egon.data.datasets.power_plants.assign_weather_data as assign_weather_data import egon.data.datasets.power_plants.pv_ground_mounted as pv_ground_mounted import egon.data.datasets.power_plants.wind_farms as wind_onshore import egon.data.datasets.power_plants.wind_offshore as wind_offshore @@ -289,6 +281,7 @@ def insert_biomass_plants(scenario): session.add(entry) session.commit() + session.close() def insert_hydro_plants(scenario): @@ -376,6 +369,7 @@ def insert_hydro_plants(scenario): session.add(entry) session.commit() + session.close() def assign_voltage_level(mastr_loc, cfg): @@ -723,6 +717,7 @@ def allocate_conventional_non_chp_power_plants(): ) session.add(entry) session.commit() + session.close() def allocate_other_power_plants(): @@ -796,10 +791,12 @@ def allocate_other_power_plants(): # Select power plants representing carrier 'others' from MaStR files mastr_sludge = pd.read_csv(cfg["sources"]["mastr_gsgk"]).query( - """EinheitBetriebsstatus=='InBetrieb'and Energietraeger=='Klaerschlamm'""" + "EinheitBetriebsstatus=='InBetrieb' and Energietraeger=='Klaerschlamm'" ) mastr_geothermal = pd.read_csv(cfg["sources"]["mastr_gsgk"]).query( - """EinheitBetriebsstatus=='InBetrieb' and Energietraeger=='Geothermie' and Technologie == 'ORCOrganicRankineCycleAnlage'""" + "EinheitBetriebsstatus=='InBetrieb'" + " and Energietraeger=='Geothermie'" + " and Technologie == 'ORCOrganicRankineCycleAnlage'" ) mastr_sg = mastr_sludge.append(mastr_geothermal) @@ -881,3 +878,4 @@ def allocate_other_power_plants(): ) session.add(entry) session.commit() + session.close() diff --git a/src/egon/data/datasets/power_plants/pv_ground_mounted.py b/src/egon/data/datasets/power_plants/pv_ground_mounted.py index e9b92b5b6..fa46d09c1 100644 --- a/src/egon/data/datasets/power_plants/pv_ground_mounted.py +++ b/src/egon/data/datasets/power_plants/pv_ground_mounted.py @@ -1,8 +1,6 @@ -from shapely import wkb import geopandas as gpd import numpy as np import pandas as pd -import psycopg2 from egon.data import db @@ -36,7 +34,7 @@ def mastr_existing_pv(path, pow_per_area): ) df = df[df["Lage"] == "Freiflaeche"] - ### examine data concerning geographical locations and drop NaNs + # examine data concerning geographical locations and drop NaNs x1 = df["Laengengrad"].isnull().sum() x2 = df["Breitengrad"].isnull().sum() print(" ") @@ -104,7 +102,7 @@ def mastr_existing_pv(path, pow_per_area): v_l.loc[index] = np.NaN mastr["voltage_level"] = v_l - ### examine data concerning voltage level + # examine data concerning voltage level x1 = mastr["voltage_level"].isnull().sum() print(" ") print("Examination of voltage levels in MaStR data set:") @@ -127,7 +125,7 @@ def mastr_existing_pv(path, pow_per_area): x3 = len(index_names) mastr.drop(index_names, inplace=True) - ### further examination + # further examination print("Number of PVs in low voltage level: " + str(x2)) print("Number of PVs in LVMV level: " + str(x3)) print( @@ -173,7 +171,7 @@ def potential_areas(con, join_buffer): # roads and railways - ### counting variable for examination + # counting variable for examination before = len(potentials_rora) # get small areas and create buffer for joining around them @@ -199,7 +197,7 @@ def potential_areas(con, join_buffer): join = gpd.GeoSeries(data=[x, y]) potentials_rora["geom"].loc[index_potentials] = join.unary_union - ### examination of joining of areas + # examination of joining of areas count_small = len(small_buffers) count_join = len(o) count_delete = count_small - count_join @@ -216,7 +214,7 @@ def potential_areas(con, join_buffer): # agriculture - ### counting variable for examination + # counting variable for examination before = len(potentials_agri) # get small areas and create buffer for joining around them @@ -242,7 +240,7 @@ def potential_areas(con, join_buffer): join = gpd.GeoSeries(data=[x, y]) potentials_agri["geom"].loc[index_potentials] = join.unary_union - ### examination of joining of areas + # examination of joining of areas count_small = len(small_buffers) count_join = len(o) count_delete = count_small - count_join @@ -261,7 +259,7 @@ def potential_areas(con, join_buffer): # check intersection of potential areas - ### counting variable + # counting variable agri_vorher = len(potentials_agri) # if areas intersect, keep road & railway potential areas and drop agricultural ones @@ -272,7 +270,7 @@ def potential_areas(con, join_buffer): index = o.iloc[i] potentials_agri.drop([index], inplace=True) - ### examination of intersection of areas + # examination of intersection of areas print(" ") print("Review function to avoid intersection of potential areas:") print("Initial length potentials_agri: " + str(agri_vorher)) @@ -323,7 +321,7 @@ def select_pot_areas(mastr, potentials_pot): # get voltage level of existing PVs index_pv = o.index[i] pot_sel["voltage_level"] = mastr["voltage_level"].loc[index_pv] - pot_sel = pot_sel[pot_sel["selected"] == True] + pot_sel = pot_sel[pot_sel["selected"] is True] pot_sel.drop("selected", axis=1, inplace=True) # drop selected existing pv parks from mastr @@ -471,7 +469,7 @@ def build_additional_pv(potentials, pv, pow_per_area, con): overlay = gpd.sjoin(centroids, distr) - ### examine potential area per grid district + # examine potential area per grid district anz = len(overlay) anz_distr = len(overlay["index_right"].unique()) size = 137500 # m2 Fläche für > 5,5 MW: (5500 kW / (0,04 kW/m2)) @@ -738,7 +736,7 @@ def keep_existing_pv(mastr, con): return pv_exist def run_methodology( - con=db.engine(), + con=None, path="", pow_per_area=0.04, join_buffer=10, @@ -765,6 +763,8 @@ def run_methodology( """ + if con is None: + con = db.engine() ### print(" ") print("MaStR-Data") @@ -900,7 +900,7 @@ def run_methodology( if len(distr_i) > 0: distr_i["nuts"] = target[target["nuts"] == i]["nuts"].iloc[0] - ### examination of built PV parks per state + # examination of built PV parks per state rora_i_mv = rora_i[rora_i["voltage_level"] == 5] rora_i_hv = rora_i[rora_i["voltage_level"] == 4] agri_i_mv = agri_i[agri_i["voltage_level"] == 5] @@ -981,8 +981,8 @@ def run_methodology( con, ) - ### create map to show distribution of installed capacity - if show_map == True: + # create map to show distribution of installed capacity + if show_map: # 1) eGon2035 @@ -1027,7 +1027,7 @@ def run_methodology( cmap="magma_r", legend=True, legend_kwds={ - "label": f"Installed capacity in MW", + "label": "Installed capacity in MW", "orientation": "vertical", }, ) @@ -1078,7 +1078,7 @@ def run_methodology( cmap="magma_r", legend=True, legend_kwds={ - "label": f"Installed capacity in MW", + "label": "Installed capacity in MW", "orientation": "vertical", }, ) @@ -1149,7 +1149,7 @@ def insert_pv_parks( sql = "SELECT MAX(id) FROM supply.egon_power_plants" max_id = pd.read_sql(sql, con) max_id = max_id["max"].iat[0] - if max_id == None: + if max_id is None: max_id = 1 pv_park_id = max_id + 1 @@ -1212,7 +1212,7 @@ def insert_pv_parks( show_map=False, ) - ### examination of results + # examination of results if len(pv_per_distr) > 0: pv_per_distr_mv = pv_per_distr[pv_per_distr["voltage_level"] == 5] pv_per_distr_hv = pv_per_distr[pv_per_distr["voltage_level"] == 4] diff --git a/src/egon/data/datasets/renewable_feedin.py b/src/egon/data/datasets/renewable_feedin.py index c9122d6ef..0bc5f0114 100644 --- a/src/egon/data/datasets/renewable_feedin.py +++ b/src/egon/data/datasets/renewable_feedin.py @@ -2,6 +2,7 @@ Central module containing all code dealing with processing era5 weather data. """ +from geoalchemy2.shape import to_shape from sqlalchemy import Column, ForeignKey, Integer from sqlalchemy.ext.declarative import declarative_base import geopandas as gpd @@ -36,7 +37,6 @@ def __init__(self, dependencies): Base = declarative_base() -engine = db.engine() class MapZensusWeatherCell(Base): @@ -491,7 +491,7 @@ def heat_pump_cop(): # Calculate coefficient of performance for air sourced heat pumps # according to Brown et. al - cop = 6.81 - 0.121 * delta_t + 0.00063 * delta_t ** 2 + cop = 6.81 - 0.121 * delta_t + 0.00063 * delta_t**2 df = pd.DataFrame( index=temperature.to_pandas().index, @@ -582,22 +582,30 @@ def mapping_zensus_weather(): ), DestatisZensusPopulationPerHaInsideGermany.geom_point, ) + cells_query = cells_query.all() - gdf_zensus_population = gpd.read_postgis( - cells_query.statement, - cells_query.session.bind, - index_col=None, - geom_col="geom_point", + gdf_zensus_population = gpd.GeoDataFrame( + pd.DataFrame.from_records( + [ + db.asdict(row, conversions={"geom_point": to_shape}) + for row in cells_query + ] + ), + geometry="geom_point", ) with db.session_scope() as session: cells_query = session.query(EgonEra5Cells.w_id, EgonEra5Cells.geom) + cells_query = cells_query.all() - gdf_weather_cell = gpd.read_postgis( - cells_query.statement, - cells_query.session.bind, - index_col=None, - geom_col="geom", + gdf_weather_cell = gpd.GeoDataFrame( + pd.DataFrame.from_records( + [ + db.asdict(row, conversions={"geom": to_shape}) + for row in cells_query + ] + ), + geometry="geom", ) # CRS is 4326 gdf_weather_cell = gdf_weather_cell.to_crs(epsg=3035) @@ -606,8 +614,8 @@ def mapping_zensus_weather(): gdf_weather_cell, how="left", predicate="within" ) - MapZensusWeatherCell.__table__.drop(bind=engine, checkfirst=True) - MapZensusWeatherCell.__table__.create(bind=engine, checkfirst=True) + MapZensusWeatherCell.__table__.drop(bind=db.engine(), checkfirst=True) + MapZensusWeatherCell.__table__.create(bind=db.engine(), checkfirst=True) # Write mapping into db with db.session_scope() as session: diff --git a/src/egon/data/datasets/sanity_checks.py b/src/egon/data/datasets/sanity_checks.py index b74101f58..8bfbb285a 100644 --- a/src/egon/data/datasets/sanity_checks.py +++ b/src/egon/data/datasets/sanity_checks.py @@ -620,9 +620,10 @@ def cts_electricity_demand_share(rtol=1e-5): with db.session_scope() as session: cells_query = session.query(EgonCtsElectricityDemandBuildingShare) + cells_query = cells_query.all() - df_demand_share = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_demand_share = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) np.testing.assert_allclose( @@ -646,10 +647,10 @@ def cts_heat_demand_share(rtol=1e-5): to all buildings.""" with db.session_scope() as session: - cells_query = session.query(EgonCtsHeatDemandBuildingShare) + cells_query = session.query(EgonCtsHeatDemandBuildingShare).all() - df_demand_share = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_demand_share = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) np.testing.assert_allclose( @@ -711,9 +712,10 @@ def check_ev_allocation(): table.scenario == scenario_name, table.scenario_variation == scenario_var_name, ) + query = query.all() - ev_counts = pd.read_sql( - query.statement, query.session.bind, index_col=None + ev_counts = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) ev_counts_dict[level] = ev_counts.iloc[0].ev_count print( @@ -744,8 +746,9 @@ def check_ev_allocation(): EgonEvMvGridDistrict.scenario == scenario_name, EgonEvMvGridDistrict.scenario_variation == scenario_var_name, ) + query = query.all() ev_count_alloc = ( - pd.read_sql(query.statement, query.session.bind, index_col=None) + pd.DataFrame.from_records([db.asdict(row) for row in query]) .iloc[0] .ev_count ) @@ -789,8 +792,9 @@ def check_trip_data(): ), EgonEvTrip.scenario == scenario_name, ) - invalid_trips = pd.read_sql( - query.statement, query.session.bind, index_col=None + query = query.all() + invalid_trips = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) np.testing.assert_equal( invalid_trips.iloc[0].cnt, @@ -820,8 +824,9 @@ def check_trip_data(): < cast(EgonEvTrip.charging_demand, Numeric), EgonEvTrip.scenario == scenario_name, ) - invalid_trips = pd.read_sql( - query.statement, query.session.bind, index_col=None + query = query.all() + invalid_trips = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) np.testing.assert_equal( invalid_trips.iloc[0].cnt, @@ -848,9 +853,10 @@ def check_model_data(): == scenario_var_name, ) .group_by(EgonEvMvGridDistrict.bus_id) + .all() ) mvgds_with_ev = ( - pd.read_sql(query.statement, query.session.bind, index_col=None) + pd.DataFrame.from_records([db.asdict(row) for row in query]) .bus_id.sort_values() .to_list() ) @@ -884,9 +890,10 @@ def check_model_data(): EgonPfHvLink.bus1 == EgonPfHvLoad.bus, EgonPfHvLink.bus1 == EgonPfHvStore.bus, ) + .all() ) - model_components = pd.read_sql( - query.statement, query.session.bind, index_col=None + model_components = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) # Check number of buses with model components connected @@ -966,10 +973,9 @@ def check_model_data(): ), attrs["table_ts"].scn_name == scenario_name, ) - attrs["ts"] = pd.read_sql( - query.statement, - query.session.bind, - index_col=attrs["column_id"], + attrs["ts"] = pd.DataFrame.from_records( + [db.asdict(row) for row in query.all()], + index=attrs["column_id"], ) # Check if all timeseries have 8760 steps @@ -1024,9 +1030,10 @@ def check_model_data(): EgonPfHvStore.scn_name == scenario_name, EgonPfHvStore.carrier == "battery storage", ) + query = query.all() storage_capacity_model = ( - pd.read_sql( - query.statement, query.session.bind, index_col=None + pd.DataFrame.from_records( + [db.asdict(row) for row in query] ).e_nom.sum() / 1e3 ) @@ -1057,9 +1064,10 @@ def check_model_data(): EgonEvPool.scenario == scenario_name, ) .group_by(EgonEvMvGridDistrict.bus_id, EgonEvPool.type) + .all() ) - count_per_ev_all = pd.read_sql( - query.statement, query.session.bind, index_col="bus_id" + count_per_ev_all = pd.DataFrame.from_records( + [db.asdict(row) for row in query], index="bus_id" ) count_per_ev_all["bat_cap"] = count_per_ev_all.type.map( meta_tech_data.battery_capacity @@ -1126,9 +1134,10 @@ def check_model_data_lowflex_eGon2035(): EgonPfHvLoad.scn_name == "eGon2035", EgonPfHvLoadTimeseries.scn_name == "eGon2035", ) + .all() ) - model_driving_load = pd.read_sql( - query.statement, query.session.bind, index_col=None + model_driving_load = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) driving_load = np.array(model_driving_load.p_set.to_list()).sum(axis=0) @@ -1151,9 +1160,10 @@ def check_model_data_lowflex_eGon2035(): EgonPfHvLoad.scn_name == "eGon2035_lowflex", EgonPfHvLoadTimeseries.scn_name == "eGon2035_lowflex", ) + .all() ) - model_charging_load_lowflex = pd.read_sql( - query.statement, query.session.bind, index_col=None + model_charging_load_lowflex = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) charging_load = np.array( model_charging_load_lowflex.p_set.to_list() diff --git a/src/egon/data/datasets/scenario_capacities.py b/src/egon/data/datasets/scenario_capacities.py index 9067c36e5..240af9ecc 100755 --- a/src/egon/data/datasets/scenario_capacities.py +++ b/src/egon/data/datasets/scenario_capacities.py @@ -6,7 +6,6 @@ from sqlalchemy import Column, Float, Integer, String from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker import numpy as np import pandas as pd import yaml @@ -16,7 +15,7 @@ from egon.data.datasets import Dataset import egon.data.config -### will be later imported from another file ### +# will be later imported from another file # Base = declarative_base() @@ -85,7 +84,6 @@ def create_table(): def nuts_mapping(): - nuts_mapping = { "BW": "DE1", "NW": "DEA", @@ -158,23 +156,26 @@ def insert_capacities_per_federal_state_nep(): df_windoff = pd.read_excel( target_file, sheet_name="WInd_Offshore_NEP", - ).dropna(subset=['Bundesland', 'Netzverknuepfungspunkt']) + ).dropna(subset=["Bundesland", "Netzverknuepfungspunkt"]) # Remove trailing whitespace from column Bundesland - df_windoff['Bundesland']= df_windoff['Bundesland'].str.strip() + df_windoff["Bundesland"] = df_windoff["Bundesland"].str.strip() # Group and sum capacities per federal state - df_windoff_fs = df_windoff[['Bundesland', 'C 2035']].groupby(['Bundesland']).sum() + df_windoff_fs = ( + df_windoff[["Bundesland", "C 2035"]].groupby(["Bundesland"]).sum() + ) # List federal state with an assigned wind offshore capacity index_list = list(df_windoff_fs.index.values) - # Overwrite capacities in df_windoff with more accurate values from df_windoff_fs + # Overwrite capacities in df_windoff with more accurate values from + # df_windoff_fs for state in index_list: - - df.at['Wind offshore', state] = df_windoff_fs.at[state, 'C 2035']/1000 - + df.at["Wind offshore", state] = ( + df_windoff_fs.at[state, "C 2035"] / 1000 + ) # sort NEP-carriers: rename_carrier = { @@ -194,7 +195,7 @@ def insert_capacities_per_federal_state_nep(): "Haushaltswaermepumpen": "residential_rural_heat_pump", "KWK < 10 MW": "small_chp", } - #'Elektromobilitaet gesamt': 'transport', + # 'Elektromobilitaet gesamt': 'transport', # 'Elektromobilitaet privat': 'transport'} # nuts1 to federal state in Germany @@ -216,7 +217,6 @@ def insert_capacities_per_federal_state_nep(): ] for bl in map_nuts.index: - data = pd.DataFrame(df[bl]) # if distribution to federal states is not provided, @@ -266,7 +266,8 @@ def insert_capacities_per_federal_state_nep(): # Filter by carrier updated = insert_data[insert_data["carrier"].isin(carriers)] - # Merge to replace capacities for carriers "oil", "other_non_renewable" and "pumped_hydro" + # Merge to replace capacities for carriers "oil", + # "other_non_renewable" and "pumped_hydro" updated = ( updated.merge(capacities_list, on=["carrier", "nuts"], how="left") .fillna(0) @@ -494,7 +495,8 @@ def insert_nep_list_powerplants(export=True): return kw_liste_nep -def district_heating_input(): +@db.session_scoped +def district_heating_input(session=None): """Imports data for district heating networks in Germany Returns @@ -523,10 +525,6 @@ def district_heating_input(): pd.IndexSlice[:, "Fernwaermeerzeugung"], "Wert" ] *= population_share() - # Connect to database - engine = db.engine() - session = sessionmaker(bind=engine)() - # insert heatpumps and resistive heater as link for c in ["Grosswaermepumpe", "Elektrodenheizkessel"]: entry = EgonScenarioCapacities( @@ -562,8 +560,6 @@ def district_heating_input(): session.add(entry) - session.commit() - def insert_data_nep(): """Overall function for importing scenario input data for eGon2035 scenario @@ -691,7 +687,9 @@ def eGon100_capacities(): df.p_nom[f"residential_{merge_carrier}"] + df.p_nom[f"services_{merge_carrier}"] ), - "component": df.component[f"residential_{merge_carrier}"], + "component": df.component[ + f"residential_{merge_carrier}" + ], }, ) ) @@ -708,7 +706,9 @@ def eGon100_capacities(): "OCGT": "gas", "rural_ground_heat_pump": "residential_rural_heat_pump", "urban_central_air_heat_pump": "urban_central_heat_pump", - "urban_central_solar_thermal": "urban_central_solar_thermal_collector", + "urban_central_solar_thermal": ( + "urban_central_solar_thermal_collector" + ), }, inplace=True, ) diff --git a/src/egon/data/datasets/scenario_parameters/__init__.py b/src/egon/data/datasets/scenario_parameters/__init__.py index 19ff71316..e99d62946 100755 --- a/src/egon/data/datasets/scenario_parameters/__init__.py +++ b/src/egon/data/datasets/scenario_parameters/__init__.py @@ -8,7 +8,6 @@ from sqlalchemy import VARCHAR, Column, String from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker import pandas as pd from egon.data import db @@ -45,7 +44,8 @@ def create_table(): EgonScenario.__table__.create(bind=engine, checkfirst=True) -def insert_scenarios(): +@db.session_scoped +def insert_scenarios(session=None): """Insert scenarios and their parameters to scenario table Returns @@ -54,9 +54,8 @@ def insert_scenarios(): """ - db.execute_sql("DELETE FROM scenario.egon_scenario_parameters CASCADE;") - - session = sessionmaker(bind=db.engine())() + session.execute("DELETE FROM scenario.egon_scenario_parameters CASCADE;") + session.commit() # Scenario eGon2035 egon2035 = EgonScenario(name="eGon2035") @@ -175,7 +174,10 @@ def download_pypsa_technology_data(): sources = egon.data.config.datasets()["pypsa-technology-data"]["sources"][ "zenodo" ] - url = f"""https://zenodo.org/record/{sources['deposit_id']}/files/{sources['file']}""" + url = ( + f"https://zenodo.org/record/{sources['deposit_id']}/files/" + f"{sources['file']}" + ) target_file = egon.data.config.datasets()["pypsa-technology-data"][ "targets" ]["file"] diff --git a/src/egon/data/datasets/storages/__init__.py b/src/egon/data/datasets/storages/__init__.py index d6cb666c2..269069a37 100755 --- a/src/egon/data/datasets/storages/__init__.py +++ b/src/egon/data/datasets/storages/__init__.py @@ -1,25 +1,25 @@ """The central module containing all code dealing with power plant data. """ -from geoalchemy2 import Geometry from pathlib import Path + +from geoalchemy2 import Geometry from sqlalchemy import BigInteger, Column, Float, Integer, Sequence, String from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker -from egon.data.datasets.storages.pumped_hydro import ( - select_mastr_pumped_hydro, - select_nep_pumped_hydro, - match_storage_units, - get_location, - apply_voltage_level_thresholds, -) -from egon.data.datasets.power_plants import assign_voltage_level import geopandas as gpd import pandas as pd -from egon.data import db, config +from egon.data import config, db from egon.data.datasets import Dataset - +from egon.data.datasets.power_plants import assign_voltage_level +from egon.data.datasets.storages.pumped_hydro import ( + apply_voltage_level_thresholds, + get_location, + match_storage_units, + select_mastr_pumped_hydro, + select_nep_pumped_hydro, +) Base = declarative_base() @@ -247,6 +247,7 @@ def allocate_pumped_hydro_eGon2035(export=True): ) session.add(entry) session.commit() + session.close() else: return power_plants @@ -292,8 +293,8 @@ def allocate_pumped_hydro_eGon100RE(): else: raise ValueError(f"'{boundary}' is not a valid dataset boundary.") - # Get allocation of pumped_hydro plants in eGon2035 scenario as the reference - # for the distribution in eGon100RE scenario + # Get allocation of pumped_hydro plants in eGon2035 scenario as the + # reference for the distribution in eGon100RE scenario allocation = allocate_pumped_hydro_eGon2035(export=False) scaling_factor = capacity_phes / allocation.el_capacity.sum() @@ -317,6 +318,7 @@ def allocate_pumped_hydro_eGon100RE(): ) session.add(entry) session.commit() + session.close() def home_batteries_per_scenario(scenario): @@ -351,9 +353,9 @@ def home_batteries_per_scenario(scenario): sheet_name="1.Entwurf_NEP2035_V2021", index_col="Unnamed: 0", ) - - # Select target value in MW - target = capacities_nep.Summe["PV-Batteriespeicher"]*1000 + + # Select target value in MW + target = capacities_nep.Summe["PV-Batteriespeicher"] * 1000 else: target = db.select_dataframe( @@ -410,6 +412,7 @@ def home_batteries_per_scenario(scenario): ) session.add(entry) session.commit() + session.close() def allocate_pv_home_batteries(): diff --git a/src/egon/data/datasets/storages/pumped_hydro.py b/src/egon/data/datasets/storages/pumped_hydro.py index cae9eea88..75a451f92 100755 --- a/src/egon/data/datasets/storages/pumped_hydro.py +++ b/src/egon/data/datasets/storages/pumped_hydro.py @@ -4,7 +4,6 @@ """ from geopy.geocoders import Nominatim -from sqlalchemy.orm import sessionmaker import geopandas as gpd import pandas as pd diff --git a/src/egon/data/db.py b/src/egon/data/db.py index 962e4ff61..9e66f1374 100755 --- a/src/egon/data/db.py +++ b/src/egon/data/db.py @@ -1,6 +1,8 @@ from contextlib import contextmanager +from types import SimpleNamespace import codecs import functools +import os import time from psycopg2.errors import DeadlockDetected, UniqueViolation @@ -13,6 +15,63 @@ from egon.data import config +def asdict(row, conversions=None): + """Convert a result row of an SQLAlchemy query to a dictionary. + + This helper unifies the conversion of two types of query result rows, + namely instances of mapped classes and keyed tuples, to dictionaries. + That way it's suitable for massaging query results into a format which + can easily be converted to a `pandas` `DataFrame` like this: + + ```python + df = pandas.DataFrame.from_records( + [asdict(row) for row in session.query(*columns).all()] + ) + ``` + + Parameters + ---------- + row : SQLAlchemy query result row + conversions : dict + Dictionary mapping column names to functions applied to the values of + that column. The default ist `None` which means no conversion is + applied. + + Returns + ------- + dict + The argument converted to a dictionary with column names as keys and + column values potentially converted by calling + `conversions[column_name](column_value)`. + """ + result = None + if hasattr(row, "_asdict"): + result = row._asdict() + if hasattr(row, "__table__"): + result = { + column.name: getattr(row, column.name) + for column in row.__table__.columns + } + if (result is not None) and (conversions is None): + return result + if (result is not None) and (conversions is not None): + return { + k: conversions[k](v) if k in conversions else v + for k, v in result.items() + } + raise TypeError( + "Don't know how to convert `row` argument to dict because it has" + " neither an `_asdict`, nor a `__table__` attribute." + ) + + +@contextmanager +def access(): + """Provide a context with a session and an associated connection.""" + with session_scope() as session, session.connection() as c, c.begin(): + yield SimpleNamespace(session=session, connection=c) + + def credentials(): """Return local database connection parameters. @@ -40,13 +99,19 @@ def credentials(): def engine(): """Engine for local database.""" + if not hasattr(engine, "cache"): + engine.cache = {} + pid = os.getpid() + if pid in engine.cache: + return engine.cache[pid] db_config = credentials() - return create_engine( + engine.cache[pid] = create_engine( f"postgresql+psycopg2://{db_config['POSTGRES_USER']}:" f"{db_config['POSTGRES_PASSWORD']}@{db_config['HOST']}:" f"{db_config['PORT']}/{db_config['POSTGRES_DB']}", echo=False, ) + return engine.cache[pid] def execute_sql(sql_string): @@ -61,10 +126,8 @@ def execute_sql(sql_string): SQL expression """ - engine_local = engine() - - with engine_local.connect().execution_options(autocommit=True) as con: - con.execute(text(sql_string)) + with access() as database: + database.connection.execute(text(sql_string)) def submit_comment(json, schema, table): @@ -128,7 +191,7 @@ def session_scope(): try: yield session session.commit() - except: + except: # noqa: E722 (This is ok because we immediatey reraise.) session.rollback() raise finally: @@ -179,7 +242,8 @@ def select_dataframe(sql, index_col=None, warning=True): """ - df = pd.read_sql(sql, engine(), index_col=index_col) + with access() as database: + df = pd.read_sql(sql, database.connection, index_col=index_col) if df.size == 0 and warning is True: print(f"WARNING: No data returned by statement: \n {sql}") @@ -208,9 +272,10 @@ def select_geodataframe(sql, index_col=None, geom_col="geom", epsg=3035): """ - gdf = gpd.read_postgis( - sql, engine(), index_col=index_col, geom_col=geom_col - ) + with access() as database: + gdf = gpd.read_postgis( + sql, database.connection, index_col=index_col, geom_col=geom_col + ) if gdf.size == 0: print(f"WARNING: No data returned by statement: \n {sql}") @@ -354,7 +419,11 @@ def commit(*args, **kwargs): def assign_gas_bus_id(dataframe, scn_name, carrier): - """Assigns bus_ids to points (contained in a dataframe) according to location + """Assigns bus_ids to points according to location. + + The points are taken from the given `dataframe` and the geometries by + which the `bus_id`s are assigned to them are taken from the + `grid.egon_gas_voronoi` table. Parameters ----------