Skip to content

Commit

Permalink
refactor(databricks): simpler silver transform
Browse files Browse the repository at this point in the history
  • Loading branch information
pbullhove committed Nov 1, 2024
1 parent 4282ada commit 08ef709
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 60 deletions.
25 changes: 13 additions & 12 deletions databricks/Havvarsel - Transform to Gold.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,24 @@ def transform_to_local_timezone(utc_time_str):

# COMMAND ----------

from pyspark.sql.functions import col

# Assume df_silver has a 'time' column in UTC
df_silver_time_transformed = df_silver.withColumn(
"fetch_timestamp_local",
transform_to_local_timezone_udf(col("fetch_timestamp_utc").cast(StringType()))
)

df_silver_time_transformed = df_silver_time_transformed.withColumn(
"forecast_timestamp_local",
transform_to_local_timezone_udf(col("forecast_timestamp_utc").cast(StringType()))
)
df_silver_time_transformed = df_silver \
.withColumn(
"fetch_timestamp_local",
transform_to_local_timezone_udf(col("fetch_timestamp_utc").cast(StringType()))
) \
.withColumn(
"forecast_timestamp_local",
transform_to_local_timezone_udf(col("forecast_timestamp_utc").cast(StringType()))
)

# COMMAND ----------

display(df_silver_time_transformed)

# save as a mangaged table for SQL Dashboards
df_silver_time_transformed.write.format("delta").mode("overwrite").saveAsTable("gold_hav_temperature_projection_latest")


# COMMAND ----------

from helpers.adls_utils import save_df_as_csv
Expand Down
57 changes: 9 additions & 48 deletions databricks/Havvarsel - Transform to Silver.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,27 @@
# Databricks notebook source
from helpers.adls_utils import read_df_as_delta
df_bronze_0 = read_df_as_delta("/bronze/hav_temperature_projection_latest")
display(df_bronze_0)
df_bronze = read_df_as_delta("/bronze/hav_temperature_projection_latest")
display(df_bronze)

# COMMAND ----------

from pyspark.sql.functions import explode, from_unixtime, col
from pyspark.sql.functions import col, explode, from_unixtime

# Extract lat and lon, and explode variables
df_bronze_1 = df_bronze_0.select(
# Extract lat and lon, and select the first element from variables
df_silver = df_bronze.select(
col("closestGridPointWithData.lat").alias("lat"),
col("closestGridPointWithData.lon").alias("lon"),
explode(col("variables")).alias("variable"),
"depth_meters",
"fetch_timestamp_utc"
)

display(df_bronze_1)

# COMMAND ----------

from pyspark.sql.functions import from_unixtime


# Extract fields from variable
df_bronze_2 = df_bronze_1.select(
"lat",
"lon",
col("variable.variableName").alias("variable_name"),
explode(col("variable.data")).alias("data"),
explode(col("variables")[0]["data"]).alias("data"),
col("data.rawTime").alias("forecast_timestamp_utc"),
col("data.value").alias("ocean_temperature"),
"depth_meters",
"fetch_timestamp_utc"
).drop("data")



display(df_bronze_2)

# COMMAND ----------

from pyspark.sql.functions import lit, col

# Convert time to readable format
df_bronze_3 = df_bronze_2.withColumn("forecast_timestamp_utc", from_unixtime(col("forecast_timestamp_utc") / 1000))


# Explode dimensions if necessary
df_silver = df_bronze_3.select(
"lat",
"lon",
"variable_name",
"forecast_timestamp_utc",
"ocean_temperature",
"depth_meters",
"fetch_timestamp_utc"
)
) \
.drop("data") \
.withColumn("forecast_timestamp_utc", from_unixtime(col("forecast_timestamp_utc") / 1000))

display(df_silver)


# COMMAND ----------


Expand Down

0 comments on commit 08ef709

Please sign in to comment.