Skip to content

Commit

Permalink
Terminally exit on unrecoverable exceptions for RCI.
Browse files Browse the repository at this point in the history
  • Loading branch information
BinBin He committed Jan 1, 2025
1 parent 41d593c commit 80cba96
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 24 deletions.
43 changes: 33 additions & 10 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,15 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
// Register the container instance
err = agent.registerContainerInstance(client, vpcSubnetAttributes)
if err != nil {
if isTransient(err) {
return exitcodes.ExitError
if isTerminal(err) {
// On unrecoverable error codes, agent should terminally exit.
logger.Critical("Agent will terminally exit, unable to register container instance:", logger.Fields{
field.Error: err,
})
return exitcodes.ExitTerminal
}
return exitcodes.ExitTerminal
// Other errors are considered recoverable and will be retried.
return exitcodes.ExitError
}

// Load Managed Daemon images asynchronously
Expand Down Expand Up @@ -834,13 +839,19 @@ func (agent *ecsAgent) registerContainerInstance(
field.Error: err,
})
if retriable, ok := err.(apierrors.Retriable); ok && !retriable.Retry() {
return err
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeInvalidParameterException) {
logger.Critical("Instance registration attempt with an invalid parameter", logger.Fields{
field.Error: err,
})
return err
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeClientException) {
logger.Critical("Instance registration attempt with client performing invalid action", logger.Fields{
field.Error: err,
})
return terminalError{err}
}
if _, ok := err.(apierrors.AttributeError); ok {
attributeErrorMsg := ""
Expand All @@ -850,9 +861,9 @@ func (agent *ecsAgent) registerContainerInstance(
logger.Critical("Instance registration attempt with invalid attribute(s)", logger.Fields{
field.Error: attributeErrorMsg,
})
return err
return terminalError{err}
}
return transientError{err}
return err
}
logger.Info("Instance registration completed successfully", logger.Fields{
"instanceArn": containerInstanceArn,
Expand Down Expand Up @@ -882,7 +893,19 @@ func (agent *ecsAgent) reregisterContainerInstance(client ecs.ECSClient, capabil
})
if apierrors.IsInstanceTypeChangedError(err) {
seelog.Criticalf(instanceTypeMismatchErrorFormat, err)
return err
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeInvalidParameterException) {
logger.Critical("Instance re-registration attempt with an invalid parameter", logger.Fields{
field.Error: err,
})
return terminalError{err}
}
if utils.IsAWSErrorCodeEqual(err, ecsmodel.ErrCodeClientException) {
logger.Critical("Instance re-registration attempt with client performing invalid action", logger.Fields{
field.Error: err,
})
return terminalError{err}
}
if _, ok := err.(apierrors.AttributeError); ok {
attributeErrorMsg := ""
Expand All @@ -892,9 +915,9 @@ func (agent *ecsAgent) reregisterContainerInstance(client ecs.ECSClient, capabil
logger.Critical("Instance re-registration attempt with invalid attribute(s)", logger.Fields{
field.Error: attributeErrorMsg,
})
return err
return terminalError{err}
}
return transientError{err}
return err
}

// startAsyncRoutines starts all background methods
Expand Down
249 changes: 241 additions & 8 deletions agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ func TestNewTaskEngineRestoreFromCheckpointNewStateManagerError(t *testing.T) {
_, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager, daemonManagers)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.False(t, isTerminal(err))
}

func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) {
Expand Down Expand Up @@ -844,7 +844,7 @@ func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) {
_, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager, daemonManagers)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.False(t, isTerminal(err))
}

