Skip to content

Commit

Permalink
Terminally exit on unrecoverable exceptions for RCI.
Browse files Browse the repository at this point in the history
  • Loading branch information
BinBin He committed Jan 2, 2025
1 parent 5119e54 commit 6fe572f
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 26 deletions.
43 changes: 33 additions & 10 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,15 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
// Register the container instance
err = agent.registerContainerInstance(client, vpcSubnetAttributes)
if err != nil {
if isTransient(err) {
return exitcodes.ExitError
if isTerminal(err) {
// On unrecoverable error codes, agent should terminally exit.
logger.Critical("Agent will terminally exit, unable to register container instance:", logger.Fields{
field.Error: err,
})
return exitcodes.ExitTerminal
}
return exitcodes.ExitTerminal
// Other errors are considered recoverable and will be retried.
return exitcodes.ExitError
}

// Load Managed Daemon images asynchronously
Expand Down Expand Up @@ -855,13 +860,19 @@ func (agent *ecsAgent) registerContainerInstance(
field.Error: err,
})
if retriable, ok := err.(apierrors.Retriable); ok && !retriable.Retry() {
return err
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeInvalidParameterException) {
logger.Critical("Instance registration attempt with an invalid parameter", logger.Fields{
field.Error: err,
})
return err
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeClientException) {
logger.Critical("Instance registration attempt with client performing invalid action", logger.Fields{
field.Error: err,
})
return terminalError{err}
}
if _, ok := err.(apierrors.AttributeError); ok {
attributeErrorMsg := ""
Expand All @@ -871,9 +882,9 @@ func (agent *ecsAgent) registerContainerInstance(
logger.Critical("Instance registration attempt with invalid attribute(s)", logger.Fields{
field.Error: attributeErrorMsg,
})
return err
return terminalError{err}
}
return transientError{err}
return err
}
logger.Info("Instance registration completed successfully", logger.Fields{
"instanceArn": containerInstanceArn,
Expand Down Expand Up @@ -903,7 +914,19 @@ func (agent *ecsAgent) reregisterContainerInstance(client ecs.ECSClient, capabil
})
if apierrors.IsInstanceTypeChangedError(err) {
seelog.Criticalf(instanceTypeMismatchErrorFormat, err)
return err
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeInvalidParameterException) {
logger.Critical("Instance re-registration attempt with an invalid parameter", logger.Fields{
field.Error: err,
})
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeClientException) {
logger.Critical("Instance re-registration attempt with client performing invalid action", logger.Fields{
field.Error: err,
})
return terminalError{err}
}
if _, ok := err.(apierrors.AttributeError); ok {
attributeErrorMsg := ""
Expand All @@ -913,9 +936,9 @@ func (agent *ecsAgent) reregisterContainerInstance(client ecs.ECSClient, capabil
logger.Critical("Instance re-registration attempt with invalid attribute(s)", logger.Fields{
field.Error: attributeErrorMsg,
})
return err
return terminalError{err}
}
return transientError{err}
return err
}

// startAsyncRoutines starts all background methods
Expand Down
187 changes: 177 additions & 10 deletions agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ func TestNewTaskEngineRestoreFromCheckpointNewStateManagerError(t *testing.T) {

ec2MetadataClient := mock_ec2.NewMockEC2MetadataClient(ctrl)
mockPauseLoader := mock_loader.NewMockLoader(ctrl)
newError := errors.New("error")
gomock.InOrder(
saveableOptionFactory.EXPECT().AddSaveable("TaskEngine", gomock.Any()).Return(nil),
saveableOptionFactory.EXPECT().AddSaveable("ContainerInstanceArn", gomock.Any()).Return(nil),
Expand All @@ -770,7 +771,7 @@ func TestNewTaskEngineRestoreFromCheckpointNewStateManagerError(t *testing.T) {

stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(
nil, errors.New("error")),
nil, newError),
)

dataClient := newTestDataClient(t)
Expand All @@ -795,7 +796,6 @@ func TestNewTaskEngineRestoreFromCheckpointNewStateManagerError(t *testing.T) {
_, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager, daemonManagers)
assert.Error(t, err)
assert.False(t, isTransient(err))
}

func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) {
Expand All @@ -808,6 +808,7 @@ func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) {
cfg.Checkpoint = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}
ec2MetadataClient := mock_ec2.NewMockEC2MetadataClient(ctrl)
mockPauseLoader := mock_loader.NewMockLoader(ctrl)
newError := errors.New("error")

gomock.InOrder(
saveableOptionFactory.EXPECT().AddSaveable("TaskEngine", gomock.Any()).Return(nil),
Expand All @@ -819,7 +820,7 @@ func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) {
stateManagerFactory.EXPECT().NewStateManager(gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
).Return(stateManager, nil),
stateManager.EXPECT().Load().Return(errors.New("error")),
stateManager.EXPECT().Load().Return(newError),
)

