Skip to content

Commit

Permalink
Merge pull request #8 from miles-no/feat/databricks-storage-account
Browse files Browse the repository at this point in the history
rename gold table to readable format
  • Loading branch information
pbullhove authored Sep 27, 2024
2 parents bd7b61d + 30aa4f0 commit 55c9631
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
29 changes: 29 additions & 0 deletions databricks/Havvarsel - Transform to Gold.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Databricks notebook source
from helpers.adls_utils import read_df_as_delta
df_silver = read_df_as_delta("/silver/hav_temperature_projection_latest")
display(df_silver)

# COMMAND ----------

from helpers.adls_utils import save_df_as_csv
save_df_as_csv(df_silver, "/gold/hav_temperature_projection_latest")

# COMMAND ----------

from helpers.adls_utils import get_adls_file_path
from datetime import datetime

gold_path = f"{get_adls_file_path()}/gold"
hav_gold_folder= f"{gold_path}/hav_temperature_projection_latest"
files = dbutils.fs.ls(hav_gold_folder)

old_file = [file.path for file in files if file.name.startswith("part-")][0]

today = datetime.now().strftime("%Y-%m-%d")
destination_path = f"{gold_path}/havtemp-pred-latest-{today}.csv"
dbutils.fs.mv(old_file, destination_path)

# COMMAND ----------

dbutils.fs.rm(hav_gold_folder, recurse=True)

10 changes: 10 additions & 0 deletions databricks/helpers/adls_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ def save_df_as_delta(df, table_name, mode="overwrite", file_path=get_adls_file_p
connect_to_adls()
df.write.format("delta").mode(mode).save(f"{file_path}/{table_name}")


def save_df_as_csv(df, table_name, mode="overwrite", file_path=get_adls_file_path()):
connect_to_adls()
df.coalesce(1).write.format("csv").mode(mode).option("header", "true").save(f"{file_path}/{table_name}")

def read_df_as_csv(file_name, file_path=get_adls_file_path()):
connect_to_adls()
df = spark.read.format("csv").load(f"{file_path}/{file_name}")
return df

def read_df_as_delta(file_name, file_path=get_adls_file_path()):
connect_to_adls()
df = spark.read.format("delta").load(f"{file_path}/{file_name}")
Expand Down

0 comments on commit 55c9631

Please sign in to comment.