Skip to content

Commit

Permalink
feat: collect pipeline event logs
Browse files Browse the repository at this point in the history
  • Loading branch information
sdewitt-newrelic committed Dec 9, 2024
1 parent 49dde0d commit 286c81d
Show file tree
Hide file tree
Showing 11 changed files with 416 additions and 36 deletions.
146 changes: 146 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Databricks) and/or Spark telemetry from any Spark deployment. See the
* [Authentication](#authentication)
* [Consumption & Cost Data](#consumption--cost-data)
* [Job Run Data](#job-run-data)
* [Pipeline Event Logs](#pipeline-event-logs)
* [Building](#building)
* [Coding Conventions](#coding-conventions)
* [Local Development](#local-development)
Expand Down Expand Up @@ -152,6 +153,17 @@ to add the following environment variables.
the OAuth client ID for the service principal
* `NEW_RELIC_DATABRICKS_OAUTH_CLIENT_SECRET` - To [use a service principal to authenticate with Databricks (OAuth M2M)](https://docs.databricks.com/en/dev-tools/auth/oauth-m2m.html),
an OAuth client secret associated with the service principal
* `NEW_RELIC_DATABRICKS_USAGE_ENABLED` - Set to `true` to enable collection of
[consumption and cost data](#consumption--cost-data) from this cluster node or
`false` to disable collection. Defaults to `true`.
* `NEW_RELIC_DATABRICKS_SQL_WAREHOUSE` - The ID of a [SQL warehouse](https://docs.databricks.com/en/compute/sql-warehouse/index.html)
where usage queries should be run
* `NEW_RELIC_DATABRICKS_JOB_RUNS_ENABLED` - Set to `true` to enable collection
of [job run data](#job-run-data) from this cluster node or `false` to disable
collection. Defaults to `true`.
* `NEW_RELIC_DATABRICKS_PIPELINE_EVENT_LOGS_ENABLED` - Set to `true` to enable
collection of Databricks Delta Live Tables [pipeline event logs](#pipeline-event-logs)
from this cluster node or `false` to disable collection. Defaults to `true`.

Note that the `NEW_RELIC_API_KEY` and `NEW_RELIC_ACCOUNT_ID` are currently
unused but are required by the [new-relic-client-go](https://github.com/newrelic/newrelic-client-go)
Expand Down Expand Up @@ -181,6 +193,23 @@ The New Relic Databricks integration supports the following capabilities.
This data can be used to show Databricks DBU consumption metrics and estimated
Databricks costs directly within New Relic.

* Collect Databricks job run telemetry

The New Relic Databricks integration can collect telemetry about
[Databricks Job](https://docs.databricks.com/en/jobs/index.html#what-are-databricks-jobs)
runs, such as job run durations, task run durations, the current state of job
and task runs, if a job or a task is a retry, and the number of times a task
was retried.

* Collect Databricks Delta Live Tables Pipeline event logs

The New Relic Databricks integration can collect [Databricks Delta Live Tables Pipeline event logs](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log)
for all [Databricks Delta Live Tables Pipelines](https://docs.databricks.com/en/delta-live-tables/develop-pipelines.html)
defined in a [workspace](https://docs.databricks.com/en/getting-started/concepts.html#accounts-and-workspaces).
[Databricks Delta Live Tables Pipeline event log](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log)
entries for every [pipeline update](https://docs.databricks.com/en/delta-live-tables/updates.html)
are collected and sent to [New Relic Logs](https://docs.newrelic.com/docs/logs/get-started/get-started-log-management/).

## Usage

### Command Line Options
Expand Down Expand Up @@ -551,6 +580,16 @@ the Databricks collector settings related to the collection of
This element groups together the configuration parameters to [configure](#databricks-job-configuration)
the Databricks collector settings related to the collection of job data.

###### `pipelines`

| Description | Valid Values | Required | Default |
| --- | --- | --- | --- |
| The root node for the set of [Databricks Pipeline configuration](#databricks-pipeline-configuration) parameters | YAML Mapping | N | N/a |

This element groups together the configuration parameters to [configure](#databricks-pipeline-configuration)
the Databricks collector settings related to the collection of [Databricks Delta Live Tables Pipelines](https://docs.databricks.com/en/delta-live-tables/develop-pipelines.html)
telemetry.

##### Databricks `spark` configuration

###### Databricks Spark `enabled`
Expand Down Expand Up @@ -853,6 +892,44 @@ particular time in the past calculated as an offset from the current time.
See the section [`startOffset` Configuration](#startoffset-configuration) for
more details.

##### Databricks Pipeline configuration

###### `logs`

| Description | Valid Values | Required | Default |
| --- | --- | --- | --- |
| The root node for the set of [Databricks Pipeline Event Logs configuration](#databricks-pipeline-event-logs-configuration) parameters | YAML Mapping | N | N/a |

This element groups together the configuration parameters to [configure](#databricks-pipeline-event-logs-configuration)
the Databricks collector settings related to the collection of Databricks Delta
Live Tables [pipeline event logs](#pipeline-event-logs).

##### Databricks pipeline event logs configuration

The Databricks pipeline event logs configuration parameters are used to
configure Databricks collector settings related to the collection of Databricks
Delta Live Tables [pipeline event logs](#pipeline-event-logs).

###### Databricks pipeline event logs `enabled`

| Description | Valid Values | Required | Default |
| --- | --- | --- | --- |
| Flag to enable automatic collection of Databricks Delta Live Tables [pipeline event logs](#pipeline-event-logs) | `true` / `false` | N | `true` |

By default, when the Databricks collector is enabled, it will automatically
collect Databricks Delta Live Tables [pipeline event logs](#pipeline-event-logs).

This flag can be used to disable the collection of Databricks Delta Live Tables
[pipeline event logs](#pipeline-event-logs) by the Databricks collector. This
may be useful when running multiple instances of the New Relic Databricks
integration against the same Databricks [workspace](https://docs.databricks.com/en/getting-started/concepts.html#accounts-and-workspaces).
In this scenario, the collection of Databricks Delta Live Tables [pipeline event logs](#pipeline-event-logs)
should _only_ be enabled on a single instance of the integration. Otherwise,
duplicate New Relic log entries will be created for each Databricks Delta Live
Tables [pipeline event log](#pipeline-event-logs) entry, making troubleshooting
challenging and affecting product features that rely on signal integrity such as
[anomaly detection](https://docs.newrelic.com/docs/alerts/create-alert/set-thresholds/anomaly-detection/).

##### Spark configuration

The Spark configuration parameters are used to configure the Spark collector.
Expand Down Expand Up @@ -1472,6 +1549,75 @@ statements to use to visualize the data.

![Sample job runs dashboard image](./examples/job-runs-dashboard.png)

### Pipeline Event Logs

The New Relic Databricks integration can collect [Databricks Delta Live Tables Pipeline event logs](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log)
for all [Databricks Delta Live Tables Pipelines](https://docs.databricks.com/en/delta-live-tables/develop-pipelines.html)
defined in a [workspace](https://docs.databricks.com/en/getting-started/concepts.html#accounts-and-workspaces).
[Databricks Delta Live Tables Pipeline event log](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log)
entries for every [pipeline update](https://docs.databricks.com/en/delta-live-tables/updates.html)
are collected and sent to [New Relic Logs](https://docs.newrelic.com/docs/logs/get-started/get-started-log-management/).

**NOTE:** Some of the text below is sourced from the
[Databricks Delta Live Tables pipeline event log schema documentation](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log-schema)
and the [Databricks SDK Go module documentation](https://pkg.go.dev/github.com/databricks/databricks-sdk-go).

#### Pipeline Event Log Data

[Databricks Delta Live Tables Pipeline event logs](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log)
are sent to New Relic as [New Relic Log data](https://docs.newrelic.com/docs/data-apis/understand-data/new-relic-data-types/#log-data).
For each [Databricks Delta Live Tables Pipeline event log](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log)
entry, the [fields from the event log entry](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log-schema)
are mapped to attributes on the corresponding New Relic log entry as follows.

**NOTE:** Due to the way in which the [Databricks ReST API](https://docs.databricks.com/api/workspace/introduction)
returns [Databricks Delta Live Tables Pipeline event log](https://docs.databricks.com/en/delta-live-tables/observability.html#event-log)
entries (in descending order by timestamp), later event log entries may
sometimes be visible in the [New Relic Logs UI](https://docs.newrelic.com/docs/logs/ui-data/use-logs-ui/)
before older event log entries are visible. This does not affect the temporal
ordering of the log entries. Once older log entries become visible (usually
30-60s longer than the value of the [`interval` configuration parameter](#interval)),
they are properly ordered by timestamp in the [New Relic Logs UI](https://docs.newrelic.com/docs/logs/ui-data/use-logs-ui/).

##### Pipeline event log attributes

| Pipeline Event Log Field Name | New Relic Log Entry Attribute Name | Data Type in New Relic Log Entry | Description |
| --- | --- | --- | --- |
| `message` | `message` | string | A human-readable message describing the event |
| `timestamp` | `timestamp` | integer | The time (in milliseconds since the epoch) the event was recorded |
| `id` | `databricksPipelineEventId` | string | A unique identifier for the event log record |
| `event_type` | `databricksPipelineEventType` | string | The event type |
| `level` | `level`, `databricksPipelineEventLevel` | string | The severity level of the event, for example, `INFO`, `WARN`, `ERROR`, or `METRICS` |
| `maturity_level` | `databricksPipelineEventMaturityLevel` | string | The stability of the event schema, one of `STABLE`, `NULL`, `EVOLVING`, or `DEPRECATED` |
| n/a | `databricksPipelineEventError` | boolean | `true` if an error was captured by the event (see below for additional details), otherwise `false` |
| `origin.batch_id` | `databricksPipelineEventBatchId` | integer | The id of a batch. Unique within a flow. |
| `origin.cloud` | `databricksPipelineEventCloud` | string | The cloud provider, e.g., AWS or Azure |
| `origin.cluster_id` | `databricksPipelineEventClusterId` | string | The id of the cluster where an execution happens. Unique within a region. |
| `origin.dataset_name` | `databricksPipelineEventDatasetName` | string | The name of a dataset. Unique within a pipeline. |
| `origin.flow_id` | `databricksPipelineEventFlowId` | string | The id of the flow. Globally unique. |
| `origin.flow_name` | `databricksPipelineEventFlowName` | string | The name of the flow. Not unique. |
| `origin.host` | `databricksPipelineEventHost` | string | The optional host name where the event was triggered |
| `origin.maintenance_id` | `databricksPipelineEventMaintenanceId` | string | The id of a maintenance run. Globally unique. |
| `origin.materialization_name` | `databricksPipelineEventMaterializationName` | string | Materialization name |
| `origin.org_id` | `databricksPipelineEventOrgId` | integer | The org id of the user. Unique within a cloud. |
| `origin.pipeline_id` | `databricksPipelineEventPipelineId` | string | The id of the pipeline. Globally unique. |
| `origin.pipeline_name` | `databricksPipelineEventPipelineName` | string | The name of the pipeline. Not unique. |
| `origin.region` | `databricksPipelineEventRegion` | string | The cloud region |
| `origin.request_id` | `databricksPipelineEventRequestId` | string | The id of the request that caused an update |
| `origin.table_id` | `databricksPipelineEventTableId` | string | The id of a (delta) table. Globally unique. |
| `origin.uc_resource_id` | `databricksPipelineEventUcResourceId` | string | The Unity Catalog id of the MV or ST being updated |
| `origin.update_id` | `databricksPipelineEventUpdateId` | string | The id of an execution. Globally unique. |

Additionally, if the `error` field is set on the pipeline event log entry,
indicating an error was captured by the event, the following attributes are
added to the New Relic log entry.

| Pipeline Event Log Field Name | New Relic Log Entry Attribute Name | Data Type in New Relic Log Entry | Description |
| --- | --- | --- | --- |
| `error.fatal` | `databricksPipelineEventErrorFatal` | boolean | Whether the error is considered fatal, that is, unrecoverable |
| `error.exceptions[N].class_name` | `databricksPipelineEventErrorExceptionNClassName` | string | Runtime class of the `N`th exception |
| `error.exceptions[N].message` | `databricksPipelineEventErrorExceptionNMessage` | string | Exception message of the `N`th exception |

## Building

### Coding Conventions
Expand Down
1 change: 1 addition & 0 deletions cmd/databricks/databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func main() {
integration.WithApiKey(),
integration.WithEvents(ctx),
integration.WithLastUpdate(),
integration.WithLogs(ctx),
)
fatalIfErr(err)

Expand Down
3 changes: 3 additions & 0 deletions configs/config.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ databricks:
includeIdentityMetadata: false
includeRunId: false
startOffset: 86400
pipelines:
logs:
enabled: true

spark:
webUiUrl: http://localhost:4040
Expand Down
Binary file added examples/dlt_pipeline_events_logs_ui.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
toolchain go1.22.4

require (
github.com/databricks/databricks-sdk-go v0.46.0
github.com/databricks/databricks-sdk-go v0.52.0
github.com/newrelic/newrelic-client-go/v2 v2.45.0
github.com/newrelic/newrelic-labs-sdk/v2 v2.1.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/containerd/containerd v1.3.7/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/databricks/databricks-sdk-go v0.46.0 h1:D0TxmtSVAOsdnfzH4OGtAmcq+8TyA7Z6fA6JEYhupeY=
github.com/databricks/databricks-sdk-go v0.46.0/go.mod h1:ds+zbv5mlQG7nFEU5ojLtgN/u0/9YzZmKQES/CfedzU=
github.com/databricks/databricks-sdk-go v0.52.0 h1:WKcj0F+pdx0gjI5xMicjYC4O43S2q5nyTpaGGMFmgHw=
github.com/databricks/databricks-sdk-go v0.52.0/go.mod h1:ds+zbv5mlQG7nFEU5ojLtgN/u0/9YzZmKQES/CfedzU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down
18 changes: 17 additions & 1 deletion init/cluster_init_integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,23 @@ databricks:
accessToken: "$NEW_RELIC_DATABRICKS_ACCESS_TOKEN"
oauthClientId: "$NEW_RELIC_DATABRICKS_OAUTH_CLIENT_ID"
oauthClientSecret: "$NEW_RELIC_DATABRICKS_OAUTH_CLIENT_SECRET"
sparkMetrics: false
spark:
enabled: false
usage:
enabled: $NEW_RELIC_DATABRICKS_USAGE_ENABLED
warehouseId: "$NEW_RELIC_DATABRICKS_SQL_WAREHOUSE"
includeIdentityMetadata: false
runTime: 02:00:00
jobs:
runs:
enabled: $NEW_RELIC_DATABRICKS_JOB_RUNS_ENABLED
metricPrefix: databricks.
includeIdentityMetadata: false
includeRunId: false
startOffset: 86400
pipelines:
logs:
enabled: $NEW_RELIC_DATABRICKS_PIPELINE_EVENT_LOGS_ENABLED
spark:
webUiUrl: http://{UI_HOST}:{UI_PORT}
metricPrefix: spark.
Expand Down
23 changes: 23 additions & 0 deletions internal/databricks/databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,29 @@ func InitPipelines(
i.AddComponent(mp)
}

collectPipelineEventLogs := true
if viper.IsSet("databricks.pipelines.logs.enabled") {
collectPipelineEventLogs = viper.GetBool(
"databricks.pipelines.logs.enabled",
)
}

if collectPipelineEventLogs {
databricksPipelineEventsReceiver :=
NewDatabricksPipelineEventsReceiver(i, w, tags)

// Create a logs pipeline for the event logs
lp := pipeline.NewLogsPipeline(
"databricks-pipeline-event-logs-pipeline",
)
lp.AddReceiver(databricksPipelineEventsReceiver)
lp.AddExporter(newRelicExporter)

log.Debugf("initializing Databricks pipeline event logs pipeline")

i.AddComponent(lp)
}

return nil
}

Expand Down
32 changes: 0 additions & 32 deletions internal/databricks/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,35 +516,3 @@ func writeCounters(
writer,
)
}

func writeGauge(
prefix string,
metricName string,
metricValue any,
attrs map[string]interface{},
writer chan <- model.Metric,
) {
metric := model.NewGaugeMetric(
prefix + metricName,
model.MakeNumeric(metricValue),
time.Now(),
)

for k, v := range attrs {
metric.Attributes[k] = v
}

writer <- metric
}

func makeAttributesMap(
tags map[string]string,
) map[string]interface{} {
attrs := make(map[string]interface{})

for k, v := range tags {
attrs[k] = v
}

return attrs
}
Loading

0 comments on commit 286c81d

Please sign in to comment.