Skip to content

Commit

Permalink
0810 release (#1202)
Browse files Browse the repository at this point in the history
* Initial Commit

* Revert "added hotfix for warehouse_spec_silver (#1154)" (#1171)

This reverts commit 54b2549.

* increase  to 60 days hard limit for clusters/events (#1170)

* removing 30days hard limit for clusters/events

* changing 30days to 60 days hard limit for clusters/events

* wasbs check added (#1176)

* 1059 incorrect dbu (#1181)

* adding code for automated clusters

* adding code to calculate accurate cost for parallel task jobs

* lag added (#1186)

* readtimeout increased (#1189)

* 1180 rebuild cluster snapshot bronze using clusterget api call instead of clusterlist api call (#1185)

* Rebuild Cluster_Snapshot_bronze using cluster/get

* Added makeParallelApiCalls for Parallel API Calls

* Changed Variable name in getClusterSnapshotBronze

* Change Parallelize API call

* Removed Unncessary Print Statements

* Removed fetchClusterDetails Function

* Added clusters_snapshot_error_bronze for Error Snapshot Records

* Added Logger in prepClusterSnapshot

* Removed the duplicate snapshot table

* Removed Prinln Statements

* Removed print Statement

* Removed NEw Schema in Pipeline Target Table

* Removed Reductant Spaces

* Change in PipelineTargets.scala

* Change DataframeColumn from cluster_id to cluster_name

* added setBatchPersist in ClusterGetApi

* Removed extra fields from error table

* Removed clusterIDs for Negative Test Case

* Added Missing Job Clusters

* Added try catch in JobClusterDF in prepClusterSnapshot

* Added cluster List clusterIDs in Snapshot

* Removed Select Clause from PrepCLuserSnapshot

* Removed clusterSnapSchema

* Changed Print Statement with Logger statement

* Removed duplicated clusterSnapSchema

* Added getParallelAPIParams trait in ApiMeta.scala

* Removed multiple usage of getParallelAPIParams

* Removed Unused Varibles

* Added minor changes on logging messages

* Added Generic Functions to get cluster_id for Service_name as clusters and jobs

* Added some minor changes as per comment

* Added single msg variable

* async api call error resolved (#1163)

* async api call error resolved

* async api call error resolved

* removing 30days hard limit for clusters/events

* exception handling done

* review comments implemented

* review comments implemented

* Removed the reductant Column names in Snapshot Bronze (#1193)

* Removed the reductant Column names in Snapshot Bronze

* Removed Print Statements

* Change the logic for renaming KeepAlive

* Added comments for better understanding

* Removed unused Functions

* Initializer calling fixed for multiworkspace (#1194)

* Initializer calling fixed for multiworkspace

* review comment implemented

* Added change missed in Rebase (#1198)

---------

Co-authored-by: Sriram Mohanty <[email protected]>
Co-authored-by: Aman <[email protected]>
Co-authored-by: Sourav Banerjee <[email protected]>
  • Loading branch information
4 people authored May 8, 2024
1 parent 4cf9e37 commit b3a4c2a
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 78 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "overwatch"

organization := "com.databricks.labs"

version := "0.8.0.1"
version := "0.8.1.0"

scalaVersion := "2.12.12"
scalacOptions ++= Seq("-Xmax-classfile-name", "78")
Expand Down
64 changes: 54 additions & 10 deletions src/main/scala/com/databricks/labs/overwatch/Optimizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package com.databricks.labs.overwatch

import com.databricks.labs.overwatch.env.Workspace
import com.databricks.labs.overwatch.pipeline._
import com.databricks.labs.overwatch.utils.{BadConfigException, JsonUtils, OverwatchParams, SparkSessionWrapper}
import com.databricks.labs.overwatch.utils.{BadConfigException, DataTarget, Helpers, JsonUtils, OverwatchParams, SparkSessionWrapper}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

Expand All @@ -31,22 +31,66 @@ object Optimizer extends SparkSessionWrapper{
.orderBy('Pipeline_SnapTS.desc)
}

/**
* This method is used to get the API URL from config table.
*
* @param orgID The organization ID as a string.
* @param dataTarget The data target object.
* @return The API URL as a string.
*
* The method works as follows:
* 1. It retrieves the ETL storage prefix from the data target and trims any leading or trailing spaces.
* 2. It constructs the report location by appending "report/configTable" to the last 12 characters of the ETL storage prefix.
* 3. It defines a window function that partitions by workspace_id and orders by snapTS in descending order.
* 4. It reads the data from the report location and selects the workspace_id, api_url, and snapTS columns.
* 5. It filters the data to only include rows where the workspace_id matches the provided organization ID.
* 6. It adds a new column "rnk" which is the rank of each row within its partition, as defined by the window function.
* 7. It filters the data to only include rows where "rnk" is 1, i.e., the row with the latest snapTS for each workspace_id.
* 8. It returns the api_url of the first row in the resulting DataFrame.
*/
private def getApiURL(orgID: String,dataTarget: DataTarget): String = {
val etlStoragePrefix = dataTarget.etlDataPathPrefix.get.trim
val reportLocation = etlStoragePrefix.substring(0,etlStoragePrefix.length-12)+"report/configTable" //Removing global prefix and adding report location
if(Helpers.pathExists(reportLocation)) { //If the config table is found at the specified location, proceed with legacy optimization.
val latestRunWindow = Window.partitionBy('workspace_id).orderBy('snapTS.desc)
spark.read.load(reportLocation).select("workspace_id", "api_url", "snapTS")
.filter('workspace_id === orgID)
.withColumn("rnk", rank.over(latestRunWindow))
.filter('rnk === 1)
.first().getString(1)
} else { // If the config table is not found at the specified location, throw an exception.
throw new BadConfigException(s"Config table not found at $reportLocation")
}

}


/**
* Derive the workspace from the config supplied in the last run
* @param overwatchETLDB name of the Overwtch ETL database
* @return workspace object
*/
private def getLatestWorkspace(overwatchETLDB: String,orgId: String): Workspace = {
val params = getLatestSuccessState(overwatchETLDB,orgId)
.selectExpr("inputConfig.*")
.as[OverwatchParams]

val ( orgID: Option[String], params: OverwatchParams) = getLatestSuccessState(overwatchETLDB,orgId)
.select("organization_id", "inputConfig")
.as[(Option[String], OverwatchParams)]
.first

val args = JsonUtils.objToJson(params).compactString
val prettyArgs = JsonUtils.objToJson(params).prettyString
logger.log(Level.INFO, s"ARGS: Identified config string is: \n$prettyArgs")
Initializer(args, debugFlag = true)
}
val args = JsonUtils.objToJson(params)
logger.log(Level.INFO, s"ARGS: Identified config string is: \n${args.prettyString}")
try{
val apiURL = getApiURL(orgID.get,params.dataTarget.get)
logger.log(Level.INFO,"Running optimization for Multiworkspace deployment")
Initializer(args.compactString,apiURL=Some(apiURL), organizationID = Some(orgID.get))
}catch {
case e: Exception => { //Unable to retrive api URL performing legacy Optimization
logger.log(Level.INFO,"Running optimization for single workspace deployment")
Initializer(args.compactString)
}
}

}

/**
* pass in the overwatch ETL database name to optimize overwatch in parallel
Expand Down
59 changes: 34 additions & 25 deletions src/main/scala/com/databricks/labs/overwatch/api/ApiCallV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
private var _apiMeta: ApiMeta = null //Metadata for the API call.
private var _allowUnsafeSSL: Boolean = false //Flag to make the unsafe ssl.
private val readTimeoutMS = 60000 //Read timeout.
private val connTimeoutMS = 10000 //Connection timeout.
private val connTimeoutMS = 60000 //Connection timeout.
private var _printFlag: Boolean = true
private var _totalSleepTime: Int = 0
private var _apiSuccessCount: Int = 0
Expand Down Expand Up @@ -343,9 +343,12 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
execute()
} else {
if(writeTraceApiFlag()){
PipelineFunctions.writeMicroBatchToTempLocation(successTempPath.get, apiMeta.enrichAPIResponse(response,jsonQuery,queryMap))
val enrichedResponse = apiMeta.enrichAPIResponse(response.body,response.code,jsonQuery,queryMap)
PipelineFunctions.writeMicroBatchToTempLocation(successTempPath.get,enrichedResponse)
}
throw new ApiCallFailure(response, buildGenericErrorMessage, debugFlag = false)
val apiErrorDetails = jsonQueryToApiErrorDetail(response.body)
val responseMeta = apiMeta.enrichAPIResponse(apiErrorDetails,response.code,jsonQuery,queryMap)
throw new ApiCallFailure(response, buildGenericErrorMessage, responseWithMeta = responseMeta , debugFlag = false)
}

}
Expand Down Expand Up @@ -395,7 +398,7 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
private def reqOptions: Seq[HttpOptions.HttpOption] = {
val baseOptions = Seq(
HttpOptions.connTimeout(connTimeoutMS),
HttpOptions.connTimeout(readTimeoutMS)
HttpOptions.readTimeout(readTimeoutMS)
)
if (_allowUnsafeSSL) baseOptions :+ HttpOptions.allowUnsafeSSL else baseOptions

Expand Down Expand Up @@ -553,19 +556,24 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {



private def jsonQueryToApiErrorDetail(e: ApiCallFailure): String = {
val mapper = new ObjectMapper()
val jsonObject = mapper.readTree(jsonQuery);
val clusterId = jsonObject.get("cluster_id").toString.replace("\"", "")
val start_time = jsonObject.get("start_time").asLong()
val end_time = jsonObject.get("end_time").asLong()
val errorObj = mapper.readTree(e.getMessage);
val newJsonObject = new JSONObject();
newJsonObject.put("cluster_id", clusterId)
newJsonObject.put("from_epoch", start_time)
newJsonObject.put("until_epoch", end_time)
newJsonObject.put("error", errorObj.get("error_code").toString.replace("\"", "") + " " + errorObj.get("message").toString.replace("\"", ""))
newJsonObject.toString
private def jsonQueryToApiErrorDetail(msg: String): String = {
try {
val mapper = new ObjectMapper()
val jsonObject = mapper.readTree(jsonQuery);
val clusterId = jsonObject.get("cluster_id").toString.replace("\"", "")
val start_time = jsonObject.get("start_time").asLong()
val end_time = jsonObject.get("end_time").asLong()
val errorObj = mapper.readTree(msg);
val newJsonObject = new JSONObject();
newJsonObject.put("cluster_id", clusterId)
newJsonObject.put("from_epoch", start_time)
newJsonObject.put("until_epoch", end_time)
newJsonObject.put("error", errorObj.get("error_code").toString.replace("\"", "") + " " + errorObj.get("message").toString.replace("\"", ""))
newJsonObject.toString
} catch {
case e: Throwable =>
msg
}
}

/**
Expand Down Expand Up @@ -603,7 +611,7 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
@tailrec def executeThreadedHelper(): util.ArrayList[String] = {
val response = getResponse
responseCodeHandler(response)
_apiResponseArray.add(apiMeta.enrichAPIResponse(response,jsonQuery,queryMap))//for GET request we have to convert queryMap to Json
_apiResponseArray.add(apiMeta.enrichAPIResponse(response.body,response.code,jsonQuery,queryMap))//for GET request we have to convert queryMap to Json
if (apiMeta.batchPersist && successTempPath.nonEmpty) {
accumulator.add(1)
if (apiEnv.successBatchSize <= _apiResponseArray.size()) { //Checking if its right time to write the batches into persistent storage
Expand Down Expand Up @@ -634,7 +642,7 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
throw e
}
logger.log(Level.ERROR, excMsg, e)
throw new ApiCallFailureV2(jsonQueryToApiErrorDetail(e))
throw e
}
case e: Throwable => {
val excMsg = "Got the exception while performing get request "
Expand All @@ -655,7 +663,7 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
@tailrec def executeHelper(): this.type = {
val response = getResponse
responseCodeHandler(response)
_apiResponseArray.add(apiMeta.enrichAPIResponse(response,jsonQuery,queryMap))
_apiResponseArray.add(apiMeta.enrichAPIResponse(response.body,response.code,jsonQuery,queryMap))
if (apiMeta.batchPersist && successTempPath.nonEmpty) {
if (apiEnv.successBatchSize <= _apiResponseArray.size()) { //Checking if its right time to write the batches into persistent storage
val responseFlag = PipelineFunctions.writeMicroBatchToTempLocation(successTempPath.get, _apiResponseArray.toString)
Expand Down Expand Up @@ -771,18 +779,19 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
case Success(_) =>
apiResponseCounter.add(1)

case Failure(e) =>
if (e.isInstanceOf[ApiCallFailureV2]) {
case Failure(e:ApiCallFailure) =>
synchronized {
apiErrorArray.add(e.getMessage)
apiErrorArray.add(e.responseMeta)
if (apiErrorArray.size() >= config.apiEnv.errorBatchSize) {
PipelineFunctions.writeMicroBatchToTempLocation(tmpErrorPath, apiErrorArray.toString)
apiErrorArray = Collections.synchronizedList(new util.ArrayList[String]())
}
apiResponseCounter.add(1)
}
logger.log(Level.ERROR, "Future failure message: " + e.getMessage, e)
}
apiResponseCounter.add(1)
case Failure(_) =>
apiResponseCounter.add(1)

}
startValue = startValue + incrementCounter
}
Expand Down
55 changes: 33 additions & 22 deletions src/main/scala/com/databricks/labs/overwatch/api/ApiMeta.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,18 @@ trait ApiMeta {
Map[String, String]()
}

private[overwatch] def getParallelAPIParams(jsonInput: Map[String, String]): Map[String, String] = {
logger.log(Level.INFO, s"""Needs to be override for specific API for intializing Parallel API call function""")
Map[String, String]()
// private[overwatch] def getParallelAPIParams(jsonInput: Map[String, String]): Map[String, String] = {
// logger.log(Level.INFO, s"""Needs to be override for specific API for intializing Parallel API call function""")
// Map[String, String]()
// }

private[overwatch] def getParallelAPIParams(jsonInput: Map[String, String]): Map[String, String] = {
Map(
"start_value" -> s"""${jsonInput.get("start_value").get.toLong}""",
"end_value" -> s"""${jsonInput.get("end_value").get.toLong}""",
"increment_counter" -> s"""${jsonInput.get("increment_counter").get.toLong}""",
"final_response_count" -> s"""${jsonInput.get("final_response_count").get.toLong}"""
)
}

/**
Expand All @@ -173,7 +182,7 @@ trait ApiMeta {
* @param queryMap
* @return a string containing the api response and the meta for the api call.
*/
private[overwatch] def enrichAPIResponse(response: HttpResponse[String], jsonQuery: String, queryMap: Map[String, String]): String = {
private[overwatch] def enrichAPIResponse(response: String, resopnseCode : Int, jsonQuery: String, queryMap: Map[String, String]): String = {
val filter: String = if (apiCallType.equals("POST")) jsonQuery else {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
Expand All @@ -184,9 +193,9 @@ trait ApiMeta {
apiTraceabilityMeta.put("endPoint", apiName)
apiTraceabilityMeta.put("type", apiCallType)
apiTraceabilityMeta.put("apiVersion", apiV)
apiTraceabilityMeta.put("responseCode", response.code)
apiTraceabilityMeta.put("responseCode", resopnseCode)
apiTraceabilityMeta.put("batchKeyFilter", filter)
jsonObject.put("rawResponse", response.body.trim)
jsonObject.put("rawResponse", response.trim)
jsonObject.put("apiTraceabilityMeta", apiTraceabilityMeta)
jsonObject.toString
}
Expand All @@ -204,6 +213,7 @@ class ApiMetaFactory {
val meta = _apiName match {
case "jobs/list" => new JobListApi
case "clusters/list" => new ClusterListApi
case "clusters/get" => new ClusterGetApi
case "clusters/events" => new ClusterEventsApi
case "dbfs/list" => new DbfsListApi
case "instance-pools/list" => new InstancePoolsListApi
Expand Down Expand Up @@ -279,14 +289,8 @@ class SqlQueryHistoryApi extends ApiMeta {
"filter_by.query_start_time_range.end_time_ms" -> s"$endTime"
)
}

private[overwatch] override def getParallelAPIParams(jsonInput: Map[String, String]): Map[String, String] = {
Map(
"start_value" -> s"""${jsonInput.get("start_value").get.toLong}""",
"end_value" -> s"""${jsonInput.get("end_value").get.toLong}""",
"increment_counter" -> s"""${jsonInput.get("increment_counter").get.toLong}""",
"final_response_count" -> s"""${jsonInput.get("final_response_count").get.toLong}"""
)
super.getParallelAPIParams(jsonInput)
}
}

Expand Down Expand Up @@ -316,6 +320,20 @@ class ClusterListApi extends ApiMeta {
setApiCallType("GET")
}

class ClusterGetApi extends ApiMeta {
setDataframeColumn("cluster_name")
setApiCallType("GET")
setBatchPersist(true)
private[overwatch] override def getAPIJsonQuery(startValue: Long, endValue: Long, jsonInput: Map[String, String]): Map[String, String] = {
val clusterIDs = jsonInput.get("cluster_ids").get.split(",").map(_.trim).toArray
Map("cluster_id" -> s"""${clusterIDs(startValue.toInt)}"""
)
}

private[overwatch] override def getParallelAPIParams(jsonInput: Map[String, String]): Map[String, String] = {
super.getParallelAPIParams(jsonInput)
}
}

class JobListApi extends ApiMeta {
setDataframeColumn("jobs")
Expand All @@ -334,7 +352,7 @@ class JobListApi extends ApiMeta {
}
}

class ClusterEventsApi extends ApiMeta {
class ClusterEventsApi extends ApiMeta{
setPaginationKey("next_page")
setPaginationToken("next_page")
setDataframeColumn("events")
Expand All @@ -352,16 +370,9 @@ class ClusterEventsApi extends ApiMeta {
"limit" -> "500"
)
}

private[overwatch] override def getParallelAPIParams(jsonInput: Map[String, String]): Map[String, String] = {
Map(
"start_value" -> s"""${jsonInput.get("start_value").get.toLong}""",
"end_value" -> s"""${jsonInput.get("end_value").get.toLong}""",
"increment_counter" -> s"""${jsonInput.get("increment_counter").get.toLong}""",
"final_response_count" -> s"""${jsonInput.get("final_response_count").get.toLong}"""
)
super.getParallelAPIParams(jsonInput)
}

}

class JobRunsApi extends ApiMeta {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package com.databricks.labs.overwatch.env

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
import com.databricks.labs.overwatch.api.ApiCallV2
import com.databricks.labs.overwatch.pipeline.PipelineFunctions
import com.databricks.labs.overwatch.pipeline.{PipelineFunctions, Schema}
import com.databricks.labs.overwatch.utils.Helpers.deriveRawApiResponseDF
import com.databricks.labs.overwatch.utils._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import com.databricks.labs.overwatch.pipeline.TransformFunctions._

import java.util
import java.util.Collections
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.Future
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.{Failure, Success, Try}
import scala.concurrent.ExecutionContext.Implicits.global


/**
Expand Down
16 changes: 13 additions & 3 deletions src/main/scala/com/databricks/labs/overwatch/pipeline/Bronze.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,17 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)
lazy private val appendClustersAPIProcess: () => ETLDefinition = {
() =>
ETLDefinition(
workspace.getClustersDF(deriveApiTempDir(config.tempWorkingDir,clustersSnapshotModule.moduleName,pipelineSnapTime)),
Seq(cleanseRawClusterSnapDF),
BronzeTargets.auditLogsTarget.asIncrementalDF(clustersSnapshotModule, BronzeTargets.auditLogsTarget.incrementalColumns, additionalLagDays = 1), // 1 lag day to get laggard records,
Seq(prepClusterSnapshot(
workspace,
pipelineSnapTime,
config.apiEnv,
database,
BronzeTargets.clusterSnapshotErrorsTarget,
config,
clustersSnapshotModule.moduleName
)
),
append(BronzeTargets.clustersSnapshotTarget)
)
}
Expand Down Expand Up @@ -155,7 +164,7 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)
)
}

lazy private[overwatch] val clusterEventLogsModule = Module(1005, "Bronze_ClusterEventLogs", this, Array(1004), 0.0, Some(30))
lazy private[overwatch] val clusterEventLogsModule = Module(1005, "Bronze_ClusterEventLogs", this, Array(1004), 0.0, Some(60))
lazy private val appendClusterEventLogsProcess: () => ETLDefinition = {
() =>
ETLDefinition(
Expand Down Expand Up @@ -286,6 +295,7 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)
)
}


// TODO -- convert and merge this into audit's ETLDefinition
private def landAzureAuditEvents(): Unit = {

Expand Down
Loading

0 comments on commit b3a4c2a

Please sign in to comment.