Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(SliceGwReconciler): Add PodDisruptionBudget logic to SliceGwReconciler (#308) #334

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- create
- delete
- list
- apiGroups:
- rbac.authorization.k8s.io
resources:
Expand Down
61 changes: 61 additions & 0 deletions controllers/slicegateway/pod_disruption_budget.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package slicegateway

import (
"context"
"fmt"

"github.com/kubeslice/worker-operator/controllers"
webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// Default minAvailable value in PodDisruptionBudget
var DefaultMinAvailablePodsInPDB = intstr.FromInt(1)

// constructPodDisruptionBudget creates the PodDisruptionBudget's manifest with labels matching the slice gateway pods.
func constructPodDisruptionBudget(sliceName, sliceGwName string, minAvailable intstr.IntOrString) *policyv1.PodDisruptionBudget {
return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-pdb", sliceGwName),
Namespace: controllers.ControlPlaneNamespace,
Labels: map[string]string{
controllers.ApplicationNamespaceSelectorLabelKey: sliceName,
controllers.SliceGatewaySelectorLabelKey: sliceGwName,
},
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
controllers.ApplicationNamespaceSelectorLabelKey: sliceName,
webhook.PodInjectLabelKey: "slicegateway",
controllers.SliceGatewaySelectorLabelKey: sliceGwName,
},
},
},
}
}

// listPodDisruptionBudgetForSliceGateway lists the PodDisruptionBudget objects that match the slice gateway pods.
func listPodDisruptionBudgetForSliceGateway(ctx context.Context, kubeClient client.Client,
sliceName, sliceGwName string) ([]policyv1.PodDisruptionBudget, error) {
// Options for listing the PDBs that match the slice and slice gateway
listOpts := []client.ListOption{
client.MatchingLabels(map[string]string{
controllers.ApplicationNamespaceSelectorLabelKey: sliceName,
controllers.SliceGatewaySelectorLabelKey: sliceGwName,
}),
client.InNamespace(controllers.ControlPlaneNamespace),
}

// List PDBs from cluster that match the slice and slice gateway
pdbList := policyv1.PodDisruptionBudgetList{}
if err := kubeClient.List(ctx, &pdbList, listOpts...); err != nil {
Bhargav-InfraCloud marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

return pdbList.Items, nil
}
3 changes: 3 additions & 0 deletions controllers/slicegateway/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -78,6 +79,7 @@ type SliceGwReconciler struct {
//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
//+kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;
//+kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=list;create;delete

func (r *SliceGwReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var sliceGwNodePorts []int
Expand Down Expand Up @@ -490,6 +492,7 @@ func (r *SliceGwReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&kubeslicev1beta1.SliceGateway{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Owns(&policyv1.PodDisruptionBudget{}).
Watches(
&corev1.Pod{},
handler.EnqueueRequestsFromMapFunc(r.findSliceGwObjectsToReconcile),
Expand Down
65 changes: 65 additions & 0 deletions controllers/slicegateway/slicegateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,18 @@ func (r *SliceGwReconciler) ReconcileGatewayDeployments(ctx context.Context, sli
}
}

// Create PodDisruptionBudget for slice gateway's pod to at least have 1 instance of pods on each worker
// when disruption has occurred.
//
// Note: This should run an attempt to create PDB regardless of whether current reconciliation creating deployments
// as the request could've been requeued due to failure at the creation of PDB.
if err = r.createPodDisruptionBudgetForSliceGatewayPods(ctx, sliceName, sliceGw); err != nil {
log.Error(err, "Failed to create PodDisruptionBudget for SliceGW deployments",
"SliceName", sliceName, "SliceGwName", sliceGwName)

return ctrl.Result{}, err, true
}

// Reconcile deployment to node port mapping for gw client deployments
if isClient(sliceGw) {
for _, deployment := range deployments.Items {
Expand Down Expand Up @@ -1534,3 +1546,56 @@ func (r *SliceGwReconciler) ReconcileIntermediateGatewayDeployments(ctx context.

return ctrl.Result{}, nil, false
}

// createPodDisruptionBudgetForSliceGatewayPods checks for PodDisruptionBudget objects in the cluster that match the
// slice gateway pods, and if missing, it creates a PDB with minimum availability of 1 so at least one pod remains in
// case of a disruption.
func (r *SliceGwReconciler) createPodDisruptionBudgetForSliceGatewayPods(ctx context.Context,
sliceName string, sliceGateway *kubeslicev1beta1.SliceGateway) error {
log := r.Log.WithValues("sliceName", sliceName, "sliceGwName", sliceGateway.Name)

// List PDBs in cluster that match the slice gateway pods
pdbs, err := listPodDisruptionBudgetForSliceGateway(ctx, r.Client, sliceName, sliceGateway.Name)
if err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "failed to list PodDisruptionBudgets that match the slice gateway")

// When some unexpected error occurred, return the error for requeuing the request
return err
}

// Check if PDB already exists that matches the current slice gateway
if len(pdbs) > 0 {
// PodDisruptionBudget matching the slice gateway already exists. Skipping creation.
return nil
}

// Create PDB manifest with minimum availability of 1 pod
pdb := constructPodDisruptionBudget(sliceName, sliceGateway.Name, DefaultMinAvailablePodsInPDB)

// Set SliceGateway instance as the owner and controller for PDB
if err = ctrl.SetControllerReference(sliceGateway, pdb, r.Scheme); err != nil {
log.Error(err, "Failed to set slice gateway as owner to PodDisruptionBudget",
"pdb", pdb.Name)

return fmt.Errorf("failed to set slice gateway %q as owner to PodDisruptionBudget %q: %v",
sliceGateway.Name, pdb.Name, err)
}

// Create PDB for slice gateway's pod to have at least 1 pod on each worker when disruption occurs
if err = r.Create(ctx, pdb); err != nil {
if apierrors.IsAlreadyExists(err) {
// PDB is already exists. So, ignoring the current request.
return nil
}

log.Error(err, "PodDisruptionBudget creation failed", "pdb", pdb.Name)

// When any other unexpected error occurred when attempting to create PDB, fail the request
return fmt.Errorf("failed to create PodDisruptionBudget for SliceGW pods: %v", err)
}

// PDB created successfully
log.Info("PodDisruptionBudget for slice gateway pods created successfully")

return nil
}
Loading
Loading