Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EBPF] gpu: use the tagger to add GPU-related tags #32906

Merged
merged 5 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions pkg/collector/corechecks/gpu/gpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *Check) Configure(senderManager sender.SenderManager, _ uint64, config,
}

var err error
c.collectors, err = nvidia.BuildCollectors(c.nvmlLib)
c.collectors, err = nvidia.BuildCollectors(&nvidia.CollectorDependencies{NVML: c.nvmlLib, Tagger: c.tagger})
if err != nil {
return fmt.Errorf("failed to build NVML collectors: %w", err)
}
Expand Down Expand Up @@ -162,21 +162,30 @@ func (c *Check) emitSysprobeMetrics(snd sender.Sender) error {
}

func (c *Check) getTagsForKey(key model.StatsKey) []string {
entityID := taggertypes.NewEntityID(taggertypes.ContainerID, key.ContainerID)
tags, err := c.tagger.Tag(entityID, c.tagger.ChecksCardinality())
// Container ID tag will be added or not depending on the tagger configuration
gjulianm marked this conversation as resolved.
Show resolved Hide resolved
// PID is always added
tags := []string{
// Per-PID metrics are subject to change due to high cardinality
fmt.Sprintf("pid:%d", key.PID),
}

containerEntityID := taggertypes.NewEntityID(taggertypes.ContainerID, key.ContainerID)
containerTags, err := c.tagger.Tag(containerEntityID, c.tagger.ChecksCardinality())
if err != nil {
log.Errorf("Error collecting container tags for process %d: %s", key.PID, err)
} else {
tags = append(tags, containerTags...)
}

// Container ID tag will be added or not depending on the tagger configuration
// PID and GPU UUID are always added as they're not relying on the tagger yet
keyTags := []string{
// Per-PID metrics are subject to change due to high cardinality
fmt.Sprintf("pid:%d", key.PID),
fmt.Sprintf("gpu_uuid:%s", key.DeviceUUID),
gpuEntityID := taggertypes.NewEntityID(taggertypes.GPU, key.DeviceUUID)
gpuTags, err := c.tagger.Tag(gpuEntityID, c.tagger.ChecksCardinality())
if err != nil {
log.Errorf("Error collecting GPU tags for process %d: %s", key.PID, err)
} else {
tags = append(tags, gpuTags...)
}

return append(tags, keyTags...)
return tags
}

func (c *Check) emitNvmlMetrics(snd sender.Sender) error {
Expand Down
56 changes: 30 additions & 26 deletions pkg/collector/corechecks/gpu/nvidia/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@ import (

"github.com/NVIDIA/go-nvml/pkg/nvml"

tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
taggertypes "github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
tagVendor = "gpu_vendor:nvidia"
tagNameModel = "gpu_model"
tagNameUUID = "gpu_uuid"
)

// Collector defines a collector that gets metric from a specific NVML subsystem and device
type Collector interface {
// Collect collects metrics from the given NVML device. This method should not fill the tags
Expand Down Expand Up @@ -63,29 +59,42 @@ var allSubsystems = map[string]subsystemBuilder{
clocksCollectorName: newClocksCollector,
}

// CollectorDependencies holds the dependencies needed to create a set of collectors.
type CollectorDependencies struct {
// Tagger is the tagger component used to tag the metrics.
Tagger tagger.Component

// NVML is the NVML library interface used to interact with the NVIDIA devices.
NVML nvml.Interface
}

// BuildCollectors returns a set of collectors that can be used to collect metrics from NVML.
func BuildCollectors(lib nvml.Interface) ([]Collector, error) {
return buildCollectors(lib, allSubsystems)
func BuildCollectors(deps *CollectorDependencies) ([]Collector, error) {
return buildCollectors(deps, allSubsystems)
}

func buildCollectors(lib nvml.Interface, subsystems map[string]subsystemBuilder) ([]Collector, error) {
func buildCollectors(deps *CollectorDependencies, subsystems map[string]subsystemBuilder) ([]Collector, error) {
var collectors []Collector

devCount, ret := lib.DeviceGetCount()
devCount, ret := deps.NVML.DeviceGetCount()
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("failed to get device count: %s", nvml.ErrorString(ret))
}

for i := 0; i < devCount; i++ {
dev, ret := lib.DeviceGetHandleByIndex(i)
dev, ret := deps.NVML.DeviceGetHandleByIndex(i)
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("failed to get device handle for index %d: %s", i, nvml.ErrorString(ret))
}

tags := getTagsFromDevice(dev)
tags, err := getTagsFromDevice(dev, deps.Tagger)
if err != nil {
log.Warnf("failed to get tags for device %s: %s", dev, err)
continue
}

for name, builder := range subsystems {
subsystem, err := builder(lib, dev, tags)
subsystem, err := builder(deps.NVML, dev, tags)
if errors.Is(err, errUnsupportedDevice) {
log.Warnf("device %s does not support collector %s", dev, name)
continue
Expand All @@ -102,22 +111,17 @@ func buildCollectors(lib nvml.Interface, subsystems map[string]subsystemBuilder)
}

// getTagsFromDevice returns the tags associated with the given NVML device.
func getTagsFromDevice(dev nvml.Device) []string {
tags := []string{tagVendor}

func getTagsFromDevice(dev nvml.Device, tagger tagger.Component) ([]string, error) {
uuid, ret := dev.GetUUID()
if ret == nvml.SUCCESS {
tags = append(tags, fmt.Sprintf("%s:%s", tagNameUUID, uuid))
} else {
log.Warnf("failed to get device UUID: %s", nvml.ErrorString(ret))
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("failed to get device UUID: %s", nvml.ErrorString(ret))
}

name, ret := dev.GetName()
if ret == nvml.SUCCESS {
tags = append(tags, fmt.Sprintf("%s:%s", tagNameModel, name))
} else {
log.Warnf("failed to get device name: %s", nvml.ErrorString(ret))
entityID := taggertypes.NewEntityID(taggertypes.GPU, uuid)
tags, err := tagger.Tag(entityID, tagger.ChecksCardinality())
if err != nil {
log.Errorf("Error collecting GPU tags for GPU UUID %s: %s", uuid, err)
gjulianm marked this conversation as resolved.
Show resolved Hide resolved
}

return tags
return tags, nil
}
22 changes: 6 additions & 16 deletions pkg/collector/corechecks/gpu/nvidia/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/NVIDIA/go-nvml/pkg/nvml"
nvmlmock "github.com/NVIDIA/go-nvml/pkg/nvml/mock"
"github.com/stretchr/testify/require"

taggerMock "github.com/DataDog/datadog-agent/comp/core/tagger/mock"
)

func getBasicNvmlDeviceMock() nvml.Device {
Expand Down Expand Up @@ -54,22 +56,10 @@ func TestCollectorsStillInitIfOneFails(t *testing.T) {
return nil, errors.New("failure")
}

collectors, err := buildCollectors(getBasicNvmlMock(), map[string]subsystemBuilder{"ok": factory, "fail": factory})
nvmlMock := getBasicNvmlMock()
fakeTagger := taggerMock.SetupFakeTagger(t)
deps := &CollectorDependencies{NVML: nvmlMock, Tagger: fakeTagger}
collectors, err := buildCollectors(deps, map[string]subsystemBuilder{"ok": factory, "fail": factory})
require.NotNil(t, collectors)
require.NoError(t, err)
}

func TestGetTagsFromDeviceGetsTagsEvenIfOneFails(t *testing.T) {
device := &nvmlmock.Device{
GetUUIDFunc: func() (string, nvml.Return) {
return "GPU-123", nvml.SUCCESS
},
GetNameFunc: func() (string, nvml.Return) {
return "", nvml.ERROR_GPU_IS_LOST
},
}

result := getTagsFromDevice(device)
expected := []string{tagVendor, tagNameUUID + ":GPU-123"}
require.ElementsMatch(t, expected, result)
}
30 changes: 26 additions & 4 deletions test/new-e2e/tests/gpu/gpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
"encoding/json"
"flag"
"fmt"
"slices"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/pkg/util/testutil/flake"
"github.com/DataDog/datadog-agent/test/fakeintake/aggregator"
"github.com/DataDog/datadog-agent/test/fakeintake/client"

"github.com/DataDog/datadog-agent/test/new-e2e/pkg/e2e"
Expand Down Expand Up @@ -121,6 +121,26 @@ func (v *gpuSuite) TestGPUSysprobeEndpointIsResponding() {
}, 2*time.Minute, 10*time.Second)
}

func (v *gpuSuite) requireGPUTags(metric *aggregator.MetricSeries) {
foundRequiredTags := map[string]bool{
"gpu_uuid": false,
"gpu_device": false,
"gpu_vendor": false,
}

for _, tag := range metric.Tags {
for requiredTag := range foundRequiredTags {
if strings.HasPrefix(tag, requiredTag+":") {
foundRequiredTags[requiredTag] = true
}
}
}

for requiredTag, found := range foundRequiredTags {
v.Require().True(found, "required tag %s not found in %v", requiredTag, metric)
}
}

func (v *gpuSuite) TestVectorAddProgramDetected() {
flake.Mark(v.T())

Expand All @@ -136,9 +156,7 @@ func (v *gpuSuite) TestVectorAddProgramDetected() {
assert.Greater(c, len(metrics), 0, "no '%s' with value higher than 0 yet", metricName)

for _, metric := range metrics {
assert.True(c, slices.ContainsFunc(metric.Tags, func(tag string) bool {
return strings.HasPrefix(tag, "gpu_uuid:")
}), "no gpu_uuid tag found in %v", metric)
v.requireGPUTags(metric)
}
}
}, 5*time.Minute, 10*time.Second)
Expand All @@ -155,6 +173,10 @@ func (v *gpuSuite) TestNvmlMetricsPresent() {
metrics, err := v.Env().FakeIntake.Client().FilterMetrics(metricName)
assert.NoError(c, err)
assert.Greater(c, len(metrics), 0, "no metric '%s' found")

for _, metric := range metrics {
v.requireGPUTags(metric)
}
}
}, 5*time.Minute, 10*time.Second)
}
Expand Down
Loading