Skip to content

Commit

Permalink
Add changes from feature branch
Browse files Browse the repository at this point in the history
  • Loading branch information
mye956 authored and fierlion committed Oct 10, 2023
1 parent 96633a3 commit b02f981
Show file tree
Hide file tree
Showing 26 changed files with 212 additions and 25 deletions.
12 changes: 12 additions & 0 deletions agent/acs/session/payload_responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package session

import (
"encoding/json"
"fmt"

"github.com/aws/amazon-ecs-agent/agent/api"
Expand Down Expand Up @@ -98,6 +99,16 @@ func (pmHandler *payloadMessageHandler) addPayloadTasks(payload *ecsacs.PayloadM
allTasksOK := true

validTasks := make([]*apitask.Task, 0, len(payload.Tasks))

data, err := json.Marshal(payload)
if err != nil {
logger.Debug("Unable to marshal")
} else {
logger.Debug("Here's the task payload", logger.Fields{
"taskPayload": string(data),
})
}

for _, task := range payload.Tasks {
if task == nil {
logger.Critical("Received nil task for message", logger.Fields{
Expand All @@ -106,6 +117,7 @@ func (pmHandler *payloadMessageHandler) addPayloadTasks(payload *ecsacs.PayloadM
allTasksOK = false
continue
}

apiTask, err := apitask.TaskFromACS(task, payload)
if err != nil {
pmHandler.handleInvalidTask(task, err, payload)
Expand Down
6 changes: 6 additions & 0 deletions agent/api/ecsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,10 @@ func (client *APIECSClient) SubmitContainerStateChange(change api.ContainerState
func (client *APIECSClient) SubmitAttachmentStateChange(change api.AttachmentStateChange) error {
attachmentStatus := change.Attachment.Status.String()

seelog.Infof("attachment arn in ecs client %s", change.Attachment.AttachmentARN)
seelog.Infof("attachment in ecs client %v", change.Attachment)
seelog.Infof("stringified attachment in ecs client %v", aws.String(change.Attachment.AttachmentARN))

req := ecs.SubmitAttachmentStateChangesInput{
Cluster: &client.config.Cluster,
Attachments: []*ecs.AttachmentStateChange{
Expand All @@ -668,6 +672,8 @@ func (client *APIECSClient) SubmitAttachmentStateChange(change api.AttachmentSta
},
}

seelog.Infof("attachment req in ecs client %v", req)

_, err := client.submitStateChangeClient.SubmitAttachmentStateChanges(&req)
if err != nil {
seelog.Warnf("Could not submit attachment state change [%s]: %v", change.String(), err)
Expand Down
5 changes: 4 additions & 1 deletion agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2602,7 +2602,10 @@ func (task *Task) dockerHostBinds(container *apicontainer.Container) ([]string,
return []string{}, errors.Errorf("Unable to resolve volume mounts; invalid path: %s %s; %s -> %s",
container.Name, mountPoint.SourceVolume, hv.Source(), mountPoint.ContainerPath)
}

logger.Debug("Volume Binds are the following", logger.Fields{
"hostPath": hv.Source(),
"containerPath": mountPoint.ContainerPath,
})
bind := hv.Source() + ":" + mountPoint.ContainerPath
if mountPoint.ReadOnly {
bind += ":ro"
Expand Down
11 changes: 11 additions & 0 deletions agent/api/task/task_attachment_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package task

import (
"encoding/json"
"fmt"

"github.com/aws/amazon-ecs-agent/agent/api/serviceconnect"
Expand Down Expand Up @@ -115,6 +116,16 @@ func handleTaskAttachments(acsTask *ecsacs.Task, task *Task) error {
if err != nil {
return fmt.Errorf("unable to parse and validate EBS volume: %w", err)
}

data, err := json.Marshal(ebs)
if err != nil {
logger.Debug("Unable to marshal")
} else {
logger.Debug("Here's the ebs volume config", logger.Fields{
"ebs volume config": string(data),
})
}

taskVolume := TaskVolume{
Name: ebs.VolumeName,
Type: apiresource.EBSTaskAttach,
Expand Down
4 changes: 3 additions & 1 deletion agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
if loaded, _ := csiDM.IsLoaded(agent.dockerClient); loaded {
imageManager.AddImageToCleanUpExclusionList(csiDM.GetManagedDaemon().GetLoadedDaemonImageRef())
}
} else {
seelog.Debug("CSI daemon manager is empty...")
}

// Add container instance ARN to metadata manager
Expand Down Expand Up @@ -1041,7 +1043,7 @@ func (agent *ecsAgent) startACSSession(
taskComparer,
sequenceNumberAccessor,
taskStopper,
nil,
agent.ebsWatcher,
updater.NewUpdater(agent.cfg, state, agent.dataClient, taskEngine).AddAgentUpdateHandlers,
)
logger.Info("Beginning Polling for updates")
Expand Down
31 changes: 29 additions & 2 deletions agent/ebs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"context"
"errors"
"fmt"
"path/filepath"
"strconv"
"strings"
"time"

ecsapi "github.com/aws/amazon-ecs-agent/agent/api"
Expand Down Expand Up @@ -156,7 +158,7 @@ func (w *EBSWatcher) HandleEBSResourceAttachment(ebs *apiebs.ResourceAttachment)
return fmt.Errorf("%w; attach %v message handler: unable to add ebs attachment to engine state: %v",
err, attachmentType, ebs.EBSToString())
}

log.Debug("EBS Attachment has been added to state successfully")
return nil
}

Expand Down Expand Up @@ -193,7 +195,11 @@ func (w *EBSWatcher) StageAll(foundVolumes map[string]string) error {
log.Debugf("EBS status is already attached, skipping: %v.", ebsAttachment.EBSToString())
continue
}
hostPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey)

// hostPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey)

preHostPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey)
hostPath := filepath.Join("/mnt/ecs/ebs", preHostPath[strings.LastIndex(preHostPath, "/")+1:])
filesystemType := ebsAttachment.GetAttachmentProperties(apiebs.FileSystemTypeName)

// CSI NodeStage stub required fields
Expand Down Expand Up @@ -229,6 +235,26 @@ func (w *EBSWatcher) StageAll(foundVolumes map[string]string) error {
return nil
}

// func (w *EBSWatcher) stageAllHelper(ctx context.Context, volID, hostPath, deviceName, filesystemType string) error {
// stubSecrets := make(map[string]string)
// stubVolumeContext := make(map[string]string)
// stubMountOptions := []string{}
// stubFsGroup, _ := strconv.ParseInt("123456", 10, 8)
// publishContext := map[string]string{"devicePath": deviceName}
// err := w.csiClient.NodeStageVolume(ctx,
// volID,
// publishContext,
// hostPath,
// filesystemType,
// v1.ReadWriteMany,
// stubSecrets,
// stubVolumeContext,
// stubMountOptions,
// &stubFsGroup)

// return err
// }

// NotifyAttached will go through the list of found EBS volumes from the scanning process and mark them as found.
func (w *EBSWatcher) NotifyAttached(foundVolumes map[string]string) {
for volID := range foundVolumes {
Expand Down Expand Up @@ -278,6 +304,7 @@ func (w *EBSWatcher) addEBSAttachmentToState(ebs *apiebs.ResourceAttachment) err
w.handleEBSAckTimeout(volumeId)
})
if err != nil {
log.Warnf("Unable to handl ack timeout")
return err
}

Expand Down
8 changes: 8 additions & 0 deletions agent/engine/daemonmanager/daemon_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,15 @@ func (dm *daemonManager) initDaemonDirectoryMounts(imageName string) error {
}
// create socket path
socketPathHost := filepath.Join(socketPathHostRoot, imageName)
logger.Debug("Attempting to create socket file", logger.Fields{
"SocketPathHost": socketPathHost,
"SocketPathHostRoot": socketPathHostRoot,
"ImageName": imageName,
})
if err := mkdirAllAndChown(socketPathHost, daemonMountPermission, daemonUID, os.Getegid()); err != nil {
logger.Debug("Unable to create socket file", logger.Fields{
"error": err,
})
return err
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions agent/engine/dockerstate/docker_task_engine_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachments() []*apiresource
func (state *DockerTaskEngineState) allPendingEBSAttachmentsUnsafe() []*apiresource.ResourceAttachment {
var pendingEBSAttachments []*apiresource.ResourceAttachment
for _, v := range state.ebsAttachments {
if !v.IsAttached() && !v.IsSent() {
if !v.IsAttached() || !v.IsSent() {
pendingEBSAttachments = append(pendingEBSAttachments, v)
}
}
Expand All @@ -319,7 +319,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachmentsWithKey() map[str
func (state *DockerTaskEngineState) allPendingEBSAttachmentsWithKeyUnsafe() map[string]*apiresource.ResourceAttachment {
pendingEBSAttachments := make(map[string]*apiresource.ResourceAttachment)
for k, v := range state.ebsAttachments {
if !v.IsAttached() && !v.IsSent() {
if !v.IsAttached() || !v.IsSent() {
pendingEBSAttachments[k] = v
}
}
Expand Down
12 changes: 12 additions & 0 deletions agent/engine/mocks/engine_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (mtask *managedTask) overseeTask() {
// - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately
// - If the task is already stopped (knownStatus is STOPPED), does not attempt to consume resources - returns immediately
// - If an ACS StopTask arrives, host resources manager returns immediately. Host resource manager does not consume resources
// (resources are later 'release'd on Stopped task emitTaskEvent call)
// (resources are later 'release'd on Stopped task EmitTaskEvent call)
mtask.waitForHostResources()

// If this was a 'state restore', send all unsent statuses
Expand Down Expand Up @@ -284,7 +284,7 @@ func (mtask *managedTask) emitCurrentStatus() {
for _, container := range mtask.Containers {
mtask.emitContainerEvent(mtask.Task, container, "")
}
mtask.emitTaskEvent(mtask.Task, "")
mtask.EmitTaskEvent(mtask.Task, "")
}

// waitForHostResources waits for host resources to become available to start
Expand Down Expand Up @@ -509,7 +509,7 @@ func (mtask *managedTask) handleContainerChange(containerChange dockerContainerC
if mtask.GetKnownStatus().Terminal() {
taskStateChangeReason = mtask.Task.GetTerminalReason()
}
mtask.emitTaskEvent(mtask.Task, taskStateChangeReason)
mtask.EmitTaskEvent(mtask.Task, taskStateChangeReason)
// Save the new task status to database.
mtask.engine.saveTaskData(mtask.Task)
}
Expand Down Expand Up @@ -600,7 +600,7 @@ func getContainerEventLogFields(c api.ContainerStateChange) logger.Fields {
return f
}

func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) {
func (mtask *managedTask) EmitTaskEvent(task *apitask.Task, reason string) {
taskKnownStatus := task.GetKnownStatus()
// Always do (idempotent) release host resources whenever state change with
// known status == STOPPED is done to ensure sync between tasks and host resource manager
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func (mtask *managedTask) progressTask() {
if mtask.GetKnownStatus().Terminal() {
taskStateChangeReason = mtask.Task.GetTerminalReason()
}
mtask.emitTaskEvent(mtask.Task, taskStateChangeReason)
mtask.EmitTaskEvent(mtask.Task, taskStateChangeReason)
}
}

Expand Down Expand Up @@ -1420,7 +1420,7 @@ func (mtask *managedTask) handleContainersUnableToTransitionState() {
field.TaskID: mtask.GetID(),
})
mtask.SetKnownStatus(apitaskstatus.TaskStopped)
mtask.emitTaskEvent(mtask.Task, taskUnableToTransitionToStoppedReason)
mtask.EmitTaskEvent(mtask.Task, taskUnableToTransitionToStoppedReason)
// TODO we should probably panic here
} else {
// If we end up here, it means containers are not able to transition anymore; maybe because of dependencies that
Expand Down
14 changes: 7 additions & 7 deletions agent/eventhandler/attachment_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (handler *attachmentHandler) submitAttachmentEvent(attachmentChange *api.At
// we need to lock the attachment handler to avoid sending an attachment state change for an attachment
// multiple times (this can happen when udev watcher sends multiple attached events for a certain attachment,
// for example one from udev event and one from reconciliation loop)
seelog.Debugf("AttachmentHandler: acquiring attachment lock before sending attachment state change for attachment %s", handler.attachmentARN)
seelog.Infof("AttachmentHandler: acquiring attachment lock before sending attachment state change for attachment %s", handler.attachmentARN)
handler.lock.Lock()
seelog.Debugf("AttachmentHandler: acquired attachment lock for attachment %s", handler.attachmentARN)
defer handler.lock.Unlock()
Expand All @@ -140,13 +140,13 @@ func (handler *attachmentHandler) submitAttachmentEventOnce(attachmentChange *ap
}
seelog.Debugf("AttachmentHandler: submitted attachment state change: %s", attachmentChange.String())

attachmentChange.Attachment.SetSentStatus()
attachmentChange.Attachment.StopAckTimer()
// attachmentChange.Attachment.SetSentStatus()
// attachmentChange.Attachment.StopAckTimer()

err := handler.dataClient.SaveENIAttachment(attachmentChange.Attachment)
if err != nil {
seelog.Errorf("AttachmentHandler: error saving state after submitted attachment state change [%s]: %v", attachmentChange.String(), err)
}
// err := handler.dataClient.SaveENIAttachment(attachmentChange.Attachment)
// if err != nil {
// seelog.Errorf("AttachmentHandler: error saving state after submitted attachment state change [%s]: %v", attachmentChange.String(), err)
// }
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions agent/taskresource/volume/dockervolume_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func ParseEBSTaskVolumeAttachment(ebsAttachment *ecsacs.Attachment) (*EBSTaskVol
ebsTaskVolumeConfig.DeviceName = aws.StringValue(property.Value)
case apiresource.SourceVolumeHostPathKey:
ebsTaskVolumeConfig.SourceVolumeHostPath = aws.StringValue(property.Value)
// preHostPath := aws.StringValue(property.Value)
// ebsTaskVolumeConfig.SourceVolumeHostPath = preHostPath[strings.LastIndex(preHostPath, "/")+1:]
case apiresource.VolumeNameKey:
ebsTaskVolumeConfig.VolumeName = aws.StringValue(property.Value)
case apiresource.FileSystemKey:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b02f981

Please sign in to comment.