Skip to content

Commit

Permalink
Run smoke tests concurrently for faster jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
rquitales committed Jan 30, 2024
1 parent 306794b commit a7375ee
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 105 deletions.
3 changes: 1 addition & 2 deletions examples/examples_nodejs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package example
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
Expand Down Expand Up @@ -558,7 +557,7 @@ func TestAccMigrateNodeGroups(t *testing.T) {

// Write kubeconfig to temp file & export it for use
// with kubectl.
kubeconfigFile, err := ioutil.TempFile(os.TempDir(), "kubeconfig-*.json")
kubeconfigFile, err := os.CreateTemp(os.TempDir(), "kubeconfig-*.json")
assert.NoError(t, err, "expected tempfile to be created: %v", err)
defer os.Remove(kubeconfigFile.Name())
_, err = kubeconfigFile.Write(kubeconfig)
Expand Down
209 changes: 106 additions & 103 deletions examples/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,33 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"testing"
"time"

"sync/atomic"

"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

// Each resource request will be fetched from the Kubernetes API Server at a
// max of 40 retries, with 15 seconds in between each attempt.
// This creates a max wait time of up to 10 minutes that a resource request
// max of 20 retries, with 15 seconds in between each attempt.
// This creates a max wait time of up to 5 minutes that a resource request
// must successfully return within, before moving on.

// MaxRetries is the maximum number of retries that a resource will be
// attempted to be fetched from the Kubernetes API Server.
const MaxRetries = 40
const MaxRetries = 20

// RetryInterval is the number of seconds to sleep in between requests
// to the Kubernetes API Server.
Expand All @@ -49,10 +54,18 @@ func RunEKSSmokeTest(t *testing.T, resources []apitype.ResourceV3, kubeconfigs .

// Run the smoke test against each cluster, expecting the total desired
// Node count.
var wg sync.WaitGroup
wg.Add(len(kubeAccess))
defer wg.Wait()

for clusterName := range kubeAccess {
clusterName := clusterName
PrintAndLog(fmt.Sprintf("Testing Cluster: %s\n", clusterName), t)
clientset := kubeAccess[clusterName].Clientset
eksSmokeTest(t, clientset, clusterNodeCount[clusterName])

go wgWrapper(&wg, func() {
clientset := kubeAccess[clusterName].Clientset
eksSmokeTest(t, clientset, clusterNodeCount[clusterName])
})
}
}

Expand All @@ -62,9 +75,19 @@ func eksSmokeTest(t *testing.T, clientset *kubernetes.Clientset, desiredNodeCoun
APIServerVersionInfo(t, clientset)

// Run all tests.
assertEKSConfigMapReady(t, clientset)
AssertAllNodesReady(t, clientset, desiredNodeCount)
AssertKindInAllNamespacesReady(t, clientset, "pods")
var wg sync.WaitGroup
wg.Add(3)
defer wg.Wait()

go wgWrapper(&wg, func() { assertEKSConfigMapReady(t, clientset) })
go wgWrapper(&wg, func() { AssertAllNodesReady(t, clientset, desiredNodeCount) })
go wgWrapper(&wg, func() { AssertKindInAllNamespacesReady(t, clientset, "pods") })
}

// wgWrapper is a helper func that wraps a WaitGroup's Done() method.
func wgWrapper(wg *sync.WaitGroup, f func()) {
defer wg.Done()
f()
}

// APIServerVersionInfo prints out the API Server versions.
Expand Down Expand Up @@ -164,36 +187,36 @@ func AssertAllNodesReady(t *testing.T, clientset *kubernetes.Clientset, desiredN

// AssertKindInAllNamespacesReady ensures all Deployments have valid & ready status
// conditions.
func AssertKindInAllNamespacesReady(t *testing.T, clientset *kubernetes.Clientset, name string) {
var list interface{}
func AssertKindInAllNamespacesReady(t *testing.T, clientset *kubernetes.Clientset, objType string) {
var list runtime.Object
var err error

// Assume first non-error return of list is correct & complete,
// as we currently do not have a way of knowing ahead of time how many
// total to expect in each cluster before it is up and running.
RetryLoop:
for i := 0; i < MaxRetries; i++ {
switch n := strings.ToLower(name); {
switch n := strings.ToLower(objType); {
case n == "deployments" || n == "deployment" || n == "deploy":
list, err = clientset.AppsV1().Deployments("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
waitFor(t, "Deployments list", fmt.Sprintf("returned: %s", err))
} else {
break
if err == nil {
break RetryLoop
}
waitFor(t, "Deployments list", fmt.Sprintf("returned: %s", err))
case n == "replicasets" || n == "replicaset" || n == "rs":
list, err = clientset.AppsV1().ReplicaSets("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
waitFor(t, "ReplicaSets list", fmt.Sprintf("returned: %s", err))
} else {
break
if err == nil {
break RetryLoop
}
waitFor(t, "ReplicaSets list", fmt.Sprintf("returned: %s", err))
case n == "pods" || n == "pod" || n == "po":
list, err = clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
waitFor(t, "Pods list", fmt.Sprintf("returned: %s", err))
} else {
break
if err == nil {
break RetryLoop
}
waitFor(t, "Pods list", fmt.Sprintf("returned: %s", err))
default:
t.Fatalf("Unknown kind type: %s", objType)
}
}

Expand All @@ -204,88 +227,68 @@ func AssertKindInAllNamespacesReady(t *testing.T, clientset *kubernetes.Clientse

// AssertKindListIsReady verifies that each item in a given resource list is
// marked as ready.
func AssertKindListIsReady(t *testing.T, clientset *kubernetes.Clientset, list interface{}) {
var readyCount int
var listLength int

// Attempt to validate each list item has a "Ready" status.
switch list.(type) {
case *appsv1.DeploymentList:
items := list.(*appsv1.DeploymentList).Items
listLength = len(items)
for _, item := range items {
ready := false
// Attempt to check if ready, and output the resulting status.
for i := 0; i < MaxRetries; i++ {
if IsDeploymentReady(t, clientset, &item) {
ready = true
readyCount++
break
} else {
waitFor(t, fmt.Sprintf("Deployment %q", item.Name), "ready")
}
}
PrintAndLog(fmt.Sprintf("Deployment: %s | Ready Status: %t\n", item.Name, ready), t)
func AssertKindListIsReady(t *testing.T, clientset *kubernetes.Clientset, list runtime.Object) {
if !meta.IsListType(list) {
t.Errorf("Expected a list, but got %T", list)
}

var readyCount atomic.Int64
var totalItems int
var wg sync.WaitGroup
var objType string

err := meta.EachListItem(list, func(obj runtime.Object) error {
// Note: we cannout use item.GetObjectKind().GroupVersionKind().Kind to get the object type
// as List objects return an empty GVK. We set the objType in the switch statement below as
// a workaround instead of using runtime.Scheme.ObjectKinds() which requires registering
// a Kubernetes scheme.
// See: https://github.com/kubernetes/kubernetes/issues/3030
objName := obj.(metav1.Object).GetName()
if objName == "" {
return fmt.Errorf("object name is empty for object: %+v", obj)
}

// Validate that the readyCount is not 0, and matches the total Deployments
// returned.
require.NotEqual(t, 0, readyCount, "No Deployments are ready")
require.Equal(t, listLength, readyCount,
"%d out of %d Deployments are ready", readyCount, listLength)

PrintAndLog(fmt.Sprintf("%d out of %d Deployments are ready\n", readyCount, listLength), t)
case *appsv1.ReplicaSetList:
items := list.(*appsv1.ReplicaSetList).Items
listLength = len(items)
for _, item := range items {
ready := false
// Attempt to check if ready, and output the resulting status.
wg.Add(1)
totalItems++
go func() { // Fan-out goroutine to check each item in the list concurrently.
defer wg.Done()

var ready bool
for i := 0; i < MaxRetries; i++ {
if IsReplicaSetReady(t, clientset, &item) {
ready = true
readyCount++
break
} else {
waitFor(t, fmt.Sprintf("ReplicaSet %q", item.Name), "ready")
switch item := obj.(type) {
case *appsv1.Deployment:
ready = IsDeploymentReady(t, clientset, item)
objType = "Deployment"
case *appsv1.ReplicaSet:
ready = IsReplicaSetReady(t, clientset, item)
objType = "ReplicaSet"
case *corev1.Pod:
ready = IsPodReady(t, clientset, item)
objType = "Pod"
}
}
PrintAndLog(fmt.Sprintf("ReplicaSet: %s | Ready Status: %t\n", item.Name, ready), t)
}

// Validate that the readyCount is not 0, and matches the total ReplicaSets
// returned.
require.NotEqual(t, 0, readyCount, "No ReplicaSets are ready")
require.Equal(t, listLength, readyCount,
"%d out of %d ReplicaSets are ready", readyCount, listLength)

PrintAndLog(fmt.Sprintf("%d out of %d ReplicaSets are ready\n", readyCount, listLength), t)
case *corev1.PodList:
items := list.(*corev1.PodList).Items
listLength = len(items)
for _, item := range items {
ready := false
// Attempt to check if ready, and output the resulting status.
for i := 0; i < MaxRetries; i++ {
if IsPodReady(t, clientset, &item) {
ready = true
readyCount++
if ready {
readyCount.Add(1)
break
} else {
waitFor(t, fmt.Sprintf("Pod %q", item.Name), "ready")
}

waitFor(t, fmt.Sprintf("%s %q", objType, objName), "ready")
}
PrintAndLog(fmt.Sprintf("Pod: %s | Ready Status: %t\n", item.Name, ready), t)
}

// Validate that the readyCount is not 0, and matches the total Deployments
// returned.
require.NotEqual(t, 0, readyCount, "No Pods are ready")
require.Equal(t, listLength, readyCount,
"%d out of %d Pods are ready", readyCount, listLength)
PrintAndLog(fmt.Sprintf("%s: %s | Ready Status: %t\n", objType, objName, ready), t)
}()

PrintAndLog(fmt.Sprintf("%d out of %d Pods are ready\n", readyCount, listLength), t)
}
return nil
})

require.NoError(t, err)

wg.Wait() // Wait until all fan-out goroutine checks are done.

require.NotEqual(t, 0, totalItems, fmt.Sprintf("List of %s contains no items", objType))
require.NotEqual(t, 0, readyCount.Load(), fmt.Sprintf("No %ss are ready", objType))
require.Equal(t, totalItems, int(readyCount.Load()), fmt.Sprintf("%d out of %d %ss are ready", readyCount.Load(), totalItems, objType))
PrintAndLog(fmt.Sprintf("%d out of %d %ss are ready\n", readyCount.Load(), totalItems, objType), t)
}

// waitFor is a helper func that prints to stdout & sleeps to indicate a
Expand Down Expand Up @@ -321,9 +324,9 @@ func IsNodeReady(t *testing.T, clientset *kubernetes.Clientset, node *corev1.Nod
}

// IsPodReady attempts to check if the Pod's status & condition is ready.
func IsPodReady(t *testing.T, clientset *kubernetes.Clientset, pod *corev1.Pod) bool {
func IsPodReady(t *testing.T, clientset *kubernetes.Clientset, pod metav1.Object) bool {
// Attempt to retrieve Pod.
o, err := clientset.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
o, err := clientset.CoreV1().Pods(pod.GetNamespace()).Get(context.TODO(), pod.GetName(), metav1.GetOptions{})
if err != nil {
return false
}
Expand All @@ -332,7 +335,7 @@ func IsPodReady(t *testing.T, clientset *kubernetes.Clientset, pod *corev1.Pod)
if o.Status.Phase == corev1.PodRunning || o.Status.Phase == corev1.PodSucceeded {
for _, condition := range o.Status.Conditions {
if condition.Type == corev1.PodReady {
t.Logf("Checking if Pod %q is Ready | Condition.Status: %q | Condition.Reason: %q | Condition: %v\n", pod.Name, condition.Status, condition.Reason, condition)
t.Logf("Checking if Pod %q is Ready | Condition.Status: %q | Condition.Reason: %q | Condition: %v\n", pod.GetName(), condition.Status, condition.Reason, condition)
if o.Status.Phase == corev1.PodSucceeded {
return true
}
Expand Down Expand Up @@ -381,17 +384,17 @@ func IsDaemonSetReady(t *testing.T, clientset *kubernetes.Clientset, namespace,

// IsDeploymentReady attempts to check if the Deployments's status conditions
// are ready.
func IsDeploymentReady(t *testing.T, clientset *kubernetes.Clientset, deployment *appsv1.Deployment) bool {
func IsDeploymentReady(t *testing.T, clientset *kubernetes.Clientset, deployment metav1.Object) bool {
// Attempt to retrieve Deployment.
o, err := clientset.AppsV1().Deployments(deployment.Namespace).Get(context.TODO(), deployment.Name, metav1.GetOptions{})
o, err := clientset.AppsV1().Deployments(deployment.GetNamespace()).Get(context.TODO(), deployment.GetName(), metav1.GetOptions{})
if err != nil {
return false
}

// Check the returned Deployment's status & conditions for readiness.
for _, condition := range o.Status.Conditions {
if condition.Type == appsv1.DeploymentAvailable {
t.Logf("Checking if Deployment %q is Available | Condition.Status: %q | Condition: %v\n", deployment.Name, condition.Status, condition)
t.Logf("Checking if Deployment %q is Available | Condition.Status: %q | Condition: %v\n", deployment.GetName(), condition.Status, condition)
return condition.Status == corev1.ConditionTrue
}
}
Expand All @@ -401,9 +404,9 @@ func IsDeploymentReady(t *testing.T, clientset *kubernetes.Clientset, deployment

// IsReplicaSetReady attempts to check if the ReplicaSets's status conditions
// are ready.
func IsReplicaSetReady(t *testing.T, clientset *kubernetes.Clientset, replicaSet *appsv1.ReplicaSet) bool {
func IsReplicaSetReady(t *testing.T, clientset *kubernetes.Clientset, replicaSet metav1.Object) bool {
// Attempt to retrieve ReplicaSet.
o, err := clientset.AppsV1().ReplicaSets(replicaSet.Namespace).Get(context.TODO(), replicaSet.Name, metav1.GetOptions{})
o, err := clientset.AppsV1().ReplicaSets(replicaSet.GetNamespace()).Get(context.TODO(), replicaSet.GetName(), metav1.GetOptions{})
if err != nil {
return false
}
Expand Down

0 comments on commit a7375ee

Please sign in to comment.