diff --git a/agent/app/agent.go b/agent/app/agent.go index a232fc7088d..11ef156952f 100644 --- a/agent/app/agent.go +++ b/agent/app/agent.go @@ -1041,7 +1041,7 @@ func (agent *ecsAgent) startACSSession( taskComparer, sequenceNumberAccessor, taskStopper, - nil, + agent.ebsWatcher, updater.NewUpdater(agent.cfg, state, agent.dataClient, taskEngine).AddAgentUpdateHandlers, ) logger.Info("Beginning Polling for updates") diff --git a/agent/ebs/watcher.go b/agent/ebs/watcher.go index 8fd2a0d9caf..cfa4e154536 100644 --- a/agent/ebs/watcher.go +++ b/agent/ebs/watcher.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "path/filepath" "strconv" "time" @@ -36,6 +37,7 @@ import ( const ( nodeStageTimeout = 2 * time.Second + hostMountDir = "/mnt/ecs/ebs" ) type EBSWatcher struct { @@ -82,11 +84,7 @@ func (w *EBSWatcher) Start() { if len(pendingEBS) > 0 { foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, w.discoveryClient) w.overrideDeviceName(foundVolumes) - if err := w.StageAll(foundVolumes); err != nil { - log.Errorf("stage error: %s", err) - continue - } - // TODO only notify attached for volumes that are successfully staged + w.StageAll(foundVolumes) w.NotifyAttached(foundVolumes) } case <-w.ctx.Done(): @@ -174,95 +172,87 @@ func (w *EBSWatcher) overrideDeviceName(foundVolumes map[string]string) { } // assumes CSI Driver Managed Daemon is running else call will timeout -func (w *EBSWatcher) StageAll(foundVolumes map[string]string) error { - for volID, deviceName := range foundVolumes { - // get volume details from attachment - ebsAttachment, ok := w.agentState.GetEBSByVolumeId(volID) - if !ok { - continue - } - if ebsAttachment.IsSent() { - log.Debugf("State change event has already been emitted for EBS volume: %v.", ebsAttachment.EBSToString()) - continue - } - if ebsAttachment.HasExpired() { - log.Debugf("EBS status expired, no longer tracking EBS volume: %v.", ebsAttachment.EBSToString()) - continue - } - if ebsAttachment.IsAttached() { - log.Debugf("EBS status is already attached, skipping: %v.", ebsAttachment.EBSToString()) - continue +func (w *EBSWatcher) StageAll(foundVolumes map[string]string) []error { + errors := make([]error, 0) + for volumeId, deviceName := range foundVolumes { + if err := w.stageVolumeEBS(volumeId, deviceName); err != nil { + log.Error(err) + errors = append(errors, err) } - hostPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey) - filesystemType := ebsAttachment.GetAttachmentProperties(apiebs.FileSystemTypeName) - - // CSI NodeStage stub required fields - stubSecrets := make(map[string]string) - stubVolumeContext := make(map[string]string) - stubMountOptions := []string{} - // note that the numbers '123456', '10' and '8' here are dummy data - // we don't use the fsGroup for now - stubFsGroup, _ := strconv.ParseInt("123456", 10, 8) - publishContext := map[string]string{"devicePath": deviceName} - // call CSI NodeStage - timeoutCtx, cancelFunc := context.WithTimeout(w.ctx, nodeStageTimeout) - defer cancelFunc() - err := w.csiClient.NodeStageVolume(timeoutCtx, - volID, - publishContext, - hostPath, - filesystemType, - v1.ReadWriteMany, - stubSecrets, - stubVolumeContext, - stubMountOptions, - &stubFsGroup) + } + return errors +} - if err != nil { - log.Errorf("Failed to initialize EBS volume: error: %s", err) - continue - } - // set attached status - log.Infof("We've set attached status for %v", ebsAttachment.EBSToString()) - ebsAttachment.SetAttachedStatus() +func (w *EBSWatcher) stageVolumeEBS(volID, deviceName string) error { + // get volume details from attachment + ebsAttachment, ok := w.agentState.GetEBSByVolumeId(volID) + if !ok { + return fmt.Errorf("Unable to find EBS volume with volume ID: %v within agent state.", volID) } + if !ebsAttachment.ShouldAttach() { + return nil + } + attachmentMountPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey) + hostPath := filepath.Join(hostMountDir, attachmentMountPath) + filesystemType := ebsAttachment.GetAttachmentProperties(apiebs.FileSystemTypeName) + // CSI NodeStage stub required fields + stubSecrets := make(map[string]string) + stubVolumeContext := make(map[string]string) + stubMountOptions := []string{} + // note that the numbers '123456', '10' and '8' here are dummy data + // we don't use the fsGroup for now + stubFsGroup, _ := strconv.ParseInt("123456", 10, 8) + publishContext := map[string]string{"devicePath": deviceName} + // call CSI NodeStage + timeoutCtx, cancelFunc := context.WithTimeout(w.ctx, nodeStageTimeout) + defer cancelFunc() + err := w.csiClient.NodeStageVolume(timeoutCtx, + volID, + publishContext, + hostPath, + filesystemType, + v1.ReadWriteMany, + stubSecrets, + stubVolumeContext, + stubMountOptions, + &stubFsGroup) + if err != nil { + return fmt.Errorf("Failed to initialize EBS volume ID: %v: error: %w", ebsAttachment.EBSToString(), err) + } + ebsAttachment.SetAttachedStatus() + log.Debugf("We've set attached status for %v", ebsAttachment.EBSToString()) return nil } // NotifyAttached will go through the list of found EBS volumes from the scanning process and mark them as found. -func (w *EBSWatcher) NotifyAttached(foundVolumes map[string]string) { +func (w *EBSWatcher) NotifyAttached(foundVolumes map[string]string) []error { + errors := make([]error, 0) for volID := range foundVolumes { - w.notifyAttachedEBS(volID) + if err := w.notifyAttachedEBS(volID); err != nil { + log.Error(err) + errors = append(errors, err) + } } + return errors } // notifyAttachedEBS will mark it as found within the agent state -func (w *EBSWatcher) notifyAttachedEBS(volumeId string) { +func (w *EBSWatcher) notifyAttachedEBS(volumeId string) error { // TODO: Add the EBS volume to data client ebs, ok := w.agentState.GetEBSByVolumeId(volumeId) if !ok { - log.Errorf("Unable to find EBS volume with volume ID: %v within agent state.", volumeId) - return - } - - if ebs.HasExpired() { - log.Debugf("EBS status expired, no longer tracking EBS volume: %v.", ebs.EBSToString()) - return + return fmt.Errorf("Unable to find EBS volume with volume ID: %v within agent state.", volumeId) } - - if ebs.IsSent() { - log.Debugf("State change event has already been emitted for EBS volume: %v.", ebs.EBSToString()) - return + if !ebs.ShouldNotify() { + return nil } - // We found an EBS volume which has the expiration time set in future and - // needs to be acknowledged as having been 'attached' to the Instance if err := w.sendEBSStateChange(ebs); err != nil { - log.Warnf("Unable to send state EBS change, %s", err) - return + return fmt.Errorf("Unable to send state EBS change, %s", err) } ebs.SetSentStatus() - log.Infof("We've set sent status for %v", ebs.EBSToString()) ebs.StopAckTimer() + log.Infof("We've set sent status for %v", ebs.EBSToString()) + return nil } // removeEBSAttachment removes a EBS volume with a specific volume ID @@ -314,10 +304,15 @@ func (w *EBSWatcher) emitEBSAttachedEvent(ebsvol *apiebs.ResourceAttachment) { ClusterARN: ebsvol.GetClusterARN(), ContainerInstanceARN: ebsvol.GetContainerInstanceARN(), } + eniWrapper := apieni.ENIAttachment{AttachmentInfo: attachmentInfo} + // TODO update separate out ENI and EBS attachment types in attachment + // handler. For now we use fake task ENI with dummy fields + eniWrapper.AttachmentType = apieni.ENIAttachmentTypeTaskENI + eniWrapper.MACAddress = "ebs1" + eniWrapper.StartTimer(func() {}) attachmentChange := ecsapi.AttachmentStateChange{ - Attachment: &apieni.ENIAttachment{AttachmentInfo: attachmentInfo}, + Attachment: &eniWrapper, } - log.Debugf("Emitting EBS volume attached event for: %v", ebsvol) w.taskEngine.StateChangeEvents() <- attachmentChange } diff --git a/agent/engine/dockerstate/docker_task_engine_state.go b/agent/engine/dockerstate/docker_task_engine_state.go index c64f090707b..bdbaa5f99a2 100644 --- a/agent/engine/dockerstate/docker_task_engine_state.go +++ b/agent/engine/dockerstate/docker_task_engine_state.go @@ -300,7 +300,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachments() []*apiresource func (state *DockerTaskEngineState) allPendingEBSAttachmentsUnsafe() []*apiresource.ResourceAttachment { var pendingEBSAttachments []*apiresource.ResourceAttachment for _, v := range state.ebsAttachments { - if !v.IsAttached() && !v.IsSent() { + if !v.IsAttached() || !v.IsSent() { pendingEBSAttachments = append(pendingEBSAttachments, v) } } @@ -319,7 +319,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachmentsWithKey() map[str func (state *DockerTaskEngineState) allPendingEBSAttachmentsWithKeyUnsafe() map[string]*apiresource.ResourceAttachment { pendingEBSAttachments := make(map[string]*apiresource.ResourceAttachment) for k, v := range state.ebsAttachments { - if !v.IsAttached() && !v.IsSent() { + if !v.IsAttached() || !v.IsSent() { pendingEBSAttachments[k] = v } } diff --git a/agent/engine/dockerstate/dockerstate_test.go b/agent/engine/dockerstate/dockerstate_test.go index c70c67312b3..46f1ce6b61d 100644 --- a/agent/engine/dockerstate/dockerstate_test.go +++ b/agent/engine/dockerstate/dockerstate_test.go @@ -203,6 +203,52 @@ func TestAddPendingEBSAttachment(t *testing.T) { } +func TestAddPendingEBSAttachmentExclusion(t *testing.T) { + state := NewTaskEngineState() + + testSentAttachmentProperties := map[string]string{ + apiresource.VolumeNameKey: "myCoolVolume", + apiresource.SourceVolumeHostPathKey: "/testpath2", + apiresource.VolumeSizeGibKey: "7", + apiresource.DeviceNameKey: "/dev/nvme1n0", + apiresource.VolumeIdKey: "vol-456", + apiresource.FileSystemKey: "testXFS", + } + + // not attached but sent should be included (||) + sentAttachment := &apiresource.ResourceAttachment{ + AttachmentInfo: attachmentinfo.AttachmentInfo{ + TaskARN: "taskarn1", + AttachmentARN: "ebs1", + AttachStatusSent: true, + Status: status.AttachmentNone, + }, + AttachmentProperties: testAttachmentProperties, + AttachmentType: apiresource.EBSTaskAttach, + } + + // attached and sent attachment should be excluded (&&) + foundAttachment := &apiresource.ResourceAttachment{ + AttachmentInfo: attachmentinfo.AttachmentInfo{ + TaskARN: "taskarn2", + AttachmentARN: "ebs2", + AttachStatusSent: true, + Status: status.AttachmentAttached, + }, + AttachmentProperties: testSentAttachmentProperties, + AttachmentType: apiresource.EBSTaskAttach, + } + + state.AddEBSAttachment(foundAttachment) + state.AddEBSAttachment(sentAttachment) + assert.Len(t, state.(*DockerTaskEngineState).GetAllPendingEBSAttachments(), 1) + assert.Len(t, state.(*DockerTaskEngineState).GetAllPendingEBSAttachmentsWithKey(), 1) + assert.Len(t, state.(*DockerTaskEngineState).GetAllEBSAttachments(), 2) + + _, ok := state.(*DockerTaskEngineState).GetAllPendingEBSAttachmentsWithKey()["vol-123"] + assert.True(t, ok) +} + func TestTwophaseAddContainer(t *testing.T) { state := NewTaskEngineState() testTask := &apitask.Task{Arn: "test", Containers: []*apicontainer.Container{{ diff --git a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/api/resource/resource_attachment.go b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/api/resource/resource_attachment.go index bd8c27cd7c8..0091298d522 100644 --- a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/api/resource/resource_attachment.go +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/api/resource/resource_attachment.go @@ -325,3 +325,19 @@ func (ra *ResourceAttachment) GetContainerInstanceARN() string { return ra.ContainerInstanceARN } + +// should attach when not attached, and not sent/not expired +func (ra *ResourceAttachment) ShouldAttach() bool { + ra.guard.RLock() + defer ra.guard.RUnlock() + + return !(ra.Status == status.AttachmentAttached) && !ra.AttachStatusSent && !(time.Now().After(ra.ExpiresAt)) +} + +// should notify when attached, and not sent/not expired +func (ra *ResourceAttachment) ShouldNotify() bool { + ra.guard.RLock() + defer ra.guard.RUnlock() + + return (ra.Status == status.AttachmentAttached) && !ra.AttachStatusSent && !(time.Now().After(ra.ExpiresAt)) +} diff --git a/ecs-agent/api/resource/resource_attachment.go b/ecs-agent/api/resource/resource_attachment.go index bd8c27cd7c8..0091298d522 100644 --- a/ecs-agent/api/resource/resource_attachment.go +++ b/ecs-agent/api/resource/resource_attachment.go @@ -325,3 +325,19 @@ func (ra *ResourceAttachment) GetContainerInstanceARN() string { return ra.ContainerInstanceARN } + +// should attach when not attached, and not sent/not expired +func (ra *ResourceAttachment) ShouldAttach() bool { + ra.guard.RLock() + defer ra.guard.RUnlock() + + return !(ra.Status == status.AttachmentAttached) && !ra.AttachStatusSent && !(time.Now().After(ra.ExpiresAt)) +} + +// should notify when attached, and not sent/not expired +func (ra *ResourceAttachment) ShouldNotify() bool { + ra.guard.RLock() + defer ra.guard.RUnlock() + + return (ra.Status == status.AttachmentAttached) && !ra.AttachStatusSent && !(time.Now().After(ra.ExpiresAt)) +}