diff --git a/databricks/Havvarsel - Transform to Gold.py b/databricks/Havvarsel - Transform to Gold.py index a6c6969..751ecc3 100644 --- a/databricks/Havvarsel - Transform to Gold.py +++ b/databricks/Havvarsel - Transform to Gold.py @@ -43,23 +43,24 @@ def transform_to_local_timezone(utc_time_str): # COMMAND ---------- -from pyspark.sql.functions import col - -# Assume df_silver has a 'time' column in UTC -df_silver_time_transformed = df_silver.withColumn( - "fetch_timestamp_local", - transform_to_local_timezone_udf(col("fetch_timestamp_utc").cast(StringType())) -) - -df_silver_time_transformed = df_silver_time_transformed.withColumn( - "forecast_timestamp_local", - transform_to_local_timezone_udf(col("forecast_timestamp_utc").cast(StringType())) -) +df_silver_time_transformed = df_silver \ + .withColumn( + "fetch_timestamp_local", + transform_to_local_timezone_udf(col("fetch_timestamp_utc").cast(StringType())) + ) \ + .withColumn( + "forecast_timestamp_local", + transform_to_local_timezone_udf(col("forecast_timestamp_utc").cast(StringType())) + ) # COMMAND ---------- display(df_silver_time_transformed) +# save as a mangaged table for SQL Dashboards +df_silver_time_transformed.write.format("delta").mode("overwrite").saveAsTable("gold_hav_temperature_projection_latest") + + # COMMAND ---------- from helpers.adls_utils import save_df_as_csv diff --git a/databricks/Havvarsel - Transform to Silver.py b/databricks/Havvarsel - Transform to Silver.py index d6950f2..5081dee 100644 --- a/databricks/Havvarsel - Transform to Silver.py +++ b/databricks/Havvarsel - Transform to Silver.py @@ -1,66 +1,27 @@ # Databricks notebook source from helpers.adls_utils import read_df_as_delta -df_bronze_0 = read_df_as_delta("/bronze/hav_temperature_projection_latest") -display(df_bronze_0) +df_bronze = read_df_as_delta("/bronze/hav_temperature_projection_latest") +display(df_bronze) # COMMAND ---------- -from pyspark.sql.functions import explode, from_unixtime, col +from pyspark.sql.functions import col, explode, from_unixtime -# Extract lat and lon, and explode variables -df_bronze_1 = df_bronze_0.select( +# Extract lat and lon, and select the first element from variables +df_silver = df_bronze.select( col("closestGridPointWithData.lat").alias("lat"), col("closestGridPointWithData.lon").alias("lon"), - explode(col("variables")).alias("variable"), - "depth_meters", - "fetch_timestamp_utc" -) - -display(df_bronze_1) - -# COMMAND ---------- - -from pyspark.sql.functions import from_unixtime - - -# Extract fields from variable -df_bronze_2 = df_bronze_1.select( - "lat", - "lon", - col("variable.variableName").alias("variable_name"), - explode(col("variable.data")).alias("data"), + explode(col("variables")[0]["data"]).alias("data"), col("data.rawTime").alias("forecast_timestamp_utc"), col("data.value").alias("ocean_temperature"), "depth_meters", - "fetch_timestamp_utc" -).drop("data") - - - -display(df_bronze_2) - -# COMMAND ---------- - -from pyspark.sql.functions import lit, col - -# Convert time to readable format -df_bronze_3 = df_bronze_2.withColumn("forecast_timestamp_utc", from_unixtime(col("forecast_timestamp_utc") / 1000)) - - -# Explode dimensions if necessary -df_silver = df_bronze_3.select( - "lat", - "lon", - "variable_name", - "forecast_timestamp_utc", - "ocean_temperature", - "depth_meters", "fetch_timestamp_utc" -) +) \ +.drop("data") \ +.withColumn("forecast_timestamp_utc", from_unixtime(col("forecast_timestamp_utc") / 1000)) display(df_silver) - # COMMAND ----------