diff --git a/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka.go b/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka.go deleted file mode 100644 index 2fa01646b..000000000 --- a/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka.go +++ /dev/null @@ -1,410 +0,0 @@ -package miqtools - -import ( - "context" - - miqv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1" - miqutilsv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/miqutils" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - resource "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - intstr "k8s.io/apimachinery/pkg/util/intstr" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" -) - -func ManageKafkaSecret(cr *miqv1alpha1.ManageIQ, client client.Client, scheme *runtime.Scheme) (*corev1.Secret, controllerutil.MutateFn) { - secretKey := types.NamespacedName{Namespace: cr.ObjectMeta.Namespace, Name: cr.Spec.KafkaSecret} - secret := &corev1.Secret{} - secretErr := client.Get(context.TODO(), secretKey, secret) - if secretErr != nil { - secret = defaultKafkaSecret(cr) - } - - f := func() error { - if err := controllerutil.SetControllerReference(cr, secret, scheme); err != nil { - return err - } - - addAppLabel(cr.Spec.AppName, &secret.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &secret.ObjectMeta) - - return nil - } - - return secret, f -} - -func defaultKafkaSecret(cr *miqv1alpha1.ManageIQ) *corev1.Secret { - secretData := map[string]string{ - "username": "root", - "password": generatePassword(), - "hostname": "kafka", - } - - secret := &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: kafkaSecretName(cr), - Namespace: cr.ObjectMeta.Namespace, - }, - StringData: secretData, - } - - addAppLabel(cr.Spec.AppName, &secret.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &secret.ObjectMeta) - - return secret -} - -func kafkaSecretName(cr *miqv1alpha1.ManageIQ) string { - secretName := "kafka-secrets" - if cr.Spec.KafkaSecret != "" { - secretName = cr.Spec.KafkaSecret - } - - return secretName -} - -func KafkaPVC(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*corev1.PersistentVolumeClaim, controllerutil.MutateFn) { - storageReq, _ := resource.ParseQuantity(cr.Spec.KafkaVolumeCapacity) - - resources := corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - "storage": storageReq, - }, - } - - accessModes := []corev1.PersistentVolumeAccessMode{ - "ReadWriteOnce", - } - - pvc := &corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "kafka-data", - Namespace: cr.ObjectMeta.Namespace, - }, - } - - f := func() error { - if err := controllerutil.SetControllerReference(cr, pvc, scheme); err != nil { - return err - } - - addAppLabel(cr.Spec.AppName, &pvc.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &pvc.ObjectMeta) - pvc.Spec.AccessModes = accessModes - pvc.Spec.Resources = resources - - if cr.Spec.StorageClassName != "" { - pvc.Spec.StorageClassName = &cr.Spec.StorageClassName - } - return nil - } - - return pvc, f -} - -func ZookeeperPVC(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*corev1.PersistentVolumeClaim, controllerutil.MutateFn) { - storageReq, _ := resource.ParseQuantity(cr.Spec.DatabaseVolumeCapacity) - - resources := corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - "storage": storageReq, - }, - } - - accessModes := []corev1.PersistentVolumeAccessMode{ - "ReadWriteOnce", - } - - pvc := &corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "zookeeper-data", - Namespace: cr.ObjectMeta.Namespace, - }, - } - - f := func() error { - if err := controllerutil.SetControllerReference(cr, pvc, scheme); err != nil { - return err - } - - addAppLabel(cr.Spec.AppName, &pvc.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &pvc.ObjectMeta) - pvc.Spec.AccessModes = accessModes - pvc.Spec.Resources = resources - - if cr.Spec.StorageClassName != "" { - pvc.Spec.StorageClassName = &cr.Spec.StorageClassName - } - return nil - } - - return pvc, f -} - -func KafkaService(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*corev1.Service, controllerutil.MutateFn) { - service := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "kafka", - Namespace: cr.ObjectMeta.Namespace, - }, - } - - f := func() error { - if err := controllerutil.SetControllerReference(cr, service, scheme); err != nil { - return err - } - - addAppLabel(cr.Spec.AppName, &service.ObjectMeta) - if len(service.Spec.Ports) == 0 { - service.Spec.Ports = append(service.Spec.Ports, corev1.ServicePort{}) - } - service.Spec.Ports[0].Name = "kafka" - service.Spec.Ports[0].Port = 9092 - service.Spec.Selector = map[string]string{"name": "kafka"} - return nil - } - - return service, f -} - -func ZookeeperService(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*corev1.Service, controllerutil.MutateFn) { - service := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "zookeeper", - Namespace: cr.ObjectMeta.Namespace, - }, - } - - f := func() error { - if err := controllerutil.SetControllerReference(cr, service, scheme); err != nil { - return err - } - - addAppLabel(cr.Spec.AppName, &service.ObjectMeta) - if len(service.Spec.Ports) == 0 { - service.Spec.Ports = append(service.Spec.Ports, corev1.ServicePort{}) - } - service.Spec.Ports[0].Name = "zookeeper" - service.Spec.Ports[0].Port = 2181 - service.Spec.Selector = map[string]string{"name": "zookeeper"} - return nil - } - - return service, f -} - -func KafkaDeployment(cr *miqv1alpha1.ManageIQ, client client.Client, scheme *runtime.Scheme) (*appsv1.Deployment, controllerutil.MutateFn, error) { - deploymentLabels := map[string]string{ - "name": "kafka", - "app": cr.Spec.AppName, - } - - container := corev1.Container{ - Name: "kafka", - Image: cr.Spec.KafkaImage, - ImagePullPolicy: corev1.PullIfNotPresent, - Ports: []corev1.ContainerPort{ - corev1.ContainerPort{ - ContainerPort: 9092, - }, - }, - LivenessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - TCPSocket: &corev1.TCPSocketAction{ - Port: intstr.FromInt(9092), - }, - }, - }, - ReadinessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - TCPSocket: &corev1.TCPSocketAction{ - Port: intstr.FromInt(9092), - }, - }, - }, - Env: []corev1.EnvVar{ - corev1.EnvVar{ - Name: "KAFKA_BROKER_USER", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, - Key: "username", - }, - }, - }, - corev1.EnvVar{ - Name: "KAFKA_BROKER_PASSWORD", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, - Key: "password", - }, - }, - }, - corev1.EnvVar{ - Name: "KAFKA_ZOOKEEPER_CONNECT", - Value: "zookeeper:2181", - }, - corev1.EnvVar{ - Name: "ALLOW_PLAINTEXT_LISTENER", - Value: "yes", - }, - corev1.EnvVar{ - Name: "KAFKA_CFG_ADVERTISED_LISTENERS", - Value: "PLAINTEXT://kafka:9092", - }, - }, - VolumeMounts: []corev1.VolumeMount{ - corev1.VolumeMount{Name: "kafka-data", MountPath: "/bitnami/kafka"}, - }, - } - - err := addResourceReqs(cr.Spec.KafkaMemoryLimit, cr.Spec.KafkaMemoryRequest, cr.Spec.KafkaCpuLimit, cr.Spec.KafkaCpuRequest, &container) - if err != nil { - return nil, nil, err - } - - deployment := &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "kafka", - Namespace: cr.ObjectMeta.Namespace, - }, - Spec: appsv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: deploymentLabels, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: deploymentLabels, - Name: "kafka", - }, - Spec: corev1.PodSpec{}, - }, - }, - } - - f := func() error { - if err := controllerutil.SetControllerReference(cr, deployment, scheme); err != nil { - return err - } - addAppLabel(cr.Spec.AppName, &deployment.ObjectMeta) - addBackupAnnotation("kafka-data", &deployment.Spec.Template.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &deployment.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &deployment.Spec.Template.ObjectMeta) - var repNum int32 = 1 - deployment.Spec.Replicas = &repNum - deployment.Spec.Strategy = appsv1.DeploymentStrategy{ - Type: "Recreate", - } - deployment.Spec.Template.Spec.Containers = []corev1.Container{container} - deployment.Spec.Template.Spec.Containers[0].SecurityContext = DefaultSecurityContext() - deployment.Spec.Template.Spec.ServiceAccountName = defaultServiceAccountName(cr.Spec.AppName) - var termSecs int64 = 10 - deployment.Spec.Template.Spec.TerminationGracePeriodSeconds = &termSecs - deployment.Spec.Template.Spec.Volumes = []corev1.Volume{ - corev1.Volume{ - Name: "kafka-data", - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "kafka-data", - }, - }, - }, - } - miqutilsv1alpha1.SetDeploymentNodeAffinity(deployment, client) - - return nil - } - - return deployment, f, nil -} - -func ZookeeperDeployment(cr *miqv1alpha1.ManageIQ, client client.Client, scheme *runtime.Scheme) (*appsv1.Deployment, controllerutil.MutateFn, error) { - deploymentLabels := map[string]string{ - "name": "zookeeper", - "app": cr.Spec.AppName, - } - - container := corev1.Container{ - Name: "zookeeper", - Image: cr.Spec.ZookeeperImage, - ImagePullPolicy: corev1.PullIfNotPresent, - Ports: []corev1.ContainerPort{ - corev1.ContainerPort{ - ContainerPort: 2181, - }, - }, - Env: []corev1.EnvVar{ - corev1.EnvVar{ - Name: "ALLOW_ANONYMOUS_LOGIN", - Value: "yes", - }, - }, - VolumeMounts: []corev1.VolumeMount{ - corev1.VolumeMount{Name: "zookeeper-data", MountPath: "/bitnami/zookeeper"}, - }, - } - - err := addResourceReqs(cr.Spec.ZookeeperMemoryLimit, cr.Spec.ZookeeperMemoryRequest, cr.Spec.ZookeeperCpuLimit, cr.Spec.ZookeeperCpuRequest, &container) - if err != nil { - return nil, nil, err - } - - deployment := &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "zookeeper", - Namespace: cr.ObjectMeta.Namespace, - }, - Spec: appsv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: deploymentLabels, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: deploymentLabels, - Name: "zookeeper", - }, - Spec: corev1.PodSpec{}, - }, - }, - } - - f := func() error { - if err := controllerutil.SetControllerReference(cr, deployment, scheme); err != nil { - return err - } - addAppLabel(cr.Spec.AppName, &deployment.ObjectMeta) - addBackupAnnotation("zookeeper-data", &deployment.Spec.Template.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &deployment.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &deployment.Spec.Template.ObjectMeta) - var repNum int32 = 1 - deployment.Spec.Replicas = &repNum - deployment.Spec.Strategy = appsv1.DeploymentStrategy{ - Type: "Recreate", - } - addAnnotations(cr.Spec.AppAnnotations, &deployment.Spec.Template.ObjectMeta) - deployment.Spec.Template.Spec.Containers = []corev1.Container{container} - deployment.Spec.Template.Spec.Containers[0].SecurityContext = DefaultSecurityContext() - deployment.Spec.Template.Spec.ServiceAccountName = defaultServiceAccountName(cr.Spec.AppName) - deployment.Spec.Template.Spec.Volumes = []corev1.Volume{ - corev1.Volume{ - Name: "zookeeper-data", - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "zookeeper-data", - }, - }, - }, - } - miqutilsv1alpha1.SetDeploymentNodeAffinity(deployment, client) - - return nil - } - - return deployment, f, nil -} diff --git a/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka/kafka.go b/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka/kafka.go new file mode 100644 index 000000000..0af0af8d5 --- /dev/null +++ b/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka/kafka.go @@ -0,0 +1,558 @@ +package miqkafka + +import ( + "bytes" + "context" + miqv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1" + miqtool "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/miq-components" + miqutilsv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/miqutils" + olmv1 "github.com/operator-framework/api/pkg/operators/v1" + olmv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "strconv" +) + +func KafkaCASecret(cr *miqv1alpha1.ManageIQ, client client.Client, scheme *runtime.Scheme, secretType string) (*corev1.Secret, controllerutil.MutateFn) { + caSecret := miqtool.InternalCertificatesSecret(cr, client) + secret := &corev1.Secret{} + + if secretType == "cert" { + secret.ObjectMeta = metav1.ObjectMeta{Name: cr.Spec.AppName + "-cluster-ca-cert", Namespace: cr.Namespace} + } else { + secret.ObjectMeta = metav1.ObjectMeta{Name: cr.Spec.AppName + "-cluster-ca", Namespace: cr.Namespace} + } + + if secret.ObjectMeta.Annotations == nil { + secret.ObjectMeta.Annotations = make(map[string]string) + } + + if secret.Data == nil { + secret.Data = make(map[string][]byte) + } + + secretLabels := map[string]string{ + "app.kubernetes.io/instance": "manageiq", + "app.kubernetes.io/managed-by": "strimzi-cluster-operator", + "strimzi.io/cluster": "manageiq", + "strimzi.io/component-type": "certificate-authority", + "strimzi.io/kind": "Kafka", + "strimzi.io/name": "strimzi", + } + + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, secret, scheme); err != nil { + return err + } + + miqtool.AddLabels(secretLabels, &secret.ObjectMeta) + + caGen := 0 + lastKnownRevisionStr := secret.ObjectMeta.Annotations["last-known-revision"] + if lastKnownRevisionStr == "null" || lastKnownRevisionStr == "" { + lastKnownRevisionStr = "0" + secret.ObjectMeta.Annotations["last-known-revision"] = lastKnownRevisionStr + } + caGen, err := strconv.Atoi(lastKnownRevisionStr) + if err != nil { + return err + } + + if secretType == "cert" { + secret.ObjectMeta.Annotations["strimzi.io/ca-cert-generation"] = strconv.Itoa(caGen) + secret.Data["ca.crt"] = caSecret.Data["root_crt"] + } else { + secret.ObjectMeta.Annotations["strimzi.io/ca-key-generation"] = strconv.Itoa(caGen) + secret.Data["ca.key"] = caSecret.Data["root_key"] + } + + return nil + } + + return secret, mutateFunc +} + +func renewKafkaCASecretCheck(cr *miqv1alpha1.ManageIQ, client client.Client) bool { + certSecret := miqtool.InternalCertificatesSecret(cr, client) + kafkaCASecret := miqutilsv1alpha1.FindSecretByName(client, cr.Namespace, cr.Spec.AppName+"-cluster-ca-cert") + kafkaCAKeySecret := miqutilsv1alpha1.FindSecretByName(client, cr.Namespace, cr.Spec.AppName+"-cluster-ca") + if certSecret.Data["root_crt"] == nil || certSecret.Data["root_key"] == nil || kafkaCASecret.Data["ca.crt"] == nil || kafkaCAKeySecret.Data["ca.key"] == nil { + return false + } + + if bytes.Equal(certSecret.Data["root_crt"], kafkaCASecret.Data["ca.crt"]) || bytes.Equal(certSecret.Data["root_key"], kafkaCAKeySecret.Data["ca.key"]) { + return false + } + + return true +} + +func updateKafka(cr *miqv1alpha1.ManageIQ, client client.Client, scheme *runtime.Scheme, update func(*unstructured.Unstructured) *unstructured.Unstructured) error { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + kafka := miqutilsv1alpha1.FindKafka(client, scheme, cr.Namespace, cr.Spec.AppName) + kafka = update(kafka) + + err := client.Update(context.TODO(), kafka) + return err + }) + if err != nil { + return err + } + + return nil +} + +// If manual certificates are introduced after Strimzi has generated its own certificates, +// then the certificates must be "renewed" to replace the old. This process is outlined here: +// https://strimzi.io/docs/operators/in-development/deploying#proc-replacing-your-own-private-keys-str +func renewKafkaCASecret(cr *miqv1alpha1.ManageIQ, client client.Client, scheme *runtime.Scheme) error { + if renewKafkaCASecretCheck(cr, client) { + kafkaCASecret := miqutilsv1alpha1.FindSecretByName(client, cr.Namespace, cr.Spec.AppName+"-cluster-ca-cert") + kafkaCAKeySecret := miqutilsv1alpha1.FindSecretByName(client, cr.Namespace, cr.Spec.AppName+"-cluster-ca") + certSecret := miqtool.InternalCertificatesSecret(cr, client) + + pauseReconcile := func(kafka *unstructured.Unstructured) *unstructured.Unstructured { + kafka.SetAnnotations(map[string]string{"strimzi.io/pause-reconciliation": "true"}) + return kafka + } + updateKafka(cr, client, scheme, pauseReconcile) + + useMiqCA := func(kafka *unstructured.Unstructured) *unstructured.Unstructured { + kafkaCR := kafka.UnstructuredContent()["spec"].(map[string]interface{}) + kafkaCR["clusterCa"] = map[string]interface{}{ + "generateCertificateAuthority": false, + } + + return kafka + } + updateKafka(cr, client, scheme, useMiqCA) + + lastKnownRevisionStr := kafkaCASecret.ObjectMeta.Annotations["last-known-revision"] + if lastKnownRevisionStr == "null" || lastKnownRevisionStr == "" { + lastKnownRevisionStr = "0" + kafkaCASecret.ObjectMeta.Annotations["last-known-revision"] = lastKnownRevisionStr + kafkaCAKeySecret.ObjectMeta.Annotations["last-known-revision"] = lastKnownRevisionStr + } + + caGen, err := strconv.Atoi(lastKnownRevisionStr) + if err != nil { + return err + } + caGen++ + + caGenStr := strconv.Itoa(caGen) + kafkaCASecret.ObjectMeta.Annotations["last-known-revision"] = caGenStr + kafkaCAKeySecret.ObjectMeta.Annotations["last-known-revision"] = caGenStr + + kafkaCASecret.ObjectMeta.Annotations["strimzi.io/ca-cert-generation"] = caGenStr + oldCA := kafkaCASecret.Data["ca.crt"] + kafkaCASecret.Data = map[string][]byte{"ca.crt": certSecret.Data["root_crt"], "ca-old.crt": oldCA} + if err := client.Update(context.TODO(), kafkaCASecret); err != nil { + return err + } + + kafkaCAKeySecret.ObjectMeta.Annotations["strimzi.io/ca-key-generation"] = caGenStr + kafkaCAKeySecret.Data = map[string][]byte{"ca.key": certSecret.Data["root_key"]} + if err := client.Update(context.TODO(), kafkaCAKeySecret); err != nil { + return err + } + + pauseReconcile = func(kafka *unstructured.Unstructured) *unstructured.Unstructured { + kafka.SetAnnotations(map[string]string{"strimzi.io/pause-reconciliation": "false"}) + return kafka + } + updateKafka(cr, client, scheme, pauseReconcile) + } + + return nil +} + +func KafkaClusterSpec() map[string]interface{} { + return map[string]interface{}{ + "kafka": map[string]interface{}{ + "replicas": 1, + "listeners": []map[string]interface{}{ + map[string]interface{}{ + "name": "kafka", + "port": 9093, + "type": "internal", + "tls": true, + "authentication": map[string]interface{}{ + "type": "scram-sha-512", + }, + }, + }, + "config": map[string]interface{}{ + "offsets.topic.replication.factor": 1, + "transaction.state.log.replication.factor": 1, + "transaction.state.log.min.isr": 1, + "default.replication.factor": 1, + "min.insync.replicas": 1, + }, + "template": map[string]interface{}{ + "pod": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "runAsNonRoot": true, + }, + }, + "kafkaContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + }, + "storage": map[string]interface{}{ + "type": "persistent-claim", + "deleteClaim": true, + }, + "authorization": map[string]interface{}{ + "type": "simple", + }, + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "cpu": "200m", + "memory": "1Gi", + }, + "limits": map[string]interface{}{ + "cpu": "400m", + "memory": "2Gi", + }, + }, + }, + "zookeeper": map[string]interface{}{ + "replicas": 1, + "template": map[string]interface{}{ + "pod": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "runAsNonRoot": true, + }, + }, + "zookeeperContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + }, + "storage": map[string]interface{}{ + "type": "persistent-claim", + "deleteClaim": true, + }, + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "cpu": "150m", + "memory": "256Mi", + }, + "limits": map[string]interface{}{ + "cpu": "250m", + "memory": "512Mi", + }, + }, + }, + "entityOperator": map[string]interface{}{ + "template": map[string]interface{}{ + "pod": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "runAsNonRoot": true, + }, + }, + "topicOperatorContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + "userOperatorContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + "tlsSidecarContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + }, + "tlsSidecar": map[string]interface{}{ + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "cpu": "500m", + "memory": "128Mi", + }, + "limits": map[string]interface{}{ + "cpu": "500m", + "memory": "128Mi", + }, + }, + }, + "userOperator": map[string]interface{}{ + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "cpu": 1, + "memory": "1Gi", + }, + "limits": map[string]interface{}{ + "cpu": 1, + "memory": "1Gi", + }, + }, + }, + "topicOperator": map[string]interface{}{ + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "cpu": 1, + "memory": "1Gi", + }, + "limits": map[string]interface{}{ + "cpu": 1, + "memory": "1Gi", + }, + }, + }, + }, + } +} + +func KafkaCluster(cr *miqv1alpha1.ManageIQ, client client.Client, scheme *runtime.Scheme) (*unstructured.Unstructured, controllerutil.MutateFn) { + kafkaClusterCR := &unstructured.Unstructured{} + kafkaClusterCR.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "kafka.strimzi.io", + Kind: "Kafka", + Version: "v1beta2", + }) + kafkaClusterCR.SetName(cr.Spec.AppName) + kafkaClusterCR.SetNamespace(cr.Namespace) + + kafkaCRSpec := KafkaClusterSpec() + + if cr.Spec.StorageClassName != "" { + kafkaStorage := kafkaCRSpec["kafka"].(map[string]interface{})["storage"].(map[string]interface{}) + kafkaStorage["class"] = cr.Spec.StorageClassName + zookeeperStorage := kafkaCRSpec["zookeeper"].(map[string]interface{})["storage"].(map[string]interface{}) + zookeeperStorage["class"] = cr.Spec.StorageClassName + } + + kafkaCRSpec = miqutilsv1alpha1.SetKafkaNodeAffinity(kafkaCRSpec, []string{"amd64"}) + + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaClusterCR, scheme); err != nil { + return err + } + + if *cr.Spec.EnforceWorkerResourceConstraints == true { + kafkaResourceRequests := kafkaCRSpec["kafka"].(map[string]interface{})["resources"].(map[string]interface{})["requests"].(map[string]interface{}) + kafkaResourceRequests["memory"] = cr.Spec.KafkaMemoryRequest + kafkaResourceRequests["cpu"] = cr.Spec.KafkaCpuRequest + kafkaResourceLimits := kafkaCRSpec["kafka"].(map[string]interface{})["resources"].(map[string]interface{})["limits"].(map[string]interface{}) + kafkaResourceLimits["memory"] = cr.Spec.KafkaMemoryLimit + kafkaResourceLimits["cpu"] = cr.Spec.KafkaCpuLimit + + zookeeperResourceRequests := kafkaCRSpec["zookeeper"].(map[string]interface{})["resources"].(map[string]interface{})["requests"].(map[string]interface{}) + zookeeperResourceRequests["memory"] = cr.Spec.ZookeeperMemoryRequest + zookeeperResourceRequests["cpu"] = cr.Spec.ZookeeperCpuRequest + zookeeperResourceLimits := kafkaCRSpec["zookeeper"].(map[string]interface{})["resources"].(map[string]interface{})["limits"].(map[string]interface{}) + zookeeperResourceLimits["memory"] = cr.Spec.ZookeeperMemoryLimit + zookeeperResourceLimits["cpu"] = cr.Spec.ZookeeperCpuLimit + } + + if certSecret := miqtool.InternalCertificatesSecret(cr, client); certSecret.Data["root_crt"] != nil && certSecret.Data["root_key"] != nil { + if err := renewKafkaCASecret(cr, client, scheme); err != nil { + return err + } else { + kafkaCRSpec["clusterCa"] = map[string]interface{}{ + "generateCertificateAuthority": false, + } + } + } + + kafkaStorage := kafkaCRSpec["kafka"].(map[string]interface{})["storage"].(map[string]interface{}) + kafkaStorage["size"] = cr.Spec.KafkaVolumeCapacity + + zookeeperStorage := kafkaCRSpec["zookeeper"].(map[string]interface{})["storage"].(map[string]interface{}) + zookeeperStorage["size"] = cr.Spec.ZookeeperVolumeCapacity + + kafkaClusterCR.UnstructuredContent()["spec"] = kafkaCRSpec + + return nil + } + + return kafkaClusterCR, mutateFunc +} + +func KafkaUserSpec() map[string]interface{} { + return map[string]interface{}{ + "authentication": map[string]interface{}{ + "type": "scram-sha-512", + }, + "authorization": map[string]interface{}{ + "type": "simple", + "acls": []map[string]interface{}{ + map[string]interface{}{ + "resource": map[string]interface{}{ + "type": "topic", + "name": "*", + "patternType": "literal", + }, + "operations": []string{"All"}, + "host": "*", + }, + map[string]interface{}{ + "resource": map[string]interface{}{ + "type": "group", + "name": "*", + "patternType": "literal", + }, + "operations": []string{"All"}, + "host": "*", + }, + }, + }, + } +} + +func KafkaUser(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*unstructured.Unstructured, controllerutil.MutateFn) { + kafkaUserCR := &unstructured.Unstructured{} + + kafkaUserCR.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "kafka.strimzi.io", + Kind: "KafkaUser", + Version: "v1beta2", + }) + kafkaUserCR.SetName(cr.Spec.AppName + "-user") + kafkaUserCR.SetNamespace(cr.Namespace) + kafkaUserCR.SetLabels(map[string]string{"strimzi.io/cluster": cr.Spec.AppName}) + + kafkaUserSpec := KafkaUserSpec() + + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaUserCR, scheme); err != nil { + return err + } + + kafkaUserCR.UnstructuredContent()["spec"] = kafkaUserSpec + + return nil + } + + return kafkaUserCR, mutateFunc +} + +func KafkaTopicSpec() map[string]interface{} { + return map[string]interface{}{ + "partitions": 1, + "config": map[string]interface{}{ + "retention.ms": 7200000, + "segment.bytes": 1073741824, + }, + } +} + +func KafkaTopic(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme, topicName string) (*unstructured.Unstructured, controllerutil.MutateFn) { + kafkaTopicCR := &unstructured.Unstructured{} + + kafkaTopicCR.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "kafka.strimzi.io", + Kind: "KafkaTopic", + Version: "v1beta2", + }) + kafkaTopicCR.SetName(topicName) + kafkaTopicCR.SetNamespace(cr.Namespace) + kafkaTopicCR.SetLabels(map[string]string{"strimzi.io/cluster": cr.Spec.AppName}) + + kafkaTopicSpec := KafkaTopicSpec() + + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaTopicCR, scheme); err != nil { + return err + } + + kafkaTopicCR.UnstructuredContent()["spec"] = kafkaTopicSpec + + return nil + } + + return kafkaTopicCR, mutateFunc +} + +func KafkaInstall(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*olmv1alpha1.Subscription, controllerutil.MutateFn) { + kafkaSubscription := &olmv1alpha1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: "strimzi-kafka-operator", + Namespace: cr.Namespace, + }, + } + + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaSubscription, scheme); err != nil { + return err + } + + kafkaSubscription.Spec = &olmv1alpha1.SubscriptionSpec{ + CatalogSource: "community-operators", + CatalogSourceNamespace: "openshift-marketplace", + Package: "strimzi-kafka-operator", + Channel: "strimzi-0.35.x", + } + + return nil + } + + return kafkaSubscription, mutateFunc +} + +func KafkaOperatorGroup(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*olmv1.OperatorGroup, controllerutil.MutateFn) { + kafkaOperatorGroup := &olmv1.OperatorGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: cr.Spec.AppName + "-group", + Namespace: cr.Namespace, + }, + } + + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaOperatorGroup, scheme); err != nil { + return err + } + + kafkaOperatorGroup.Spec = olmv1.OperatorGroupSpec{ + TargetNamespaces: []string{cr.Namespace}, + } + + return nil + } + + return kafkaOperatorGroup, mutateFunc +} diff --git a/manageiq-operator/api/v1alpha1/helpers/miq-components/network_policies.go b/manageiq-operator/api/v1alpha1/helpers/miq-components/network_policies.go index 44aba3488..48e555cd1 100644 --- a/manageiq-operator/api/v1alpha1/helpers/miq-components/network_policies.go +++ b/manageiq-operator/api/v1alpha1/helpers/miq-components/network_policies.go @@ -227,7 +227,7 @@ func NetworkPolicyAllowKafka(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme, c addAppLabel(cr.Spec.AppName, &networkPolicy.ObjectMeta) setIngressPolicyType(networkPolicy) - networkPolicy.Spec.PodSelector.MatchLabels = map[string]string{"name": "kafka"} + networkPolicy.Spec.PodSelector.MatchLabels = map[string]string{"strimzi.io/pod-name": "manageiq-kafka-0"} pod := orchestratorPod(*c) if pod == nil { diff --git a/manageiq-operator/api/v1alpha1/helpers/miq-components/orchestrator.go b/manageiq-operator/api/v1alpha1/helpers/miq-components/orchestrator.go index 7fc124acb..518c7111a 100644 --- a/manageiq-operator/api/v1alpha1/helpers/miq-components/orchestrator.go +++ b/manageiq-operator/api/v1alpha1/helpers/miq-components/orchestrator.go @@ -111,46 +111,40 @@ func orchestratorObjectName(cr *miqv1alpha1.ManageIQ) string { return cr.Spec.AppName + "-orchestrator" } -func addMessagingEnv(cr *miqv1alpha1.ManageIQ, c *corev1.Container) { +func addMessagingEnv(cr *miqv1alpha1.ManageIQ, c *corev1.Container, client client.Client) { if !*cr.Spec.DeployMessagingService { return } messagingEnv := []corev1.EnvVar{ corev1.EnvVar{ - Name: "MESSAGING_HOSTNAME", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, - Key: "hostname", - }, - }, + Name: "MESSAGING_HOSTNAME", + Value: cr.Spec.AppName + "-kafka-bootstrap", }, corev1.EnvVar{ Name: "MESSAGING_PASSWORD", ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, + LocalObjectReference: corev1.LocalObjectReference{Name: cr.Spec.AppName + "-user"}, Key: "password", }, }, }, corev1.EnvVar{ Name: "MESSAGING_PORT", - Value: "9092", + Value: "9093", }, corev1.EnvVar{ Name: "MESSAGING_TYPE", Value: "kafka", }, corev1.EnvVar{ - Name: "MESSAGING_USERNAME", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, - Key: "username", - }, - }, + Name: "MESSAGING_USERNAME", + Value: cr.Spec.AppName + "-user", + }, + corev1.EnvVar{ + Name: "MESSAGING_SASL_MECHANISM", + Value: "SCRAM-SHA-512", }, } @@ -245,7 +239,7 @@ func OrchestratorDeployment(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme, cl }, } - addMessagingEnv(cr, &container) + addMessagingEnv(cr, &container, client) err = addResourceReqs(cr.Spec.OrchestratorMemoryLimit, cr.Spec.OrchestratorMemoryRequest, cr.Spec.OrchestratorCpuLimit, cr.Spec.OrchestratorCpuRequest, &container) if err != nil { return nil, nil, err @@ -305,6 +299,15 @@ func OrchestratorDeployment(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme, cl deployment.Spec.Template.Spec.Containers[0].Env = addOrUpdateEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: "UI_SSL_SECRET_NAME", Value: cr.Spec.InternalCertificatesSecret}) } + messagingCAPath := "" + if certSecret := InternalCertificatesSecret(cr, client); certSecret.Data["root_crt"] != nil && certSecret.Data["root_key"] != nil { + messagingCAPath = "/etc/pki/ca-trust/source/anchors/root.crt" + } else { + messagingCAPath = "/etc/pki/ca-trust/source/anchors/ca.crt" + } + + deployment.Spec.Template.Spec.Containers[0].Env = addOrUpdateEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: "MESSAGING_SSL_CA", Value: messagingCAPath}) + volumeMount := corev1.VolumeMount{Name: "encryption-key", MountPath: "/run/secrets/manageiq/application", ReadOnly: true} deployment.Spec.Template.Spec.Containers[0].VolumeMounts = addOrUpdateVolumeMount(deployment.Spec.Template.Spec.Containers[0].VolumeMounts, volumeMount) @@ -359,5 +362,10 @@ func addInternalRootCertificate(cr *miqv1alpha1.ManageIQ, d *appsv1.Deployment, d.Spec.Template.Spec.Containers[0].Env = addOrUpdateEnvVar(d.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: "MEMCACHED_ENABLE_SSL", Value: "true"}) d.Spec.Template.Spec.Containers[0].Env = addOrUpdateEnvVar(d.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{Name: "MEMCACHED_SSL_CA", Value: "/etc/pki/ca-trust/source/anchors/root.crt"}) } + } else { + volumeMount := corev1.VolumeMount{Name: "messaging-certificate", MountPath: "/etc/pki/ca-trust/source/anchors", ReadOnly: true} + d.Spec.Template.Spec.Containers[0].VolumeMounts = addOrUpdateVolumeMount(d.Spec.Template.Spec.Containers[0].VolumeMounts, volumeMount) + secretVolumeSource := corev1.SecretVolumeSource{SecretName: "manageiq-cluster-ca-cert", Items: []corev1.KeyToPath{corev1.KeyToPath{Key: "ca.crt", Path: "ca.crt"}}} + d.Spec.Template.Spec.Volumes = addOrUpdateVolume(d.Spec.Template.Spec.Volumes, corev1.Volume{Name: "messaging-certificate", VolumeSource: corev1.VolumeSource{Secret: &secretVolumeSource}}) } } diff --git a/manageiq-operator/api/v1alpha1/miqutils/find.go b/manageiq-operator/api/v1alpha1/miqutils/find.go index e76a5926c..f420d9e6b 100644 --- a/manageiq-operator/api/v1alpha1/miqutils/find.go +++ b/manageiq-operator/api/v1alpha1/miqutils/find.go @@ -2,8 +2,12 @@ package miqutils import ( "context" + olmv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -31,3 +35,32 @@ func FindDeploymentByName(client client.Client, namespace string, name string) * return deployment } + +func FindSecretByName(client client.Client, namespace string, name string) *corev1.Secret { + secretKey := types.NamespacedName{Namespace: namespace, Name: name} + secret := &corev1.Secret{} + client.Get(context.TODO(), secretKey, secret) + + return secret +} + +func FindKafka(client client.Client, scheme *runtime.Scheme, namespace string, name string) *unstructured.Unstructured { + kafkaKey := types.NamespacedName{Namespace: namespace, Name: name} + kafka := &unstructured.Unstructured{} + kafka.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "kafka.strimzi.io", + Kind: "Kafka", + Version: "v1beta2", + }) + client.Get(context.TODO(), kafkaKey, kafka) + + return kafka +} + +func FindCatalogSourceByName(client client.Client, namespace string, name string) *olmv1alpha1.CatalogSource { + catalogSourceKey := types.NamespacedName{Namespace: namespace, Name: name} + catalogSource := &olmv1alpha1.CatalogSource{} + client.Get(context.TODO(), catalogSourceKey, catalogSource) + + return catalogSource +} diff --git a/manageiq-operator/cmd/main.go b/manageiq-operator/cmd/main.go index b8c84f732..a8ab72dfc 100644 --- a/manageiq-operator/cmd/main.go +++ b/manageiq-operator/cmd/main.go @@ -38,6 +38,8 @@ import ( manageiqv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1" "github.com/ManageIQ/manageiq-pods/manageiq-operator/internal/controller" + olmv1 "github.com/operator-framework/api/pkg/operators/v1" + olmv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" //+kubebuilder:scaffold:imports ) @@ -53,6 +55,9 @@ func init() { utilruntime.Must(manageiqv1alpha1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme + utilruntime.Must(olmv1alpha1.SchemeBuilder.AddToScheme(scheme)) + utilruntime.Must(olmv1.SchemeBuilder.AddToScheme(scheme)) + utilruntime.Must(routev1.AddToScheme(scheme)) } diff --git a/manageiq-operator/config/rbac/role.yaml b/manageiq-operator/config/rbac/role.yaml index 67c9af75d..0f5d2511d 100644 --- a/manageiq-operator/config/rbac/role.yaml +++ b/manageiq-operator/config/rbac/role.yaml @@ -2,7 +2,9 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: + creationTimestamp: null name: manageiq-operator + namespace: miq rules: - apiGroups: - "" @@ -76,6 +78,20 @@ rules: - patch - update - watch +- apiGroups: + - kafka.strimzi.io + resources: + - kafkas + - kafkatopics + - kafkausers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - manageiq.org resources: @@ -122,6 +138,19 @@ rules: - patch - update - watch +- apiGroups: + - operators.coreos.com + resources: + - operatorgroups + - subscriptions + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - rbac.authorization.k8s.io resources: diff --git a/manageiq-operator/go.mod b/manageiq-operator/go.mod index e0ee7d60c..4dcd072a0 100644 --- a/manageiq-operator/go.mod +++ b/manageiq-operator/go.mod @@ -4,8 +4,9 @@ go 1.21 require ( github.com/onsi/ginkgo v1.16.5 - github.com/onsi/gomega v1.30.0 - github.com/openshift/api v0.0.0-20231123212421-7955d3da79e8 + github.com/onsi/gomega v1.29.0 + github.com/openshift/api v0.0.0-20231025170628-b8a18fdc040d + github.com/operator-framework/api v0.20.0 k8s.io/api v0.28.4 k8s.io/apimachinery v0.28.4 k8s.io/client-go v0.28.4 @@ -14,6 +15,7 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect + github.com/blang/semver/v4 v4.0.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect @@ -45,6 +47,7 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/sirupsen/logrus v1.9.2 // indirect github.com/spf13/pflag v1.0.5 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect diff --git a/manageiq-operator/go.sum b/manageiq-operator/go.sum index f02c0e607..62af34b24 100644 --- a/manageiq-operator/go.sum +++ b/manageiq-operator/go.sum @@ -1,5 +1,7 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= +github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -102,10 +104,12 @@ github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4 github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= -github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= -github.com/openshift/api v0.0.0-20231123212421-7955d3da79e8 h1:JfXWa9HQc3GCMQeRxL3WLOW0eTYLoNtOFRajLjua/S0= -github.com/openshift/api v0.0.0-20231123212421-7955d3da79e8/go.mod h1:qNtV0315F+f8ld52TLtPvrfivZpdimOzTi3kn9IVbtU= +github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= +github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/openshift/api v0.0.0-20231025170628-b8a18fdc040d h1:076BQ9iaz/giM0wRT9grdbkYsdy6WHQ2vg/asQ3lv6c= +github.com/openshift/api v0.0.0-20231025170628-b8a18fdc040d/go.mod h1:qNtV0315F+f8ld52TLtPvrfivZpdimOzTi3kn9IVbtU= +github.com/operator-framework/api v0.20.0 h1:A2YCRhr+6s0k3pRJacnwjh1Ue8BqjIGuQ2jvPg9XCB4= +github.com/operator-framework/api v0.20.0/go.mod h1:rXPOhrQ6mMeXqCmpDgt1ALoar9ZlHL+Iy5qut9R99a4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -120,6 +124,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y= +github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -127,6 +133,7 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= @@ -179,6 +186,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= diff --git a/manageiq-operator/internal/controller/manageiq_controller.go b/manageiq-operator/internal/controller/manageiq_controller.go index 9bf358328..122912938 100644 --- a/manageiq-operator/internal/controller/manageiq_controller.go +++ b/manageiq-operator/internal/controller/manageiq_controller.go @@ -36,6 +36,8 @@ import ( miqv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1" cr_migration "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/cr_migration" miqtool "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/miq-components" + miqkafka "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka" + miqutilsv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/miqutils" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -52,11 +54,13 @@ type ManageIQReconciler struct { //+kubebuilder:rbac:namespace=changeme,groups=apps,resources=deployments/finalizers,resourceNames=manageiq-operator,verbs=update //+kubebuilder:rbac:namespace=changeme,groups=coordination.k8s.io,resources=leases,verbs=get;list;create;update;delete //+kubebuilder:rbac:namespace=changeme,groups=extensions,resources=deployments;deployments/scale;networkpolicies,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:namespace=changeme,groups=kafka.strimzi.io,resources=kafkas;kafkausers;kafkatopics,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:namespace=changeme,groups=manageiq.org,resources=manageiqs,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:namespace=changeme,groups=manageiq.org,resources=manageiqs/finalizers,verbs=update //+kubebuilder:rbac:namespace=changeme,groups=manageiq.org,resources=manageiqs/status,verbs=get;update;patch //+kubebuilder:rbac:namespace=changeme,groups=monitoring.coreos.com,resources=servicemonitors,verbs=get;create //+kubebuilder:rbac:namespace=changeme,groups=networking.k8s.io,resources=ingresses;networkpolicies,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:namespace=changeme,groups=operators.coreos.com,resources=operatorgroups;subscriptions,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:namespace=changeme,groups=rbac.authorization.k8s.io,resources=rolebindings;roles,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:namespace=changeme,groups=route.openshift.io,resources=routes;routes/custom-host,verbs=get;list;watch;create;update;patch;delete @@ -522,67 +526,60 @@ func (r *ManageIQReconciler) generatePostgresqlResources(cr *miqv1alpha1.ManageI } func (r *ManageIQReconciler) generateKafkaResources(cr *miqv1alpha1.ManageIQ) error { - secret, mutateFunc := miqtool.ManageKafkaSecret(cr, r.Client, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, secret, mutateFunc); err != nil { - return err - } else if result != controllerutil.OperationResultNone { - logger.Info("Secret has been reconciled", "component", "kafka", "result", result) - } - - hostName := getSecretKeyValue(r.Client, cr.Namespace, cr.Spec.KafkaSecret, "hostname") - if hostName != "" { - logger.Info("External Kafka Messaging Service selected, skipping kafka and zookeeper service reconciliation", "hostname", hostName) - return nil - } - - kafkaPVC, mutateFunc := miqtool.KafkaPVC(cr, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaPVC, mutateFunc); err != nil { - return err - } else if result != controllerutil.OperationResultNone { - logger.Info("PVC has been reconciled", "component", "kafka", "result", result) - } + if miqutilsv1alpha1.FindCatalogSourceByName(r.Client, "openshift-marketplace", "community-operators") != nil { + kafkaOperatorGroup, mutateFunc := miqkafka.KafkaOperatorGroup(cr, r.Scheme) + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaOperatorGroup, mutateFunc); err != nil { + return err + } else if result != controllerutil.OperationResultNone { + logger.Info("Kafka Operator group has been reconciled", "result", result) + } - zookeeperPVC, mutateFunc := miqtool.ZookeeperPVC(cr, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, zookeeperPVC, mutateFunc); err != nil { - return err - } else if result != controllerutil.OperationResultNone { - logger.Info("PVC has been reconciled", "component", "zookeeper", "result", result) + kafkaSubscription, mutateFunc := miqkafka.KafkaInstall(cr, r.Scheme) + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaSubscription, mutateFunc); err != nil { + return err + } else if result != controllerutil.OperationResultNone { + logger.Info("Kafka Subscription has been reconciled", "result", result) + } } - kafkaService, mutateFunc := miqtool.KafkaService(cr, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaService, mutateFunc); err != nil { + kafkaClusterCR, mutateFunc := miqkafka.KafkaCluster(cr, r.Client, r.Scheme) + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaClusterCR, mutateFunc); err != nil { return err } else if result != controllerutil.OperationResultNone { - logger.Info("Service has been reconciled", "component", "kafka", "result", result) + logger.Info("Kafka Cluster has been reconciled", "result", result) } - zookeeperService, mutateFunc := miqtool.ZookeeperService(cr, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, zookeeperService, mutateFunc); err != nil { - return err - } else if result != controllerutil.OperationResultNone { - logger.Info("Service has been reconciled", "component", "zookeeper", "result", result) - } + if certSecret := miqtool.InternalCertificatesSecret(cr, r.Client); certSecret.Data["root_crt"] != nil && certSecret.Data["root_key"] != nil { + kafkaCACert, mutateFunc := miqkafka.KafkaCASecret(cr, r.Client, r.Scheme, "cert") + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaCACert, mutateFunc); err != nil { + return err + } else if result != controllerutil.OperationResultNone { + logger.Info("Kafka CA Certificate has been reconciled", "result", result) + } - kafkaDeployment, mutateFunc, err := miqtool.KafkaDeployment(cr, r.Client, r.Scheme) - if err != nil { - return err + kafkaCAKey, mutateFunc := miqkafka.KafkaCASecret(cr, r.Client, r.Scheme, "key") + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaCAKey, mutateFunc); err != nil { + return err + } else if result != controllerutil.OperationResultNone { + logger.Info("Kafka CA Key has been reconciled", "result", result) + } } - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaDeployment, mutateFunc); err != nil { + kafkaUserCR, mutateFunc := miqkafka.KafkaUser(cr, r.Scheme) + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaUserCR, mutateFunc); err != nil { return err } else if result != controllerutil.OperationResultNone { - logger.Info("Deployment has been reconciled", "component", "kafka", "result", result) + logger.Info("Kafka User has been reconciled", "result", result) } - zookeeperDeployment, mutateFunc, err := miqtool.ZookeeperDeployment(cr, r.Client, r.Scheme) - if err != nil { - return err - } - - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, zookeeperDeployment, mutateFunc); err != nil { - return err - } else if result != controllerutil.OperationResultNone { - logger.Info("Deployment has been reconciled", "component", "zookeeper", "result", result) + topics := []string{"manageiq.liveness-check", "manageiq.ems", "manageiq.ems-events", "manageiq.ems-inventory", "manageiq.metrics"} + for i := 0; i < len(topics); i++ { + kafkaTopicCR, mutateFunc := miqkafka.KafkaTopic(cr, r.Scheme, topics[i]) + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaTopicCR, mutateFunc); err != nil { + return err + } else if result != controllerutil.OperationResultNone { + logger.Info(fmt.Sprintf("Kafka topic %s has been reconciled", topics[i])) + } } return nil