Skip to content

Commit

Permalink
Add start network packet loss implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Tianze Shan authored and mye956 committed Oct 3, 2024
1 parent 03d8c56 commit f5a0eb7
Show file tree
Hide file tree
Showing 3 changed files with 482 additions and 70 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

171 changes: 163 additions & 8 deletions ecs-agent/tmds/handlers/fault/v1/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ const (
)

var (
iptablesChainExistCmd = "iptables -C %s -p %s --dport %s -j DROP"
nsenterCommandString = "nsenter --net=%s"
iptablesChainExistCmd = "iptables -C %s -p %s --dport %s -j DROP"
nsenterCommandString = "nsenter --net=%s "
tcCheckInjectionCommandString = "tc -j q show dev %s parent 1:1"
tcAddQdiscRootCommandString = "tc qdisc add dev %s root handle 1: prio priomap 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2"
tcAddQdiscLossCommandString = "tc qdisc add dev %s parent 1:1 handle 10: netem loss %d%%"
tcAddFilterForIPCommandString = "tc filter add dev %s protocol ip parent 1:0 prio 1 u32 match ip dst %s flowid 1:1"
)

type FaultHandler struct {
Expand Down Expand Up @@ -475,18 +479,47 @@ func (h *FaultHandler) StartNetworkPacketLoss() func(http.ResponseWriter, *http.
rwMu.Lock()
defer rwMu.Unlock()

// TODO: Check status of current fault injection
// TODO: Invoke the start fault injection functionality if not running

responseBody := types.NewNetworkFaultInjectionSuccessResponse("running")
logger.Info("Successfully started fault", logger.Fields{
var responseBody types.NetworkFaultInjectionResponse
var httpStatusCode int
stringToBeLogged := "Failed to start fault"
// All command executions for the start network packet loss workflow all together should finish within 5 seconds.
// Thus, create the context here so that it can be shared by all os/exec calls.
ctx, cancel := context.WithTimeout(context.Background(), requestTimeoutDuration)
defer cancel()
// Check the status of current fault injection.
latencyFaultExists, packetLossFaultExists, err := h.checkTCFault(ctx, taskMetadata)
if err != nil {
responseBody = types.NewNetworkFaultInjectionErrorResponse(err.Error())
httpStatusCode = http.StatusInternalServerError
} else {
// If there already exists a fault in the task network namespace.
if latencyFaultExists {
responseBody = types.NewNetworkFaultInjectionErrorResponse("There is already one network latency fault running")
httpStatusCode = http.StatusConflict
} else if packetLossFaultExists {
responseBody = types.NewNetworkFaultInjectionErrorResponse("There is already one network packet loss fault running")
httpStatusCode = http.StatusConflict
} else {
// Invoke the start fault injection functionality if not running.
err := h.startNetworkPacketLossFault(ctx, taskMetadata, request)
if err != nil {
responseBody = types.NewNetworkFaultInjectionErrorResponse("Failed to inject network-packet-loss fault")
httpStatusCode = http.StatusInternalServerError
} else {
stringToBeLogged = "Successfully started fault"
responseBody = types.NewNetworkFaultInjectionSuccessResponse("running")
httpStatusCode = http.StatusOK
}
}
}
logger.Info(stringToBeLogged, logger.Fields{
field.RequestType: requestType,
field.Request: request.ToString(),
field.Response: responseBody.ToString(),
})
utils.WriteJSONResponse(
w,
http.StatusOK,
httpStatusCode,
responseBody,
requestType,
)
Expand Down Expand Up @@ -794,6 +827,128 @@ func validateTaskNetworkConfig(taskNetworkConfig *state.TaskNetworkConfig) error
return nil
}

// startNetworkPacketLossFault invokes the linux TC utility tool to start the network-packet-loss fault.
func (h *FaultHandler) startNetworkPacketLossFault(ctx context.Context, taskMetadata *state.TaskResponse, request types.NetworkPacketLossRequest) error {
interfaceName := taskMetadata.TaskNetworkConfig.NetworkNamespaces[0].NetworkInterfaces[0].DeviceName
networkMode := taskMetadata.TaskNetworkConfig.NetworkMode
// If task's network mode is awsvpc, we need to run nsenter to access the task's network namespace.
nsenterPrefix := ""
if networkMode == ecs.NetworkModeAwsvpc {
nsenterPrefix = fmt.Sprintf(nsenterCommandString, taskMetadata.TaskNetworkConfig.NetworkNamespaces[0].Path)
}
lossPercent := aws.Uint64Value(request.LossPercent)

// Command to be executed:
// <nsenterPrefix> tc qdisc add dev <interfaceName> root handle 1: prio priomap 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
// <nsenterPrefix> "tc qdisc add dev <interfaceName> parent 1:1 handle 10: netem loss <lossPercentage>%"
tcAddQdiscRootCommandComposed := nsenterPrefix + fmt.Sprintf(tcAddQdiscRootCommandString, interfaceName)
cmdList := strings.Split(tcAddQdiscRootCommandComposed, " ")
cmdOutput, err := h.runExecCommand(ctx, cmdList)
if err != nil {
logger.Error(fmt.Sprintf("'%s' command failed with the following error: '%s'. std output: '%s'",
tcAddQdiscRootCommandComposed, err, string(cmdOutput[:])))
return err
}
logger.Info(fmt.Sprintf("'%s' command result: '%s'", tcAddQdiscRootCommandComposed, string(cmdOutput[:])))
tcAddQdiscLossCommandComposed := nsenterPrefix + fmt.Sprintf(tcAddQdiscLossCommandString, interfaceName, lossPercent)
cmdList = strings.Split(tcAddQdiscLossCommandComposed, " ")
cmdOutput, err = h.runExecCommand(ctx, cmdList)
if err != nil {
logger.Error(fmt.Sprintf("'%s' command failed with the following error: '%s'. std output: '%s'",
tcAddQdiscLossCommandComposed, err, string(cmdOutput[:])))
return err
}
logger.Info(fmt.Sprintf("'%s' command result: '%s'", tcAddQdiscLossCommandComposed, string(cmdOutput[:])))
// After creating the queueing discipline, create filters to associate the IPs in the request with the handle.
for _, ip := range request.Sources {
tcAddFilterForIPCommandComposed := nsenterPrefix + fmt.Sprintf(tcAddFilterForIPCommandString, interfaceName, *ip)
cmdList = strings.Split(tcAddFilterForIPCommandComposed, " ")
_, err = h.runExecCommand(ctx, cmdList)
if err != nil {
logger.Error(fmt.Sprintf("'%s' command failed with the following error: '%s'. std output: '%s'",
tcAddFilterForIPCommandComposed, err, string(cmdOutput[:])))
return err
}
}

return nil
}

// checkTCFault check if there's existing network-latency fault or network-packet-loss fault.
func (h *FaultHandler) checkTCFault(ctx context.Context, taskMetadata *state.TaskResponse) (bool, bool, error) {
interfaceName := taskMetadata.TaskNetworkConfig.NetworkNamespaces[0].NetworkInterfaces[0].DeviceName
networkMode := taskMetadata.TaskNetworkConfig.NetworkMode
// If task's network mode is awsvpc, we need to run nsenter to access the task's network namespace.
nsenterPrefix := ""
if networkMode == ecs.NetworkModeAwsvpc {
nsenterPrefix = fmt.Sprintf(nsenterCommandString, taskMetadata.TaskNetworkConfig.NetworkNamespaces[0].Path)
}

// We will run the following Linux command to assess if there existing fault.
// "tc -j q show dev {INTERFACE} parent 1:1"
// The command above gives the output of "tc q show dev {INTERFACE} parent 1:1" in json format.
// We will then unmarshall the json string and evaluate the fields of it.
tcCheckInjectionCommandComposed := nsenterPrefix + fmt.Sprintf(tcCheckInjectionCommandString, interfaceName)
cmdList := strings.Split(tcCheckInjectionCommandComposed, " ")
cmdOutput, err := h.runExecCommand(ctx, cmdList)
if err != nil {
logger.Error(fmt.Sprintf("'%s' command failed with the following error: '%s'. std output: '%s'",
tcCheckInjectionCommandComposed, err, string(cmdOutput[:])))
return false, false, fmt.Errorf("failed to check existing network fault: '%s' command failed with the following error: '%s'. std output: '%s'",
tcCheckInjectionCommandComposed, err, string(cmdOutput[:]))
}
// Log the command output to better help us debug.
logger.Info(fmt.Sprintf("'%s' command result: '%s'", tcCheckInjectionCommandComposed, string(cmdOutput[:])))

// Check whether latency fault exists and whether packet loss fault exists separately.
var outputUnmarshalled []map[string]interface{}
err = json.Unmarshal(cmdOutput, &outputUnmarshalled)
if err != nil {
return false, false, errors.New("failed to check existing network fault: failed to unmarshal tc command output: " + err.Error())
}
latencyFaultExists, err := checkLatencyFault(outputUnmarshalled)
if err != nil {
return false, false, errors.New("failed to check existing network fault: " + err.Error())
}
packetLossFaultExists, err := checkPacketLossFault(outputUnmarshalled)
if err != nil {
return false, false, errors.New("failed to check existing network fault: " + err.Error())
}
return latencyFaultExists, packetLossFaultExists, nil
}

// checkLatencyFault parses the tc command output and checks if there's existing network-latency fault running.
func checkLatencyFault(outputUnmarshalled []map[string]interface{}) (bool, error) {
for _, line := range outputUnmarshalled {
// Check if field "kind":"netem" exists.
if line["kind"] == "netem" {
// Now check if network packet loss fault exists.
if options := line["options"]; options != nil {
if delay := options.(map[string]interface{})["delay"]; delay != nil {
return true, nil
}
}
}
}
return false, nil
}

// checkPacketLossFault parses the tc command output and checks if there's existing network-packet-loss fault running.
func checkPacketLossFault(outputUnmarshalled []map[string]interface{}) (bool, error) {
for _, line := range outputUnmarshalled {
// First check field "kind":"netem" exists.
if line["kind"] == "netem" {
// Now check if field "loss":"<loss percent>" exists, and if the percentage matches with the value in the request.
if options := line["options"]; options != nil {
if lossRandom := options.(map[string]interface{})["loss-random"]; lossRandom != nil {
return true, nil
}
}
}
}
return false, nil
}

// runExecCommand wraps around the execwrapper, providing a convenient way of running any Linux command
// and getting the result in both stdout and stderr.
func (h *FaultHandler) runExecCommand(ctx context.Context, cmdList []string) ([]byte, error) {
Expand Down
Loading

0 comments on commit f5a0eb7

Please sign in to comment.