Skip to content

Commit

Permalink
Terminally exit on unrecoverable exceptions for RCI.
Browse files Browse the repository at this point in the history
  • Loading branch information
BinBin He committed Dec 31, 2024
1 parent 41d593c commit 86512a8
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 80 deletions.
31 changes: 21 additions & 10 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,15 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
// Register the container instance
err = agent.registerContainerInstance(client, vpcSubnetAttributes)
if err != nil {
if isTransient(err) {
return exitcodes.ExitError
if isTerminal(err) {
// On unrecoverable error codes, agent should terminally exit.
logger.Critical("Agent will terminally exit, unable to register container instance:", logger.Fields{
field.Error: err,
})
return exitcodes.ExitTerminal
}
return exitcodes.ExitTerminal
// Other errors are considered recoverable and will be retried.
return exitcodes.ExitError
}

// Load Managed Daemon images asynchronously
Expand Down Expand Up @@ -834,13 +839,19 @@ func (agent *ecsAgent) registerContainerInstance(
field.Error: err,
})
if retriable, ok := err.(apierrors.Retriable); ok && !retriable.Retry() {
return err
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeInvalidParameterException) {
logger.Critical("Instance registration attempt with an invalid parameter", logger.Fields{
field.Error: err,
})
return err
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeClientException) {
logger.Critical("Instance registration attempt with client performing invalid action", logger.Fields{
field.Error: err,
})
return terminalError{err}
}
if _, ok := err.(apierrors.AttributeError); ok {
attributeErrorMsg := ""
Expand All @@ -850,9 +861,9 @@ func (agent *ecsAgent) registerContainerInstance(
logger.Critical("Instance registration attempt with invalid attribute(s)", logger.Fields{
field.Error: attributeErrorMsg,
})
return err
return terminalError{err}
}
return transientError{err}
return err
}
logger.Info("Instance registration completed successfully", logger.Fields{
"instanceArn": containerInstanceArn,
Expand Down Expand Up @@ -882,7 +893,7 @@ func (agent *ecsAgent) reregisterContainerInstance(client ecs.ECSClient, capabil
})
if apierrors.IsInstanceTypeChangedError(err) {
seelog.Criticalf(instanceTypeMismatchErrorFormat, err)
return err
return terminalError{err}
}
if _, ok := err.(apierrors.AttributeError); ok {
attributeErrorMsg := ""
Expand All @@ -892,9 +903,9 @@ func (agent *ecsAgent) reregisterContainerInstance(client ecs.ECSClient, capabil
logger.Critical("Instance re-registration attempt with invalid attribute(s)", logger.Fields{
field.Error: attributeErrorMsg,
})
return err
return terminalError{err}
}
return transientError{err}
return err
}

// startAsyncRoutines starts all background methods
Expand Down
149 changes: 85 additions & 64 deletions agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ func TestNewTaskEngineRestoreFromCheckpointNewStateManagerError(t *testing.T) {
_, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager, daemonManagers)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.false(t, isTerminal(err))
}

func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) {
Expand Down Expand Up @@ -844,7 +844,7 @@ func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) {
_, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager, daemonManagers)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.false(t, isTerminal(err))
}

