Skip to content

Commit

Permalink
Issue 197 (#201)
Browse files Browse the repository at this point in the history
* bump version

* added support for EH Connection Strings to be redacted

* externalized the eh connString function so as not to be redacted by json parser

* optimized imports
  • Loading branch information
GeekSheikh authored Aug 16, 2021
1 parent a6b7df4 commit fda884a
Show file tree
Hide file tree
Showing 24 changed files with 64 additions and 71 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "overwatch"

organization := "com.databricks.labs"

version := "0.5.0.3"
version := "0.5.0.4"

scalaVersion := "2.12.12"
scalacOptions ++= Seq("-Xmax-classfile-name", "78")
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/databricks/labs/overwatch/ApiCall.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.databricks.labs.overwatch

import com.databricks.labs.overwatch.utils.{ApiCallFailure, ApiEnv, Config, JsonUtils, NoNewDataException, SchemaTools, SparkSessionWrapper, TokenError}
import com.databricks.labs.overwatch.utils._
import com.fasterxml.jackson.databind.JsonMappingException
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.DataFrame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import com.fasterxml.jackson.databind.node.ArrayNode
import com.fasterxml.jackson.databind.{DeserializationContext, JsonNode}

import java.io.IOException
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import collection.JavaConverters._

/**
* Custom deserializer to convert json string coming from jobs main class into validated, strongly typed object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.databricks.labs.overwatch.env

import com.databricks.labs.overwatch.ApiCall
import com.databricks.labs.overwatch.utils.{ApiEnv, Config, SparkSessionWrapper}
import org.apache.log4j.{Level, Logger}
import org.apache.log4j.Logger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.databricks.labs.overwatch.pipeline

import com.databricks.labs.overwatch.env.{Database, Workspace}
import com.databricks.labs.overwatch.utils.{Config, Layer, OverwatchScope}
import com.databricks.labs.overwatch.utils.{Config, OverwatchScope}
import org.apache.log4j.{Level, Logger}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.databricks.labs.overwatch.pipeline

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
import com.databricks.labs.overwatch.ApiCall
import com.databricks.labs.overwatch.env.Database
import com.databricks.labs.overwatch.utils.{SparkSessionWrapper, _}
Expand All @@ -10,14 +9,12 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import org.apache.log4j.{Level, Logger}
import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf, EventPosition}
import org.apache.spark.sql.catalyst.expressions.Slice
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.util.SerializableConfiguration

import java.io.FileNotFoundException
import java.time.{Duration, LocalDateTime}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
Expand Down Expand Up @@ -149,7 +146,7 @@ trait BronzeTransforms extends SparkSessionWrapper {
runID: String
): DataFrame = {

val connectionString = ConnectionStringBuilder(ehConfig.connectionString)
val connectionString = ConnectionStringBuilder(PipelineFunctions.parseEHConnectionString(ehConfig.connectionString))
.setEventHubName(ehConfig.eventHubName)
.build

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.databricks.labs.overwatch.pipeline

import com.databricks.labs.overwatch.env.{Database, Workspace}
import com.databricks.labs.overwatch.utils.{Config, Layer, OverwatchScope}
import com.databricks.labs.overwatch.utils.{Config, OverwatchScope}
import org.apache.log4j.Logger

class Gold(_workspace: Workspace, _database: Database, _config: Config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.databricks.labs.overwatch.pipeline.TransformFunctions._
import com.databricks.labs.overwatch.utils.{BadConfigException, SparkSessionWrapper, TimeTypes}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.{Column, DataFrame}

trait GoldTransforms extends SparkSessionWrapper {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package com.databricks.labs.overwatch.pipeline

import com.databricks.labs.overwatch.pipeline.Pipeline.deriveLocalDate
import com.databricks.labs.overwatch.pipeline.TransformFunctions._
import com.databricks.labs.overwatch.utils._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.DataFrame

import java.time.{Instant, LocalDate}

class Module(
val moduleId: Int,
val moduleName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.databricks.labs.overwatch.pipeline
import com.databricks.labs.overwatch.env.{Database, Workspace}
import com.databricks.labs.overwatch.pipeline.Pipeline.{deriveLocalDate, systemZoneId, systemZoneOffset}
import com.databricks.labs.overwatch.utils._
import io.delta.tables.DeltaTable
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
package com.databricks.labs.overwatch.pipeline

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
import com.databricks.labs.overwatch.utils.Frequency.Frequency
import com.databricks.labs.overwatch.utils.{Config, Frequency, IncrementalFilter}
import com.databricks.labs.overwatch.utils.{Config, IncrementalFilter}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, SparkSession}

import java.net.URI

object PipelineFunctions {
private val logger: Logger = Logger.getLogger(this.getClass)

private val uriSchemeRegex = "^([a-zA-Z][-.+a-zA-Z0-9]*):/.*".r

/**
* parses the value for the connection string from the scope/key defined if the pattern matches {{secrets/scope/key}}
* otherwise return the true string value
* https://docs.databricks.com/security/secrets/secrets.html#store-the-path-to-a-secret-in-a-spark-configuration-property
* @param connectionString
* @return
*/
def parseEHConnectionString(connectionString: String): String = {
val secretsRE = "\\{\\{secrets/([^/]+)/([^}]+)\\}\\}".r

secretsRE.findFirstMatchIn(connectionString) match {
case Some(i) =>
dbutils.secrets.get(i.group(1), i.group(2))
case None =>
connectionString
}
}

/**
* Ensure no duplicate slashes in path and default to dbfs:/ URI prefix where no uri specified to result in
* fully qualified URI for db location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import com.databricks.labs.overwatch.utils.Frequency.Frequency
import com.databricks.labs.overwatch.utils._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.catalog.CatalogTable

// TODO -- Add rules: Array[Rule] to enable Rules engine calculations in the append
// also add ruleStrategy: Enum(Kill, Quarantine, Ignore) to determine when to require them
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.databricks.labs.overwatch.pipeline

import com.databricks.labs.overwatch.env.{Database, Workspace}
import com.databricks.labs.overwatch.utils.{Config, Layer, OverwatchScope}
import com.databricks.labs.overwatch.utils.{Config, OverwatchScope}
import org.apache.log4j.Logger

class Silver(_workspace: Workspace, _database: Database, _config: Config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset}

import java.time.LocalDate

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package com.databricks.labs.overwatch.utils
import com.databricks.labs.overwatch.pipeline.{Module, PipelineTable}
import com.databricks.labs.overwatch.utils.Frequency.Frequency
import com.databricks.labs.overwatch.utils.OverwatchScope.OverwatchScope
import com.databricks.labs.overwatch.validation.SnapReport
import org.apache.log4j.Level
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.{StructField, StructType}

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.time.{LocalDateTime, ZonedDateTime}
import java.util.Date
import java.sql.Timestamp

import com.databricks.labs.overwatch.validation.SnapReport

case class DBDetail()

Expand Down Expand Up @@ -69,6 +69,7 @@ case class AuditLogConfig(
auditLogFormat: String = "json",
azureAuditLogEventhubConfig: Option[AzureAuditLogEventhubConfig] = None
)

case class IntelligentScaling(enabled: Boolean = false, minimumCores: Int = 4, maximumCores: Int = 512, coeff: Double = 1.0)

case class OverwatchParams(auditLogConfig: AuditLogConfig,
Expand Down Expand Up @@ -144,7 +145,9 @@ case class SimplifiedModuleStatusReport(
)

case class IncrementalFilter(cronField: StructField, low: Column, high: Column)

case class UpgradeReport(db: String, tbl: String, errorMsg: Option[String])

object OverwatchScope extends Enumeration {
type OverwatchScope = Value
val jobs, clusters, clusterEvents, sparkEvents, audit, notebooks, accounts, pools = Value
Expand Down
10 changes: 3 additions & 7 deletions src/main/scala/com/databricks/labs/overwatch/utils/Upgrade.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package com.databricks.labs.overwatch.utils

import com.databricks.labs.overwatch.env.Workspace
import com.databricks.labs.overwatch.pipeline.{Bronze, Gold, Pipeline, PipelineTable}
import com.databricks.labs.overwatch.pipeline.{Bronze, Gold}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Column, DataFrame, Dataset}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, Dataset}

import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool

object Upgrade extends SparkSessionWrapper {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
package com.databricks.labs.overwatch.validation

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
import com.databricks.labs.overwatch.env.{Database, Workspace}
import com.databricks.labs.overwatch.pipeline.{Bronze, Gold, Initializer, Module, Pipeline, PipelineTable, Silver}
import com.databricks.labs.overwatch.env.Workspace
import com.databricks.labs.overwatch.pipeline.TransformFunctions._
import com.databricks.labs.overwatch.pipeline.{Initializer, Module, Pipeline, PipelineTable}
import com.databricks.labs.overwatch.utils.JsonUtils.objToJson
import com.databricks.labs.overwatch.utils._
import com.databricks.labs.overwatch.pipeline.TransformFunctions._
import org.apache.spark.sql.{Column, DataFrame, Dataset}
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.expressions.Window

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.time.{Duration, LocalDate}
import scala.collection.parallel.ForkJoinTaskSupport
import java.util.concurrent.ForkJoinPool
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset}

case class SnapReport(tableFullName: String,
from: java.sql.Timestamp,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.databricks.labs.overwatch.validation

import com.databricks.labs.overwatch.pipeline.TransformFunctions._
import com.databricks.labs.overwatch.utils.SparkSessionWrapper
import com.databricks.labs.validation.utils.Structures._
import com.databricks.labs.validation._
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions._
import com.databricks.labs.validation.utils.Structures._
import org.apache.spark.sql.expressions.Window
import com.databricks.labs.overwatch.pipeline.TransformFunctions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame}

object Scenarios extends SparkSessionWrapper {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package com.databricks.labs.overwatch.validation

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
import com.databricks.labs.overwatch.env.Workspace
import com.databricks.labs.overwatch.pipeline._
import com.databricks.labs.overwatch.pipeline.TransformFunctions._
import com.databricks.labs.overwatch.pipeline._
import com.databricks.labs.overwatch.utils._
import io.delta.tables.{DeltaMergeBuilder, DeltaTable}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import io.delta.tables.DeltaTable
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.time.{Duration, LocalDate}
import java.time.Duration
import java.util.concurrent.ForkJoinPool
import scala.collection.parallel.{ForkJoinTaskSupport, ParSeq}
import scala.collection.parallel.mutable.ParArray
import scala.collection.parallel.{ForkJoinTaskSupport, ParSeq}


class ValidationUtils(sourceDBName: String, snapWorkspace: Workspace, _paralellism: Option[Int]) extends SparkSessionWrapper {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.databricks.labs.overwatch

import com.databricks.labs.overwatch.utils.{AuditLogConfig, AzureAuditLogEventhubConfig, DatabricksContractPrices, IntelligentScaling, OverwatchParams}
import com.databricks.labs.overwatch.utils._
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.scala.DefaultScalaModule
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package com.databricks.labs.overwatch.pipeline

import com.databricks.labs.overwatch.SparkSessionTestWrapper
import com.databricks.labs.overwatch.utils.Frequency.Frequency
import com.databricks.labs.overwatch.utils.{Frequency, IncrementalFilter}
import com.github.mrpowers.spark.fast.tests.DataFrameComparer
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{Column, SQLContext}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types._
import org.scalatest.funspec.AnyFunSpec
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Column

class PipelineFunctionsTest extends AnyFunSpec with DataFrameComparer with SparkSessionTestWrapper {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.databricks.labs.overwatch.pipeline

import java.sql.{Date, Timestamp}
import java.time.Instant

import com.databricks.labs.overwatch.SparkSessionTestWrapper
import com.github.mrpowers.spark.fast.tests.DataFrameComparer
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, lit, struct}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types._
import org.scalatest.funspec.AnyFunSpec

import java.sql.{Date, Timestamp}
import java.time.Instant

class TransformFunctionsTest extends AnyFunSpec with DataFrameComparer with SparkSessionTestWrapper {
import spark.implicits._
spark.conf.set("spark.sql.session.timeZone", "UTC")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.databricks.labs.overwatch.utils

import org.scalatest.funspec.AnyFunSpec
import org.scalatest.PrivateMethodTester._

class ConfigTest extends AnyFunSpec {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.databricks.labs.overwatch.utils

import com.databricks.labs.overwatch.SparkSessionTestWrapper
import org.apache.spark.sql.functions.col
import org.scalatest.funspec.AnyFunSpec

class SchemaToolsTest extends AnyFunSpec with SparkSessionTestWrapper {
Expand Down

0 comments on commit fda884a

Please sign in to comment.