func TestNewTaskEngineRestoreFromCheckpoint(t *testing.T) {
Expand Down Expand Up @@ -1086,7 +1086,7 @@ func TestReregisterContainerInstanceInstanceTypeChanged(t *testing.T) {

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestReregisterContainerInstanceAttributeError(t *testing.T) {
Expand Down Expand Up @@ -1145,7 +1145,7 @@ func TestReregisterContainerInstanceAttributeError(t *testing.T) {

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestReregisterContainerInstanceNonTerminalError(t *testing.T) {
Expand Down Expand Up @@ -1204,7 +1204,7 @@ func TestReregisterContainerInstanceNonTerminalError(t *testing.T) {

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.True(t, isTransient(err))
assert.False(t, isTerminal(err))
}

func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetHappyPath(t *testing.T) {
Expand Down Expand Up @@ -1320,7 +1320,7 @@ func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetCanRetryError(

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.True(t, isTransient(err))
assert.False(t, isTerminal(err))
}

func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetCannotRetryError(t *testing.T) {
Expand Down Expand Up @@ -1378,7 +1378,7 @@ func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetCannotRetryErr

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetAttributeError(t *testing.T) {
Expand Down Expand Up @@ -1435,7 +1435,7 @@ func TestRegisterContainerInstanceWhenContainerInstanceARNIsNotSetAttributeError

err := agent.registerContainerInstance(client, nil)
assert.Error(t, err)
assert.False(t, isTransient(err))
assert.True(t, isTerminal(err))
}

func TestRegisterContainerInstanceInvalidParameterTerminalError(t *testing.T) {
Expand Down Expand Up @@ -1499,6 +1499,239 @@ func TestRegisterContainerInstanceInvalidParameterTerminalError(t *testing.T) {
credentialsManager, state, imageManager, client, execCmdMgr)
assert.Equal(t, exitcodes.ExitTerminal, exitCode)
}

func TestRegisterContainerInstanceExceptionErrors(t *testing.T) {
testCases := []struct {
name string
regError error
exitCode int
}{
{
name: "InvalidParameterException",
regError: awserr.New("InvalidParameterException", "", nil),
exitCode: exitcodes.ExitTerminal,
},
{
name: "ClientException",
regError: awserr.New("ClientException", "", nil),
exitCode: exitcodes.ExitTerminal,
},
{
name: "ThrottlingException",
regError: awserr.New("ThrottlingException", "", nil),
exitCode: exitcodes.ExitError,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl, credentialsManager, state, imageManager, client,
dockerClient, _, _, execCmdMgr, _ := setup(t)
defer ctrl.Finish()

mockCredentialsProvider := app_mocks.NewMockCredentialsProvider(ctrl)
mockMobyPlugins := mock_mobypkgwrapper.NewMockPlugins(ctrl)
mockEC2Metadata := mock_ec2.NewMockEC2MetadataClient(ctrl)
mockPauseLoader := mock_loader.NewMockLoader(ctrl)
mockServiceConnectManager := mock_serviceconnect.NewMockManager(ctrl)
mockDaemonManager := mock_daemonmanager.NewMockDaemonManager(ctrl)
mockDaemonManagers := map[string]dm.DaemonManager{md.EbsCsiDriver: mockDaemonManager}

mockPauseLoader.EXPECT().IsLoaded(gomock.Any()).Return(false, nil).AnyTimes()
mockPauseLoader.EXPECT().LoadImage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

mockEC2Metadata.EXPECT().PrimaryENIMAC().Return("mac", nil)
mockEC2Metadata.EXPECT().VPCID("mac").Return("vpc-id", nil)
mockEC2Metadata.EXPECT().SubnetID("mac").Return("subnet-id", nil)
mockEC2Metadata.EXPECT().OutpostARN().Return("", nil)

mockServiceConnectManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil).AnyTimes()
mockServiceConnectManager.EXPECT().GetLoadedAppnetVersion().AnyTimes()
mockServiceConnectManager.EXPECT().GetCapabilitiesForAppnetInterfaceVersion("").AnyTimes()
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()

mockDaemonManager.EXPECT().IsLoaded(gomock.Any()).Return(true, nil).AnyTimes()
mockDaemonManager.EXPECT().LoadImage(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

dockerClient.EXPECT().SupportedVersions().Return(apiVersions).AnyTimes()

gomock.InOrder(
client.EXPECT().GetHostResources().Return(testHostResource, nil),
mockCredentialsProvider.EXPECT().Retrieve(gomock.Any()).Return(awsv2.Credentials{}, nil),
mockMobyPlugins.EXPECT().Scan().AnyTimes().Return([]string{}, nil),
dockerClient.EXPECT().ListPluginsWithFilters(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).AnyTimes().Return([]string{}, nil),

client.EXPECT().
RegisterContainerInstance(
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(),
).
Return("", "", tc.regError),
)

cfg := getTestConfig()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

agent := &ecsAgent{
ctx: ctx,
ec2MetadataClient: mockEC2Metadata,
cfg: &cfg,
pauseLoader: mockPauseLoader,
credentialsCache: awsv2.NewCredentialsCache(mockCredentialsProvider),
dockerClient: dockerClient,
mobyPlugins: mockMobyPlugins,
terminationHandler: func(
taskEngineState dockerstate.TaskEngineState,
dataClient data.Client,
taskEngine engine.TaskEngine,
cancel context.CancelFunc,
) {
},
serviceconnectManager: mockServiceConnectManager,
daemonManagers: mockDaemonManagers,
}

exitCode := agent.doStart(
eventstream.NewEventStream("events", ctx),
credentialsManager,
state,
imageManager,
client,
execCmdMgr,
)

assert.Equal(t, tc.exitCode, exitCode)
})
}
}

func (agent *ecsAgent) reregisterContainerInstance(client ecs.ECSClient, capabilities []*ecsmodel.Attribute,
tags []*ecsmodel.Tag, registrationToken string, platformDevices []*ecsmodel.PlatformDevice, outpostARN string) error {
_, availabilityZone, err := client.RegisterContainerInstance(agent.containerInstanceARN, capabilities, tags,
registrationToken, platformDevices, outpostARN)
//set az to agent
agent.availabilityZone = availabilityZone
if err == nil {
return nil
}
logger.Error("Error re-registering container instance", logger.Fields{
field.Error: err,
})
if apierrors.IsInstanceTypeChangedError(err) {
seelog.Criticalf(instanceTypeMismatchErrorFormat, err)
return terminalError{err}
}
if _, ok := err.(apierrors.AttributeError); ok {
attributeErrorMsg := ""
if len(agent.cfg.InstanceAttributes) > 0 {
attributeErrorMsg = customAttributeErrorMessage
}
logger.Critical("Instance re-registration attempt with invalid attribute(s)", logger.Fields{
field.Error: attributeErrorMsg,
})
return terminalError{err}
}
return err
}

unc TestReregisterContainerInstanceErrorHandling(t *testing.T) {
// Table of test scenarios, including your new edge cases
testCases := []struct {
name string
regError error
expectTerminal bool
}{
{
name: "NoError",
regError: nil,
expectTerminal: false,
},
{
name: "InstanceTypeChangedError",
regError: apierrors.NewInstanceTypeChangedError("instance-type-changed"),
expectTerminal: true,
},
{
name: "InvalidParameterException",
regError: awserr.New(ecsmodel.ErrCodeInvalidParameterException, "invalid param", nil),
expectTerminal: true,
},
{
name: "ClientException",
regError: awserr.New(ecsmodel.ErrCodeClientException, "client problem", nil),
expectTerminal: true,
},
{
name: "AttributeError",
regError: apierrors.NewAttributeError("invalid attribute"),
expectTerminal: true,
},
{
name: "OtherError",
regError: errors.New("some-other-error"),
expectTerminal: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockECSClient := mock_ecs.NewMockECSClient(ctrl)

var capabilities []*ecsmodel.Attribute
var tags []*ecsmodel.Tag
var registrationToken string
var platformDevices []*ecsmodel.PlatformDevice
var outpostARN string

agent := &ecsAgent{
// If your agent expects certain fields set, do so here:
containerInstanceARN: "container-instance-arn",
cfg: &config.Config{},
}

mockECSClient.EXPECT().
RegisterContainerInstance(
agent.containerInstanceARN,
capabilities,
tags,
registrationToken,
platformDevices,
outpostARN,
).
Return("", "", tc.regError)

err := agent.reregisterContainerInstance(
mockECSClient,
capabilities,
tags,
registrationToken,
platformDevices,
outpostARN,
)

if tc.regError == nil {
require.NoError(t, err, "Expected no error but got one")
return
}

if tc.expectTerminal {
require.Error(t, err, "Expected a terminalError but got nil")
_, isTerminal := err.(terminalError)
require.True(t, isTerminal, "Expected terminalError but got %T", err)
} else {
require.Error(t, err, "Expected an error but got nil")
_, isTerminal := err.(terminalError)
require.False(t, isTerminal, "Expected non-terminal error but got terminalError")
}
})
}
}

func TestMergeTags(t *testing.T) {
ec2Key := "ec2Key"
ec2Value := "ec2Value"
Expand Down
Loading

0 comments on commit 80cba96

Please sign in to comment.