From 9adaac73275a4485b43178e94403838ef7c4ecee Mon Sep 17 00:00:00 2001 From: Anitha Natarajan Date: Wed, 18 Dec 2024 23:23:58 +0530 Subject: [PATCH] concurrent namespace reconciliation to ensure openshift rbac requisites --- pkg/reconciler/common/utils.go | 9 + pkg/reconciler/common/utils_test.go | 62 ++++ .../openshift/tektonconfig/extension.go | 28 +- pkg/reconciler/openshift/tektonconfig/rbac.go | 327 ++++++++++++------ 4 files changed, 327 insertions(+), 99 deletions(-) diff --git a/pkg/reconciler/common/utils.go b/pkg/reconciler/common/utils.go index 5092e45097..3b77907994 100644 --- a/pkg/reconciler/common/utils.go +++ b/pkg/reconciler/common/utils.go @@ -63,3 +63,12 @@ func StructToMap(in, out interface{}) error { } return json.Unmarshal(data, out) } + +// Helper function to serialize labels map to JSON string +func SerializeLabelsToJSON(labels map[string]string) (string, error) { + bytes, err := json.Marshal(labels) + if err != nil { + return "", fmt.Errorf("failed to serialize labels to JSON: %v", err) + } + return string(bytes), nil +} diff --git a/pkg/reconciler/common/utils_test.go b/pkg/reconciler/common/utils_test.go index c401d0944c..8aadd3694c 100644 --- a/pkg/reconciler/common/utils_test.go +++ b/pkg/reconciler/common/utils_test.go @@ -113,3 +113,65 @@ func TestStructMapError(t *testing.T) { err := StructToMap(&in, actualOut) assert.Error(t, err, "json: Unmarshal(non-pointer map[string]interface {})") } + +func TestSerializeLabelsToJSON(t *testing.T) { + // Test cases with different inputs + tests := []struct { + name string + labels map[string]string + expectedOutput string + expectError bool + }{ + { + name: "Valid input with multiple labels", + labels: map[string]string{ + "app": "my-app", + "env": "production", + "owner": "dev-team", + }, + expectedOutput: `{"app":"my-app","env":"production","owner":"dev-team"}`, + expectError: false, + }, + { + name: "Empty input", + labels: map[string]string{}, + expectedOutput: `{}`, + expectError: false, + }, + { + name: "Single label", + labels: map[string]string{ + "key": "value", + }, + expectedOutput: `{"key":"value"}`, + expectError: false, + }, + { + name: "Special characters in keys and values", + labels: map[string]string{ + "foo@bar": "bazqux", + "key#1": "value$%", + "space key": "with space", + }, + expectedOutput: `{"foo@bar":"bazqux","key#1":"value$%","space key":"with space"}`, + expectError: false, + }, + } + + // Loop over each test case + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Call the function with the test labels + actual, err := SerializeLabelsToJSON(tt.labels) + + // Check if we expect an error + if tt.expectError { + // Assert that an error was returned + assert.Error(t, err, "Expected error, but got none") + } else { + // Check if the output matches the expected result + assert.Equal(t, tt.expectedOutput, actual) + } + }) + } +} diff --git a/pkg/reconciler/openshift/tektonconfig/extension.go b/pkg/reconciler/openshift/tektonconfig/extension.go index 42cb8b0b18..e84eaae7ec 100644 --- a/pkg/reconciler/openshift/tektonconfig/extension.go +++ b/pkg/reconciler/openshift/tektonconfig/extension.go @@ -31,6 +31,7 @@ import ( "github.com/tektoncd/operator/pkg/reconciler/common" "github.com/tektoncd/operator/pkg/reconciler/openshift/tektonconfig/extension" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" nsV1 "k8s.io/client-go/informers/core/v1" rbacV1 "k8s.io/client-go/informers/rbac/v1" "k8s.io/client-go/kubernetes" @@ -109,15 +110,38 @@ func (oe openshiftExtension) PreReconcile(ctx context.Context, tc v1alpha1.Tekto // below code helps to retain state of pre-existing SA at the time of upgrade if existingSAWithOwnerRef(r.tektonConfig) { + logger := logging.FromContext(ctx) + logger.Infof("Found pre-existing ServiceAccount. Changing owner reference during upgrade.") + if err := changeOwnerRefOfPreExistingSA(ctx, r.kubeClientSet, *config); err != nil { + logger.Errorf("Failed to change owner reference for pre-existing SA: %v", err) return err } + + // Get current labels to retain any existing labels tcLabels := config.GetLabels() + if tcLabels == nil { + tcLabels = map[string]string{} + } + + // Add or update the serviceAccountCreationLabel without removing other labels tcLabels[serviceAccountCreationLabel] = "true" - config.SetLabels(tcLabels) - if _, err := oe.operatorClientSet.OperatorV1alpha1().TektonConfigs().Update(ctx, config, metav1.UpdateOptions{}); err != nil { + + // Prepare the patch to update only the labels, keeping the existing ones + jsonLabels, err := common.SerializeLabelsToJSON(tcLabels) + if err != nil { + logger.Error(err) return err } + patchData := []byte(fmt.Sprintf(`{"metadata":{"labels":%s}}`, jsonLabels)) + + // Apply the patch to the TektonConfig + if _, err := oe.operatorClientSet.OperatorV1alpha1().TektonConfigs().Patch(ctx, config.Name, types.MergePatchType, patchData, metav1.PatchOptions{}); err != nil { + logger.Errorf("Failed to patch TektonConfig with new label: %v", err) + return err + } + + logger.Infof("Successfully patched TektonConfig with serviceAccountCreationLabel set to true") } createRBACResource := true diff --git a/pkg/reconciler/openshift/tektonconfig/rbac.go b/pkg/reconciler/openshift/tektonconfig/rbac.go index a71da7adee..de29b19639 100644 --- a/pkg/reconciler/openshift/tektonconfig/rbac.go +++ b/pkg/reconciler/openshift/tektonconfig/rbac.go @@ -36,6 +36,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" nsV1 "k8s.io/client-go/informers/core/v1" rbacV1 "k8s.io/client-go/informers/rbac/v1" "k8s.io/client-go/kubernetes" @@ -90,6 +91,11 @@ type rbac struct { tektonConfig *v1alpha1.TektonConfig } +type NamespaceServiceAccount struct { + ServiceAccount *corev1.ServiceAccount + Namespace corev1.Namespace +} + func (r *rbac) cleanUp(ctx context.Context) error { // fetch the list of all namespaces which have label @@ -144,9 +150,9 @@ func (r *rbac) EnsureRBACInstallerSet(ctx context.Context) (*v1alpha1.TektonInst } func (r *rbac) setDefault() { - var ( - found = false - ) + + //set default value for rbac param + var found = false for i, v := range r.tektonConfig.Spec.Params { if v.Name == rbacParamName { @@ -372,14 +378,84 @@ func (r *rbac) handleSCCInNamespace(ctx context.Context, ns *corev1.Namespace) e return nil } +// this function encapsulates the logic for processing a single namespace. +func (r *rbac) processNamespace(ctx context.Context, sa *corev1.ServiceAccount, ns corev1.Namespace) error { + logger := logging.FromContext(ctx) + + // Inject CA bundle configmap + logger.Infow("Inject CA bundle configmap in ", "Namespace", ns.GetName()) + if err := r.ensureCABundles(ctx, &ns); err != nil { + logger.Errorf("failed to ensure ca bundles presence in namespace %s, %v", ns.Name, err) + return err + } + + // If "operator.tekton.dev/scc" exists in the namespace, bind that SCC to the SA + if err := r.handleSCCInNamespace(ctx, &ns); err != nil { + logger.Errorf("failed to bind scc to namespace %s, %v", ns.Name, err) + return err + } + + roleRef := r.getSCCRoleInNamespace(&ns) + if roleRef != nil { + if err := r.ensurePipelinesSCCRoleBinding(ctx, sa, roleRef); err != nil { + logger.Errorf("failed to create Pipeline Scc Role Binding in namespace %s, %v", ns.Name, err) + return err + } + } + + if err := r.ensureRoleBindings(ctx, sa); err != nil { + logger.Errorf("failed to create rolebinding in namespace %s, %v", ns.Name, err) + return err + } + + return nil +} + +// patch namespace with reconciled label +func (r *rbac) patchNamespaceLabel(ctx context.Context, ns corev1.Namespace) error { + logger := logging.FromContext(ctx) + + logger.Infof("add label namespace-reconcile-version to mark namespace '%s' as reconciled", ns.Name) + + // Get the current labels, or initialize if none exist + nsLabels := ns.GetLabels() + if nsLabels == nil { + nsLabels = map[string]string{} + } + + // Add `openshift-pipelines.tekton.dev/namespace-reconcile-version` label to namespace + nsLabels[namespaceVersionLabel] = r.version + ns.SetLabels(nsLabels) + + // Prepare the patch to update only the labels, keeping the existing ones + jsonLabels, err := reconcilerCommon.SerializeLabelsToJSON(nsLabels) + if err != nil { + logger.Error(err) + return err + } + patchPayload := []byte(fmt.Sprintf(`{"metadata":{"labels":%s}}`, jsonLabels)) + + // Use PATCH instead of UPDATE to only modify the labels + if _, err := r.kubeClientSet.CoreV1().Namespaces().Patch(ctx, ns.Name, types.StrategicMergePatchType, patchPayload, metav1.PatchOptions{}); err != nil { + logger.Errorf("failed to patch the namespace %s: %v", ns.Name, err) + return err + } + + logger.Infof("namespace '%s' successfully reconciled", ns.Name) + return nil +} + +// createresources function refactored for moduularity func (r *rbac) createResources(ctx context.Context) error { logger := logging.FromContext(ctx) + // Step 1: Ensure prerequisites if err := r.ensurePreRequisites(ctx); err != nil { - logger.Errorf("Error validating resources: %v", err) + logger.Errorf("error validating resources: %v", err) return err } + // Step 2: Get namespaces to be reconciled namespacesToBeReconciled, err := r.getNamespacesToBeReconciled(ctx) if err != nil { logger.Error(err) @@ -387,81 +463,55 @@ func (r *rbac) createResources(ctx context.Context) error { } logger.Debugf("RBAC: found %d namespaces to be reconciled", len(namespacesToBeReconciled)) - // remove and update namespaces from Cluster Interceptors + // Step 3: Remove and update namespaces from Cluster Interceptors if err := r.removeAndUpdateNSFromCI(ctx); err != nil { logger.Error(err) return err } - for _, ns := range namespacesToBeReconciled { - var withError bool + // Step 4: Ensure ServiceAccount across all namespaces + var namespacesToUpdate []NamespaceServiceAccount - logger.Infow("Inject CA bundle configmap in ", "Namespace", ns.GetName()) - if err := r.ensureCABundles(ctx, &ns); err != nil { - withError = true - logger.Errorf("failed to ensure ca bundles presence in namespace %s, %v", ns.Name, err) - } - - logger.Infow("Ensures Default SA in ", "Namespace", ns.GetName()) + // Ensure ServiceAccount for each namespace + for _, ns := range namespacesToBeReconciled { + // Ensure the ServiceAccount for the namespace sa, err := r.ensureSA(ctx, &ns) if err != nil { - withError = true - logger.Errorf("failed to ensure default SA in namespace %s, %v", ns.Name, err) - } - - // If "operator.tekton.dev/scc" exists in the namespace, then bind - // that SCC to the SA - err = r.handleSCCInNamespace(ctx, &ns) - if err != nil { - withError = true - logger.Errorf("failed to bind scc to namespace %s, %v", ns.Name, err) + logger.Errorf("Skipping namespace %s as it does not have a valid ServiceAccount: %v", ns.Name, err) + // Skip namespaces where SA is not ensured or invalid } - if sa != nil { - // We use a namespace scoped Role when SCC annotation is present, and - // a cluster scoped ClusterRole when the default SCC is used - roleRef := r.getSCCRoleInNamespace(&ns) - if roleRef != nil { - if err := r.ensurePipelinesSCCRoleBinding(ctx, sa, roleRef); err != nil { - withError = true - logger.Errorf("failed to create Pipeline Scc Role Binding in namespace %s, %v", ns.Name, err) - } - } - if err := r.ensureRoleBindings(ctx, sa); err != nil { - withError = true - logger.Errorf("failed to create rolebinding in namespace %s, %v", ns.Name, err) - } - - if err := r.ensureClusterRoleBindings(ctx, sa); err != nil { - withError = true - logger.Errorf("failed to create clusterrolebinding in namespace %s, %v", ns.Name, err) - } - } - if !withError { - logger.Infof("add label namespace-reconcile-version to mark namespace '%s' as reconciled", ns.Name) - - // Re-fetch the namespace to ensure we have the latest version - updatedNS, err := r.kubeClientSet.CoreV1().Namespaces().Get(ctx, ns.Name, metav1.GetOptions{}) + err := r.processNamespace(ctx, sa, ns) if err != nil { - return fmt.Errorf("failed to re-fetch namespace %s: %v", ns.Name, err) + logger.Errorf("failed processing namespace %s, %v", ns.Name, err) + } else { + namespacesToUpdate = append(namespacesToUpdate, NamespaceServiceAccount{ + ServiceAccount: sa, + Namespace: ns, + }) } + } + } - // Get the current labels, or initialize if none exist - nsLabels := updatedNS.GetLabels() - if nsLabels == nil { - nsLabels = map[string]string{} - } + // Step 6: Bulk update ClusterRoleBinding and update labels + // Step 6: Bulk update ClusterRoleBinding and update labels + if len(namespacesToUpdate) > 0 { + if err := r.handleClusterRoleBinding(ctx, namespacesToUpdate); err != nil { + logger.Errorf("failed to ensure clusterrolebinding update: %v", err) + return err + } else { + // No namespaces to update, log that no action was needed + logger.Infof("No namespaces were processed for ClusterRoleBinding update") + } + } - // Add `openshift-pipelines.tekton.dev/namespace-reconcile-version` label to namespace - // so that rbac won't loop on it again - nsLabels[namespaceVersionLabel] = r.version - updatedNS.SetLabels(nsLabels) + // Step 7: Reconcile namespaces with labels - // Update the namespace with set labels - if _, err = r.kubeClientSet.CoreV1().Namespaces().Update(ctx, updatedNS, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("failed to update namespace '%s' with label %s, %v", ns.Name, namespaceVersionLabel, err) - } - logger.Infof("namespace '%s' sucessfully reconciled", ns.Name) + for _, eachNS := range namespacesToUpdate { + logger.Infof("Reconciling namespace %s", eachNS.Namespace.Name) + err := r.patchNamespaceLabel(ctx, eachNS.Namespace) + if err != nil { + logger.Errorf("failed reconciling namespace %s, %v", eachNS.Namespace.Name, err) } } @@ -852,6 +902,68 @@ func hasSubject(subjects []rbacv1.Subject, x rbacv1.Subject) bool { return false } +// CompareLists compares two slices of rbacv1.Subject, ignoring order +func CompareSubjects(list1, list2 []rbacv1.Subject) bool { + // Check if lengths are different + if len(list1) != len(list2) { + return false + } + // Create sets (maps) for both lists + set1 := make(map[string]struct{}) + set2 := make(map[string]struct{}) + + // Populate set1 with subjects from list1 + for _, subject := range list1 { + key := fmt.Sprintf("%s/%s/%s", subject.Kind, subject.Name, subject.Namespace) + set1[key] = struct{}{} + } + // Populate set2 with subjects from list2 + for _, subject := range list2 { + key := fmt.Sprintf("%s/%s/%s", subject.Kind, subject.Name, subject.Namespace) + set2[key] = struct{}{} + } + + // Compare the sets + if len(set1) != len(set2) { + return false + } + + // Check if all elements in set1 are in set2 + for key := range set1 { + if _, exists := set2[key]; !exists { + return false + } + } + return true +} + +func mergeSubjects(subjects []rbacv1.Subject, x []rbacv1.Subject) []rbacv1.Subject { + // Map to track subjects in the existing list + existingSubjects := make(map[string]struct{}) + + // Iterate over `subjects` and track each unique combination of Kind, Name, and Namespace + for _, subject := range subjects { + key := fmt.Sprintf("%s/%s/%s", subject.Kind, subject.Name, subject.Namespace) + existingSubjects[key] = struct{}{} + } + + // Final list to store the merged subjects + var finalSubjects []rbacv1.Subject + + // Add all subjects from the original list (list1) + finalSubjects = append(finalSubjects, subjects...) + + // Append subjects from `x` (list2) that are not in `existingSubjects` + for _, subject := range x { + key := fmt.Sprintf("%s/%s/%s", subject.Kind, subject.Name, subject.Namespace) + if _, found := existingSubjects[key]; !found { + finalSubjects = append(finalSubjects, subject) + } + } + + return finalSubjects +} + func hasOwnerRefernce(old []metav1.OwnerReference, new metav1.OwnerReference) bool { for _, v := range old { if v.APIVersion == new.APIVersion && v.Kind == new.Kind && v.Name == new.Name { @@ -914,33 +1026,6 @@ func (r *rbac) createRoleBinding(ctx context.Context, sa *corev1.ServiceAccount) return nil } -func (r *rbac) ensureClusterRoleBindings(ctx context.Context, sa *corev1.ServiceAccount) error { - logger := logging.FromContext(ctx) - - rbacClient := r.kubeClientSet.RbacV1() - logger.Info("finding cluster-role ", clusterInterceptors) - if _, err := rbacClient.ClusterRoles().Get(ctx, clusterInterceptors, metav1.GetOptions{}); errors.IsNotFound(err) { - if e := r.createClusterRole(ctx); e != nil { - return e - } - } - - logger.Info("finding cluster-role-binding ", clusterInterceptors) - - viewCRB, err := rbacClient.ClusterRoleBindings().Get(ctx, clusterInterceptors, metav1.GetOptions{}) - - if err == nil { - logger.Infof("found clusterrolebinding %s", viewCRB.Name) - return r.updateClusterRoleBinding(ctx, viewCRB, sa) - } - - if errors.IsNotFound(err) { - return r.createClusterRoleBinding(ctx, sa) - } - - return err -} - func (r *rbac) removeAndUpdateNSFromCI(ctx context.Context) error { logger := logging.FromContext(ctx) @@ -994,14 +1079,61 @@ func removeIndex(s []rbacv1.Subject, index int) []rbacv1.Subject { return append(s[:index], s[index+1:]...) } -func (r *rbac) updateClusterRoleBinding(ctx context.Context, rb *rbacv1.ClusterRoleBinding, sa *corev1.ServiceAccount) error { +func (r *rbac) handleClusterRoleBinding(ctx context.Context, namespacesToUpdate []NamespaceServiceAccount) error { logger := logging.FromContext(ctx) - subject := rbacv1.Subject{Kind: rbacv1.ServiceAccountKind, Name: sa.Name, Namespace: sa.Namespace} + rbacClient := r.kubeClientSet.RbacV1() + logger.Info("finding cluster-role ", clusterInterceptors) + if _, err := rbacClient.ClusterRoles().Get(ctx, clusterInterceptors, metav1.GetOptions{}); errors.IsNotFound(err) { + if e := r.createClusterRole(ctx); e != nil { + return e + } + } - hasSubject := hasSubject(rb.Subjects, subject) + // Prepare a list of Subjects from the namespacesToUpdate + var subjects []rbacv1.Subject + + for _, nsSA := range namespacesToUpdate { + sa := nsSA.ServiceAccount + ns := nsSA.Namespace + + logger.Infof("Processing Subject for ServiceAccount %s in Namespace %s", sa.Name, ns.Name) + + // Create the Subject for the ClusterRoleBinding + subject := rbacv1.Subject{ + Kind: rbacv1.ServiceAccountKind, + Name: sa.Name, + Namespace: sa.Namespace, + } + + // Append the subject to the list + subjects = append(subjects, subject) + } + + logger.Info("finding cluster-role-binding ", clusterInterceptors) + + viewCRB, err := rbacClient.ClusterRoleBindings().Get(ctx, clusterInterceptors, metav1.GetOptions{}) + + if err == nil { + logger.Infof("found clusterrolebinding %s", viewCRB.Name) + return r.bulkUpdateClusterRoleBinding(ctx, viewCRB, subjects) + } + + if errors.IsNotFound(err) { + logger.Infof("could not find clusterrolebinding %s proceeding to create", viewCRB.Name) + return r.bulkCreateClusterRoleBinding(ctx, subjects) + } + + return err +} + +// bulk update Cluster rolebinding with all reconciled namespaces and service accounts +func (r *rbac) bulkUpdateClusterRoleBinding(ctx context.Context, rb *rbacv1.ClusterRoleBinding, subjectlist []rbacv1.Subject) error { + logger := logging.FromContext(ctx) + + hasSubject := CompareSubjects(rb.Subjects, subjectlist) if !hasSubject { - rb.Subjects = append(rb.Subjects, subject) + rb.Subjects = mergeSubjects(rb.Subjects, subjectlist) } rbacClient := r.kubeClientSet.RbacV1() @@ -1033,7 +1165,8 @@ func (r *rbac) updateClusterRoleBinding(ctx context.Context, rb *rbacv1.ClusterR return nil } -func (r *rbac) createClusterRoleBinding(ctx context.Context, sa *corev1.ServiceAccount) error { +// create Cluster rolebinding with all reconciled namespaces and service accounts +func (r *rbac) bulkCreateClusterRoleBinding(ctx context.Context, subjectlist []rbacv1.Subject) error { logger := logging.FromContext(ctx) logger.Info("create new clusterrolebinding ", clusterInterceptors) @@ -1051,7 +1184,7 @@ func (r *rbac) createClusterRoleBinding(ctx context.Context, sa *corev1.ServiceA OwnerReferences: []metav1.OwnerReference{r.ownerRef}, }, RoleRef: rbacv1.RoleRef{APIGroup: rbacv1.GroupName, Kind: "ClusterRole", Name: clusterInterceptors}, - Subjects: []rbacv1.Subject{{Kind: rbacv1.ServiceAccountKind, Name: sa.Name, Namespace: sa.Namespace}}, + Subjects: subjectlist, } if _, err := rbacClient.ClusterRoleBindings().Create(ctx, crb, metav1.CreateOptions{}); err != nil {