Skip to content

Commit

Permalink
Collect volume metrics for EBS-backed tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
mye956 committed Sep 28, 2023
1 parent 70d53be commit 362a600
Show file tree
Hide file tree
Showing 393 changed files with 177,417 additions and 204 deletions.
2 changes: 1 addition & 1 deletion agent/acs/session/payload_responder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ func TestHandlePayloadMessageAddedEBSToTask(t *testing.T) {
Value: aws.String(taskresourcevolume.TestFileSystem),
},
},
AttachmentType: aws.String(apiresource.AmazonElasticBlockStorage),
AttachmentType: aws.String(apiresource.EBSTaskAttach),
},
},
},
Expand Down
16 changes: 16 additions & 0 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3435,6 +3435,22 @@ func (task *Task) IsServiceConnectEnabled() bool {
// Is EBS Task Attach enabled returns true if this task has EBS volume configuration in its ACS payload.
// TODO as more daemons come online, we'll want a generic handler these bool checks and payload handling
func (task *Task) IsEBSTaskAttachEnabled() bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.isEBSTaskAttachEnabledUnsafe()
}

func (task *Task) isEBSTaskAttachEnabledUnsafe() bool {
logger.Debug("Checking if there are any ebs volume configs")
for _, tv := range task.Volumes {
switch tv.Volume.(type) {
case *taskresourcevolume.EBSTaskVolumeConfig:
logger.Debug("found ebs volume config")
return true
default:
continue
}
}
return false
}

