Skip to content

Commit

Permalink
Collect volume metrics for EBS-backed tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
mye956 committed Oct 7, 2023
1 parent 35e8100 commit db47259
Show file tree
Hide file tree
Showing 15 changed files with 372 additions and 24 deletions.
69 changes: 69 additions & 0 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs"
apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status"
apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors"
apiresource "github.com/aws/amazon-ecs-agent/ecs-agent/api/resource"
apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
"github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs"
Expand Down Expand Up @@ -331,7 +332,37 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task,
return task, nil
}

// TODO: Add unit test
func (task *Task) RemoveVolume(index int) {
task.lock.Lock()
defer task.lock.Unlock()
task.removeVolumeUnsafe(index)
}

func (task *Task) removeVolumeUnsafe(index int) {
if index < 0 || index >= len(task.Volumes) {
return
}
// temp := task.Volumes[:1]
out := make([]TaskVolume, 0)
out = append(out, task.Volumes[:index]...)
out = append(out, task.Volumes[index+1:]...)
task.Volumes = out
}

func (task *Task) initializeVolumes(cfg *config.Config, dockerClient dockerapi.DockerClient, ctx context.Context) error {
// TODO: Have EBS volumes use the DockerVolumeConfig to create the mountpoint
if task.IsEBSTaskAttachEnabled() {
ebsVolumes := task.GetEBSVolumeNames()
for index, tv := range task.Volumes {
volumeName := tv.Name
volumeType := tv.Type
if ebsVolumes[volumeName] && volumeType != apiresource.EBSTaskAttach {
task.RemoveVolume(index)
}
}
}

err := task.initializeDockerLocalVolumes(dockerClient, ctx)
if err != nil {
return apierrors.NewResourceInitError(task.Arn, err)
Expand Down Expand Up @@ -3435,9 +3466,47 @@ func (task *Task) IsServiceConnectEnabled() bool {
// Is EBS Task Attach enabled returns true if this task has EBS volume configuration in its ACS payload.
// TODO as more daemons come online, we'll want a generic handler these bool checks and payload handling
func (task *Task) IsEBSTaskAttachEnabled() bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.isEBSTaskAttachEnabledUnsafe()
}

func (task *Task) isEBSTaskAttachEnabledUnsafe() bool {
logger.Debug("Checking if there are any ebs volume configs")
for _, tv := range task.Volumes {
switch tv.Volume.(type) {
case *taskresourcevolume.EBSTaskVolumeConfig:
logger.Debug("found ebs volume config")
return true
default:
continue
}
}
return false
}

// TODO: Add unit tests
func (task *Task) GetEBSVolumeNames() map[string]bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.getEBSVolumeNamesUnsafe()
}

func (task *Task) getEBSVolumeNamesUnsafe() map[string]bool {
volNames := map[string]bool{}
for _, tv := range task.Volumes {
switch tv.Volume.(type) {
case *taskresourcevolume.EBSTaskVolumeConfig:
logger.Debug("found ebs volume config")
ebsCfg := tv.Volume.(*taskresourcevolume.EBSTaskVolumeConfig)
volNames[ebsCfg.VolumeName] = true
default:
continue
}
}
return volNames
}

func (task *Task) IsServiceConnectBridgeModeApplicationContainer(container *apicontainer.Container) bool {
return container.GetNetworkModeFromHostConfig() == "container" && task.IsServiceConnectEnabled()
}
Expand Down
7 changes: 6 additions & 1 deletion agent/ebs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func (w *EBSWatcher) HandleEBSResourceAttachment(ebs *apiebs.ResourceAttachment)
return nil
}

