diff --git a/build.sbt b/build.sbt index 3d39282e1..38eeaa40f 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ name := "overwatch" organization := "com.databricks.labs" -version := "0.7.2.2" +version := "0.7.2.2.1" scalaVersion := "2.12.12" scalacOptions ++= Seq("-Xmax-classfile-name", "78") diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala index a68f64fdc..74aea67d8 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala @@ -339,6 +339,25 @@ trait BronzeTransforms extends SparkSessionWrapper { val azureAuditSourceFilters = 'Overwatch_RunID === lit(overwatchRunID) && 'organization_id === organizationId val rawBodyLookup = auditRawLand.asDF .filter(azureAuditSourceFilters) + + val requiredColumns : Array[Column] = Array( + col("category"), + col("version"), + col("timestamp"), + col("date"), + col("identity").alias("userIdentity"), + col("organization_id"), + col("properties.actionName"), + col("properties.logId"), + col("properties.requestId"), + col("properties.requestParams"), + col("properties.response"), + col("properties.serviceName"), + col("properties.sessionId"), + col("properties.sourceIPAddress"), + col("properties.userAgent"), + ) + val schemaBuilders = auditRawLand.asDF .filter(azureAuditSourceFilters) .withColumn("parsedBody", structFromJson(spark, rawBodyLookup, "deserializedBody")) @@ -348,9 +367,7 @@ trait BronzeTransforms extends SparkSessionWrapper { .withColumn("time", 'time.cast("timestamp")) .withColumn("timestamp", unix_timestamp('time) * 1000) .withColumn("date", 'time.cast("date")) - .select('category, 'version, 'timestamp, 'date, 'properties, 'identity.alias("userIdentity"), 'organization_id) - .selectExpr("*", "properties.*").drop("properties") - + .select(requiredColumns: _*) val baselineAuditLogs = auditRawLand.asDF .filter(azureAuditSourceFilters) @@ -361,17 +378,18 @@ trait BronzeTransforms extends SparkSessionWrapper { .withColumn("time", 'time.cast("timestamp")) .withColumn("timestamp", unix_timestamp('time) * 1000) .withColumn("date", 'time.cast("date")) - .select('category, 'version, 'timestamp, 'date, 'properties, 'identity.alias("userIdentity"), 'organization_id) + .select(requiredColumns: _*) .withColumn("userIdentity", structFromJson(spark, schemaBuilders, "userIdentity")) - .selectExpr("*", "properties.*").drop("properties") .withColumn("requestParams", structFromJson(spark, schemaBuilders, "requestParams")) - PipelineFunctions.cleanseCorruptAuditLogs(spark, baselineAuditLogs) + val auditDF = PipelineFunctions.cleanseCorruptAuditLogs(spark, baselineAuditLogs) .withColumn("response", structFromJson(spark, schemaBuilders, "response")) .withColumn("requestParamsJson", to_json('requestParams)) .withColumn("hashKey", xxhash64('organization_id, 'timestamp, 'serviceName, 'actionName, 'requestId, 'requestParamsJson)) .drop("logId", "requestParamsJson") + auditDF + } else { // inclusive from exclusive to diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/DbsqlTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/DbsqlTransforms.scala index d47d22923..27e15db20 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/DbsqlTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/DbsqlTransforms.scala @@ -31,29 +31,21 @@ object DbsqlTransforms extends SparkSessionWrapper { .orderBy('timestamp).rowsBetween(Window.unboundedPreceding, 1000) val warehouseRaw = auditRawDF - .select( - 'timestamp, - 'date, - 'organization_id, - 'serviceName, - 'actionName, - 'userEmail, - 'requestId, - 'response, - 'warehouse_id, - PipelineFunctions.fillForward("warehouse_name",warehouse_name_gen_w), - PipelineFunctions.fillForward("cluster_size",warehouse_name_gen_w), - PipelineFunctions.fillForward("min_num_clusters",warehouse_name_gen_w), - PipelineFunctions.fillForward("max_num_clusters",warehouse_name_gen_w), - PipelineFunctions.fillForward("auto_stop_mins",warehouse_name_gen_w), - PipelineFunctions.fillForward("spot_instance_policy",warehouse_name_gen_w), - PipelineFunctions.fillForward("enable_photon",warehouse_name_gen_w), - PipelineFunctions.fillForward("channel",warehouse_name_gen_w), - PipelineFunctions.fillForward("tags",warehouse_name_gen_w), - PipelineFunctions.fillForward("enable_serverless_compute",warehouse_name_gen_w), - PipelineFunctions.fillForward("warehouse_type",warehouse_name_gen_w) - ) + .withColumn("warehouse_name",PipelineFunctions.fillForward("warehouse_name",warehouse_name_gen_w)) + .withColumn("cluster_size",PipelineFunctions.fillForward("cluster_size",warehouse_name_gen_w)) + .withColumn("min_num_clusters",PipelineFunctions.fillForward("min_num_clusters",warehouse_name_gen_w)) + .withColumn("max_num_clusters",PipelineFunctions.fillForward("max_num_clusters",warehouse_name_gen_w)) + .withColumn("auto_stop_mins",PipelineFunctions.fillForward("auto_stop_mins",warehouse_name_gen_w)) + .withColumn("spot_instance_policy",PipelineFunctions.fillForward("spot_instance_policy",warehouse_name_gen_w)) + .withColumn("enable_photon",PipelineFunctions.fillForward("enable_photon",warehouse_name_gen_w)) + .withColumn("channel",PipelineFunctions.fillForward("channel",warehouse_name_gen_w)) + .withColumn("tags",PipelineFunctions.fillForward("tags",warehouse_name_gen_w)) + .withColumn("enable_serverless_compute",PipelineFunctions.fillForward("enable_serverless_compute",warehouse_name_gen_w)) + .withColumn("warehouse_type",PipelineFunctions.fillForward("warehouse_type",warehouse_name_gen_w)) + warehouseRaw + .filter('source_table === "audit_log_bronze") + .drop("source_table") } /** @@ -68,7 +60,7 @@ object DbsqlTransforms extends SparkSessionWrapper { */ def deriveWarehouseBaseFilled(isFirstRun: Boolean, bronzeWarehouseSnapUntilCurrent: DataFrame) (warehouseBaseWMetaDF: DataFrame): DataFrame = { - if (isFirstRun) { + val result = if (isFirstRun) { val firstRunMsg = "Silver_WarehouseSpec -- First run detected, will impute warehouse state from bronze to derive " + "current initial state for all existing warehouses." logger.log(Level.INFO, firstRunMsg) @@ -113,9 +105,42 @@ object DbsqlTransforms extends SparkSessionWrapper { (unix_timestamp('Pipeline_SnapTS) * 1000).alias("timestamp"), 'Pipeline_SnapTS.cast("date").alias("date"), 'creator_name.alias("createdBy") - ).unionByName(warehouseBaseWMetaDF, allowMissingColumns = true) + ) unionWithMissingAsNull(warehouseBaseWMetaDF, missingWareHouseBaseFromSnap) - } else warehouseBaseWMetaDF + } else + warehouseBaseWMetaDF + + result.select( + 'organization_id, + 'warehouse_id, + 'serviceName, + 'actionName, + 'warehouse_name, + 'cluster_size, + 'userEmail, + 'requestId, + 'response, + 'min_num_clusters, + 'max_num_clusters, + 'auto_stop_mins, + 'spot_instance_policy, + 'enable_photon, + 'channel, + 'tags, + 'enable_serverless_compute, + 'warehouse_type, + 'timestamp, + 'date, + 'createdBy, + 'warehouse_state, + 'size, + 'auto_resume, + 'creator_id, + 'num_clusters, + 'num_active_sessions, + 'jdbc_url, + 'odbc_params + ) } def deriveInputForWarehouseBase(auditLogDf: DataFrame, warehouseSpecSilver: PipelineTable @@ -151,6 +176,7 @@ object DbsqlTransforms extends SparkSessionWrapper { val filteredAuditLogDf = auditLogDfWithStructsToMap .withColumn("tags", SchemaTools.structToMap(auditLogDfWithStructsToMap, "tags")) + .withColumn("source_table",lit("audit_log_bronze")) val filteredDf = if(warehouseSpecSilver.exists(dataValidation = true)) { filteredAuditLogDf @@ -175,8 +201,18 @@ object DbsqlTransforms extends SparkSessionWrapper { 'channel, 'tags, 'enable_serverless_compute, - 'warehouse_type + 'warehouse_type, + 'warehouse_state, + 'size, + 'auto_resume, + 'creator_id, + 'num_clusters, + 'num_active_sessions, + 'jdbc_url, + 'odbc_params, + 'createdBy ) + .withColumn("source_table",lit("warehouse_spec_silver")),true ) } else diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/GoldTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/GoldTransforms.scala index b1c9cbccc..ac2420b5c 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/GoldTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/GoldTransforms.scala @@ -553,7 +553,7 @@ trait GoldTransforms extends SparkSessionWrapper { .drop("cluster_name", "custom_tags", "node_type_id") - val joinedDF = clsfDF.join(auditDF, Seq("clusterId", "organization_id"), "left") + val joinedDF = clsfDF.join(auditDF, Seq("clusterId", "organization_id"), "inner") // Cluster_state started before cmd start time and ended before command end time val state_before_before = 'unixTimeMS_state_start < 'unixTimeMSStart && 'unixTimeMS_state_end < 'unixTimeMSEnd @@ -625,6 +625,14 @@ trait GoldTransforms extends SparkSessionWrapper { } } + private[overwatch] def extractDBJobId(column: Column): Column = { + split(regexp_extract(column, "(job-\\d+)", 1), "-")(1) + } + + private[overwatch] def extractDBIdInJob(column: Column): Column = { + split(regexp_extract(column, "(-run-\\d+)", 1), "-")(2) + } + protected def buildSparkJob( cloudProvider: String )(df: DataFrame): DataFrame = { @@ -667,6 +675,7 @@ trait GoldTransforms extends SparkSessionWrapper { 'ExecutionID.alias("execution_id"), 'StageIDs.alias("stage_ids"), 'clusterId.alias("cluster_id"), + $"PowerProperties.ClusterDetails.Name".alias("cluster_name"), $"PowerProperties.NotebookID".alias("notebook_id"), $"PowerProperties.NotebookPath".alias("notebook_path"), $"PowerProperties.SparkDBJobID".alias("db_job_id"), @@ -685,21 +694,27 @@ trait GoldTransforms extends SparkSessionWrapper { val sparkContextW = Window.partitionBy('organization_id, 'spark_context_id) val isDatabricksJob = 'job_group_id.like("%job-%-run-%") + val isAutomatedCluster = 'cluster_name.like("%job-%-run-%") sparkJobsWImputedUser .select(sparkJobCols: _*) .withColumn("cluster_id", first('cluster_id, ignoreNulls = true).over(sparkContextW)) .withColumn("jobGroupAr", split('job_group_id, "_")(2)) .withColumn("db_job_id", - when(isDatabricksJob && 'db_job_id.isNull, - split(regexp_extract('jobGroupAr, "(job-\\d+)", 1), "-")(1)) - .otherwise('db_job_id) + when(isDatabricksJob && 'db_job_id.isNull, extractDBJobId('jobGroupAr)) + .otherwise( + when(isAutomatedCluster && 'db_job_id.isNull, extractDBJobId('cluster_name)) + .otherwise('db_job_id) + ) ) .withColumn("db_id_in_job", - when(isDatabricksJob && 'db_run_id.isNull, - split(regexp_extract('jobGroupAr, "(-run-\\d+)", 1), "-")(2)) - .otherwise('db_run_id) + when(isDatabricksJob && 'db_run_id.isNull, extractDBIdInJob('jobGroupAr)) + .otherwise( + when(isAutomatedCluster && 'db_run_id.isNull, extractDBJobId('cluster_name)) + .otherwise('db_run_id) + ) ) + .drop("cluster_name") } protected def buildSparkStage()(df: DataFrame): DataFrame = {