From 7a7274da527802ef84b95e26add4f60f83981ac1 Mon Sep 17 00:00:00 2001 From: pbullhove Date: Fri, 27 Sep 2024 07:37:23 +0000 Subject: [PATCH 1/2] rename gold table to readable format --- databricks/Havvarsel - Ingest as Bronze.py | 2 ++ databricks/Havvarsel - Transform to Gold.py | 29 +++++++++++++++++++++ databricks/helpers/adls_utils.py | 10 +++++++ 3 files changed, 41 insertions(+) create mode 100644 databricks/Havvarsel - Transform to Gold.py diff --git a/databricks/Havvarsel - Ingest as Bronze.py b/databricks/Havvarsel - Ingest as Bronze.py index 8c7d3e6..1f099b9 100644 --- a/databricks/Havvarsel - Ingest as Bronze.py +++ b/databricks/Havvarsel - Ingest as Bronze.py @@ -11,6 +11,8 @@ import requests from pyspark.sql.functions import lit +df_raw_agg = spark.dataframe() + url = f"https://api.havvarsel.no/apis/duapi/havvarsel/v2/temperatureprojection/{lat}/{lon}?depth={depth_index}" headers = {"accept": "application/json"} response = requests.get(url, headers=headers) diff --git a/databricks/Havvarsel - Transform to Gold.py b/databricks/Havvarsel - Transform to Gold.py new file mode 100644 index 0000000..05af87a --- /dev/null +++ b/databricks/Havvarsel - Transform to Gold.py @@ -0,0 +1,29 @@ +# Databricks notebook source +from helpers.adls_utils import read_df_as_delta +df_silver = read_df_as_delta("/silver/hav_temperature_projection_latest") +display(df_silver) + +# COMMAND ---------- + +from helpers.adls_utils import save_df_as_csv +save_df_as_csv(df_silver, "/gold/hav_temperature_projection_latest") + +# COMMAND ---------- + +from helpers.adls_utils import get_adls_file_path +from datetime import datetime + +gold_path = f"{get_adls_file_path()}/gold" +hav_gold_folder= f"{gold_path}/hav_temperature_projection_latest" +files = dbutils.fs.ls(hav_gold_folder) + +old_file = [file.path for file in files if file.name.startswith("part-")][0] + +today = datetime.now().strftime("%Y-%m-%d") +destination_path = f"{gold_path}/havtemp-pred-latest-{today}.csv" +dbutils.fs.mv(old_file, destination_path) + +# COMMAND ---------- + +dbutils.fs.rm(hav_gold_folder, recurse=True) + diff --git a/databricks/helpers/adls_utils.py b/databricks/helpers/adls_utils.py index 6e77475..5191c2f 100644 --- a/databricks/helpers/adls_utils.py +++ b/databricks/helpers/adls_utils.py @@ -18,6 +18,16 @@ def save_df_as_delta(df, table_name, mode="overwrite", file_path=get_adls_file_p connect_to_adls() df.write.format("delta").mode(mode).save(f"{file_path}/{table_name}") + +def save_df_as_csv(df, table_name, mode="overwrite", file_path=get_adls_file_path()): + connect_to_adls() + df.coalesce(1).write.format("csv").mode(mode).option("header", "true").save(f"{file_path}/{table_name}") + +def read_df_as_csv(file_name, file_path=get_adls_file_path()): + connect_to_adls() + df = spark.read.format("csv").load(f"{file_path}/{file_name}") + return df + def read_df_as_delta(file_name, file_path=get_adls_file_path()): connect_to_adls() df = spark.read.format("delta").load(f"{file_path}/{file_name}") From 30aa4f05a480073fbc6594220949f9c91fe60aaf Mon Sep 17 00:00:00 2001 From: pbullhove Date: Fri, 27 Sep 2024 07:42:26 +0000 Subject: [PATCH 2/2] remove code that should not be comitted --- databricks/Havvarsel - Ingest as Bronze.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/databricks/Havvarsel - Ingest as Bronze.py b/databricks/Havvarsel - Ingest as Bronze.py index 1f099b9..8c7d3e6 100644 --- a/databricks/Havvarsel - Ingest as Bronze.py +++ b/databricks/Havvarsel - Ingest as Bronze.py @@ -11,8 +11,6 @@ import requests from pyspark.sql.functions import lit -df_raw_agg = spark.dataframe() - url = f"https://api.havvarsel.no/apis/duapi/havvarsel/v2/temperatureprojection/{lat}/{lon}?depth={depth_index}" headers = {"accept": "application/json"} response = requests.get(url, headers=headers)