diff --git a/databricks/Havvarsel - Ingest as Bronze.py b/databricks/Havvarsel - Ingest as Bronze.py index 8c7d3e6..6819867 100644 --- a/databricks/Havvarsel - Ingest as Bronze.py +++ b/databricks/Havvarsel - Ingest as Bronze.py @@ -1,37 +1,68 @@ # Databricks notebook source -# Query Parameters -depth_index = 2 # 10m - look it up on the depth index table. -lat = 5.32 -lon = 60.39 +depth_indices = [0, 1, 2, 3] # [0m, 3m, 10m, 15m] below sea level +locations = [ + {"lat": 14.565382891612964, "lon": 68.22784304432557}, #Lofoten, Svolvær + {"lat": 13.62931782826355, "lon": 68.08787504064836}, #Lofoten, Buksnesfjorden + {"lat": 14.814009616694364, "lon": 68.44104810992098} #Ofoten, Melbu +] +# COMMAND ---------- + +def fetch_ocean_temperature_preditions(depth_index, lat, lon): + url = f"https://api.havvarsel.no/apis/duapi/havvarsel/v2/temperatureprojection/{lat}/{lon}?depth={depth_index}" + headers = {"accept": "application/json"} + response = requests.get(url, headers=headers) + return response.json() + + +# COMMAND ---------- + +depth_data = spark.table("havvarsel_depth_index_to_meter_mapping") # COMMAND ---------- import requests from pyspark.sql.functions import lit +from datetime import datetime +from helpers.adls_utils import save_df_as_delta, get_adls_folder_path -url = f"https://api.havvarsel.no/apis/duapi/havvarsel/v2/temperatureprojection/{lat}/{lon}?depth={depth_index}" -headers = {"accept": "application/json"} -response = requests.get(url, headers=headers) -data = response.json() -df_raw = spark.read.json(sc.parallelize([data])) +fetch_date = datetime.now().strftime("%Y-%m-%d") +bronze_df_file_name = f"bronze/hav_temperature_projection_{fetch_date}" # have a new bronze for each fetch date +dbutils.fs.rm(f"{get_adls_folder_path()}/{bronze_df_file_name}", recurse=True) # delete old in order to remove duplicates # COMMAND ---------- -from datetime import datetime -depth_data = spark.table("havvarsel_depth_index_to_meter_mapping") -depth_m = depth_data.filter(depth_data.depthIndex == depth_index).collect()[0].depthValue -fetch_date = datetime.now().strftime("%Y-%m-%d") -df_bronze = df_raw.withColumn("depth_meters", lit(depth_m)).withColumn("fetch_date", lit(fetch_date)) + +for loc_nr, location in enumerate(locations): + for depth_index in depth_indices: + print(f"Fetching data for depth index {depth_index} at location ({location['lat']}, {location['lon']})") + data = fetch_ocean_temperature_preditions(depth_index, location["lat"], location["lon"]) + if ("code" in data and data["code"] == 404): + print("-- Error, message: ", data["message"]) + continue + + df_raw = spark.read.json(sc.parallelize([data])) + + # add depth info and fetch date in order to have metadata in the table + depth_m = depth_data.filter(depth_data.depthIndex == depth_index).collect()[0].depthValue + df_bronze = df_raw.withColumn("depth_meters", lit(depth_m)).withColumn("fetch_date", lit(fetch_date)) + + save_df_as_delta(df_bronze, bronze_df_file_name, "append") + + + # COMMAND ---------- -from helpers.adls_utils import save_df_as_delta -save_df_as_delta(df_bronze, f"/bronze/hav_temperature_projection_{fetch_date}") -save_df_as_delta(df_bronze, "/bronze/hav_temperature_projection_latest") + +latest_path = f"{get_adls_folder_path()}/bronze/hav_temperature_projection_latest" +dbutils.fs.rm(latest_path, recurse=True) # delete oldest latest before copy + +# in order to have an updated latest, we overwrite the latest with the newly fetched dataframe. +dbutils.fs.cp(f"{get_adls_folder_path()}/{bronze_df_file_name}", latest_path, recurse=True) diff --git a/databricks/Havvarsel - Transform to Gold.py b/databricks/Havvarsel - Transform to Gold.py index 05af87a..ed5676b 100644 --- a/databricks/Havvarsel - Transform to Gold.py +++ b/databricks/Havvarsel - Transform to Gold.py @@ -10,10 +10,10 @@ # COMMAND ---------- -from helpers.adls_utils import get_adls_file_path +from helpers.adls_utils import get_adls_folder_path from datetime import datetime -gold_path = f"{get_adls_file_path()}/gold" +gold_path = f"{get_adls_folder_path()}/gold" hav_gold_folder= f"{gold_path}/hav_temperature_projection_latest" files = dbutils.fs.ls(hav_gold_folder) diff --git a/databricks/helpers/adls_utils.py b/databricks/helpers/adls_utils.py index 5191c2f..7a7d787 100644 --- a/databricks/helpers/adls_utils.py +++ b/databricks/helpers/adls_utils.py @@ -10,25 +10,24 @@ def connect_to_adls(storage_account = STORAGE_ACCOUNT): f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", dbutils.secrets.get(scope="terraform-created-scope", key="storage-account-key")) -def get_adls_file_path(container = "datalake", storage_account = STORAGE_ACCOUNT): +def get_adls_folder_path(container = "datalake", storage_account = STORAGE_ACCOUNT): return (f"abfss://{container}@{storage_account}.dfs.core.windows.net/havvarsel/") - -def save_df_as_delta(df, table_name, mode="overwrite", file_path=get_adls_file_path()): +def save_df_as_delta(df, table_name, mode="overwrite", folder_path=get_adls_folder_path()): connect_to_adls() - df.write.format("delta").mode(mode).save(f"{file_path}/{table_name}") + df.write.format("delta").mode(mode).save(f"{folder_path}/{table_name}") -def save_df_as_csv(df, table_name, mode="overwrite", file_path=get_adls_file_path()): +def save_df_as_csv(df, table_name, mode="overwrite", folder_path=get_adls_folder_path()): connect_to_adls() - df.coalesce(1).write.format("csv").mode(mode).option("header", "true").save(f"{file_path}/{table_name}") + df.coalesce(1).write.format("csv").mode(mode).option("header", "true").save(f"{folder_path}/{table_name}") -def read_df_as_csv(file_name, file_path=get_adls_file_path()): +def read_df_as_csv(file_name, folder_path=get_adls_folder_path()): connect_to_adls() - df = spark.read.format("csv").load(f"{file_path}/{file_name}") + df = spark.read.format("csv").load(f"{folder_path}/{file_name}") return df -def read_df_as_delta(file_name, file_path=get_adls_file_path()): +def read_df_as_delta(file_name, folder_path=get_adls_folder_path()): connect_to_adls() - df = spark.read.format("delta").load(f"{file_path}/{file_name}") + df = spark.read.format("delta").load(f"{folder_path}/{file_name}") return df \ No newline at end of file