Skip to content

Commit

Permalink
Collect volume metrics for EBS-backed tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
mye956 committed Oct 4, 2023
1 parent 8d39bcb commit 21af0e0
Show file tree
Hide file tree
Showing 17 changed files with 386 additions and 16 deletions.
16 changes: 16 additions & 0 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3435,6 +3435,22 @@ func (task *Task) IsServiceConnectEnabled() bool {
// Is EBS Task Attach enabled returns true if this task has EBS volume configuration in its ACS payload.
// TODO as more daemons come online, we'll want a generic handler these bool checks and payload handling
func (task *Task) IsEBSTaskAttachEnabled() bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.isEBSTaskAttachEnabledUnsafe()
}

func (task *Task) isEBSTaskAttachEnabledUnsafe() bool {
logger.Debug("Checking if there are any ebs volume configs")
for _, tv := range task.Volumes {
switch tv.Volume.(type) {
case *taskresourcevolume.EBSTaskVolumeConfig:
logger.Debug("found ebs volume config")
return true
default:
continue
}
}
return false
}

Expand Down
2 changes: 2 additions & 0 deletions agent/ebs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func (w *EBSWatcher) HandleEBSResourceAttachment(ebs *apiebs.ResourceAttachment)
return nil
}

// overrideDeviceName() will replace the device name that we've received from ACS with the actual device name found on the host.
// This is needed for NodeStageVolume and what we received from ACS won't be what we see on the host instance.
func (w *EBSWatcher) overrideDeviceName(foundVolumes map[string]string) {
for volumeId, deviceName := range foundVolumes {
ebs, ok := w.agentState.GetEBSByVolumeId(volumeId)
Expand Down
3 changes: 2 additions & 1 deletion agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,8 @@ func (engine *DockerTaskEngine) AddTask(task *apitask.Task) {
engine.emitTaskEvent(task, err.Error())
return
}
if task.IsEBSTaskAttachEnabled() {
// TODO: This will be fixed in a future PR. For now it will always be false.
if task.IsEBSTaskAttachEnabled() && false {
if csiTask, ok := engine.loadedDaemonTasks["ebs-csi-driver"]; ok {
logger.Info("engine ebs CSI driver is running", logger.Fields{
field.TaskID: csiTask.GetID(),
Expand Down
2 changes: 1 addition & 1 deletion agent/engine/dockerstate/docker_task_engine_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (state *DockerTaskEngineState) AddEBSAttachment(ebsAttachment *apiresource.
}
state.lock.Lock()
defer state.lock.Unlock()
volumeId := ebsAttachment.AttachmentProperties[apiresource.VolumeIdName]
volumeId := ebsAttachment.AttachmentProperties[apiresource.VolumeIdKey]
if _, ok := state.ebsAttachments[volumeId]; !ok {
state.ebsAttachments[volumeId] = ebsAttachment
seelog.Debugf("Successfully added EBS attachment: %v", ebsAttachment.EBSToString())
Expand Down
29 changes: 16 additions & 13 deletions agent/engine/dockerstate/dockerstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import (

var (
testAttachmentProperties = map[string]string{
apiresource.ResourceTypeName: apiresource.ElasticBlockStorage,
apiresource.RequestedSizeName: "5",
apiresource.VolumeSizeInGiBName: "7",
apiresource.DeviceName: "/dev/nvme0n0",
apiresource.VolumeIdName: "vol-123",
apiresource.FileSystemTypeName: "testXFS",
apiresource.VolumeNameKey: "myCoolVolume",
apiresource.SourceVolumeHostPathKey: "/testpath",
apiresource.VolumeSizeGibKey: "7",
apiresource.DeviceNameKey: "/dev/nvme0n0",
apiresource.VolumeIdKey: "vol-123",
apiresource.FileSystemKey: "testXFS",
}
)

Expand Down Expand Up @@ -138,6 +138,7 @@ func TestAddRemoveEBSAttachment(t *testing.T) {
AttachmentARN: "ebs1",
},
AttachmentProperties: testAttachmentProperties,
AttachmentType: apiresource.EBSTaskAttach,
}

state.AddEBSAttachment(attachment)
Expand All @@ -150,7 +151,7 @@ func TestAddRemoveEBSAttachment(t *testing.T) {
assert.False(t, ok)
assert.Nil(t, ebs)

state.RemoveEBSAttachment(attachment.AttachmentProperties[apiresource.VolumeIdName])
state.RemoveEBSAttachment(attachment.AttachmentProperties[apiresource.VolumeIdKey])
assert.Len(t, state.(*DockerTaskEngineState).GetAllEBSAttachments(), 0)
ebs, ok = state.GetEBSByVolumeId("vol-123")
assert.False(t, ok)
Expand All @@ -168,15 +169,16 @@ func TestAddPendingEBSAttachment(t *testing.T) {
Status: status.AttachmentNone,
},
AttachmentProperties: testAttachmentProperties,
AttachmentType: apiresource.EBSTaskAttach,
}

testSentAttachmentProperties := map[string]string{
apiresource.ResourceTypeName: apiresource.ElasticBlockStorage,
apiresource.RequestedSizeName: "3",
apiresource.VolumeSizeInGiBName: "9",
apiresource.DeviceName: "/dev/nvme1n0",
apiresource.VolumeIdName: "vol-456",
apiresource.FileSystemTypeName: "testXFS2",
apiresource.VolumeNameKey: "myCoolVolume",
apiresource.SourceVolumeHostPathKey: "/testpath2",
apiresource.VolumeSizeGibKey: "7",
apiresource.DeviceNameKey: "/dev/nvme1n0",
apiresource.VolumeIdKey: "vol-456",
apiresource.FileSystemKey: "testXFS",
}

foundAttachment := &apiresource.ResourceAttachment{
Expand All @@ -187,6 +189,7 @@ func TestAddPendingEBSAttachment(t *testing.T) {
Status: status.AttachmentAttached,
},
AttachmentProperties: testSentAttachmentProperties,
AttachmentType: apiresource.EBSTaskAttach,
}

state.AddEBSAttachment(pendingAttachment)
Expand Down
44 changes: 44 additions & 0 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"path/filepath"
"strconv"
"strings"
"sync"
Expand All @@ -34,10 +35,12 @@ import (
"github.com/aws/amazon-ecs-agent/agent/statechange"
"github.com/aws/amazon-ecs-agent/agent/taskresource"
resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status"
taskresourcevolume "github.com/aws/amazon-ecs-agent/agent/taskresource/volume"
apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status"
apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors"
apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
"github.com/aws/amazon-ecs-agent/ecs-agent/csiclient"
"github.com/aws/amazon-ecs-agent/ecs-agent/eventstream"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
Expand Down Expand Up @@ -252,6 +255,12 @@ func (mtask *managedTask) overseeTask() {
mtask.engine.wakeUpTaskQueueMonitor()
// TODO: make this idempotent on agent restart
go mtask.releaseIPInIPAM()

err := mtask.UnstageVolumes()
if err != nil {
logger.Error(fmt.Sprintf("Unable to unstage volumes: %v", err))
}

mtask.cleanupTask(retry.AddJitter(mtask.cfg.TaskCleanupWaitDuration, mtask.cfg.TaskCleanupWaitDurationJitter))
}

Expand Down Expand Up @@ -1559,3 +1568,38 @@ func (mtask *managedTask) waitForStopReported() bool {
}
return taskStopped
}

func (mtask *managedTask) UnstageVolumes() error {
task := mtask.Task
if task == nil {
return fmt.Errorf("task not is not managed")
}
if !task.IsEBSTaskAttachEnabled() {
logger.Debug("Task is not EBS-backed. Skip NodeUnstageVolume.")
return nil
}
csiClient := csiclient.NewCSIClient(filepath.Join(csiclient.SocketHostPath, csiclient.ImageName, csiclient.SocketName))
for _, tv := range task.Volumes {
switch tv.Volume.(type) {
case *taskresourcevolume.EBSTaskVolumeConfig:
ebsCfg := tv.Volume.(*taskresourcevolume.EBSTaskVolumeConfig)
volumeId := ebsCfg.VolumeId
hostPath := ebsCfg.Source()
err := mtask.unstageVolumeWithTimeout(&csiClient, volumeId, hostPath)
if err != nil {
logger.Error("Unable to unstage volume", logger.Fields{
"Task": task.String(),
"Error": err,
})
return err
}
}
}
return nil
}

func (mtask *managedTask) unstageVolumeWithTimeout(csiClient csiclient.CSIClient, volumeId, hostPath string) error {
derivedCtx, cancel := context.WithTimeout(mtask.ctx, time.Second*3)
defer cancel()
return csiClient.NodeUnstageVolume(derivedCtx, volumeId, hostPath)
}
6 changes: 6 additions & 0 deletions agent/stats/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
ecsengine "github.com/aws/amazon-ecs-agent/agent/engine"
"github.com/aws/amazon-ecs-agent/agent/stats/resolver"
apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/csiclient"
"github.com/aws/amazon-ecs-agent/ecs-agent/eventstream"
"github.com/aws/amazon-ecs-agent/ecs-agent/stats"
"github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs"
Expand Down Expand Up @@ -112,6 +113,8 @@ type DockerStatsEngine struct {
// channels to send metrics to TACS Client
metricsChannel chan<- ecstcs.TelemetryMessage
healthChannel chan<- ecstcs.HealthMessage

csiClient csiclient.CSIClient
}

// ResolveTask resolves the api task object, given container id.
Expand Down Expand Up @@ -572,12 +575,15 @@ func (engine *DockerStatsEngine) GetInstanceMetrics(includeServiceConnectStats b
continue
}

volMetrics := engine.getEBSVolumeMetrics(taskArn)

metricTaskArn := taskArn
taskMetric := &ecstcs.TaskMetric{
TaskArn: &metricTaskArn,
TaskDefinitionFamily: &taskDef.family,
TaskDefinitionVersion: &taskDef.version,
ContainerMetrics: containerMetrics,
VolumeMetrics: volMetrics,
}

if includeServiceConnectStats {
Expand Down
5 changes: 5 additions & 0 deletions agent/stats/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
mock_resolver "github.com/aws/amazon-ecs-agent/agent/stats/resolver/mock"
apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status"
apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/csiclient"
ni "github.com/aws/amazon-ecs-agent/ecs-agent/netlib/model/networkinterface"
"github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs"
"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -62,6 +63,9 @@ func TestStatsEngineAddRemoveContainers(t *testing.T) {
resolver.EXPECT().ResolveTask("c1").AnyTimes().Return(t1, nil)
resolver.EXPECT().ResolveTask("c2").AnyTimes().Return(t1, nil)
resolver.EXPECT().ResolveTask("c3").AnyTimes().Return(t2, nil)
resolver.EXPECT().ResolveTaskByARN("t1").AnyTimes().Return(t1, nil)
resolver.EXPECT().ResolveTaskByARN("t2").AnyTimes().Return(t2, nil)
resolver.EXPECT().ResolveTaskByARN("t3").AnyTimes().Return(t3, nil)
resolver.EXPECT().ResolveTask("c4").AnyTimes().Return(nil, fmt.Errorf("unmapped container"))
resolver.EXPECT().ResolveTask("c5").AnyTimes().Return(t2, nil)
resolver.EXPECT().ResolveTask("c6").AnyTimes().Return(t3, nil)
Expand All @@ -82,6 +86,7 @@ func TestStatsEngineAddRemoveContainers(t *testing.T) {
engine.client = mockDockerClient
engine.cluster = defaultCluster
engine.containerInstanceArn = defaultContainerInstance
engine.csiClient = csiclient.NewDummyCSIClient()
defer engine.removeAll()

engine.addAndStartStatsContainer("c1")
Expand Down
104 changes: 104 additions & 0 deletions agent/stats/engine_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//go:build linux
// +build linux

// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package stats

import (
"context"
"fmt"
"path/filepath"
"time"

apitask "github.com/aws/amazon-ecs-agent/agent/api/task"
taskresourcevolume "github.com/aws/amazon-ecs-agent/agent/taskresource/volume"
"github.com/aws/amazon-ecs-agent/ecs-agent/csiclient"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs"

"github.com/aws/aws-sdk-go/aws"
)

func (engine *DockerStatsEngine) getEBSVolumeMetrics(taskArn string) []*ecstcs.VolumeMetric {
task, err := engine.resolver.ResolveTaskByARN(taskArn)
if err != nil {
logger.Error(fmt.Sprintf("Unable to get corresponding task from dd with task arn: %s", taskArn))
return nil
}

if !task.IsEBSTaskAttachEnabled() {
logger.Debug("Task not EBS-backed, skip gathering EBS volume metrics.", logger.Fields{
"taskArn": taskArn,
})
return nil
}

if engine.csiClient == nil {
client := csiclient.NewCSIClient(filepath.Join(csiclient.SocketHostPath, csiclient.ImageName, csiclient.SocketName))
engine.csiClient = &client
}
return engine.fetchEBSVolumeMetrics(task, taskArn)
}

func (engine *DockerStatsEngine) fetchEBSVolumeMetrics(task *apitask.Task, taskArn string) []*ecstcs.VolumeMetric {
var metrics []*ecstcs.VolumeMetric
for _, tv := range task.Volumes {
// TODO: Include Getters within the TaskVolume interface so that we don't need to have these type casts.
// (i.e. getVolumeId())
switch tv.Volume.(type) {
case *taskresourcevolume.EBSTaskVolumeConfig:
ebsCfg := tv.Volume.(*taskresourcevolume.EBSTaskVolumeConfig)
volumeId := ebsCfg.VolumeId
hostPath := ebsCfg.Source()
metric, err := engine.getVolumeMetricsWithTimeout(volumeId, hostPath)
if err != nil {
logger.Error("Failed to gather metrics for EBS volume", logger.Fields{
"VolumeId": volumeId,
"SourceVolumeHostPath": hostPath,
"Error": err,
})
continue
}
usedBytes := aws.Float64((float64)(metric.Used))
totalBytes := aws.Float64((float64)(metric.Capacity))
metrics = append(metrics, &ecstcs.VolumeMetric{
VolumeId: aws.String(volumeId),
VolumeName: aws.String(ebsCfg.VolumeName),
Utilized: &ecstcs.UDoubleCWStatsSet{
Max: usedBytes,
Min: usedBytes,
SampleCount: aws.Int64(1),
Sum: usedBytes,
},
Size: &ecstcs.UDoubleCWStatsSet{
Max: totalBytes,
Min: totalBytes,
SampleCount: aws.Int64(1),
Sum: totalBytes,
},
})
default:
continue
}
}
return metrics
}

func (engine *DockerStatsEngine) getVolumeMetricsWithTimeout(volumeId, hostPath string) (*csiclient.Metrics, error) {
derivedCtx, cancel := context.WithTimeout(engine.ctx, time.Second*1)
// releases resources if GetVolumeMetrics finishes before timeout
defer cancel()
return engine.csiClient.GetVolumeMetrics(derivedCtx, volumeId, hostPath)
}
Loading

0 comments on commit 21af0e0

Please sign in to comment.