Skip to content

Commit

Permalink
mae bronze get data from multiple localities and depths
Browse files Browse the repository at this point in the history
  • Loading branch information
pbullhove authored and pbullhove committed Sep 27, 2024
1 parent 55c9631 commit 5910966
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 29 deletions.
65 changes: 48 additions & 17 deletions databricks/Havvarsel - Ingest as Bronze.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,68 @@
# Databricks notebook source
# Query Parameters
depth_index = 2 # 10m - look it up on the depth index table.
lat = 5.32
lon = 60.39
depth_indices = [0, 1, 2, 3] # [0m, 3m, 10m, 15m] below sea level
locations = [
{"lat": 14.565382891612964, "lon": 68.22784304432557}, #Lofoten, Svolvær
{"lat": 13.62931782826355, "lon": 68.08787504064836}, #Lofoten, Buksnesfjorden
{"lat": 14.814009616694364, "lon": 68.44104810992098} #Ofoten, Melbu
]

# COMMAND ----------

def fetch_ocean_temperature_preditions(depth_index, lat, lon):
url = f"https://api.havvarsel.no/apis/duapi/havvarsel/v2/temperatureprojection/{lat}/{lon}?depth={depth_index}"
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
return response.json()


# COMMAND ----------

depth_data = spark.table("havvarsel_depth_index_to_meter_mapping")


# COMMAND ----------

import requests
from pyspark.sql.functions import lit
from datetime import datetime
from helpers.adls_utils import save_df_as_delta, get_adls_folder_path

url = f"https://api.havvarsel.no/apis/duapi/havvarsel/v2/temperatureprojection/{lat}/{lon}?depth={depth_index}"
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
data = response.json()
df_raw = spark.read.json(sc.parallelize([data]))

fetch_date = datetime.now().strftime("%Y-%m-%d")
bronze_df_file_name = f"bronze/hav_temperature_projection_{fetch_date}" # have a new bronze for each fetch date

dbutils.fs.rm(f"{get_adls_folder_path()}/{bronze_df_file_name}", recurse=True) # delete old in order to remove duplicates


# COMMAND ----------

from datetime import datetime
depth_data = spark.table("havvarsel_depth_index_to_meter_mapping")
depth_m = depth_data.filter(depth_data.depthIndex == depth_index).collect()[0].depthValue
fetch_date = datetime.now().strftime("%Y-%m-%d")

df_bronze = df_raw.withColumn("depth_meters", lit(depth_m)).withColumn("fetch_date", lit(fetch_date))

for loc_nr, location in enumerate(locations):
for depth_index in depth_indices:
print(f"Fetching data for depth index {depth_index} at location ({location['lat']}, {location['lon']})")
data = fetch_ocean_temperature_preditions(depth_index, location["lat"], location["lon"])
if ("code" in data and data["code"] == 404):
print("-- Error, message: ", data["message"])
continue

df_raw = spark.read.json(sc.parallelize([data]))

# add depth info and fetch date in order to have metadata in the table
depth_m = depth_data.filter(depth_data.depthIndex == depth_index).collect()[0].depthValue
df_bronze = df_raw.withColumn("depth_meters", lit(depth_m)).withColumn("fetch_date", lit(fetch_date))

save_df_as_delta(df_bronze, bronze_df_file_name, "append")




# COMMAND ----------

from helpers.adls_utils import save_df_as_delta
save_df_as_delta(df_bronze, f"/bronze/hav_temperature_projection_{fetch_date}")
save_df_as_delta(df_bronze, "/bronze/hav_temperature_projection_latest")

latest_path = f"{get_adls_folder_path()}/bronze/hav_temperature_projection_latest"
dbutils.fs.rm(latest_path, recurse=True) # delete oldest latest before copy

# in order to have an updated latest, we overwrite the latest with the newly fetched dataframe.
dbutils.fs.cp(f"{get_adls_folder_path()}/{bronze_df_file_name}", latest_path, recurse=True)

4 changes: 2 additions & 2 deletions databricks/Havvarsel - Transform to Gold.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

# COMMAND ----------

from helpers.adls_utils import get_adls_file_path
from helpers.adls_utils import get_adls_folder_path
from datetime import datetime

gold_path = f"{get_adls_file_path()}/gold"
gold_path = f"{get_adls_folder_path()}/gold"
hav_gold_folder= f"{gold_path}/hav_temperature_projection_latest"
files = dbutils.fs.ls(hav_gold_folder)

Expand Down
19 changes: 9 additions & 10 deletions databricks/helpers/adls_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,24 @@ def connect_to_adls(storage_account = STORAGE_ACCOUNT):
f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
dbutils.secrets.get(scope="terraform-created-scope", key="storage-account-key"))

def get_adls_file_path(container = "datalake", storage_account = STORAGE_ACCOUNT):
def get_adls_folder_path(container = "datalake", storage_account = STORAGE_ACCOUNT):
return (f"abfss://{container}@{storage_account}.dfs.core.windows.net/havvarsel/")


def save_df_as_delta(df, table_name, mode="overwrite", file_path=get_adls_file_path()):
def save_df_as_delta(df, table_name, mode="overwrite", folder_path=get_adls_folder_path()):
connect_to_adls()
df.write.format("delta").mode(mode).save(f"{file_path}/{table_name}")
df.write.format("delta").mode(mode).save(f"{folder_path}/{table_name}")


def save_df_as_csv(df, table_name, mode="overwrite", file_path=get_adls_file_path()):
def save_df_as_csv(df, table_name, mode="overwrite", folder_path=get_adls_folder_path()):
connect_to_adls()
df.coalesce(1).write.format("csv").mode(mode).option("header", "true").save(f"{file_path}/{table_name}")
df.coalesce(1).write.format("csv").mode(mode).option("header", "true").save(f"{folder_path}/{table_name}")

def read_df_as_csv(file_name, file_path=get_adls_file_path()):
def read_df_as_csv(file_name, folder_path=get_adls_folder_path()):
connect_to_adls()
df = spark.read.format("csv").load(f"{file_path}/{file_name}")
df = spark.read.format("csv").load(f"{folder_path}/{file_name}")
return df

def read_df_as_delta(file_name, file_path=get_adls_file_path()):
def read_df_as_delta(file_name, folder_path=get_adls_folder_path()):
connect_to_adls()
df = spark.read.format("delta").load(f"{file_path}/{file_name}")
df = spark.read.format("delta").load(f"{folder_path}/{file_name}")
return df

0 comments on commit 5910966

Please sign in to comment.