Skip to content

Commit

Permalink
feat: collect Spark telemetry from job clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
sdewitt-newrelic committed Sep 14, 2024
1 parent 2676da6 commit f37aa1e
Show file tree
Hide file tree
Showing 6 changed files with 312 additions and 183 deletions.
56 changes: 55 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ Databricks) and/or Spark telemetry from any Spark deployment. See the
* [General configuration](#general-configuration)
* [Pipeline configuration](#pipeline-configuration)
* [Log configuration](#log-configuration)
* [Query configuration](#query-configuration)
* [Databricks configuration](#databricks-configuration)
* [Spark configuration](#spark-configuration)
* [Authentication](#authentication)
* [Building](#building)
* [Coding Conventions](#coding-conventions)
Expand Down Expand Up @@ -498,6 +499,59 @@ collector. This includes the case when the integration is
using the the provided [init script](./init/cluster_init_integration.sh)
since Spark telemetry is collected by the Spark collector in this scenario.

###### `sparkClusterSources`

| Description | Valid Values | Required | Default |
| --- | --- | --- | --- |
| The root node for the [Databricks cluster source configuration](#databricks-cluster-source-configuration) | YAML Mapping | N | N/a |

The mechanism used to create a cluster is referred to as a cluster "source". The
Databricks collector supports collecting Spark telemetry from all-purpose
clusters created via the UI or API and from job clusters created via the
Databricks Jobs Scheduler. This element groups together the flags used to
individually [enable or disable](#databricks-cluster-source-configuration) the
cluster sources from which the Databricks collector will collect Spark
telemetry.

##### Databricks cluster source configuration

###### `ui`

| Description | Valid Values | Required | Default |
| --- | --- | --- | --- |
| Flag to enable automatic collection of Spark telemetry from all-purpose clusters created via the UI | `true` / `false` | N | `true` |

By default, when the Databricks collector is enabled, it will automatically
collect Spark telemetry from all all-purpose clusters created via the UI.

This flag can be used to disable the collection of Spark telemetry from
all-purpose clusters created via the UI.

###### `job`

| Description | Valid Values | Required | Default |
| --- | --- | --- | --- |
| Flag to enable automatic collection of Spark telemetry from job clusters created via the Databricks Jobs Scheduler | `true` / `false` | N | `true` |

By default, when the Databricks collector is enabled, it will automatically
collect Spark telemetry from job clusters created by the Databricks Jobs
Scheduler.

This flag can be used to disable the collection of Spark telemetry from job
clusters created via the Databricks Jobs Scheduler.

###### `api`

| Description | Valid Values | Required | Default |
| --- | --- | --- | --- |
| Flag to enable automatic collection of Spark telemetry from all-purpose clusters created via the [Databricks ReST API](https://docs.databricks.com/api/workspace/introduction) | `true` / `false` | N | `true` |

By default, when the Databricks collector is enabled, it will automatically
collect Spark telemetry from all-purpose clusters created via the [Databricks ReST API](https://docs.databricks.com/api/workspace/introduction).

This flag can be used to disable the collection of Spark telemetry from
all-purpose clusters created via the [Databricks ReST API](https://docs.databricks.com/api/workspace/introduction).

##### Spark configuration

The Spark configuration parameters are used to configure the Spark collector.
Expand Down
4 changes: 4 additions & 0 deletions configs/config.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ databricks:
oauthClientSecret: [YOUR_DATABRICKS_SERVICE_PRINCIPAL_OAUTH_CLIENT_SECRET]
sparkMetrics: true
sparkMetricPrefix: spark.
sparkClusterSources:
ui: true
job: true
api: true
# RESERVED FOR FUTURE USE
#accountHost: https://accounts.cloud.databricks.com
spark:
Expand Down
160 changes: 19 additions & 141 deletions internal/databricks/databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@ package databricks

import (
"context"
"fmt"
"maps"
"strings"

databricksSdk "github.com/databricks/databricks-sdk-go"
databricksSdkConfig "github.com/databricks/databricks-sdk-go/config"
databricksSdkCompute "github.com/databricks/databricks-sdk-go/service/compute"
"github.com/newrelic-experimental/newrelic-databricks-integration/internal/spark"

"github.com/newrelic/newrelic-labs-sdk/v2/pkg/integration"
"github.com/newrelic/newrelic-labs-sdk/v2/pkg/integration/exporters"
"github.com/newrelic/newrelic-labs-sdk/v2/pkg/integration/log"
"github.com/newrelic/newrelic-labs-sdk/v2/pkg/integration/pipeline"
"github.com/spf13/viper"
)

Expand Down Expand Up @@ -50,71 +47,28 @@ func InitPipelines(
}

if sparkMetrics {
all, err := w.Clusters.ListAll(
ctx,
databricksSdkCompute.ListClustersRequest{},
// Create the newrelic exporter
newRelicExporter := exporters.NewNewRelicExporter(
"newrelic-api",
i.Name,
i.Id,
i.NrClient,
i.GetLicenseKey(),
i.GetRegion(),
i.DryRun,
)
if err != nil {
return fmt.Errorf("failed to list clusters: %w", err)
}

for _, c := range all {
if c.State != databricksSdkCompute.StateRunning {
log.Debugf(
"skipping cluster %s because it is not running",
c.ClusterName,
)
continue
}

if c.ClusterSource == databricksSdkCompute.ClusterSourceUi ||
c.ClusterSource == databricksSdkCompute.ClusterSourceApi {

/// resolve the spark context UI URL for the cluster
log.Debugf(
"resolving Spark context UI URL for cluster %s",
c.ClusterName,
)
sparkContextUiPath, err := getSparkContextUiPathForCluster(
ctx,
w,
&c,
)
if err != nil {
return err
}

databricksSparkApiClient, err := NewDatabricksSparkApiClient(
sparkContextUiPath,
w,
)
if err != nil {
return err
}
// Create a metrics pipeline
mp := pipeline.NewMetricsPipeline()
mp.AddExporter(newRelicExporter)

// Initialize spark pipelines
log.Debugf(
"initializing Spark pipeline for cluster %s with spark context UI URL %s",
c.ClusterName,
sparkContextUiPath,
)
// Create the receiver
databricksSparkReceiver := NewDatabricksSparkReceiver(w, tags)
mp.AddReceiver(databricksSparkReceiver)

newTags := maps.Clone(tags)
newTags["clusterProvider"] = "databricks"
newTags["databricksClusterId"] = c.ClusterId
newTags["databricksClusterName"] = c.ClusterName
log.Debugf("initializing Spark pipeline")

err = spark.InitPipelinesWithClient(
i,
databricksSparkApiClient,
viper.GetString("databricks.sparkMetricPrefix"),
newTags,
)
if err != nil {
return err
}
}
}
i.AddPipeline(mp)
}

// @TODO: initialize databricks pipelines here
Expand Down Expand Up @@ -182,79 +136,3 @@ func configureAuth(config *databricksSdk.Config) error {

return nil
}

func getSparkContextUiPathForCluster(
ctx context.Context,
w *databricksSdk.WorkspaceClient,
c *databricksSdkCompute.ClusterDetails,
) (string, error) {
// @see https://databrickslabs.github.io/overwatch/assets/_index/realtime_helpers.html

clusterId := c.ClusterId

waitContextStatus, err := w.CommandExecution.Create(
ctx,
databricksSdkCompute.CreateContext{
ClusterId: clusterId,
Language: databricksSdkCompute.LanguagePython,
},
)
if err != nil {
return "", err
}

execContext, err := waitContextStatus.Get()
if err != nil {
return "", err
}

cmd := databricksSdkCompute.Command{
ClusterId: clusterId,
Command: `
print(f'{spark.conf.get("spark.databricks.clusterUsageTags.clusterOwnerOrgId")}')
print(f'{spark.conf.get("spark.ui.port")}')
`,
ContextId: execContext.Id,
Language: databricksSdkCompute.LanguagePython,
}

waitCommandStatus, err := w.CommandExecution.Execute(
ctx,
cmd,
)
if err != nil {
return "", err
}

resp, err := waitCommandStatus.Get()
if err != nil {
return "", err
}

data, ok := resp.Results.Data.(string);
if !ok {
return "", fmt.Errorf("command result is not a string value")
}

vals := strings.Split(data, "\n")
if len(vals) != 2 {
return "", fmt.Errorf("invalid command result")
}

if vals[0] == "" || vals[1] == "" {
return "", fmt.Errorf("empty command results")
}

// @TODO: I think this URL pattern only works for multi-tenant accounts.
// We may need a flag for single tenant accounts and use the o/0 form
// shown on the overwatch site.

url := fmt.Sprintf(
"/driver-proxy-api/o/%s/%s/%s",
vals[0],
clusterId,
vals[1],
)

return url, nil
}
Loading

0 comments on commit f37aa1e

Please sign in to comment.