Skip to content

Commit

Permalink
0820 release (#1299)
Browse files Browse the repository at this point in the history
* Initial commit

* recon enhancement done to deal with different columns in source and target (#1216)

* Initial commit

* recon enhancement done to deal with different columns in source and target

---------

Co-authored-by: Guenia <[email protected]>

* adjust Silver Job Runs module configuration (#1256)

enable auto-optimized shuffle for module 2011

originally implemented in commit [d751d5f](d751d5f)

* append null columns from cluster snapshot for cluster_spec_silver (#1239)

* Initial commit

* append null columns from cluster snapshot for cluster_spec_silver

* append null columns from cluster snapshot for cluster_spec_silver

---------

Co-authored-by: Guenia <[email protected]>

* 1201 collect all event logs on first run (#1255)

* Initial commit

* cluster event bronze will take all the data from API for first run

* Update BronzeTransforms.scala

adjust whitespace around `landClusterEvents()`

---------

Co-authored-by: Guenia <[email protected]>
Co-authored-by: Neil Best <[email protected]>

* Redefine views so that they are created from tables not locations (#1241)

* Initial commit

* Change publish() function to incorporate views from ETL Tables iso paths

* Handle view creation in case of table does not exists

---------

Co-authored-by: Guenia <[email protected]>

* 1030 pipeline validation framework (#1071)

* Initial commit

* 19-Oct-23 : Added Validation Framework

* 19-Oct-23: Customize the message for customer

* 19-Oct-23: Customize the message for customer

* 26-Oct-23: Added OverwatchID filter in the table

* 26-Oct-23: Change for Coding Best Practices

* Added Function Description for validateColumnBetweenMultipleTable

* Added Pattern Matching in Validation

* Convert if-else in validateRuleAndUpdateStatus to case statement as per comment

* Initial commit

* traceability implemented (#1102)

* traceability implemented

* code review implemented

* missed code implemented (#1105)

* Initial commit

* traceability implemented (#1102)

* traceability implemented

* code review implemented

* missed code implemented

* missed code implemented

---------

Co-authored-by: Guenia Izquierdo <[email protected]>

* Added proper exception for Spark Stream Gold if progress c… (#1085)

* Initial commit

* 09-Nov-23: Added proper exception for Spark Stream Gold if progress column contains only null in SparkEvents_Bronze

---------

Co-authored-by: Guenia Izquierdo <[email protected]>
Co-authored-by: Sourav Banerjee <[email protected]>

* Gracefully Handle Exception for NotebookCommands_Gold (#1095)

* Initial commit

* Gracefully Handle Exception for NotebookCommands_Gold

* Convert the check in buildNotebookCommandsFact to single or clause

---------

Co-authored-by: Guenia Izquierdo <[email protected]>
Co-authored-by: Sourav Banerjee <[email protected]>

* code missed in merge (#1120)

* Fix Helper Method to Instantiate Remote Workspaces (#1110)

* Initial commit

* Change getRemoteWorkspaceByPath and getWorkspaceByDatabase to take it RemoteWorkspace

* Remove Unnecessary println Statements

---------

Co-authored-by: Guenia Izquierdo <[email protected]>

* Ensure we test the write into a partitioned storage_prefix (#1088)

* Initial commit

* Ensure we test the write into a partitioned storage_prefix

* silver warehouse spec fix (#1121)

* added missed copy-pasta (#1129)

* Exclude cluster logs in S3 root bucket (#1118)

* Exclude cluster logs in S3 root bucket

* Omit cluster log paths pointing to s3a as well

* implemented recon (#1116)

* implemented recon

* docs added

* file path change

* review comments implemented

* Added ShuffleFactor to NotebookCommands (#1124)

Co-authored-by: Sourav Banerjee <[email protected]>

* disabled traceability (#1130)

* Added JobRun_Silver in buildClusterStateFact for Cluster E… (#1083)

* Initial commit

* 08-Nov-23: Added JobRun_Silver in buildClusterStateFact for Cluster End Time Imputation

* Impute Terminating Events in CLSF from JR_Silver

* Impute Terminating Events in CLSD

* Impute Terminating Events in CLSD

* Change CLSF to original 0730 version

* Change CLSF to original 0730 version

* Added cluster_spec in CLSD to get job Cluster only

* Make the variables name in buildClusterStateDetail into more descriptive way

* Make the variables name in buildClusterStateDetail into more descriptive way

---------

Co-authored-by: Guenia Izquierdo <[email protected]>
Co-authored-by: Sourav Banerjee <[email protected]>

* Sys table audit log integration (#1122)

* system table integration with audit log

* adding code to resolve issues with response col

* fixed timestamp issue

* adding print statement for from and until time

* adding fix for azure

* removed comments

* removed comments and print statements

* removed comments

* implemented code review comments

* implemented code review comments

* adding review comment

* Sys table integration multi acount (#1131)

* added code changes for multi account deployment

* code for multi account system table integration

* Sys table integration multi acount (#1132)

* added code changes for multi account deployment

* code for multi account system table integration

* adding code for system table migration check

* changing exception for empty audit log from system table

* adding code to handle sql_endpoint in configs and fix in migration validation (#1133)

* corner case commit (#1134)

* Handle CLSD Cluster Impute when jrcp and clusterSpec is Empty (#1135)

* Handle CLSD Cluster Impute when jrcp and clusterSpec is Empty

* Exclude last_state from clsd as it is not needed in the logic.

---------

Co-authored-by: Sourav Banerjee <[email protected]>

* Exclude 2011 and 2014 as dependency module for 2019 (#1136)

* Exclude 2011 and 2014 as dependency module for 2019

* Added comment in CLSD for understandability

---------

Co-authored-by: Sourav Banerjee <[email protected]>

* corner case commit (#1137)

* Update version

* adding fix for empty EH config for system tables (#1140)

* corner case commit (#1142)

* adding fix for empty audit log for warehouse_spec_silver (#1141)

* recon columns removed (#1143)

* recon columns removed

* recon columns removed

* Initial Commit

* Added Changes in Validation Framework as per comments added during sprint meeting

* added hotfix for warehouse_spec_silver (#1154)

* Added Multiple RunID check in Validation Frameowkr

* Added Other tables in Validation Framework

* Added Multiple WS ID option in Cros Table Validation

* Added change for Pipeline_report

* Change for Pipeline Report

* Added msg for single table validation

* Added negative msg in HealthCheck Report

* Added Negative Msg for Cross Table Validation

* Added extra filter for total cost validation for CLSF

* Changed as per Comments

* Changed as per the comments

* Added some filter condition for cost validation in clsf

* Added Config for all pipeline run

* 19-Oct-23 : Added Validation Framework

* 19-Oct-23: Customize the message for customer

* 19-Oct-23: Customize the message for customer

* 26-Oct-23: Added OverwatchID filter in the table

* 26-Oct-23: Change for Coding Best Practices

* Added Function Description for validateColumnBetweenMultipleTable

* Added Pattern Matching in Validation

* Convert if-else in validateRuleAndUpdateStatus to case statement as per comment

* traceability implemented (#1102)

* traceability implemented

* code review implemented

* Added JobRun_Silver in buildClusterStateFact for Cluster E… (#1083)

* Initial commit

* 08-Nov-23: Added JobRun_Silver in buildClusterStateFact for Cluster End Time Imputation

* Impute Terminating Events in CLSF from JR_Silver

* Impute Terminating Events in CLSD

* Impute Terminating Events in CLSD

* Change CLSF to original 0730 version

* Change CLSF to original 0730 version

* Added cluster_spec in CLSD to get job Cluster only

* Make the variables name in buildClusterStateDetail into more descriptive way

* Make the variables name in buildClusterStateDetail into more descriptive way

---------

Co-authored-by: Guenia Izquierdo <[email protected]>
Co-authored-by: Sourav Banerjee <[email protected]>

* corner case commit (#1134)

* Exclude 2011 and 2014 as dependency module for 2019 (#1136)

* Exclude 2011 and 2014 as dependency module for 2019

* Added comment in CLSD for understandability

---------

Co-authored-by: Sourav Banerjee <[email protected]>

* Added Changes in Validation Framework as per comments added during sprint meeting

* Added Multiple RunID check in Validation Frameowkr

* Added Other tables in Validation Framework

* Added Multiple WS ID option in Cros Table Validation

* Added change for Pipeline_report

* Change for Pipeline Report

* Added msg for single table validation

* Added negative msg in HealthCheck Report

* Added Negative Msg for Cross Table Validation

* Added extra filter for total cost validation for CLSF

* Changed as per Comments

* Changed as per the comments

* Added some filter condition for cost validation in clsf

* Added Config for all pipeline run

---------

Co-authored-by: Guenia Izquierdo <[email protected]>
Co-authored-by: Sourav Banerjee <[email protected]>
Co-authored-by: Sriram Mohanty <[email protected]>
Co-authored-by: Aman <[email protected]>

* adding fix for duplicate accountId in module 2010 and 3019 (#1270)

* 1218 warehouse state details (#1254)

* test

* code for warehouse_state_detail_silver

* removed comments

* adding warehouseEvents scope

* added exception for table not found

* added exception to check if system tables are getting used or not

* enhance function getWarehousesEventDF

* added code to fix max number of clusters

* change in column names

* refactored code

* Add descriptive `NamedTransformation`s to Spark UI (#1223)

* Initial commit

* Add descriptive job group IDs and named transformations

This makes the Spark UI more developer-friendly when analyzing
Overwatch runs.

Job group IDs have the form <workspace name>:<OW module name>

Any use of `.transform( df => df)` may be replaced with
`.transformWithDescription( nt)` after instantiating a `val nt =
NamedTransformation( df => df)` as its argument.

This commit contains one such application of the new extension method.
(See `val jobRunsAppendClusterName` in `WorkflowsTransforms.scala`.)

Some logic in `GoldTransforms` falls through to elements of the
special job-run-action form of Job Group IDs emitted by the platform
but the impact is minimal relative to the benefit to Overwatch
development and troubleshooting.  Even so this form of Job Group ID is
still present in initial Spark events before OW ETL modules begin to
execute.

* improve TransformationDescriberTest

* flip transformation names to beginning of label

for greater visibility in Spark UI. `NamedTransformation` type name
now appears in labels' second position.

(cherry picked from commit 2ead752)

* revert modified Spark UI Job Group labels

TODO: enumerate the regressions this would introduce when the labels set by then platform are replaced this way.

---------

Co-authored-by: Guenia <[email protected]>

* adding code for warehouseStateFact gold (#1265)

* adding code for warehouseStateFact gold

* removed hard coded data and fix logic

* removed commented code

* Show `DataFrame` records in logs (#1224)

* Initial commit

* Add extension method to show `DataFrame` records in the log

* catch up with 0820_release

Squashed commit of the following:

commit bbdb61f
Author: Neil Best <[email protected]>
Date:   Tue Aug 20 10:11:03 2024 -0500

    Add descriptive `NamedTransformation`s to Spark UI (#1223)

    * Initial commit

    * Add descriptive job group IDs and named transformations

    This makes the Spark UI more developer-friendly when analyzing
    Overwatch runs.

    Job group IDs have the form <workspace name>:<OW module name>

    Any use of `.transform( df => df)` may be replaced with
    `.transformWithDescription( nt)` after instantiating a `val nt =
    NamedTransformation( df => df)` as its argument.

    This commit contains one such application of the new extension method.
    (See `val jobRunsAppendClusterName` in `WorkflowsTransforms.scala`.)

    Some logic in `GoldTransforms` falls through to elements of the
    special job-run-action form of Job Group IDs emitted by the platform
    but the impact is minimal relative to the benefit to Overwatch
    development and troubleshooting.  Even so this form of Job Group ID is
    still present in initial Spark events before OW ETL modules begin to
    execute.

    * improve TransformationDescriberTest

    * flip transformation names to beginning of label

    for greater visibility in Spark UI. `NamedTransformation` type name
    now appears in labels' second position.

    (cherry picked from commit 2ead752)

    * revert modified Spark UI Job Group labels

    TODO: enumerate the regressions this would introduce when the labels set by then platform are replaced this way.

    ---------

    Co-authored-by: Guenia <[email protected]>

commit 3055a22
Author: Aman <[email protected]>
Date:   Mon Aug 12 22:59:13 2024 +0530

    1218 warehouse state details (#1254)

    * test

    * code for warehouse_state_detail_silver

    * removed comments

    * adding warehouseEvents scope

    * added exception for table not found

    * added exception to check if system tables are getting used or not

    * enhance function getWarehousesEventDF

    * added code to fix max number of clusters

    * change in column names

    * refactored code

commit 59daae5
Author: Aman <[email protected]>
Date:   Thu Aug 8 20:20:17 2024 +0530

    adding fix for duplicate accountId in module 2010 and 3019 (#1270)

commit d6fa441
Author: Sourav Banerjee <[email protected]>
Date:   Wed Aug 7 23:24:00 2024 +0530

    1030 pipeline validation framework (#1071)

    * Initial commit

    * 19-Oct-23 : Added Validation Framework

    * 19-Oct-23: Customize the message for customer

    * 19-Oct-23: Customize the message for customer

    * 26-Oct-23: Added OverwatchID filter in the table

    * 26-Oct-23: Change for Coding Best Practices

    * Added Function Description for validateColumnBetweenMultipleTable

    * Added Pattern Matching in Validation

    * Convert if-else in validateRuleAndUpdateStatus to case statement as per comment

    * Initial commit

    * traceability implemented (#1102)

    * traceability implemented

    * code review implemented

    * missed code implemented (#1105)

    * Initial commit

    * traceability implemented (#1102)

    * traceability implemented

    * code review implemented

    * missed code implemented

    * missed code implemented

    ---------

    Co-authored-by: Guenia Izquierdo <[email protected]>

    * Added proper exception for Spark Stream Gold if progress c… (#1085)

    * Initial commit

    * 09-Nov-23: Added proper exception for Spark Stream Gold if progress column contains only null in SparkEvents_Bronze

    ---------

    Co-authored-by: Guenia Izquierdo <[email protected]>
    Co-authored-by: Sourav Banerjee <[email protected]>

    * Gracefully Handle Exception for NotebookCommands_Gold (#1095)

    * Initial commit

    * Gracefully Handle Exception for NotebookCommands_Gold

    * Convert the check in buildNotebookCommandsFact to single or clause

    ---------

    Co-authored-by: Guenia Izquierdo <[email protected]>
    Co-authored-by: Sourav Banerjee <[email protected]>

    * code missed in merge (#1120)

    * Fix Helper Method to Instantiate Remote Workspaces (#1110)

    * Initial commit

    * Change getRemoteWorkspaceByPath and getWorkspaceByDatabase to take it RemoteWorkspace

    * Remove Unnecessary println Statements

    ---------

    Co-authored-by: Guenia Izquierdo <[email protected]>

    * Ensure we test the write into a partitioned storage_prefix (#1088)

    * Initial commit

    * Ensure we test the write into a partitioned storage_prefix

    * silver warehouse spec fix (#1121)

    * added missed copy-pasta (#1129)

    * Exclude cluster logs in S3 root bucket (#1118)

    * Exclude cluster logs in S3 root bucket

    * Omit cluster log paths pointing to s3a as well

    * implemented recon (#1116)

    * implemented recon

    * docs added

    * file path change

    * review comments implemented

    * Added ShuffleFactor to NotebookCommands (#1124)

    Co-authored-by: Sourav Banerjee <[email protected]>

    * disabled traceability (#1130)

    * Added JobRun_Silver in buildClusterStateFact for Cluster E… (#1083)

    * Initial commit

    * 08-Nov-23: Added JobRun_Silver in buildClusterStateFact for Cluster End Time Imputation

    * Impute Terminating Events in CLSF from JR_Silver

    * Impute Terminating Events in CLSD

    * Impute Terminating Events in CLSD

    * Change CLSF to original 0730 version

    * Change CLSF to original 0730 version

    * Added cluster_spec in CLSD to get job Cluster only

    * Make the variables name in buildClusterStateDetail into more descriptive way

    * Make the variables name in buildClusterStateDetail into more descriptive way

    ---------

    Co-authored-by: Guenia Izquierdo <[email protected]>
    Co-authored-by: Sourav Banerjee <[email protected]>

    * Sys table audit log integration (#1122)

    * system table integration with audit log

    * adding code to resolve issues with response col

    * fixed timestamp issue

    * adding print statement for from and until time

    * adding fix for azure

    * removed comments

    * removed comments and print statements

    * removed comments

    * implemented code review comments

    * implemented code review comments

    * adding review comment

    * Sys table integration multi acount (#1131)

    * added code changes for multi account deployment

    * code for multi account system table integration

    * Sys table integration multi acount (#1132)

    * added code changes for multi account deployment

    * code for multi account system table integration

    * adding code for system table migration check

    * changing exception for empty audit log from system table

    * adding code to handle sql_endpoint in configs and fix in migration validation (#1133)

    * corner case commit (#1134)

    * Handle CLSD Cluster Impute when jrcp and clusterSpec is Empty (#1135)

    * Handle CLSD Cluster Impute when jrcp and clusterSpec is Empty

    * Exclude last_state from clsd as it is not needed in the logic.

    ---------

    Co-authored-by: Sourav Banerjee <[email protected]>

    * Exclude 2011 and 2014 as dependency module for 2019 (#1136)

    * Exclude 2011 and 2014 as dependency module for 2019

    * Added comment in CLSD for understandability

    ---------

    Co-authored-by: Sourav Banerjee <[email protected]>

    * corner case commit (#1137)

    * Update version

    * adding fix for empty EH config for system tables (#1140)

    * corner case commit (#1142)

    * adding fix for empty audit log for warehouse_spec_silver (#1141)

    * recon columns removed (#1143)

    * recon columns removed

    * recon columns removed

    * Initial Commit

    * Added Changes in Validation Framework as per comments added during sprint meeting

    * added hotfix for warehouse_spec_silver (#1154)

    * Added Multiple RunID check in Validation Frameowkr

    * Added Other tables in Validation Framework

    * Added Multiple WS ID option in Cros Table Validation

    * Added change for Pipeline_report

    * Change for Pipeline Report

    * Added msg for single table validation

    * Added negative msg in HealthCheck Report

    * Added Negative Msg for Cross Table Validation

    * Added extra filter for total cost validation for CLSF

    * Changed as per Comments

    * Changed as per the comments

    * Added some filter condition for cost validation in clsf

    * Added Config for all pipeline run

    * 19-Oct-23 : Added Validation Framework

    * 19-Oct-23: Customize the message for customer

    * 19-Oct-23: Customize the message for customer

    * 26-Oct-23: Added OverwatchID filter in the table

    * 26-Oct-23: Change for Coding Best Practices

    * Added Function Description for validateColumnBetweenMultipleTable

    * Added Pattern Matching in Validation

    * Convert if-else in validateRuleAndUpdateStatus to case statement as per comment

    * traceability implemented (#1102)

    * traceability implemented

    * code review implemented

    * Added JobRun_Silver in buildClusterStateFact for Cluster E… (#1083)

    * Initial commit

    * 08-Nov-23: Added JobRun_Silver in buildClusterStateFact for Cluster End Time Imputation

    * Impute Terminating Events in CLSF from JR_Silver

    * Impute Terminating Events in CLSD

    * Impute Terminating Events in CLSD

    * Change CLSF to original 0730 version

    * Change CLSF to original 0730 version

    * Added cluster_spec in CLSD to get job Cluster only

    * Make the variables name in buildClusterStateDetail into more descriptive way

    * Make the variables name in buildClusterStateDetail into more descriptive way

    ---------

    Co-authored-by: Guenia Izquierdo <[email protected]>
    Co-authored-by: Sourav Banerjee <[email protected]>

    * corner case commit (#1134)

    * Exclude 2011 and 2014 as dependency module for 2019 (#1136)

    * Exclude 2011 and 2014 as dependency module for 2019

    * Added comment in CLSD for understandability

    ---------

    Co-authored-by: Sourav Banerjee <[email protected]>

    * Added Changes in Validation Framework as per comments added during sprint meeting

    * Added Multiple RunID check in Validation Frameowkr

    * Added Other tables in Validation Framework

    * Added Multiple WS ID option in Cros Table Validation

    * Added change for Pipeline_report

    * Change for Pipeline Report

    * Added msg for single table validation

    * Added negative msg in HealthCheck Report

    * Added Negative Msg for Cross Table Validation

    * Added extra filter for total cost validation for CLSF

    * Changed as per Comments

    * Changed as per the comments

    * Added some filter condition for cost validation in clsf

    * Added Config for all pipeline run

    ---------

    Co-authored-by: Guenia Izquierdo <[email protected]>
    Co-authored-by: Sourav Banerjee <[email protected]>
    Co-authored-by: Sriram Mohanty <[email protected]>
    Co-authored-by: Aman <[email protected]>

commit 3c16b5f
Author: Sourav Banerjee <[email protected]>
Date:   Wed Aug 7 23:23:17 2024 +0530

    Redefine views so that they are created from tables not locations (#1241)

    * Initial commit

    * Change publish() function to incorporate views from ETL Tables iso paths

    * Handle view creation in case of table does not exists

    ---------

    Co-authored-by: Guenia <[email protected]>

commit f3ffd7c
Author: Sourav Banerjee <[email protected]>
Date:   Wed Aug 7 23:21:37 2024 +0530

    1201 collect all event logs on first run (#1255)

    * Initial commit

    * cluster event bronze will take all the data from API for first run

    * Update BronzeTransforms.scala

    adjust whitespace around `landClusterEvents()`

    ---------

    Co-authored-by: Guenia <[email protected]>
    Co-authored-by: Neil Best <[email protected]>

commit caa3282
Author: Sriram Mohanty <[email protected]>
Date:   Wed Aug 7 23:20:25 2024 +0530

    append null columns from cluster snapshot for cluster_spec_silver (#1239)

    * Initial commit

    * append null columns from cluster snapshot for cluster_spec_silver

    * append null columns from cluster snapshot for cluster_spec_silver

    ---------

    Co-authored-by: Guenia <[email protected]>

commit f7460bd
Author: Neil Best <[email protected]>
Date:   Tue Jul 30 14:52:38 2024 -0500

    adjust Silver Job Runs module configuration (#1256)

    enable auto-optimized shuffle for module 2011

    originally implemented in commit [d751d5f](d751d5f)

commit 25671b7
Author: Sriram Mohanty <[email protected]>
Date:   Tue Jul 9 02:02:04 2024 +0530

    recon enhancement done to deal with different columns in source and target (#1216)

    * Initial commit

    * recon enhancement done to deal with different columns in source and target

    ---------

    Co-authored-by: Guenia <[email protected]>

commit 97236ae
Author: Guenia <[email protected]>
Date:   Wed May 8 19:43:29 2024 -0400

    Initial commit

commit f9c8dd0
Author: Guenia Izquierdo Delgado <[email protected]>
Date:   Mon Jun 24 11:28:15 2024 -0400

    0812 release (#1249)

    * Initial commit

    * adding fix for schemaScrubber and StructToMap (#1232)

    * fix for null driver_type_id and node_type_id in jrcp (#1236)

    * Modify Cluster_snapshot_bronze column (#1234)

    * Comvert all the struct field inside 'spec' column for cluster_snapshot_bronze to mapType

    * Dropped Spec column from snapshot

    * Removed Reductant VerifyMinSchema

    * Update_AWS_instance_types (#1248)

    * Update_gcp_instance_types (#1244)

    Update_gcp_instance_types

    * Update_AWS_instance_types

    Update_AWS_instance_types

    ---------

    Co-authored-by: Aman <[email protected]>
    Co-authored-by: Sourav Banerjee <[email protected]>
    Co-authored-by: Mohan Baabu <[email protected]>

commit 7390d4a
Author: Mohan Baabu <[email protected]>
Date:   Fri Jun 21 20:01:46 2024 +0530

    Update_Azure_Instance_details (#1246)

    * Update_Azure_Instance_details

    Update_Azure_Instance_details

    * Update Azure_Instance_Details.csv

    Updated Standard_NV72ads_A10_v5 types, missed a comma

commit 6cbb9d7
Author: Mohan Baabu <[email protected]>
Date:   Fri Jun 21 19:37:57 2024 +0530

    Update_gcp_instance_types (#1244)

    Update_gcp_instance_types

* add Spark conf option for `DataFrame` logging extension methods

This feature respects the logging level set for the logger in scope.

```scala
spark.conf.set( "overwatch.dataframelogger.level", "DEBUG")

logger.setLevel( "WARN")

df.log()

// no data shown in logs

logger.setLevel( "DEBUG")

df.log()

// :)
```

also:

- implement `DataFrameSyntaxTest` suite to test `Dataset`/`DataFrame`
  extension methods `.showLines()` and `.log()` as implemented within
  the `DataFrameSyntax` trait.

- move `SparkSessionTestWrapper` into `src/main` and made it extend
  `SparkSessionWrapper` in order to make `DataFrameSyntax` testable
  through the use of type parameter `SPARK` and self-typing.

---------

Co-authored-by: Guenia <[email protected]>

* Added deriveRawApiResponseDF fix (#1283)

Co-authored-by: Sourav Banerjee <[email protected]>

* add new module to gold metadata (#1296)

---------

Co-authored-by: Sriram Mohanty <[email protected]>
Co-authored-by: Neil Best <[email protected]>
Co-authored-by: Sourav Banerjee <[email protected]>
Co-authored-by: Sourav Banerjee <[email protected]>
Co-authored-by: Aman <[email protected]>
Co-authored-by: Sourav Banerjee <[email protected]>
  • Loading branch information
7 people authored Sep 26, 2024
1 parent 7c433b1 commit 6a66e78
Show file tree
Hide file tree
Showing 37 changed files with 2,352 additions and 142 deletions.
8 changes: 6 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ name := "overwatch"

organization := "com.databricks.labs"

version := "0.8.1.2"
version := "0.8.2.0"

scalaVersion := "2.12.12"
scalacOptions ++= Seq("-Xmax-classfile-name", "78")

Test / fork := true
Test / envVars := Map("OVERWATCH_ENV" -> " ","OVERWATCH_TOKEN" -> " ","OVERWATCH" -> " ")

logBuffered in Test := false
// parallelExecution in Test := false

val sparkVersion = "3.1.2"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % Provided
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % Provided
Expand All @@ -18,6 +21,7 @@ libraryDependencies += "com.databricks" % "dbutils-api_2.12" % "0.0.5" % Provide
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.595" % Provided
libraryDependencies += "io.delta" % "delta-core_2.12" % "1.0.0" % Provided
libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2"
libraryDependencies += "com.lihaoyi" %% "sourcecode" % "0.4.1"

//libraryDependencies += "org.apache.hive" % "hive-metastore" % "2.3.9"

Expand Down Expand Up @@ -51,4 +55,4 @@ assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
19 changes: 19 additions & 0 deletions src/main/resources/Warehouse_DBU_Details.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
cloud,cluster_size,driver_size,worker_count,total_dbus
AWS,2X-Small,i3.2xlarge,1,4
AWS,X-Small,i3.2xlarge,2,6
AWS,Small,i3.4xlarge,4,12
AWS,Medium,i3.8xlarge,8,24
AWS,Large,i3.8xlarge,16,40
AWS,X-Large,i3.16xlarge,32,80
AWS,2X-Large,i3.16xlarge,64,144
AWS,3X-Large,i3.16xlarge,128,272
AWS,4X-Large,i3.16xlarge,256,528
AZURE,2X-Small,E8ds v4,1,4
AZURE,X-Small,E8ds v4,2,6
AZURE,Small,E16ds v4,4,12
AZURE,Medium,E32ds v4,8,24
AZURE,Large,E32ds v4,16,40
AZURE,X-Large,E64ds v4,32,80
AZURE,2X-Large,E64ds v4,64,144
AZURE,3X-Large,E64ds v4,128,272
AZURE,4X-Large,E64ds v4,256,528
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
val sqlComputerDBUPrice: Double = config.sql_compute_dbu_price
val jobsLightDBUPrice: Double = config.jobs_light_dbu_price
val customWorkspaceName: String = config.workspace_name
val standardScopes = "audit,sparkEvents,jobs,clusters,clusterEvents,notebooks,pools,accounts,dbsql,notebookCommands".split(",")
val standardScopes = OverwatchScope.toArray
val scopesToExecute = (standardScopes.map(_.toLowerCase).toSet --
config.excluded_scopes.getOrElse("").split(":").map(_.toLowerCase).toSet).toArray

Expand Down
26 changes: 26 additions & 0 deletions src/main/scala/com/databricks/labs/overwatch/api/ApiMeta.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,32 @@ trait ApiMeta {
jsonObject.toString
}

/**
* Function will add the meta info to the api response.
*
* @param response
* @param jsonQuery
* @param queryMap
* @return a string containing the api response and the meta for the api call.
*/
private[overwatch] def enrichAPIResponse(response: HttpResponse[String], jsonQuery: String, queryMap: Map[String, String]): String = {
val filter: String = if (apiCallType.equals("POST")) jsonQuery else {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.writeValueAsString(queryMap)
}
val jsonObject = new JSONObject();
val apiTraceabilityMeta = new JSONObject();
apiTraceabilityMeta.put("endPoint", apiName)
apiTraceabilityMeta.put("type", apiCallType)
apiTraceabilityMeta.put("apiVersion", apiV)
apiTraceabilityMeta.put("responseCode", response.code)
apiTraceabilityMeta.put("batchKeyFilter", filter)
jsonObject.put("rawResponse", response.body.trim)
jsonObject.put("apiTraceabilityMeta", apiTraceabilityMeta)
jsonObject.toString
}

}

/**
Expand Down
36 changes: 36 additions & 0 deletions src/main/scala/com/databricks/labs/overwatch/env/Workspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import scala.concurrent.Future
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.{Failure, Success, Try}
import scala.concurrent.ExecutionContext.Implicits.global
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter


/**
Expand Down Expand Up @@ -422,6 +424,40 @@ class Workspace(config: Config) extends SparkSessionWrapper {
addReport
}

/**
* Fetch the warehouse event data from system.compute.warehouse_events
* @param fromTime : from time to fetch the data
* @param untilTime: until time to fetch the data
* @param maxHistoryDays: maximum history days to fetch the data
* @return
*/
def getWarehousesEventDF(fromTime: TimeTypes,
untilTime: TimeTypes,
config: Config,
maxHistoryDays: Int = 30
): DataFrame = {
val sysTableFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
val moduleFromTime = fromTime.asLocalDateTime.format(sysTableFormat)
val moduleUntilTime = untilTime.asLocalDateTime.format(sysTableFormat)
val useSystemTableMessage = "Use system tables as a source to audit logs"
val tableDoesNotExistsMessage = "Table system.compute.warehouse_events does not exists"

if(config.auditLogConfig.systemTableName.isEmpty)
throw new NoNewDataException(useSystemTableMessage, Level.WARN, allowModuleProgression = false)

if(!spark.catalog.tableExists("system.compute.warehouse_events"))
throw new NoNewDataException(tableDoesNotExistsMessage, Level.WARN, allowModuleProgression = false)

spark.sql(s"""
select * from system.compute.warehouse_events
WHERE workspace_id = '${config.organizationId}'
and event_time >= DATE_SUB('${moduleFromTime}', ${maxHistoryDays})
and event_time <= '${moduleUntilTime}'
""")
.withColumnRenamed("event_type","state")
.withColumnRenamed("workspace_id","organization_id")
.withColumnRenamed("event_time","timestamp")
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)

}



lazy private[overwatch] val jobsSnapshotModule = Module(1001, "Bronze_Jobs_Snapshot", this)
lazy private val appendJobsProcess: () => ETLDefinition = {
() =>
Expand Down Expand Up @@ -171,6 +169,7 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)
BronzeTargets.clustersSnapshotTarget.asDF,
Seq(
prepClusterEventLogs(
clusterEventLogsModule.isFirstRun,
BronzeTargets.auditLogsTarget.asIncrementalDF(clusterEventLogsModule, BronzeTargets.auditLogsTarget.incrementalColumns, additionalLagDays = 1), // 1 lag day to get laggard records
clusterEventLogsModule.fromTime,
clusterEventLogsModule.untilTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,21 @@ trait BronzeTransforms extends SparkSessionWrapper {
pipelineSnapTime: Long,
tmpClusterEventsSuccessPath: String,
tmpClusterEventsErrorPath: String,
config: Config) = {
config: Config,
isFirstRun: Boolean) = {
val finalResponseCount = clusterIDs.length
val clusterEventsEndpoint = "clusters/events"

val lagTime = 86400000 //1 day
val lagStartTime = startTime.asUnixTimeMilli - lagTime

val lagStartTime = if (isFirstRun) {
logger.log(Level.INFO, "First run, acquiring all cluster events")
0.toLong
} else {
logger.log(Level.INFO, "Subsequent run, acquiring new cluster events")
startTime.asUnixTimeMilli - lagTime
}

// creating Json input for parallel API calls
val jsonInput = Map(
"start_value" -> "0",
Expand Down Expand Up @@ -601,6 +610,7 @@ trait BronzeTransforms extends SparkSessionWrapper {
}

protected def prepClusterEventLogs(
isFirstRun : Boolean,
filteredAuditLogDF: DataFrame,
startTime: TimeTypes,
endTime: TimeTypes,
Expand All @@ -626,18 +636,23 @@ trait BronzeTransforms extends SparkSessionWrapper {

val tmpClusterEventsSuccessPath = s"${config.tempWorkingDir}/${apiEndpointTempDir}/success_" + pipelineSnapTS.asUnixTimeMilli
val tmpClusterEventsErrorPath = s"${config.tempWorkingDir}/${apiEndpointTempDir}/error_" + pipelineSnapTS.asUnixTimeMilli
try{
landClusterEvents(clusterIDs, startTime, endTime, pipelineSnapTS.asUnixTimeMilli, tmpClusterEventsSuccessPath,
tmpClusterEventsErrorPath, config)
}catch {
try {
landClusterEvents(
clusterIDs, startTime, endTime,
pipelineSnapTS.asUnixTimeMilli,
tmpClusterEventsSuccessPath,
tmpClusterEventsErrorPath,
config,
isFirstRun)
} catch {
case e: Throwable =>
val errMsg = s"Error in landing cluster events: ${e.getMessage}"
logger.log(Level.ERROR, errMsg)
throw e
}
if (Helpers.pathExists(tmpClusterEventsErrorPath)) {
persistErrors(
deriveRawApiResponseDF(spark.read.json(tmpClusterEventsErrorPath))
deriveRawApiResponseDF(spark.read.json(tmpClusterEventsErrorPath))
.withColumn("from_ts", toTS(col("from_epoch")))
.withColumn("until_ts", toTS(col("until_epoch"))),
database,
Expand Down Expand Up @@ -1375,4 +1390,4 @@ trait BronzeTransforms extends SparkSessionWrapper {
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,18 @@ class ETLDefinition(

val transformedDF = transforms.foldLeft(verifiedSourceDF) {
case (df, transform) =>
df.transform(transform)
/*
* reverting Spark UI Job Group labels for now
*
* TODO: enumerate the regressions this would introduce
* when the labels set by then platform are replaced
* this way.
* df.sparkSession.sparkContext.setJobGroup(
* s"${module.pipeline.config.workspaceName}:${module.moduleName}",
* transform.toString)
*/

df.transform( transform)
}
write(transformedDF, module)
}
Expand Down
30 changes: 29 additions & 1 deletion src/main/scala/com/databricks/labs/overwatch/pipeline/Gold.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class Gold(_workspace: Workspace, _database: Database, _config: Config)
GoldTargets.sparkExecutorTarget,
GoldTargets.sqlQueryHistoryTarget,
GoldTargets.warehouseTarget,
GoldTargets.notebookCommandsTarget
GoldTargets.notebookCommandsTarget,
GoldTargets.warehouseStateFactTarget
)
}

Expand Down Expand Up @@ -78,6 +79,11 @@ class Gold(_workspace: Workspace, _database: Database, _config: Config)
notebookCommandsFactModule
)
}
case OverwatchScope.warehouseEvents => {
Array(
warehouseStateFactModule
)
}
case _ => Array[Module]()
}
}
Expand Down Expand Up @@ -320,6 +326,24 @@ class Gold(_workspace: Workspace, _database: Database, _config: Config)
)
}

lazy private[overwatch] val warehouseStateFactModule = Module(3020, "Gold_WarehouseStateFact", this, Array(2022, 2021), 3.0)
lazy private val appendWarehouseStateFactProcess: () => ETLDefinition = {
() =>
ETLDefinition(
SilverTargets.warehousesStateDetailTarget.asIncrementalDF(
warehouseStateFactModule,
SilverTargets.warehousesStateDetailTarget.incrementalColumns,
GoldTargets.warehouseStateFactTarget.maxMergeScanDates
),
Seq(buildWarehouseStateFact(
BronzeTargets.cloudMachineDetail,
BronzeTargets.warehouseDbuDetail,
SilverTargets.warehousesSpecTarget
)),
append(GoldTargets.warehouseStateFactTarget)
)
}

private def processSparkEvents(): Unit = {

sparkExecutorModule.execute(appendSparkExecutorProcess)
Expand Down Expand Up @@ -400,6 +424,10 @@ class Gold(_workspace: Workspace, _database: Database, _config: Config)
notebookCommandsFactModule.execute(appendNotebookCommandsFactProcess)
GoldTargets.notebookCommandsFactViewTarget.publish(notebookCommandsFactViewColumnMapping)
}
case OverwatchScope.warehouseEvents => {
warehouseStateFactModule.execute(appendWarehouseStateFactProcess)
GoldTargets.warehouseStateFactViewTarget.publish(warehouseStateFactViewColumnMappings)
}
case _ =>
}
}
Expand Down
Loading

0 comments on commit 6a66e78

Please sign in to comment.