// overrideDeviceName() will replace the device name that we've received from ACS with the actual device name found on the host.
// This is needed for NodeStageVolume and what we received from ACS won't be what we see on the host instance.
func (w *EBSWatcher) overrideDeviceName(foundVolumes map[string]string) {
for volumeId, deviceName := range foundVolumes {
ebs, ok := w.agentState.GetEBSByVolumeId(volumeId)
Expand All @@ -175,7 +177,10 @@ func (w *EBSWatcher) overrideDeviceName(foundVolumes map[string]string) {
func (w *EBSWatcher) StageAll(foundVolumes map[string]string) error {
for volID, deviceName := range foundVolumes {
// get volume details from attachment
ebsAttachment, _ := w.agentState.GetEBSByVolumeId(volID)
ebsAttachment, ok := w.agentState.GetEBSByVolumeId(volID)
if !ok {
continue
}
if ebsAttachment.IsSent() {
log.Debugf("State change event has already been emitted for EBS volume: %v.", ebsAttachment.EBSToString())
continue
Expand Down
2 changes: 1 addition & 1 deletion agent/engine/dockerstate/docker_task_engine_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (state *DockerTaskEngineState) AddEBSAttachment(ebsAttachment *apiresource.
}
state.lock.Lock()
defer state.lock.Unlock()
volumeId := ebsAttachment.AttachmentProperties[apiresource.VolumeIdName]
volumeId := ebsAttachment.AttachmentProperties[apiresource.VolumeIdKey]
if _, ok := state.ebsAttachments[volumeId]; !ok {
state.ebsAttachments[volumeId] = ebsAttachment
seelog.Debugf("Successfully added EBS attachment: %v", ebsAttachment.EBSToString())
Expand Down
29 changes: 16 additions & 13 deletions agent/engine/dockerstate/dockerstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import (

var (
testAttachmentProperties = map[string]string{
apiresource.ResourceTypeName: apiresource.ElasticBlockStorage,
apiresource.RequestedSizeName: "5",
apiresource.VolumeSizeInGiBName: "7",
apiresource.DeviceName: "/dev/nvme0n0",
apiresource.VolumeIdName: "vol-123",
apiresource.FileSystemTypeName: "testXFS",
apiresource.VolumeNameKey: "myCoolVolume",
apiresource.SourceVolumeHostPathKey: "/testpath",
apiresource.VolumeSizeGibKey: "7",
apiresource.DeviceNameKey: "/dev/nvme0n0",
apiresource.VolumeIdKey: "vol-123",
apiresource.FileSystemKey: "testXFS",
}
)

Expand Down Expand Up @@ -138,6 +138,7 @@ func TestAddRemoveEBSAttachment(t *testing.T) {
AttachmentARN: "ebs1",
},
AttachmentProperties: testAttachmentProperties,
AttachmentType: apiresource.EBSTaskAttach,
}

state.AddEBSAttachment(attachment)
Expand All @@ -150,7 +151,7 @@ func TestAddRemoveEBSAttachment(t *testing.T) {
assert.False(t, ok)
assert.Nil(t, ebs)

state.RemoveEBSAttachment(attachment.AttachmentProperties[apiresource.VolumeIdName])
state.RemoveEBSAttachment(attachment.AttachmentProperties[apiresource.VolumeIdKey])
assert.Len(t, state.(*DockerTaskEngineState).GetAllEBSAttachments(), 0)
ebs, ok = state.GetEBSByVolumeId("vol-123")
assert.False(t, ok)
Expand All @@ -168,15 +169,16 @@ func TestAddPendingEBSAttachment(t *testing.T) {
Status: status.AttachmentNone,
},
AttachmentProperties: testAttachmentProperties,
AttachmentType: apiresource.EBSTaskAttach,
}

testSentAttachmentProperties := map[string]string{
apiresource.ResourceTypeName: apiresource.ElasticBlockStorage,
apiresource.RequestedSizeName: "3",
apiresource.VolumeSizeInGiBName: "9",
apiresource.DeviceName: "/dev/nvme1n0",
apiresource.VolumeIdName: "vol-456",
apiresource.FileSystemTypeName: "testXFS2",
apiresource.VolumeNameKey: "myCoolVolume",
apiresource.SourceVolumeHostPathKey: "/testpath2",
apiresource.VolumeSizeGibKey: "7",
apiresource.DeviceNameKey: "/dev/nvme1n0",
apiresource.VolumeIdKey: "vol-456",
apiresource.FileSystemKey: "testXFS",
}

foundAttachment := &apiresource.ResourceAttachment{
Expand All @@ -187,6 +189,7 @@ func TestAddPendingEBSAttachment(t *testing.T) {
Status: status.AttachmentAttached,
},
AttachmentProperties: testSentAttachmentProperties,
AttachmentType: apiresource.EBSTaskAttach,
}

state.AddEBSAttachment(pendingAttachment)
Expand Down
48 changes: 48 additions & 0 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"path/filepath"
"strconv"
"strings"
"sync"
Expand All @@ -34,10 +35,12 @@ import (
"github.com/aws/amazon-ecs-agent/agent/statechange"
"github.com/aws/amazon-ecs-agent/agent/taskresource"
resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status"
taskresourcevolume "github.com/aws/amazon-ecs-agent/agent/taskresource/volume"
apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status"
apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors"
apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
"github.com/aws/amazon-ecs-agent/ecs-agent/csiclient"
"github.com/aws/amazon-ecs-agent/ecs-agent/eventstream"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
Expand All @@ -57,6 +60,7 @@ const (
stoppedSentWaitInterval = 30 * time.Second
maxStoppedWaitTimes = 72 * time.Hour / stoppedSentWaitInterval
taskUnableToTransitionToStoppedReason = "TaskStateError: Agent could not progress task's state to stopped"
unstageVolumeTimeout = 3 * time.Second
)

var (
Expand Down Expand Up @@ -252,6 +256,15 @@ func (mtask *managedTask) overseeTask() {
mtask.engine.wakeUpTaskQueueMonitor()
// TODO: make this idempotent on agent restart
go mtask.releaseIPInIPAM()

if mtask.Task.IsEBSTaskAttachEnabled() {
csiClient := csiclient.NewCSIClient(filepath.Join(csiclient.DefaultSocketHostPath, csiclient.DefaultImageName, csiclient.DefaultSocketName))
err := mtask.UnstageVolumes(&csiClient)
if err != nil {
logger.Error(fmt.Sprintf("Unable to unstage volumes: %v", err))
}
}

mtask.cleanupTask(retry.AddJitter(mtask.cfg.TaskCleanupWaitDuration, mtask.cfg.TaskCleanupWaitDurationJitter))
}

Expand Down Expand Up @@ -1559,3 +1572,38 @@ func (mtask *managedTask) waitForStopReported() bool {
}
return taskStopped
}

// TODO: Add unit test for UnstageVolumes in the near future
func (mtask *managedTask) UnstageVolumes(csiClient csiclient.CSIClient) error {
task := mtask.Task
if task == nil {
return fmt.Errorf("managed task is nil")
}
if !task.IsEBSTaskAttachEnabled() {
logger.Debug("Task is not EBS-backed. Skip NodeUnstageVolume.")
return nil
}
for _, tv := range task.Volumes {
switch tv.Volume.(type) {
case *taskresourcevolume.EBSTaskVolumeConfig:
ebsCfg := tv.Volume.(*taskresourcevolume.EBSTaskVolumeConfig)
volumeId := ebsCfg.VolumeId
hostPath := ebsCfg.Source()
err := mtask.unstageVolumeWithTimeout(csiClient, volumeId, hostPath)
if err != nil {
logger.Error("Unable to unstage volume", logger.Fields{
"Task": task.String(),
"Error": err,
})
continue
}
}
}
return nil
}

func (mtask *managedTask) unstageVolumeWithTimeout(csiClient csiclient.CSIClient, volumeId, hostPath string) error {
derivedCtx, cancel := context.WithTimeout(mtask.ctx, unstageVolumeTimeout)
defer cancel()
return csiClient.NodeUnstageVolume(derivedCtx, volumeId, hostPath)
}
6 changes: 6 additions & 0 deletions agent/stats/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
ecsengine "github.com/aws/amazon-ecs-agent/agent/engine"
"github.com/aws/amazon-ecs-agent/agent/stats/resolver"
apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/csiclient"
"github.com/aws/amazon-ecs-agent/ecs-agent/eventstream"
"github.com/aws/amazon-ecs-agent/ecs-agent/stats"
"github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs"
Expand Down Expand Up @@ -112,6 +113,8 @@ type DockerStatsEngine struct {
// channels to send metrics to TACS Client
metricsChannel chan<- ecstcs.TelemetryMessage
healthChannel chan<- ecstcs.HealthMessage

csiClient csiclient.CSIClient
}

// ResolveTask resolves the api task object, given container id.
Expand Down Expand Up @@ -572,12 +575,15 @@ func (engine *DockerStatsEngine) GetInstanceMetrics(includeServiceConnectStats b
continue
}

volMetrics := engine.getEBSVolumeMetrics(taskArn)

metricTaskArn := taskArn
taskMetric := &ecstcs.TaskMetric{
TaskArn: &metricTaskArn,
TaskDefinitionFamily: &taskDef.family,
TaskDefinitionVersion: &taskDef.version,
ContainerMetrics: containerMetrics,
VolumeMetrics: volMetrics,
}

if includeServiceConnectStats {
Expand Down
5 changes: 5 additions & 0 deletions agent/stats/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
mock_resolver "github.com/aws/amazon-ecs-agent/agent/stats/resolver/mock"
apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status"
apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/csiclient"
ni "github.com/aws/amazon-ecs-agent/ecs-agent/netlib/model/networkinterface"
"github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs"
"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -62,6 +63,9 @@ func TestStatsEngineAddRemoveContainers(t *testing.T) {
resolver.EXPECT().ResolveTask("c1").AnyTimes().Return(t1, nil)
resolver.EXPECT().ResolveTask("c2").AnyTimes().Return(t1, nil)
resolver.EXPECT().ResolveTask("c3").AnyTimes().Return(t2, nil)
resolver.EXPECT().ResolveTaskByARN("t1").AnyTimes().Return(t1, nil)
resolver.EXPECT().ResolveTaskByARN("t2").AnyTimes().Return(t2, nil)
resolver.EXPECT().ResolveTaskByARN("t3").AnyTimes().Return(t3, nil)
resolver.EXPECT().ResolveTask("c4").AnyTimes().Return(nil, fmt.Errorf("unmapped container"))
resolver.EXPECT().ResolveTask("c5").AnyTimes().Return(t2, nil)
resolver.EXPECT().ResolveTask("c6").AnyTimes().Return(t3, nil)
Expand All @@ -82,6 +86,7 @@ func TestStatsEngineAddRemoveContainers(t *testing.T) {
engine.client = mockDockerClient
engine.cluster = defaultCluster
engine.containerInstanceArn = defaultContainerInstance
engine.csiClient = csiclient.NewDummyCSIClient()
defer engine.removeAll()

engine.addAndStartStatsContainer("c1")
Expand Down
Loading

0 comments on commit db47259

Please sign in to comment.