Skip to content

Commit

Permalink
Merge pull request #42 from newrelic-experimental/feat/canned-queries
Browse files Browse the repository at this point in the history
feat: collect job cost data
  • Loading branch information
sdewitt-newrelic authored Oct 25, 2024
2 parents 0164da2 + 8cf4db5 commit fbdd84e
Show file tree
Hide file tree
Showing 14 changed files with 1,998 additions and 850 deletions.
349 changes: 268 additions & 81 deletions README.md

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions configs/config.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ databricks:
warehouseId: [YOUR_DATABRICKS_WAREHOUSE_ID]
includeIdentityMetadata: false
runTime: 02:00:00
optionalQueries:
jobs_cost_list_cost_per_job_run: true
jobs_cost_list_cost_per_job: true
jobs_cost_frequent_failures: true
jobs_cost_most_retries: true

spark:
webUiUrl: http://localhost:4040
metricPrefix: spark.
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/consumption-cost-dashboard-job-cost.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

Large diffs are not rendered by default.

Binary file removed examples/cost-usage-dashboard-cost-consumption.png
Binary file not shown.
Binary file removed examples/cost-usage-dashboard-usage.png
Binary file not shown.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/cast v1.6.0
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0 // indirect
Expand Down
353 changes: 353 additions & 0 deletions internal/databricks/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
package databricks

import (
"context"
"fmt"
"strconv"
"sync"
"time"

databricksSdk "github.com/databricks/databricks-sdk-go"
databricksSdkCompute "github.com/databricks/databricks-sdk-go/service/compute"
databricksSql "github.com/databricks/databricks-sdk-go/service/sql"
"github.com/newrelic/newrelic-labs-sdk/v2/pkg/integration/log"
)

type cacheLoaderFunc[T interface{}] func(ctx context.Context) (*T, error)

type memoryCache[T interface{}] struct {
mu sync.Mutex
expiry time.Duration
value *T
expiration time.Time
loader cacheLoaderFunc[T]
}

type workspaceInfo struct {
name string
}

type clusterInfo struct {
name string
source string
creator string
instancePoolId string
singleUserName string
}

type warehouseInfo struct {
name string
creator string
}

var (
workspaceInfoCache *memoryCache[map[int64]*workspaceInfo]
clusterInfoCache *memoryCache[map[string]*clusterInfo]
warehouseInfoCache *memoryCache[map[string]*warehouseInfo]
)

func newMemoryCache[T interface{}](
expiry time.Duration,
loader cacheLoaderFunc[T],
) *memoryCache[T] {
return &memoryCache[T] {
expiry: expiry,
loader: loader,
}
}

func (m *memoryCache[T]) get(ctx context.Context) (*T, error) {
m.mu.Lock()
defer m.mu.Unlock()

if m.value == nil || time.Now().After(m.expiration) {
val, err := m.loader(ctx)
if err != nil {
return nil, err
}

m.value = val
m.expiration = time.Now().Add(m.expiry * time.Second)
}

return m.value, nil
}

/*
func (m *memoryCache[T]) invalidate() {
m.mu.Lock()
defer m.mu.Unlock()
m.expiration = time.Time{}
m.value = nil
}
*/

// @todo: allow cache expiry values to be configured

func initInfoByIdCaches(
a *databricksSdk.AccountClient,
) {
workspaceInfoCache = newMemoryCache(
5 * time.Minute,
func(ctx context.Context) (*map[int64]*workspaceInfo, error) {
m, err := buildWorkspaceInfoByIdMap(ctx, a)
if err != nil {
return nil, err
}

return &m, nil
},
)

clusterInfoCache = newMemoryCache(
5 * time.Minute,
func(ctx context.Context) (*map[string]*clusterInfo, error) {
m, err := buildClusterInfoByIdMap(ctx, a)
if err != nil {
return nil, err
}

return &m, nil
},
)

warehouseInfoCache = newMemoryCache(
5 * time.Minute,
func(ctx context.Context) (*map[string]*warehouseInfo, error) {
m, err := buildWarehouseInfoByIdMap(ctx, a)
if err != nil {
return nil, err
}

return &m, nil
},
)
}

func buildWorkspaceInfoByIdMap(
ctx context.Context,
a *databricksSdk.AccountClient,
) (map[int64]*workspaceInfo, error) {
log.Debugf("building workspace info by ID map...")

workspaces, err := a.Workspaces.List(ctx)
if err != nil {
return nil, err
}

m := make(map[int64]*workspaceInfo)

for _, workspace := range workspaces {
workspaceInfo := &workspaceInfo{}
workspaceInfo.name = workspace.WorkspaceName

m[workspace.WorkspaceId] = workspaceInfo
}

return m, nil
}

