Skip to content

Commit

Permalink
add missing changes from feature branch
Browse files Browse the repository at this point in the history
  • Loading branch information
fierlion committed Oct 13, 2023
1 parent 352ec50 commit 8c26ec1
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 6 deletions.
2 changes: 1 addition & 1 deletion agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ func (agent *ecsAgent) startACSSession(
taskComparer,
sequenceNumberAccessor,
taskStopper,
nil,
agent.ebsWatcher,
updater.NewUpdater(agent.cfg, state, agent.dataClient, taskEngine).AddAgentUpdateHandlers,
)
logger.Info("Beginning Polling for updates")
Expand Down
18 changes: 15 additions & 3 deletions agent/ebs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"context"
"errors"
"fmt"
"path/filepath"
"strconv"
"strings"
"time"

ecsapi "github.com/aws/amazon-ecs-agent/agent/api"
Expand Down Expand Up @@ -84,7 +86,6 @@ func (w *EBSWatcher) Start() {
w.overrideDeviceName(foundVolumes)
if err := w.StageAll(foundVolumes); err != nil {
log.Errorf("stage error: %s", err)
continue
}
// TODO only notify attached for volumes that are successfully staged
w.NotifyAttached(foundVolumes)
Expand Down Expand Up @@ -193,7 +194,10 @@ func (w *EBSWatcher) StageAll(foundVolumes map[string]string) error {
log.Debugf("EBS status is already attached, skipping: %v.", ebsAttachment.EBSToString())
continue
}
hostPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey)

preHostPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey)
hostPath := filepath.Join("/mnt/ecs/ebs", preHostPath[strings.LastIndex(preHostPath, "/")+1:])

filesystemType := ebsAttachment.GetAttachmentProperties(apiebs.FileSystemTypeName)

// CSI NodeStage stub required fields
Expand Down Expand Up @@ -244,6 +248,10 @@ func (w *EBSWatcher) notifyAttachedEBS(volumeId string) {
log.Errorf("Unable to find EBS volume with volume ID: %v within agent state.", volumeId)
return
}
if !ebs.IsAttached() {
log.Debugf("EBS status is not attached, skipping: %v.", ebs.EBSToString())
return
}

if ebs.HasExpired() {
log.Debugf("EBS status expired, no longer tracking EBS volume: %v.", ebs.EBSToString())
Expand Down Expand Up @@ -314,8 +322,12 @@ func (w *EBSWatcher) emitEBSAttachedEvent(ebsvol *apiebs.ResourceAttachment) {
ClusterARN: ebsvol.GetClusterARN(),
ContainerInstanceARN: ebsvol.GetContainerInstanceARN(),
}
eniWrapper := apieni.ENIAttachment{AttachmentInfo: attachmentInfo}
eniWrapper.AttachmentType = apieni.ENIAttachmentTypeTaskENI
eniWrapper.MACAddress = "mac1"
eniWrapper.StartTimer(func() {})
attachmentChange := ecsapi.AttachmentStateChange{
Attachment: &apieni.ENIAttachment{AttachmentInfo: attachmentInfo},
Attachment: &eniWrapper,
}

log.Debugf("Emitting EBS volume attached event for: %v", ebsvol)
Expand Down
11 changes: 11 additions & 0 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,11 @@ func (engine *DockerTaskEngine) GetTaskByArn(arn string) (*apitask.Task, bool) {
func (engine *DockerTaskEngine) GetDaemonTask(daemonName string) *apitask.Task {
engine.daemonTasksLock.RLock()
defer engine.daemonTasksLock.RUnlock()

logger.Info("getting daemon task", logger.Fields{
field.Container: daemonName,
})

if daemon, ok := engine.daemonTasks[daemonName]; ok {
return daemon
}
Expand All @@ -1223,6 +1228,12 @@ func (engine *DockerTaskEngine) GetDaemonTask(daemonName string) *apitask.Task {

func (engine *DockerTaskEngine) SetDaemonTask(daemonName string, task *apitask.Task) {
engine.daemonTasksLock.Lock()

logger.Info("setting daemon task", logger.Fields{
field.Container: daemonName,
field.TaskID: task.GetID(),
})

defer engine.daemonTasksLock.Unlock()
engine.daemonTasks[daemonName] = task
}
Expand Down
4 changes: 2 additions & 2 deletions agent/engine/dockerstate/docker_task_engine_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachments() []*apiresource
func (state *DockerTaskEngineState) allPendingEBSAttachmentsUnsafe() []*apiresource.ResourceAttachment {
var pendingEBSAttachments []*apiresource.ResourceAttachment
for _, v := range state.ebsAttachments {
if !v.IsAttached() && !v.IsSent() {
if !v.IsAttached() || !v.IsSent() {
pendingEBSAttachments = append(pendingEBSAttachments, v)
}
}
Expand All @@ -319,7 +319,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachmentsWithKey() map[str
func (state *DockerTaskEngineState) allPendingEBSAttachmentsWithKeyUnsafe() map[string]*apiresource.ResourceAttachment {
pendingEBSAttachments := make(map[string]*apiresource.ResourceAttachment)
for k, v := range state.ebsAttachments {
if !v.IsAttached() && !v.IsSent() {
if !v.IsAttached() || !v.IsSent() {
pendingEBSAttachments[k] = v
}
}
Expand Down

0 comments on commit 8c26ec1

Please sign in to comment.