func TestNewTaskEngineRestoreFromCheckpoint(t *testing.T) {
Expand Down Expand Up @@ -1086,7 +1086,7 @@ func TestReregisterContainerInstanceInstanceTypeChanged(t *testing.T) {

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestReregisterContainerInstanceAttributeError(t *testing.T) {
Expand Down Expand Up @@ -1145,7 +1145,7 @@ func TestReregisterContainerInstanceAttributeError(t *testing.T) {

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestReregisterContainerInstanceNonTerminalError(t *testing.T) {
Expand Down Expand Up @@ -1204,7 +1204,7 @@ func TestReregisterContainerInstanceNonTerminalError(t *testing.T) {

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.True(t, isTransient(err))
assert.False(t, isTerminal(err))
}

func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetHappyPath(t *testing.T) {
Expand Down Expand Up @@ -1320,7 +1320,7 @@ func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetCanRetryError(

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.True(t, isTransient(err))
assert.False(t, isTerminal(err))
}

func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetCannotRetryError(t *testing.T) {
Expand Down Expand Up @@ -1378,7 +1378,7 @@ func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetCannotRetryErr

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetAttributeError(t *testing.T) {
Expand Down Expand Up @@ -1435,70 +1435,91 @@ func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetAttributeError

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestRegisterContainerInstanceInvalidParameterTerminalError(t *testing.T) {
ctrl, credentialsManager, state, imageManager, client,
dockerClient, _, _, execCmdMgr, _ := setup(t)
defer ctrl.Finish()

mockCredentialsProvider := app_mocks.NewMockCredentialsProvider(ctrl)
mockMobyPlugins := mock_mobypkgwrapper.NewMockPlugins(ctrl)
mockEC2Metadata := mock_ec2.NewMockEC2MetadataClient(ctrl)
mockPauseLoader := mock_loader.NewMockLoader(ctrl)

mockPauseLoader.EXPECT().IsLoaded(gomock.Any()).Return(false, nil).AnyTimes()
mockPauseLoader.EXPECT().LoadImage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
mockEC2Metadata.EXPECT().PrimaryENIMAC().Return("mac", nil)
mockEC2Metadata.EXPECT().VPCID(gomock.Eq("mac")).Return("vpc-id", nil)
mockEC2Metadata.EXPECT().SubnetID(gomock.Eq("mac")).Return("subnet-id", nil)
mockServiceConnectManager := mock_serviceconnect.NewMockManager(ctrl)
mockServiceConnectManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil).AnyTimes()
mockServiceConnectManager.EXPECT().GetLoadedAppnetVersion().AnyTimes()
mockServiceConnectManager.EXPECT().GetCapabilitiesForAppnetInterfaceVersion("").AnyTimes()
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()

mockDaemonManager := mock_daemonmanager.NewMockDaemonManager(ctrl)
mockDaemonManagers := map[string]dm.DaemonManager{md.EbsCsiDriver: mockDaemonManager}
mockDaemonManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil).AnyTimes()
mockDaemonManager.EXPECT().LoadImage(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
func TestRegisterContainerInstanceExceptionsErrorHandling(t *testing.T) {
testCases := []struct {
name string
regError error
expectedExit int
}{
{
name: "ThrottlingException",
regError: awserr.New("ThrottlingException", "", nil),
expectedExit: exitcodes.ExitError,
},
{
name: "ClientException",
regError: awserr.New("ClientException", "", nil),
expectedExit: exitcodes.ExitTerminal,
},
{
name: "InvalidParameterException",
regError: awserr.New("InvalidParameterException", "", nil),
expectedExit: exitcodes.ExitTerminal,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl, credentialsManager, state, imageManager, client,
dockerClient, _, _, execCmdMgr, _ := setup(t)
defer ctrl.Finish()

dockerClient.EXPECT().SupportedVersions().Return(apiVersions).AnyTimes()
mockCredentialsProvider := app_mocks.NewMockCredentialsProvider(ctrl)
mockMobyPlugins := mock_mobypkgwrapper.NewMockPlugins(ctrl)
mockEC2Metadata := mock_ec2.NewMockEC2MetadataClient(ctrl)
mockPauseLoader := mock_loader.NewMockLoader(ctrl)

mockPauseLoader.EXPECT().IsLoaded(gomock.Any()).Return(false, nil).AnyTimes()
mockPauseLoader.EXPECT().LoadImage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
mockServiceConnectManager := mock_serviceconnect.NewMockManager(ctrl)
mockServiceConnectManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil).AnyTimes()
mockServiceConnectManager.EXPECT().GetLoadedAppnetVersion().AnyTimes()
mockServiceConnectManager.EXPECT().GetCapabilitiesForAppnetInterfaceVersion("").AnyTimes()
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()

mockDaemonManager := mock_daemonmanager.NewMockDaemonManager(ctrl)
mockDaemonManagers := map[string]dm.DaemonManager{md.EbsCsiDriver: mockDaemonManager}
mockDaemonManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil).AnyTimes()
mockDaemonManager.EXPECT().LoadImage(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

dockerClient.EXPECT().SupportedVersions().Return(apiVersions).AnyTimes()

gomock.InOrder(
mockMobyPlugins.EXPECT().Scan().AnyTimes().Return([]string{}, nil),
dockerClient.EXPECT().ListPluginsWithFilters(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).AnyTimes().Return([]string{}, nil),
client.EXPECT().RegisterContainerInstance(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return("", "", tc.regError),
)
mockEC2Metadata.EXPECT().OutpostARN().Return("", nil)

gomock.InOrder(
client.EXPECT().GetHostResources().Return(testHostResource, nil),
mockCredentialsProvider.EXPECT().Retrieve(gomock.Any()).Return(awsv2.Credentials{}, nil),
mockMobyPlugins.EXPECT().Scan().AnyTimes().Return([]string{}, nil),
dockerClient.EXPECT().ListPluginsWithFilters(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).AnyTimes().Return([]string{}, nil),
client.EXPECT().RegisterContainerInstance(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return("", "", awserr.New("InvalidParameterException", "", nil)),
)
mockEC2Metadata.EXPECT().OutpostARN().Return("", nil)
cfg := getTestConfig()
ctx, cancel := context.WithCancel(context.TODO())
// Cancel the context to cancel async routines
defer cancel()
agent := &ecsAgent{
ctx: ctx,
ec2MetadataClient: mockEC2Metadata,
cfg: &cfg,
pauseLoader: mockPauseLoader,
credentialsCache: awsv2.NewCredentialsCache(mockCredentialsProvider),
dockerClient: dockerClient,
mobyPlugins: mockMobyPlugins,
terminationHandler: func(taskEngineState dockerstate.TaskEngineState, dataClient data.Client, taskEngine engine.TaskEngine, cancel context.CancelFunc) {
},
serviceconnectManager: mockServiceConnectManager,
daemonManagers: mockDaemonManagers,
}

cfg := getTestConfig()
ctx, cancel := context.WithCancel(context.TODO())
// Cancel the context to cancel async routines
defer cancel()
agent := &ecsAgent{
ctx: ctx,
ec2MetadataClient: mockEC2Metadata,
cfg: &cfg,
pauseLoader: mockPauseLoader,
credentialsCache: awsv2.NewCredentialsCache(mockCredentialsProvider),
dockerClient: dockerClient,
mobyPlugins: mockMobyPlugins,
terminationHandler: func(taskEngineState dockerstate.TaskEngineState, dataClient data.Client, taskEngine engine.TaskEngine, cancel context.CancelFunc) {
},
serviceconnectManager: mockServiceConnectManager,
daemonManagers: mockDaemonManagers,
exitCode := agent.doStart(eventstream.NewEventStream("events", ctx),
credentialsManager, state, imageManager, client, execCmdMgr)
assert.Equal(t, tc.expectedExit, exitCode)
})
}

exitCode := agent.doStart(eventstream.NewEventStream("events", ctx),
credentialsManager, state, imageManager, client, execCmdMgr)
assert.Equal(t, exitcodes.ExitTerminal, exitCode)
}

func TestMergeTags(t *testing.T) {
ec2Key := "ec2Key"
ec2Value := "ec2Value"
Expand Down
13 changes: 7 additions & 6 deletions agent/app/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@

package app

// transientError represents a transient error when executing the ECS Agent
type transientError struct {
type terminalError struct {
error
}

// isTransient returns true if the error is transient
func isTransient(err error) bool {
_, ok := err.(transientError)
return ok
// isTerminal returns true if the error is already wrapped as an unrecoverable condition
// which will allow agent to exit terminally.
func isTerminal(err error) bool {
// Check if the error is already wrapped as a terminalError
_, terminal := err.(terminalError)
return terminal
}

// clusterMismatchError represents a mismatch in cluster name between the
Expand Down

0 comments on commit 86512a8

Please sign in to comment.