dataClient := newTestDataClient(t)
Expand All @@ -844,7 +845,6 @@ func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) {
_, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager, daemonManagers)
assert.Error(t, err)
assert.False(t, isTransient(err))
}

func TestNewTaskEngineRestoreFromCheckpoint(t *testing.T) {
Expand Down Expand Up @@ -1086,7 +1086,7 @@ func TestReregisterContainerInstanceInstanceTypeChanged(t *testing.T) {

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestReregisterContainerInstanceAttributeError(t *testing.T) {
Expand Down Expand Up @@ -1145,7 +1145,7 @@ func TestReregisterContainerInstanceAttributeError(t *testing.T) {

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestReregisterContainerInstanceNonTerminalError(t *testing.T) {
Expand Down Expand Up @@ -1204,7 +1204,66 @@ func TestReregisterContainerInstanceNonTerminalError(t *testing.T) {

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.True(t, isTransient(err))
assert.False(t, isTerminal(err))
}

func TestReregisterContainerInstanceTerminalError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockDockerClient := mock_dockerapi.NewMockDockerClient(ctrl)
client := mock_ecs.NewMockECSClient(ctrl)
mockCredentialsProvider := app_mocks.NewMockCredentialsProvider(ctrl)
mockMobyPlugins := mock_mobypkgwrapper.NewMockPlugins(ctrl)
mockEC2Metadata := mock_ec2.NewMockEC2MetadataClient(ctrl)
mockPauseLoader := mock_loader.NewMockLoader(ctrl)

mockPauseLoader.EXPECT().IsLoaded(gomock.Any()).Return(false, nil).AnyTimes()
mockPauseLoader.EXPECT().LoadImage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
mockServiceConnectManager := mock_serviceconnect.NewMockManager(ctrl)
mockServiceConnectManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil).AnyTimes()
mockServiceConnectManager.EXPECT().GetLoadedAppnetVersion().AnyTimes()
mockServiceConnectManager.EXPECT().GetCapabilitiesForAppnetInterfaceVersion("").AnyTimes()
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()

mockDaemonManager := mock_daemonmanager.NewMockDaemonManager(ctrl)
mockDaemonManagers := map[string]dm.DaemonManager{md.EbsCsiDriver: mockDaemonManager}
mockDaemonManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil).AnyTimes()
mockDaemonManager.EXPECT().LoadImage(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

gomock.InOrder(
mockCredentialsProvider.EXPECT().Retrieve(gomock.Any()).Return(awsv2.Credentials{}, nil),
mockDockerClient.EXPECT().SupportedVersions().Return(apiVersions),
mockMobyPlugins.EXPECT().Scan().AnyTimes().Return([]string{}, nil),
mockDockerClient.EXPECT().ListPluginsWithFilters(gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any()).AnyTimes().Return([]string{}, nil),
client.EXPECT().RegisterContainerInstance(containerInstanceARN, gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any()).Return("", "", awserr.New("ClientException", "", nil)),
)
mockEC2Metadata.EXPECT().OutpostARN().Return("", nil)

cfg := getTestConfig()
cfg.Cluster = clusterName
ctx, cancel := context.WithCancel(context.TODO())
// Cancel the context to cancel async routines
defer cancel()
agent := &ecsAgent{
ctx: ctx,
cfg: &cfg,
dockerClient: mockDockerClient,
ec2MetadataClient: mockEC2Metadata,
pauseLoader: mockPauseLoader,
credentialsCache: awsv2.NewCredentialsCache(mockCredentialsProvider),
mobyPlugins: mockMobyPlugins,
serviceconnectManager: mockServiceConnectManager,
daemonManagers: mockDaemonManagers,
}
agent.containerInstanceARN = containerInstanceARN
agent.availabilityZone = availabilityZone

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.True(t, isTerminal(err))
}

func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetHappyPath(t *testing.T) {
Expand Down Expand Up @@ -1320,7 +1379,7 @@ func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetCanRetryError(

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.True(t, isTransient(err))
assert.False(t, isTerminal(err))
}

func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetCannotRetryError(t *testing.T) {
Expand Down Expand Up @@ -1378,7 +1437,7 @@ func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetCannotRetryErr

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetAttributeError(t *testing.T) {
Expand Down Expand Up @@ -1435,7 +1494,7 @@ func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetAttributeError

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestRegisterContainerInstanceInvalidParameterTerminalError(t *testing.T) {
Expand Down Expand Up @@ -1499,6 +1558,114 @@ func TestRegisterContainerInstanceInvalidParameterTerminalError(t *testing.T) {
credentialsManager, state, imageManager, client, execCmdMgr)
assert.Equal(t, exitcodes.ExitTerminal, exitCode)
}

func TestRegisterContainerInstanceExceptionErrors(t *testing.T) {
testCases := []struct {
name string
regError error
exitCode int
}{
{
name: "InvalidParameterException",
regError: awserr.New("InvalidParameterException", "", nil),
exitCode: exitcodes.ExitTerminal,
},
{
name: "ClientException",
regError: awserr.New("ClientException", "", nil),
exitCode: exitcodes.ExitTerminal,
},
{
name: "ThrottlingException",
regError: awserr.New("ThrottlingException", "", nil),
exitCode: exitcodes.ExitError,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl, credentialsManager, state, imageManager, client,
dockerClient, _, _, execCmdMgr, _ := setup(t)
defer ctrl.Finish()

mockCredentialsProvider := app_mocks.NewMockCredentialsProvider(ctrl)
mockMobyPlugins := mock_mobypkgwrapper.NewMockPlugins(ctrl)
mockEC2Metadata := mock_ec2.NewMockEC2MetadataClient(ctrl)
mockPauseLoader := mock_loader.NewMockLoader(ctrl)
mockServiceConnectManager := mock_serviceconnect.NewMockManager(ctrl)
mockDaemonManager := mock_daemonmanager.NewMockDaemonManager(ctrl)
mockDaemonManagers := map[string]dm.DaemonManager{md.EbsCsiDriver: mockDaemonManager}

mockPauseLoader.EXPECT().IsLoaded(gomock.Any()).Return(false, nil).AnyTimes()
mockPauseLoader.EXPECT().LoadImage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

mockEC2Metadata.EXPECT().PrimaryENIMAC().Return("mac", nil)
mockEC2Metadata.EXPECT().VPCID("mac").Return("vpc-id", nil)
mockEC2Metadata.EXPECT().SubnetID("mac").Return("subnet-id", nil)
mockEC2Metadata.EXPECT().OutpostARN().Return("", nil)

mockServiceConnectManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil).AnyTimes()
mockServiceConnectManager.EXPECT().GetLoadedAppnetVersion().AnyTimes()
mockServiceConnectManager.EXPECT().GetCapabilitiesForAppnetInterfaceVersion("").AnyTimes()
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()

mockDaemonManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil).AnyTimes()
mockDaemonManager.EXPECT().LoadImage(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

dockerClient.EXPECT().SupportedVersions().Return(apiVersions).AnyTimes()

gomock.InOrder(
client.EXPECT().GetHostResources().Return(testHostResource, nil),
mockCredentialsProvider.EXPECT().Retrieve(gomock.Any()).Return(awsv2.Credentials{}, nil),
mockMobyPlugins.EXPECT().Scan().AnyTimes().Return([]string{}, nil),
dockerClient.EXPECT().ListPluginsWithFilters(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).AnyTimes().Return([]string{}, nil),

client.EXPECT().
RegisterContainerInstance(
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(),
).
Return("", "", tc.regError),
)

cfg := getTestConfig()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

agent := &ecsAgent{
ctx: ctx,
ec2MetadataClient: mockEC2Metadata,
cfg: &cfg,
pauseLoader: mockPauseLoader,
credentialsCache: awsv2.NewCredentialsCache(mockCredentialsProvider),
dockerClient: dockerClient,
mobyPlugins: mockMobyPlugins,
terminationHandler: func(
taskEngineState dockerstate.TaskEngineState,
dataClient data.Client,
taskEngine engine.TaskEngine,
cancel context.CancelFunc,
) {
},
serviceconnectManager: mockServiceConnectManager,
daemonManagers: mockDaemonManagers,
}

exitCode := agent.doStart(
eventstream.NewEventStream("events", ctx),
credentialsManager,
state,
imageManager,
client,
execCmdMgr,
)

assert.Equal(t, tc.exitCode, exitCode)
})
}
}

func TestMergeTags(t *testing.T) {
ec2Key := "ec2Key"
ec2Value := "ec2Value"
Expand Down
13 changes: 7 additions & 6 deletions agent/app/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@

package app

// transientError represents a transient error when executing the ECS Agent
type transientError struct {
type terminalError struct {
error
}

// isTransient returns true if the error is transient
func isTransient(err error) bool {
_, ok := err.(transientError)
return ok
// isTerminal returns true if the error is already wrapped as an unrecoverable condition
// which will allow agent to exit terminally.
func isTerminal(err error) bool {
// Check if the error is already wrapped as a terminalError
_, terminal := err.(terminalError)
return terminal
}

// clusterMismatchError represents a mismatch in cluster name between the
Expand Down

0 comments on commit 6fe572f

Please sign in to comment.