func getWorkspaceInfoById(
ctx context.Context,
workspaceIdStr string,
) (*workspaceInfo, error) {
workspaceId, err := strconv.ParseInt(workspaceIdStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("workspace ID is not an integer")
}

workspaceInfoMap, err := workspaceInfoCache.get(ctx)
if err != nil {
return nil, err
}

workspaceInfo, ok := (*workspaceInfoMap)[workspaceId]
if ok {
return workspaceInfo, nil
}

return nil, nil
}

func buildClusterInfoByIdMap(
ctx context.Context,
a *databricksSdk.AccountClient,
) (map[string]*clusterInfo, error) {
log.Debugf("building cluster info by ID map...")

workspaces, err := a.Workspaces.List(ctx)
if err != nil {
return nil, err
}

m := map[string]*clusterInfo{}

for _, workspace := range workspaces {
w, err := a.GetWorkspaceClient(workspace)
if err != nil {
return nil, err
}

log.Debugf("listing clusters for workspace %s", workspace.WorkspaceName)

all := w.Clusters.List(
ctx,
databricksSdkCompute.ListClustersRequest{ PageSize: 100 },
)

for ; all.HasNext(ctx); {
c, err := all.Next(ctx)
if err != nil {
return nil, err
}

log.Debugf(
"cluster ID: %s ; cluster name: %s",
c.ClusterId,
c.ClusterName,
)

// namespace cluster ids with workspace id, just in case cluster ids
// can be the same in different workspaces
id := fmt.Sprintf(
"%d.%s",
workspace.WorkspaceId,
c.ClusterId,
)

clusterInfo := &clusterInfo{}
clusterInfo.name = c.ClusterName
clusterInfo.source = string(c.ClusterSource)
clusterInfo.creator = c.CreatorUserName
clusterInfo.singleUserName = c.SingleUserName
clusterInfo.instancePoolId = c.InstancePoolId

m[id] = clusterInfo
}
}

return m, nil
}

func getClusterInfoById(
ctx context.Context,
workspaceIdStr string,
clusterIdStr string,
) (*clusterInfo, error) {
workspaceId, err := strconv.ParseInt(workspaceIdStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("workspace ID is not an integer")
}

// namespace cluster ids with workspace id, just in case cluster ids can be
// the same in different workspaces
id := fmt.Sprintf(
"%d.%s",
workspaceId,
clusterIdStr,
)

clusterInfoMap, err := clusterInfoCache.get(ctx)
if err != nil {
return nil, err
}

clusterInfo, ok := (*clusterInfoMap)[id]
if ok {
return clusterInfo, nil
}

return nil, nil
}

func buildWarehouseInfoByIdMap(
ctx context.Context,
a *databricksSdk.AccountClient,
) (map[string]*warehouseInfo, error) {
log.Debugf("building warehouse info by ID map...")

workspaces, err := a.Workspaces.List(ctx)
if err != nil {
return nil, err
}

m := map[string]*warehouseInfo{}

for _, workspace := range workspaces {
w, err := a.GetWorkspaceClient(workspace)
if err != nil {
return nil, err
}

log.Debugf(
"listing warehouses for workspace %s",
workspace.WorkspaceName,
)

all := w.Warehouses.List(
ctx,
databricksSql.ListWarehousesRequest{},
)

for ; all.HasNext(ctx); {
warehouse, err := all.Next(ctx)
if err != nil {
return nil, err
}

log.Debugf(
"warehouse ID: %s ; warehouse name: %s",
warehouse.Id,
warehouse.Name,
)

// namespace warehouse ids with workspace id, just in case warehouse
// ids can be the same in different workspaces
id := fmt.Sprintf(
"%d.%s",
workspace.WorkspaceId,
warehouse.Id,
)

warehouseInfo := &warehouseInfo{}
warehouseInfo.name = warehouse.Name
warehouseInfo.creator = warehouse.CreatorName

m[id] = warehouseInfo
}
}

return m, nil
}

func getWarehouseInfoById(
ctx context.Context,
workspaceIdStr string,
warehouseIdStr string,
) (*warehouseInfo, error) {
workspaceId, err := strconv.ParseInt(workspaceIdStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("workspace ID is not an integer")
}

// namespace warehouse ids with workspace id, just in case warehouse ids can
// be the same in different workspaces
id := fmt.Sprintf(
"%d.%s",
workspaceId,
warehouseIdStr,
)

warehouseInfoMap, err := warehouseInfoCache.get(ctx)
if err != nil {
return nil, err
}

warehouseInfo, ok := (*warehouseInfoMap)[id]
if ok {
return warehouseInfo, nil
}

return nil, nil
}
Loading

0 comments on commit fbdd84e

Please sign in to comment.