Skip to content

Commit

Permalink
add function to make k8stcpdump invoke containetcpdump to capture the…
Browse files Browse the repository at this point in the history
… traces
  • Loading branch information
Shuanglu committed Dec 14, 2020
1 parent ada445c commit 4fd7af4
Showing 1 changed file with 118 additions and 59 deletions.
177 changes: 118 additions & 59 deletions cmd/k8stcpdump/k8stcpdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ import (
var cfgFile string
var parFile string

type container struct {
Name string `json:"Name"`
}

type containers struct {
Containers []container `json:"Containers"`
Duration string `json:"Duration"`
}

type target struct {
Name string `json:"Name"`
Namespace string `json:"Namespace"`
Expand All @@ -59,10 +68,15 @@ type targetPods struct {
Pods []target `json:"Pods"`
}

func dropErr(e error) {
if e != nil {
panic(e)
}
/*
func getNode(client *kubernetes.Clientset) *nodeSet {
nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
return nodes
}*/

func createNodeSet(nodeSet map[string][]string, targetPod *target) error {
nodeSet[targetPod.Node] = append(nodeSet[targetPod.Node], "k8s_POD_"+targetPod.Name+"_"+targetPod.Namespace+"_"+targetPod.Uid)
return nil
}

func getPodStatus(client *kubernetes.Clientset, data *targets) *[]target {
Expand Down Expand Up @@ -126,64 +140,89 @@ func parse(p string) (*rest.Config, *kubernetes.Clientset, *targets) {
return restConfig, client, &data
}

func getPodDef(pod *target, duration string) *apicore.Pod {
func createManifests(node string, targetContainers []string, duration string) (*apicore.Pod, *apicore.ConfigMap) {
n := 2
temp := make([]byte, n)
rand.Read(temp)
suffix := hex.EncodeToString(temp)
var privileged bool
privileged = true
var command string
//var command string
var probeCommand []string
command = "rm -rf /tmp/" + pod.Name + "_" + pod.Namespace + ".cap; rm -rf /tmp/complete-" + pod.Name + "_" + pod.Namespace + "; nsenter -t $(docker inspect $(docker ps |grep '" + pod.Uid + "'|grep -v pause|awk '{print $1}')| grep '\"Pid\":' | grep -Eo '[0-9]*') -n timeout " + duration + " tcpdump -i any -w /tmp/" + pod.Name + "_" + pod.Namespace + ".cap; sleep 2;touch /tmp/complete-" + pod.Name + "_" + pod.Namespace + "; tail -f /dev/null"
//command = "rm -rf /tmp/" + pod.Name + "_" + pod.Namespace + ".cap; rm -rf /tmp/complete-" + pod.Name + "_" + pod.Namespace + "; nsenter -t $(docker inspect $(docker ps |grep '" + pod.Uid + "'|grep -v pause|awk '{print $1}')| grep '\"Pid\":' | grep -Eo '[0-9]*') -n timeout " + duration + " tcpdump -i any -w /tmp/" + pod.Name + "_" + pod.Namespace + ".cap; sleep 2;touch /tmp/complete-" + pod.Name + "_" + pod.Namespace + "; tail -f /dev/null"
//log.Info(command)
probeCommand = []string{"ls", "/tmp/complete-" + pod.Name + "_" + pod.Namespace}
probeCommand = []string{"ls", "/tmp/containerTcpdumpComplete"}
cmContainers := containers{}
cmContainer := container{}
for _, targetContainer := range targetContainers {
cmContainer.Name = targetContainer
cmContainers.Containers = append(cmContainers.Containers, cmContainer)
}
cmContainers.Duration = duration
cmContainersJson, _ := json.Marshal(cmContainers)
//log.Info(probeCommand)
return &apicore.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name + "-" + suffix,
Namespace: pod.Namespace,
Labels: map[string]string{
"tdn": pod.Name,
"tdns": pod.Namespace,
ObjectMeta: metav1.ObjectMeta{
Name: node + "-" + suffix,
Namespace: "default",
Labels: map[string]string{
"k8stcpdump": "true",
},
},
},
Spec: apicore.PodSpec{
Containers: []apicore.Container{
{
Name: pod.Name,
Image: "docker.io/library/alpine",
ImagePullPolicy: apicore.PullIfNotPresent,
Command: []string{
"nsenter",
"-t",
"1",
"-m",
"-u",
"-i",
"-n",
"-p",
"--",
"bash",
"-c",
command,
},
SecurityContext: &apicore.SecurityContext{
Privileged: &privileged,
Spec: apicore.PodSpec{
Containers: []apicore.Container{
{
Name: "k8stcpdump",
Image: "shawnlu/containertcpdump:20201130",
ImagePullPolicy: apicore.PullIfNotPresent,
SecurityContext: &apicore.SecurityContext{
Privileged: &privileged,
},
ReadinessProbe: &apicore.Probe{
Handler: apicore.Handler{
Exec: &apicore.ExecAction{
Command: probeCommand,
},
},
},
VolumeMounts: []apicore.VolumeMount{
{
Name: "containers",
MountPath: "/mnt/containerTcpdump/containers.json",
SubPath: "containers.json",
},
},
},
ReadinessProbe: &apicore.Probe{
Handler: apicore.Handler{
Exec: &apicore.ExecAction{
Command: probeCommand,
},
NodeName: node,
HostPID: true,
Volumes: []apicore.Volume{
{
Name: "containers",
VolumeSource: apicore.VolumeSource{
ConfigMap: &apicore.ConfigMapVolumeSource{
LocalObjectReference: apicore.LocalObjectReference{
Name: node + "-" + suffix,
},
},
},
},
},
},
NodeName: pod.Node,
HostPID: true,
},
}
&apicore.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: node + "-" + suffix,
Namespace: "default",
Labels: map[string]string{
"k8stcpdump": "true",
},
},
BinaryData: map[string][]byte{
"containers.json": cmContainersJson,
},
}

}

func watchPodStatus(client *kubernetes.Clientset, tcpdumpPod *apicore.Pod) wait.ConditionFunc {
Expand All @@ -206,19 +245,24 @@ func watchPodStatus(client *kubernetes.Clientset, tcpdumpPod *apicore.Pod) wait.
}
}

func createPod(client *kubernetes.Clientset, targetPod *target, duration string) (*apicore.Pod, error) {
podDefinition := getPodDef(targetPod, duration)
tcpdumpPod, err := client.CoreV1().Pods(podDefinition.ObjectMeta.Namespace).Create(context.TODO(), podDefinition, metav1.CreateOptions{})
func createPod(client *kubernetes.Clientset, node string, containers []string, duration string) (*apicore.Pod, error) {
podManifest, cmManifest := createManifests(node, containers, duration)
_, err := client.CoreV1().ConfigMaps(cmManifest.ObjectMeta.Namespace).Create(context.TODO(), cmManifest, metav1.CreateOptions{})
if err != nil {
log.Warn(fmt.Sprintf("Failed to create the configmap for node %q due to %q", node, err))
return nil, err
}
tcpdumpPod, err := client.CoreV1().Pods(podManifest.ObjectMeta.Namespace).Create(context.TODO(), podManifest, metav1.CreateOptions{})
if err == nil {
log.Info(fmt.Sprintf("Pod '%s' in the namespace '%s' has been created.", tcpdumpPod.Name, tcpdumpPod.Namespace))
} else {
log.Warn(fmt.Sprintf("Pod '%s' in the namespace '%s' failed to be created due to '%s'.", podDefinition.ObjectMeta.Name, podDefinition.ObjectMeta.Namespace, err.Error()))
log.Warn(fmt.Sprintf("Pod '%s' in the namespace '%s' failed to be created due to '%s'.", podManifest.ObjectMeta.Name, podManifest.ObjectMeta.Namespace, err.Error()))
}
return tcpdumpPod, err
}

func downloadFromPod(restConfig *rest.Config, client *kubernetes.Clientset, tcpdumpPod *apicore.Pod) error {
path := "/tmp/" + tcpdumpPod.Spec.Containers[0].Name + "_" + tcpdumpPod.ObjectMeta.Namespace + ".cap"
func downloadFromPod(restConfig *rest.Config, client *kubernetes.Clientset, tcpdumpPod *apicore.Pod, targetContainer string) error {
path := "/tmp/" + targetContainer + ".cap"
command := []string{"tar", "cf", "-", path}
req := client.CoreV1().RESTClient().Post().Namespace(tcpdumpPod.ObjectMeta.Namespace).Resource("pods").Name(tcpdumpPod.ObjectMeta.Name).SubResource("exec").VersionedParams(&apicore.PodExecOptions{
Container: tcpdumpPod.Spec.Containers[0].Name,
Expand Down Expand Up @@ -255,7 +299,8 @@ func downloadFromPod(restConfig *rest.Config, client *kubernetes.Clientset, tcpd
}
break
}
destFileName := "./" + tcpdumpPod.Spec.Containers[0].Name + "-" + tcpdumpPod.ObjectMeta.Namespace + ".cap"
destFileName := "./" + targetContainer + ".cap"
//log.Info(fmt.Sprintf("Create" + destFileName))
outFile, err := os.Create(destFileName)
if err != nil {
log.Warn(fmt.Sprintf("Error while creating the local dump file for pod '%s'", tcpdumpPod.ObjectMeta.Name))
Expand All @@ -278,6 +323,10 @@ func cleanUp(client *kubernetes.Clientset, tcpdumpPod *apicore.Pod) error {
//var GracePeriodSeconds int64
//GracePeriodSeconds = 0
err := client.CoreV1().Pods(tcpdumpPod.ObjectMeta.Namespace).Delete(context.TODO(), tcpdumpPod.ObjectMeta.Name, metav1.DeleteOptions{})
if err != nil {
return err
}
err = client.CoreV1().ConfigMaps(tcpdumpPod.ObjectMeta.Namespace).Delete(context.TODO(), tcpdumpPod.ObjectMeta.Name, metav1.DeleteOptions{})
return err
}

Expand All @@ -297,9 +346,9 @@ func cleanUp(client *kubernetes.Clientset, tcpdumpPod *apicore.Pod) error {
// return false
//}

func podOperation(workerGroup *sync.WaitGroup, restConfig *rest.Config, client *kubernetes.Clientset, targetPod *target, duration string, sleepTime time.Duration) error {
func podOperation(workerGroup *sync.WaitGroup, restConfig *rest.Config, client *kubernetes.Clientset, node string, containers []string, duration string, sleepTime time.Duration) error {
defer workerGroup.Done()
tcpdumpPod, err := createPod(client, targetPod, duration)
tcpdumpPod, err := createPod(client, node, containers, duration)
if err == nil {

err = wait.PollImmediate(time.Second*1, sleepTime, watchPodStatus(client, tcpdumpPod))
Expand All @@ -326,10 +375,15 @@ func podOperation(workerGroup *sync.WaitGroup, restConfig *rest.Config, client *
log.Warn(fmt.Sprintf("Timeout while waiting tcpdump for pod '%s' in the namespace '%s' to complete", tcpdumpPod.ObjectMeta.Name, tcpdumpPod.ObjectMeta.Namespace))
//log.Fatal(err)
} else {
err = downloadFromPod(restConfig, client, tcpdumpPod)
if err != nil {
log.Warn(fmt.Sprintf("Failed to download dump file from pod '%s' in the namespace '%s'", tcpdumpPod.ObjectMeta.Name, tcpdumpPod.ObjectMeta.Namespace))
for _, container := range containers {
err = downloadFromPod(restConfig, client, tcpdumpPod, container)
if err != nil {
log.Warn(fmt.Sprintf("Failed to download dump file for the container %q from pod '%s' in the namespace '%s'", container, tcpdumpPod.ObjectMeta.Name, tcpdumpPod.ObjectMeta.Namespace))
} else {
log.Info(fmt.Sprintf("Download the dump file for the container %q successfully", container))
}
}

}
err = cleanUp(client, tcpdumpPod)
if err != nil {
Expand Down Expand Up @@ -361,13 +415,18 @@ func Run(parFile string) {
podList := *targetPods
count := len(podList)

var workerGroup sync.WaitGroup
nodeSet := make(map[string][]string)
for i := 0; i < count; i++ {
err = createNodeSet(nodeSet, &podList[i])
}

var workerGroup sync.WaitGroup
for node, targetContainers := range nodeSet {
workerGroup.Add(1)
go podOperation(&workerGroup, restConfig, client, &podList[i], duration, sleepTime)
go podOperation(&workerGroup, restConfig, client, node, targetContainers, duration, sleepTime)
}
//fmt.Println("Wait for workers")
workerGroup.Wait()

//fmt.Println("All workers have completed")
log.Info("All operations have been completed. EXIT now.")
}

0 comments on commit 4fd7af4

Please sign in to comment.