Skip to content

Commit

Permalink
Add ecs-agent external library
Browse files Browse the repository at this point in the history
  • Loading branch information
mye956 committed Oct 3, 2023
1 parent ca33f56 commit a692764
Show file tree
Hide file tree
Showing 389 changed files with 177,303 additions and 237 deletions.
2 changes: 1 addition & 1 deletion agent/acs/session/payload_responder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ func TestHandlePayloadMessageAddedEBSToTask(t *testing.T) {
Value: aws.String(taskresourcevolume.TestFileSystem),
},
},
AttachmentType: aws.String(apiresource.AmazonElasticBlockStorage),
AttachmentType: aws.String(apiresource.EBSTaskAttach),
},
},
},
Expand Down
4 changes: 2 additions & 2 deletions agent/api/task/task_attachment_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func handleTaskAttachments(acsTask *ecsacs.Task, task *Task) error {
switch aws.StringValue(attachment.AttachmentType) {
case serviceConnectAttachmentType:
serviceConnectAttachment = attachment
case apiresource.AmazonElasticBlockStorage:
case apiresource.EBSTaskAttach:
ebsVolumeAttachments = append(ebsVolumeAttachments, attachment)
default:
logger.Debug("Received an attachment type", logger.Fields{
Expand Down Expand Up @@ -117,7 +117,7 @@ func handleTaskAttachments(acsTask *ecsacs.Task, task *Task) error {
}
taskVolume := TaskVolume{
Name: ebs.VolumeName,
Type: apiresource.AmazonElasticBlockStorage,
Type: apiresource.EBSTaskAttach,
Volume: ebs,
}
task.Volumes = append(task.Volumes, taskVolume)
Expand Down
2 changes: 1 addition & 1 deletion agent/api/task/task_attachment_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestHandleTaskAttachmentWithEBSVolumeAttachment(t *testing.T) {
Value: stringToPointer(tc.testFileSystem),
},
},
AttachmentType: stringToPointer(apiresource.AmazonElasticBlockStorage),
AttachmentType: stringToPointer(apiresource.EBSTaskAttach),
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion agent/api/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4688,7 +4688,7 @@ func TestTaskWithEBSVolumeAttachment(t *testing.T) {
Value: strptr(taskresourcevolume.TestFileSystem),
},
},
AttachmentType: strptr(apiresource.AmazonElasticBlockStorage),
AttachmentType: strptr(apiresource.EBSTaskAttach),
},
},
}
Expand Down
4 changes: 2 additions & 2 deletions agent/api/task/taskvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (tv *TaskVolume) UnmarshalJSON(b []byte) error {
return tv.unmarshalEFSVolume(intermediate["efsVolumeConfiguration"])
case FSxWindowsFileServerVolumeType:
return tv.unmarshalFSxWindowsFileServerVolume(intermediate["fsxWindowsFileServerVolumeConfiguration"])
case apiresource.AmazonElasticBlockStorage:
case apiresource.EBSTaskAttach:
return tv.unmarshalEBSVolume(intermediate["ebsVolumeConfiguration"])
default:
return errors.Errorf("unrecognized volume type: %q", tv.Type)
Expand All @@ -103,7 +103,7 @@ func (tv *TaskVolume) MarshalJSON() ([]byte, error) {
result["efsVolumeConfiguration"] = tv.Volume
case FSxWindowsFileServerVolumeType:
result["fsxWindowsFileServerVolumeConfiguration"] = tv.Volume
case apiresource.AmazonElasticBlockStorage:
case apiresource.EBSTaskAttach:
result["ebsVolumeConfiguration"] = tv.Volume
default:
return nil, errors.Errorf("unrecognized volume type: %q", tv.Type)
Expand Down
8 changes: 4 additions & 4 deletions agent/api/task/taskvolume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestMarshalEBSVolumes(t *testing.T) {
Volumes: []TaskVolume{
{
Name: "1",
Type: apiresource.AmazonElasticBlockStorage,
Type: apiresource.EBSTaskAttach,
Volume: &taskresourcevolume.EBSTaskVolumeConfig{
VolumeId: "vol-12345",
VolumeName: "test-volume",
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestMarshalEBSVolumes(t *testing.T) {
"dockerVolumeName": ""
},
"name": "1",
"type": "AmazonElasticBlockStorage"
"type": "amazonebs"
}
],
"DesiredStatus": "NONE",
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestUnmarshalEBSVolumes(t *testing.T) {
"dockerVolumeName": ""
},
"name": "1",
"type": "AmazonElasticBlockStorage"
"type": "amazonebs"
}
],
"DesiredStatus": "NONE",
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestUnmarshalEBSVolumes(t *testing.T) {
require.NoError(t, err, "Could not unmarshal task")

require.Len(t, task.Volumes, 1)
assert.Equal(t, apiresource.AmazonElasticBlockStorage, task.Volumes[0].Type)
assert.Equal(t, apiresource.EBSTaskAttach, task.Volumes[0].Type)
assert.Equal(t, "1", task.Volumes[0].Name)
ebsConfig, ok := task.Volumes[0].Volume.(*taskresourcevolume.EBSTaskVolumeConfig)
require.True(t, ok)
Expand Down
73 changes: 48 additions & 25 deletions agent/ebs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,41 @@ import (
"fmt"
"time"

ecsengine "github.com/aws/amazon-ecs-agent/agent/engine"
"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
"github.com/aws/amazon-ecs-agent/agent/statechange"
apiebs "github.com/aws/amazon-ecs-agent/ecs-agent/api/resource"
csi "github.com/aws/amazon-ecs-agent/ecs-agent/csiclient"
log "github.com/cihub/seelog"
)

type EBSWatcher struct {
ctx context.Context
cancel context.CancelFunc
agentState dockerstate.TaskEngineState
// TODO: The ebsChangeEvent will be used to send over the state change event for EBS attachments once it's been found and mounted/resize/format.
ebsChangeEvent chan<- statechange.Event
// TODO: The dataClient will be used to save to agent's data client as well as start the ACK timer. This will be added once the data client functionality have been added
// dataClient data.Client
discoveryClient apiebs.EBSDiscovery
csiClient csi.CSIClient
scanTicker *time.Ticker
// TODO: The dockerTaskEngine.stateChangeEvent will be used to send over the state change event for EBS attachments once it's been found and mounted/resize/format.
taskEngine ecsengine.TaskEngine
}

// NewWatcher is used to return a new instance of the EBSWatcher struct
func NewWatcher(ctx context.Context,
state dockerstate.TaskEngineState,
stateChangeEvents chan<- statechange.Event) *EBSWatcher {
taskEngine ecsengine.TaskEngine) *EBSWatcher {
derivedContext, cancel := context.WithCancel(ctx)
discoveryClient := apiebs.NewDiscoveryClient(derivedContext)
// TODO pull this socket out into config
csiClient := csi.NewCSIClient("/var/run/ecs/ebs-csi-driver/csi-driver.sock")
return &EBSWatcher{
ctx: derivedContext,
cancel: cancel,
agentState: state,
ebsChangeEvent: stateChangeEvents,
discoveryClient: discoveryClient,
csiClient: &csiClient,
taskEngine: taskEngine,
}
}

Expand All @@ -63,7 +68,8 @@ func (w *EBSWatcher) Start() {
pendingEBS := w.agentState.GetAllPendingEBSAttachmentsWithKey()
if len(pendingEBS) > 0 {
foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, w.discoveryClient)
w.NotifyFound(foundVolumes)
w.overrideDeviceName(foundVolumes)
w.NotifyAttached(foundVolumes)
}
case <-w.ctx.Done():
w.scanTicker.Stop()
Expand All @@ -79,17 +85,25 @@ func (w *EBSWatcher) Stop() {
w.cancel()
}

func (w *EBSWatcher) HandleResourceAttachment(ebs *apiebs.ResourceAttachment) {
err := w.HandleEBSResourceAttachment(ebs)
if err != nil {
log.Errorf("Unable to handle resource attachment payload %s", err)
}
}

// HandleResourceAttachment processes the resource attachment message. It will:
// 1. Check whether we already have this attachment in state and if so it's a noop.
// 2. Otherwise add the attachment to state, start its ack timer, and save to the agent state.
func (w *EBSWatcher) HandleResourceAttachment(ebs *apiebs.ResourceAttachment) error {
attachmentType := ebs.GetAttachmentProperties(apiebs.ResourceTypeName)
if attachmentType != apiebs.ElasticBlockStorage {
// 2. Start the EBS CSI driver if it's not already running
// 3. Otherwise add the attachment to state, start its ack timer, and save to the agent state.
func (w *EBSWatcher) HandleEBSResourceAttachment(ebs *apiebs.ResourceAttachment) error {
attachmentType := ebs.GetAttachmentType()
if attachmentType != apiebs.EBSTaskAttach {
log.Warnf("Resource type not Elastic Block Storage. Skip handling resource attachment with type: %v.", attachmentType)
return nil
}

volumeId := ebs.GetAttachmentProperties(apiebs.VolumeIdName)
volumeId := ebs.GetAttachmentProperties(apiebs.VolumeIdKey)
ebsAttachment, ok := w.agentState.GetEBSByVolumeId(volumeId)
if ok {
log.Infof("EBS Volume attachment already exists. Skip handling EBS attachment %v.", ebs.EBSToString())
Expand All @@ -106,15 +120,26 @@ func (w *EBSWatcher) HandleResourceAttachment(ebs *apiebs.ResourceAttachment) er
return nil
}

// NotifyFound will go through the list of found EBS volumes from the scanning process and mark them as found.
func (w *EBSWatcher) NotifyFound(foundVolumes []string) {
for _, volumeId := range foundVolumes {
w.notifyFoundEBS(volumeId)
func (w *EBSWatcher) overrideDeviceName(foundVolumes map[string]string) {
for volumeId, deviceName := range foundVolumes {
ebs, ok := w.agentState.GetEBSByVolumeId(volumeId)
if !ok {
log.Warnf("Unable to find EBS volume with volume ID: %s", volumeId)
continue
}
ebs.SetDeviceName(deviceName)
}
}

// NotifyAttached will go through the list of found EBS volumes from the scanning process and mark them as found.
func (w *EBSWatcher) NotifyAttached(foundVolumes map[string]string) {
for volID := range foundVolumes {
w.notifyAttachedEBS(volID)
}
}

// notifyFoundEBS will mark it as found within the agent state
func (w *EBSWatcher) notifyFoundEBS(volumeId string) {
// notifyAttachedEBS will mark it as found within the agent state
func (w *EBSWatcher) notifyAttachedEBS(volumeId string) {
// TODO: Add the EBS volume to data client
ebs, ok := w.agentState.GetEBSByVolumeId(volumeId)
if !ok {
Expand All @@ -132,15 +157,13 @@ func (w *EBSWatcher) notifyFoundEBS(volumeId string) {
return
}

if ebs.IsAttached() {
log.Infof("EBS volume: %v, has been found already.", ebs.EBSToString())
return
}

ebs.SetSentStatus()
log.Infof("We've set sent status for %v", ebs.EBSToString())
ebs.StopAckTimer()
ebs.SetAttachedStatus()

log.Infof("Successfully found attached EBS volume: %v", ebs.EBSToString())

// TODO: Remove this in a future PR with the submit state change
ebs.SetAttachedStatus()
}

// removeEBSAttachment removes a EBS volume with a specific volume ID
Expand All @@ -151,7 +174,7 @@ func (w *EBSWatcher) removeEBSAttachment(volumeID string) {

// addEBSAttachmentToState adds an EBS attachment to state, and start its ack timer
func (w *EBSWatcher) addEBSAttachmentToState(ebs *apiebs.ResourceAttachment) error {
volumeId := ebs.AttachmentProperties[apiebs.VolumeIdName]
volumeId := ebs.AttachmentProperties[apiebs.VolumeIdKey]
err := ebs.StartTimer(func() {
w.handleEBSAckTimeout(volumeId)
})
Expand Down
Loading

0 comments on commit a692764

Please sign in to comment.