Expand Down
4 changes: 2 additions & 2 deletions agent/api/task/task_attachment_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func handleTaskAttachments(acsTask *ecsacs.Task, task *Task) error {
switch aws.StringValue(attachment.AttachmentType) {
case serviceConnectAttachmentType:
serviceConnectAttachment = attachment
case apiresource.AmazonElasticBlockStorage:
case apiresource.EBSTaskAttach:
ebsVolumeAttachments = append(ebsVolumeAttachments, attachment)
default:
logger.Debug("Received an attachment type", logger.Fields{
Expand Down Expand Up @@ -117,7 +117,7 @@ func handleTaskAttachments(acsTask *ecsacs.Task, task *Task) error {
}
taskVolume := TaskVolume{
Name: ebs.VolumeName,
Type: apiresource.AmazonElasticBlockStorage,
Type: apiresource.EBSTaskAttach,
Volume: ebs,
}
task.Volumes = append(task.Volumes, taskVolume)
Expand Down
2 changes: 1 addition & 1 deletion agent/api/task/task_attachment_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestHandleTaskAttachmentWithEBSVolumeAttachment(t *testing.T) {
Value: stringToPointer(tc.testFileSystem),
},
},
AttachmentType: stringToPointer(apiresource.AmazonElasticBlockStorage),
AttachmentType: stringToPointer(apiresource.EBSTaskAttach),
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion agent/api/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4688,7 +4688,7 @@ func TestTaskWithEBSVolumeAttachment(t *testing.T) {
Value: strptr(taskresourcevolume.TestFileSystem),
},
},
AttachmentType: strptr(apiresource.AmazonElasticBlockStorage),
AttachmentType: strptr(apiresource.EBSTaskAttach),
},
},
}
Expand Down
4 changes: 2 additions & 2 deletions agent/api/task/taskvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (tv *TaskVolume) UnmarshalJSON(b []byte) error {
return tv.unmarshalEFSVolume(intermediate["efsVolumeConfiguration"])
case FSxWindowsFileServerVolumeType:
return tv.unmarshalFSxWindowsFileServerVolume(intermediate["fsxWindowsFileServerVolumeConfiguration"])
case apiresource.AmazonElasticBlockStorage:
case apiresource.EBSTaskAttach:
return tv.unmarshalEBSVolume(intermediate["ebsVolumeConfiguration"])
default:
return errors.Errorf("unrecognized volume type: %q", tv.Type)
Expand All @@ -103,7 +103,7 @@ func (tv *TaskVolume) MarshalJSON() ([]byte, error) {
result["efsVolumeConfiguration"] = tv.Volume
case FSxWindowsFileServerVolumeType:
result["fsxWindowsFileServerVolumeConfiguration"] = tv.Volume
case apiresource.AmazonElasticBlockStorage:
case apiresource.EBSTaskAttach:
result["ebsVolumeConfiguration"] = tv.Volume
default:
return nil, errors.Errorf("unrecognized volume type: %q", tv.Type)
Expand Down
8 changes: 4 additions & 4 deletions agent/api/task/taskvolume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestMarshalEBSVolumes(t *testing.T) {
Volumes: []TaskVolume{
{
Name: "1",
Type: apiresource.AmazonElasticBlockStorage,
Type: apiresource.EBSTaskAttach,
Volume: &taskresourcevolume.EBSTaskVolumeConfig{
VolumeId: "vol-12345",
VolumeName: "test-volume",
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestMarshalEBSVolumes(t *testing.T) {
"dockerVolumeName": ""
},
"name": "1",
"type": "AmazonElasticBlockStorage"
"type": "amazonebs"
}
],
"DesiredStatus": "NONE",
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestUnmarshalEBSVolumes(t *testing.T) {
"dockerVolumeName": ""
},
"name": "1",
"type": "AmazonElasticBlockStorage"
"type": "amazonebs"
}
],
"DesiredStatus": "NONE",
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestUnmarshalEBSVolumes(t *testing.T) {
require.NoError(t, err, "Could not unmarshal task")

require.Len(t, task.Volumes, 1)
assert.Equal(t, apiresource.AmazonElasticBlockStorage, task.Volumes[0].Type)
assert.Equal(t, apiresource.EBSTaskAttach, task.Volumes[0].Type)
assert.Equal(t, "1", task.Volumes[0].Name)
ebsConfig, ok := task.Volumes[0].Volume.(*taskresourcevolume.EBSTaskVolumeConfig)
require.True(t, ok)
Expand Down
24 changes: 18 additions & 6 deletions agent/ebs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (w *EBSWatcher) Start() {
pendingEBS := w.agentState.GetAllPendingEBSAttachmentsWithKey()
if len(pendingEBS) > 0 {
foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, w.discoveryClient)
w.overrideDeviceName(foundVolumes)
w.NotifyFound(foundVolumes)
}
case <-w.ctx.Done():
Expand All @@ -83,13 +84,13 @@ func (w *EBSWatcher) Stop() {
// 1. Check whether we already have this attachment in state and if so it's a noop.
// 2. Otherwise add the attachment to state, start its ack timer, and save to the agent state.
func (w *EBSWatcher) HandleResourceAttachment(ebs *apiebs.ResourceAttachment) error {
attachmentType := ebs.GetAttachmentProperties(apiebs.ResourceTypeName)
if attachmentType != apiebs.ElasticBlockStorage {
attachmentType := ebs.GetAttachmentType()
if attachmentType != apiebs.EBSTaskAttach {
log.Warnf("Resource type not Elastic Block Storage. Skip handling resource attachment with type: %v.", attachmentType)
return nil
}

volumeId := ebs.GetAttachmentProperties(apiebs.VolumeIdName)
volumeId := ebs.GetAttachmentProperties(apiebs.VolumeIdKey)
ebsAttachment, ok := w.agentState.GetEBSByVolumeId(volumeId)
if ok {
log.Infof("EBS Volume attachment already exists. Skip handling EBS attachment %v.", ebs.EBSToString())
Expand All @@ -106,9 +107,20 @@ func (w *EBSWatcher) HandleResourceAttachment(ebs *apiebs.ResourceAttachment) er
return nil
}

func (w *EBSWatcher) overrideDeviceName(foundVolumes map[string]string) {
for volumeId, deviceName := range foundVolumes {
ebs, ok := w.agentState.GetEBSByVolumeId(volumeId)
if !ok {
log.Warnf("Unable to find EBS volume with volume ID: %s", volumeId)
continue
}
ebs.SetDeviceName(deviceName)
}
}

// NotifyFound will go through the list of found EBS volumes from the scanning process and mark them as found.
func (w *EBSWatcher) NotifyFound(foundVolumes []string) {
for _, volumeId := range foundVolumes {
func (w *EBSWatcher) NotifyFound(foundVolumes map[string]string) {
for volumeId := range foundVolumes {
w.notifyFoundEBS(volumeId)
}
}
Expand Down Expand Up @@ -151,7 +163,7 @@ func (w *EBSWatcher) removeEBSAttachment(volumeID string) {

// addEBSAttachmentToState adds an EBS attachment to state, and start its ack timer
func (w *EBSWatcher) addEBSAttachmentToState(ebs *apiebs.ResourceAttachment) error {
volumeId := ebs.AttachmentProperties[apiebs.VolumeIdName]
volumeId := ebs.AttachmentProperties[apiebs.VolumeIdKey]
err := ebs.StartTimer(func() {
w.handleEBSAckTimeout(volumeId)
})
Expand Down
Loading

0 comments on commit 362a600

Please sign in to comment.