Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing changes from EBS Task Attach feature branch #3965

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ func (agent *ecsAgent) startACSSession(
taskComparer,
sequenceNumberAccessor,
taskStopper,
nil,
agent.ebsWatcher,
updater.NewUpdater(agent.cfg, state, agent.dataClient, taskEngine).AddAgentUpdateHandlers,
)
logger.Info("Beginning Polling for updates")
Expand Down
145 changes: 70 additions & 75 deletions agent/ebs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"path/filepath"
"strconv"
"time"

Expand All @@ -36,6 +37,7 @@ import (

const (
nodeStageTimeout = 2 * time.Second
hostMountDir = "/mnt/ecs/ebs"
)

type EBSWatcher struct {
Expand Down Expand Up @@ -82,11 +84,7 @@ func (w *EBSWatcher) Start() {
if len(pendingEBS) > 0 {
foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, w.discoveryClient)
w.overrideDeviceName(foundVolumes)
if err := w.StageAll(foundVolumes); err != nil {
log.Errorf("stage error: %s", err)
continue
}
// TODO only notify attached for volumes that are successfully staged
w.StageAll(foundVolumes)
w.NotifyAttached(foundVolumes)
}
case <-w.ctx.Done():
Expand Down Expand Up @@ -174,95 +172,87 @@ func (w *EBSWatcher) overrideDeviceName(foundVolumes map[string]string) {
}

// assumes CSI Driver Managed Daemon is running else call will timeout
func (w *EBSWatcher) StageAll(foundVolumes map[string]string) error {
for volID, deviceName := range foundVolumes {
// get volume details from attachment
ebsAttachment, ok := w.agentState.GetEBSByVolumeId(volID)
if !ok {
continue
}
if ebsAttachment.IsSent() {
log.Debugf("State change event has already been emitted for EBS volume: %v.", ebsAttachment.EBSToString())
continue
}
if ebsAttachment.HasExpired() {
log.Debugf("EBS status expired, no longer tracking EBS volume: %v.", ebsAttachment.EBSToString())
continue
}
if ebsAttachment.IsAttached() {
log.Debugf("EBS status is already attached, skipping: %v.", ebsAttachment.EBSToString())
continue
func (w *EBSWatcher) StageAll(foundVolumes map[string]string) []error {
errors := make([]error, 0)
for volumeId, deviceName := range foundVolumes {
if err := w.stageVolumeEBS(volumeId, deviceName); err != nil {
log.Error(err)
errors = append(errors, err)
}
hostPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey)
filesystemType := ebsAttachment.GetAttachmentProperties(apiebs.FileSystemTypeName)

// CSI NodeStage stub required fields
stubSecrets := make(map[string]string)
stubVolumeContext := make(map[string]string)
stubMountOptions := []string{}
// note that the numbers '123456', '10' and '8' here are dummy data
// we don't use the fsGroup for now
stubFsGroup, _ := strconv.ParseInt("123456", 10, 8)
publishContext := map[string]string{"devicePath": deviceName}
// call CSI NodeStage
timeoutCtx, cancelFunc := context.WithTimeout(w.ctx, nodeStageTimeout)
defer cancelFunc()
err := w.csiClient.NodeStageVolume(timeoutCtx,
volID,
publishContext,
hostPath,
filesystemType,
v1.ReadWriteMany,
stubSecrets,
stubVolumeContext,
stubMountOptions,
&stubFsGroup)
}
return errors
}

if err != nil {
log.Errorf("Failed to initialize EBS volume: error: %s", err)
continue
fierlion marked this conversation as resolved.
Show resolved Hide resolved
}
// set attached status
log.Infof("We've set attached status for %v", ebsAttachment.EBSToString())
ebsAttachment.SetAttachedStatus()
func (w *EBSWatcher) stageVolumeEBS(volID, deviceName string) error {
// get volume details from attachment
ebsAttachment, ok := w.agentState.GetEBSByVolumeId(volID)
if !ok {
return fmt.Errorf("Unable to find EBS volume with volume ID: %v within agent state.", volID)
}
if !ebsAttachment.ShouldAttach() {
return nil
}
attachmentMountPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey)
hostPath := filepath.Join(hostMountDir, attachmentMountPath)
filesystemType := ebsAttachment.GetAttachmentProperties(apiebs.FileSystemTypeName)
// CSI NodeStage stub required fields
stubSecrets := make(map[string]string)
stubVolumeContext := make(map[string]string)
stubMountOptions := []string{}
// note that the numbers '123456', '10' and '8' here are dummy data
// we don't use the fsGroup for now
stubFsGroup, _ := strconv.ParseInt("123456", 10, 8)
publishContext := map[string]string{"devicePath": deviceName}
// call CSI NodeStage
timeoutCtx, cancelFunc := context.WithTimeout(w.ctx, nodeStageTimeout)
defer cancelFunc()
err := w.csiClient.NodeStageVolume(timeoutCtx,
volID,
publishContext,
hostPath,
filesystemType,
v1.ReadWriteMany,
stubSecrets,
stubVolumeContext,
stubMountOptions,
&stubFsGroup)
if err != nil {
return fmt.Errorf("Failed to initialize EBS volume ID: %v: error: %w", ebsAttachment.EBSToString(), err)
}
ebsAttachment.SetAttachedStatus()
log.Debugf("We've set attached status for %v", ebsAttachment.EBSToString())
return nil
}

// NotifyAttached will go through the list of found EBS volumes from the scanning process and mark them as found.
func (w *EBSWatcher) NotifyAttached(foundVolumes map[string]string) {
func (w *EBSWatcher) NotifyAttached(foundVolumes map[string]string) []error {
errors := make([]error, 0)
for volID := range foundVolumes {
w.notifyAttachedEBS(volID)
if err := w.notifyAttachedEBS(volID); err != nil {
log.Error(err)
errors = append(errors, err)
}
}
return errors
}

// notifyAttachedEBS will mark it as found within the agent state
func (w *EBSWatcher) notifyAttachedEBS(volumeId string) {
func (w *EBSWatcher) notifyAttachedEBS(volumeId string) error {
// TODO: Add the EBS volume to data client
ebs, ok := w.agentState.GetEBSByVolumeId(volumeId)
if !ok {
log.Errorf("Unable to find EBS volume with volume ID: %v within agent state.", volumeId)
return
}

if ebs.HasExpired() {
log.Debugf("EBS status expired, no longer tracking EBS volume: %v.", ebs.EBSToString())
return
return fmt.Errorf("Unable to find EBS volume with volume ID: %v within agent state.", volumeId)
}

if ebs.IsSent() {
log.Debugf("State change event has already been emitted for EBS volume: %v.", ebs.EBSToString())
return
if !ebs.ShouldNotify() {
return nil
}
// We found an EBS volume which has the expiration time set in future and
// needs to be acknowledged as having been 'attached' to the Instance
if err := w.sendEBSStateChange(ebs); err != nil {
log.Warnf("Unable to send state EBS change, %s", err)
return
return fmt.Errorf("Unable to send state EBS change, %s", err)
}
ebs.SetSentStatus()
log.Infof("We've set sent status for %v", ebs.EBSToString())
ebs.StopAckTimer()
log.Infof("We've set sent status for %v", ebs.EBSToString())
return nil
}

// removeEBSAttachment removes a EBS volume with a specific volume ID
Expand Down Expand Up @@ -314,10 +304,15 @@ func (w *EBSWatcher) emitEBSAttachedEvent(ebsvol *apiebs.ResourceAttachment) {
ClusterARN: ebsvol.GetClusterARN(),
ContainerInstanceARN: ebsvol.GetContainerInstanceARN(),
}
eniWrapper := apieni.ENIAttachment{AttachmentInfo: attachmentInfo}
// TODO update separate out ENI and EBS attachment types in attachment
// handler. For now we use fake task ENI with dummy fields
eniWrapper.AttachmentType = apieni.ENIAttachmentTypeTaskENI
eniWrapper.MACAddress = "ebs1"
eniWrapper.StartTimer(func() {})
attachmentChange := ecsapi.AttachmentStateChange{
Attachment: &apieni.ENIAttachment{AttachmentInfo: attachmentInfo},
Attachment: &eniWrapper,
}

log.Debugf("Emitting EBS volume attached event for: %v", ebsvol)
w.taskEngine.StateChangeEvents() <- attachmentChange
}
4 changes: 2 additions & 2 deletions agent/engine/dockerstate/docker_task_engine_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachments() []*apiresource
func (state *DockerTaskEngineState) allPendingEBSAttachmentsUnsafe() []*apiresource.ResourceAttachment {
var pendingEBSAttachments []*apiresource.ResourceAttachment
for _, v := range state.ebsAttachments {
if !v.IsAttached() && !v.IsSent() {
if !v.IsAttached() || !v.IsSent() {
pendingEBSAttachments = append(pendingEBSAttachments, v)
}
}
Expand All @@ -319,7 +319,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachmentsWithKey() map[str
func (state *DockerTaskEngineState) allPendingEBSAttachmentsWithKeyUnsafe() map[string]*apiresource.ResourceAttachment {
pendingEBSAttachments := make(map[string]*apiresource.ResourceAttachment)
for k, v := range state.ebsAttachments {
if !v.IsAttached() && !v.IsSent() {
if !v.IsAttached() || !v.IsSent() {
pendingEBSAttachments[k] = v
}
}
Expand Down
46 changes: 46 additions & 0 deletions agent/engine/dockerstate/dockerstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,52 @@ func TestAddPendingEBSAttachment(t *testing.T) {

}

func TestAddPendingEBSAttachmentExclusion(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps we can add a comment on what this will be testing for. a bit unclear on what we mean by exclusion here

state := NewTaskEngineState()

testSentAttachmentProperties := map[string]string{
apiresource.VolumeNameKey: "myCoolVolume",
apiresource.SourceVolumeHostPathKey: "/testpath2",
apiresource.VolumeSizeGibKey: "7",
apiresource.DeviceNameKey: "/dev/nvme1n0",
apiresource.VolumeIdKey: "vol-456",
apiresource.FileSystemKey: "testXFS",
}

// not attached but sent should be included (||)
sentAttachment := &apiresource.ResourceAttachment{
AttachmentInfo: attachmentinfo.AttachmentInfo{
TaskARN: "taskarn1",
AttachmentARN: "ebs1",
AttachStatusSent: true,
Status: status.AttachmentNone,
},
AttachmentProperties: testAttachmentProperties,
AttachmentType: apiresource.EBSTaskAttach,
}

// attached and sent attachment should be excluded (&&)
foundAttachment := &apiresource.ResourceAttachment{
AttachmentInfo: attachmentinfo.AttachmentInfo{
TaskARN: "taskarn2",
AttachmentARN: "ebs2",
AttachStatusSent: true,
Status: status.AttachmentAttached,
},
AttachmentProperties: testSentAttachmentProperties,
AttachmentType: apiresource.EBSTaskAttach,
}

state.AddEBSAttachment(foundAttachment)
state.AddEBSAttachment(sentAttachment)
assert.Len(t, state.(*DockerTaskEngineState).GetAllPendingEBSAttachments(), 1)
assert.Len(t, state.(*DockerTaskEngineState).GetAllPendingEBSAttachmentsWithKey(), 1)
assert.Len(t, state.(*DockerTaskEngineState).GetAllEBSAttachments(), 2)

_, ok := state.(*DockerTaskEngineState).GetAllPendingEBSAttachmentsWithKey()["vol-123"]
assert.True(t, ok)
}

func TestTwophaseAddContainer(t *testing.T) {
state := NewTaskEngineState()
testTask := &apitask.Task{Arn: "test", Containers: []*apicontainer.Container{{
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions ecs-agent/api/resource/resource_attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,19 @@ func (ra *ResourceAttachment) GetContainerInstanceARN() string {

return ra.ContainerInstanceARN
}

// should attach when not attached, and not sent/not expired
func (ra *ResourceAttachment) ShouldAttach() bool {
ra.guard.RLock()
defer ra.guard.RUnlock()

return !(ra.Status == status.AttachmentAttached) && !ra.AttachStatusSent && !(time.Now().After(ra.ExpiresAt))
}

// should notify when attached, and not sent/not expired
func (ra *ResourceAttachment) ShouldNotify() bool {
ra.guard.RLock()
defer ra.guard.RUnlock()

return (ra.Status == status.AttachmentAttached) && !ra.AttachStatusSent && !(time.Now().After(ra.ExpiresAt))
}