Skip to content

Commit

Permalink
07221 Release (#1078)
Browse files Browse the repository at this point in the history
* Initial commit

* Hotfix for Audit_log_bronze and NotebookCommands_gold (#1065)

* Hotfix for Audit_log_bronze and NotebookCommands_gold

* Reemoved Unnecessary print statements

* Added Hotfix

* Added requiredColumns in getAuditLogsDF

* 26-Oct-31: Change Lag Day in AuditLogBronze to 1 day

---------

Co-authored-by: Sourav Banerjee <[email protected]>

* hotfix warehouse performance issue (#1074)

* hotfix performance issue

* removing commented code

* Add support to extract fields from cluster name as well as the Spark JobGroupId (#1077)

---------

Co-authored-by: Sourav Banerjee <[email protected]>
Co-authored-by: Sourav Banerjee <[email protected]>
Co-authored-by: Aman <[email protected]>
  • Loading branch information
4 people authored Nov 1, 2023
1 parent 6b62553 commit ceab555
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 40 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "overwatch"

organization := "com.databricks.labs"

version := "0.7.2.2"
version := "0.7.2.2.1"

scalaVersion := "2.12.12"
scalacOptions ++= Seq("-Xmax-classfile-name", "78")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,25 @@ trait BronzeTransforms extends SparkSessionWrapper {
val azureAuditSourceFilters = 'Overwatch_RunID === lit(overwatchRunID) && 'organization_id === organizationId
val rawBodyLookup = auditRawLand.asDF
.filter(azureAuditSourceFilters)

val requiredColumns : Array[Column] = Array(
col("category"),
col("version"),
col("timestamp"),
col("date"),
col("identity").alias("userIdentity"),
col("organization_id"),
col("properties.actionName"),
col("properties.logId"),
col("properties.requestId"),
col("properties.requestParams"),
col("properties.response"),
col("properties.serviceName"),
col("properties.sessionId"),
col("properties.sourceIPAddress"),
col("properties.userAgent"),
)

val schemaBuilders = auditRawLand.asDF
.filter(azureAuditSourceFilters)
.withColumn("parsedBody", structFromJson(spark, rawBodyLookup, "deserializedBody"))
Expand All @@ -348,9 +367,7 @@ trait BronzeTransforms extends SparkSessionWrapper {
.withColumn("time", 'time.cast("timestamp"))
.withColumn("timestamp", unix_timestamp('time) * 1000)
.withColumn("date", 'time.cast("date"))
.select('category, 'version, 'timestamp, 'date, 'properties, 'identity.alias("userIdentity"), 'organization_id)
.selectExpr("*", "properties.*").drop("properties")

.select(requiredColumns: _*)

val baselineAuditLogs = auditRawLand.asDF
.filter(azureAuditSourceFilters)
Expand All @@ -361,17 +378,18 @@ trait BronzeTransforms extends SparkSessionWrapper {
.withColumn("time", 'time.cast("timestamp"))
.withColumn("timestamp", unix_timestamp('time) * 1000)
.withColumn("date", 'time.cast("date"))
.select('category, 'version, 'timestamp, 'date, 'properties, 'identity.alias("userIdentity"), 'organization_id)
.select(requiredColumns: _*)
.withColumn("userIdentity", structFromJson(spark, schemaBuilders, "userIdentity"))
.selectExpr("*", "properties.*").drop("properties")
.withColumn("requestParams", structFromJson(spark, schemaBuilders, "requestParams"))

PipelineFunctions.cleanseCorruptAuditLogs(spark, baselineAuditLogs)
val auditDF = PipelineFunctions.cleanseCorruptAuditLogs(spark, baselineAuditLogs)
.withColumn("response", structFromJson(spark, schemaBuilders, "response"))
.withColumn("requestParamsJson", to_json('requestParams))
.withColumn("hashKey", xxhash64('organization_id, 'timestamp, 'serviceName, 'actionName, 'requestId, 'requestParamsJson))
.drop("logId", "requestParamsJson")

auditDF

} else {

// inclusive from exclusive to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,21 @@ object DbsqlTransforms extends SparkSessionWrapper {
.orderBy('timestamp).rowsBetween(Window.unboundedPreceding, 1000)

val warehouseRaw = auditRawDF
.select(
'timestamp,
'date,
'organization_id,
'serviceName,
'actionName,
'userEmail,
'requestId,
'response,
'warehouse_id,
PipelineFunctions.fillForward("warehouse_name",warehouse_name_gen_w),
PipelineFunctions.fillForward("cluster_size",warehouse_name_gen_w),
PipelineFunctions.fillForward("min_num_clusters",warehouse_name_gen_w),
PipelineFunctions.fillForward("max_num_clusters",warehouse_name_gen_w),
PipelineFunctions.fillForward("auto_stop_mins",warehouse_name_gen_w),
PipelineFunctions.fillForward("spot_instance_policy",warehouse_name_gen_w),
PipelineFunctions.fillForward("enable_photon",warehouse_name_gen_w),
PipelineFunctions.fillForward("channel",warehouse_name_gen_w),
PipelineFunctions.fillForward("tags",warehouse_name_gen_w),
PipelineFunctions.fillForward("enable_serverless_compute",warehouse_name_gen_w),
PipelineFunctions.fillForward("warehouse_type",warehouse_name_gen_w)
)
.withColumn("warehouse_name",PipelineFunctions.fillForward("warehouse_name",warehouse_name_gen_w))
.withColumn("cluster_size",PipelineFunctions.fillForward("cluster_size",warehouse_name_gen_w))
.withColumn("min_num_clusters",PipelineFunctions.fillForward("min_num_clusters",warehouse_name_gen_w))
.withColumn("max_num_clusters",PipelineFunctions.fillForward("max_num_clusters",warehouse_name_gen_w))
.withColumn("auto_stop_mins",PipelineFunctions.fillForward("auto_stop_mins",warehouse_name_gen_w))
.withColumn("spot_instance_policy",PipelineFunctions.fillForward("spot_instance_policy",warehouse_name_gen_w))
.withColumn("enable_photon",PipelineFunctions.fillForward("enable_photon",warehouse_name_gen_w))
.withColumn("channel",PipelineFunctions.fillForward("channel",warehouse_name_gen_w))
.withColumn("tags",PipelineFunctions.fillForward("tags",warehouse_name_gen_w))
.withColumn("enable_serverless_compute",PipelineFunctions.fillForward("enable_serverless_compute",warehouse_name_gen_w))
.withColumn("warehouse_type",PipelineFunctions.fillForward("warehouse_type",warehouse_name_gen_w))

warehouseRaw
.filter('source_table === "audit_log_bronze")
.drop("source_table")
}

/**
Expand All @@ -68,7 +60,7 @@ object DbsqlTransforms extends SparkSessionWrapper {
*/
def deriveWarehouseBaseFilled(isFirstRun: Boolean, bronzeWarehouseSnapUntilCurrent: DataFrame)
(warehouseBaseWMetaDF: DataFrame): DataFrame = {
if (isFirstRun) {
val result = if (isFirstRun) {
val firstRunMsg = "Silver_WarehouseSpec -- First run detected, will impute warehouse state from bronze to derive " +
"current initial state for all existing warehouses."
logger.log(Level.INFO, firstRunMsg)
Expand Down Expand Up @@ -113,9 +105,42 @@ object DbsqlTransforms extends SparkSessionWrapper {
(unix_timestamp('Pipeline_SnapTS) * 1000).alias("timestamp"),
'Pipeline_SnapTS.cast("date").alias("date"),
'creator_name.alias("createdBy")
).unionByName(warehouseBaseWMetaDF, allowMissingColumns = true)
)
unionWithMissingAsNull(warehouseBaseWMetaDF, missingWareHouseBaseFromSnap)
} else warehouseBaseWMetaDF
} else
warehouseBaseWMetaDF

result.select(
'organization_id,
'warehouse_id,
'serviceName,
'actionName,
'warehouse_name,
'cluster_size,
'userEmail,
'requestId,
'response,
'min_num_clusters,
'max_num_clusters,
'auto_stop_mins,
'spot_instance_policy,
'enable_photon,
'channel,
'tags,
'enable_serverless_compute,
'warehouse_type,
'timestamp,
'date,
'createdBy,
'warehouse_state,
'size,
'auto_resume,
'creator_id,
'num_clusters,
'num_active_sessions,
'jdbc_url,
'odbc_params
)
}

def deriveInputForWarehouseBase(auditLogDf: DataFrame, warehouseSpecSilver: PipelineTable
Expand Down Expand Up @@ -151,6 +176,7 @@ object DbsqlTransforms extends SparkSessionWrapper {

val filteredAuditLogDf = auditLogDfWithStructsToMap
.withColumn("tags", SchemaTools.structToMap(auditLogDfWithStructsToMap, "tags"))
.withColumn("source_table",lit("audit_log_bronze"))

val filteredDf = if(warehouseSpecSilver.exists(dataValidation = true)) {
filteredAuditLogDf
Expand All @@ -175,8 +201,18 @@ object DbsqlTransforms extends SparkSessionWrapper {
'channel,
'tags,
'enable_serverless_compute,
'warehouse_type
'warehouse_type,
'warehouse_state,
'size,
'auto_resume,
'creator_id,
'num_clusters,
'num_active_sessions,
'jdbc_url,
'odbc_params,
'createdBy
)
.withColumn("source_table",lit("warehouse_spec_silver")),true
)
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ trait GoldTransforms extends SparkSessionWrapper {
.drop("cluster_name", "custom_tags", "node_type_id")


val joinedDF = clsfDF.join(auditDF, Seq("clusterId", "organization_id"), "left")
val joinedDF = clsfDF.join(auditDF, Seq("clusterId", "organization_id"), "inner")

// Cluster_state started before cmd start time and ended before command end time
val state_before_before = 'unixTimeMS_state_start < 'unixTimeMSStart && 'unixTimeMS_state_end < 'unixTimeMSEnd
Expand Down Expand Up @@ -625,6 +625,14 @@ trait GoldTransforms extends SparkSessionWrapper {
}
}

private[overwatch] def extractDBJobId(column: Column): Column = {
split(regexp_extract(column, "(job-\\d+)", 1), "-")(1)
}

private[overwatch] def extractDBIdInJob(column: Column): Column = {
split(regexp_extract(column, "(-run-\\d+)", 1), "-")(2)
}

protected def buildSparkJob(
cloudProvider: String
)(df: DataFrame): DataFrame = {
Expand Down Expand Up @@ -667,6 +675,7 @@ trait GoldTransforms extends SparkSessionWrapper {
'ExecutionID.alias("execution_id"),
'StageIDs.alias("stage_ids"),
'clusterId.alias("cluster_id"),
$"PowerProperties.ClusterDetails.Name".alias("cluster_name"),
$"PowerProperties.NotebookID".alias("notebook_id"),
$"PowerProperties.NotebookPath".alias("notebook_path"),
$"PowerProperties.SparkDBJobID".alias("db_job_id"),
Expand All @@ -685,21 +694,27 @@ trait GoldTransforms extends SparkSessionWrapper {
val sparkContextW = Window.partitionBy('organization_id, 'spark_context_id)

val isDatabricksJob = 'job_group_id.like("%job-%-run-%")
val isAutomatedCluster = 'cluster_name.like("%job-%-run-%")

sparkJobsWImputedUser
.select(sparkJobCols: _*)
.withColumn("cluster_id", first('cluster_id, ignoreNulls = true).over(sparkContextW))
.withColumn("jobGroupAr", split('job_group_id, "_")(2))
.withColumn("db_job_id",
when(isDatabricksJob && 'db_job_id.isNull,
split(regexp_extract('jobGroupAr, "(job-\\d+)", 1), "-")(1))
.otherwise('db_job_id)
when(isDatabricksJob && 'db_job_id.isNull, extractDBJobId('jobGroupAr))
.otherwise(
when(isAutomatedCluster && 'db_job_id.isNull, extractDBJobId('cluster_name))
.otherwise('db_job_id)
)
)
.withColumn("db_id_in_job",
when(isDatabricksJob && 'db_run_id.isNull,
split(regexp_extract('jobGroupAr, "(-run-\\d+)", 1), "-")(2))
.otherwise('db_run_id)
when(isDatabricksJob && 'db_run_id.isNull, extractDBIdInJob('jobGroupAr))
.otherwise(
when(isAutomatedCluster && 'db_run_id.isNull, extractDBJobId('cluster_name))
.otherwise('db_run_id)
)
)
.drop("cluster_name")
}

protected def buildSparkStage()(df: DataFrame): DataFrame = {
Expand Down

0 comments on commit ceab555

Please sign in to comment.