diff --git a/config/manager/operator-deployment.yaml b/config/manager/operator-deployment.yaml index ae47c6e297..f44e150d53 100644 --- a/config/manager/operator-deployment.yaml +++ b/config/manager/operator-deployment.yaml @@ -72,6 +72,9 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + # Change to true to be able to create synthetic Integrations + - name: CAMEL_K_SYNTHETIC_INTEGRATIONS + value: "false" livenessProbe: httpGet: path: /healthz diff --git a/config/rbac/namespaced/operator-role.yaml b/config/rbac/namespaced/operator-role.yaml index 4ddc2d4c17..0f364463e4 100644 --- a/config/rbac/namespaced/operator-role.yaml +++ b/config/rbac/namespaced/operator-role.yaml @@ -45,6 +45,7 @@ rules: - camel.apache.org resources: - builds + - integrations verbs: - delete - apiGroups: diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index 017b36dfc2..20fb04f30a 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -20,8 +20,9 @@ ** xref:running/dev-mode.adoc[Developer mode] ** xref:running/dry-run.adoc[Dry run] ** xref:running/runtime-version.adoc[Camel version] -** xref:running/camel-runtimes.adoc[Camel runtimes] ** xref:running/quarkus-native.adoc[Quarkus Native] +** xref:running/camel-runtimes.adoc[Camel runtimes] +** xref:running/import.adoc[Import existing Camel apps] ** xref:running/run-from-github.adoc[Run from GitHub] ** xref:running/promoting.adoc[Promote an Integration] ** xref:running/knative-sink.adoc[Knative Sinks] diff --git a/docs/modules/ROOT/pages/running/import.adoc b/docs/modules/ROOT/pages/running/import.adoc new file mode 100644 index 0000000000..9fa8099b8f --- /dev/null +++ b/docs/modules/ROOT/pages/running/import.adoc @@ -0,0 +1,44 @@ += Importing existing Camel applications + +You may have already a Camel application running on your cluster. You may have created it via a manual deployment, a CICD or any other deployment mechanism you have in place. Since the Camel K operator is meant to operate any Camel application out there, then, you will be able to import it and monitor in a similar fashion of any other Camel K **managed Integration**. + +This feature is disabled by default. In order to enable it, you need to run the operator deployment with an environment variable, `CAMEL_K_SYNTHETIC_INTEGRATIONS`, set to `true`. + +NOTE: you will be only able to monitor the synthetic Integrations. Camel K won't be able to alter the lifecycle of non managed Integrations (ie, rebuild the original application). + +It's important to notice that the operator won't be altering any field of the original application in order to avoid breaking any deployment procedure which is already in place. As it cannot make any assumption on the way the application is built and deployed, it will only be able to **watch** for any changes happening around it. + +[[deploy-and-monitor]] +== Deploy externally, monitor via Camel K Operator + +An imported Integration is known as **synthetic Integration**. You can import any Camel application deployed as a **Deployment**, **CronJob** or **Knative Service**. We control this behavior via a label (`camel.apache.org/integration`) that the user need to apply on the Camel application (either manually or introducing in the deployment process, ie, via CICD). + +NOTE: the example here will work in a similar way using CronJob and Knative Service. + +As an example, we show how to import a Camel application which was deployed with the Deployment kind. Let's assume it is called `my-deploy`. +``` +$ kubectl label deploy my-camel-sb-svc camel.apache.org/integration=my-it +``` +The operator immediately creates a synthetic Integration: +``` +$ kubectl get it +NAMESPACE NAME PHASE RUNTIME PROVIDER RUNTIME VERSION KIT REPLICAS +test-79c385c3-d58e-4c28-826d-b14b6245f908 my-it Running +``` +You can see it will be in `Running` status phase. However, checking the conditions you will be able to see that the Integration is not yet able to be fully monitored. This is expected because the way Camel K operator monitor Pods. It requires that the same label applied to the Deployment is inherited by the generated Pods. For this reason, beside labelling the Deployment, we need to add a label in the Deployment template. +``` +$ kubectl patch deployment my-camel-sb-svc --patch '{"spec": {"template": {"metadata": {"labels": {"camel.apache.org/integration": "my-it"}}}}}' +``` +Also this operation can be performed manually or automated in the deployment procedure. We can see now that the operator will be able to monitor accordingly the status of the Pods: +``` +$ kubectl get it +NAMESPACE NAME PHASE RUNTIME PROVIDER RUNTIME VERSION KIT REPLICAS +test-79c385c3-d58e-4c28-826d-b14b6245f908 my-it Running 1 +``` +From now on, you will be able to monitor the status of the synthetic Integration in a similar fashion of what you do with managed Integrations. If, for example, your Deployment will scale up or down, then, you will see this information reflecting accordingly: +``` +$ kubectl scale deployment my-camel-sb-svc --replicas 2 +$ kubectl get it +NAMESPACE NAME PHASE RUNTIME PROVIDER RUNTIME VERSION KIT REPLICAS +test-79c385c3-d58e-4c28-826d-b14b6245f908 my-it Running 2 +``` diff --git a/e2e/commonwithcustominstall/files/deploy.yaml b/e2e/commonwithcustominstall/files/deploy.yaml new file mode 100644 index 0000000000..89921a04ac --- /dev/null +++ b/e2e/commonwithcustominstall/files/deploy.yaml @@ -0,0 +1,50 @@ +--- +apiVersion: v1 +data: + my-file.txt: hello +kind: ConfigMap +metadata: + name: my-cm +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: my-camel-sb-svc + name: my-camel-sb-svc +spec: + progressDeadlineSeconds: 600 + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + app: my-camel-sb-svc + strategy: + rollingUpdate: + maxSurge: 25% + maxUnavailable: 25% + type: RollingUpdate + template: + metadata: + labels: + app: my-camel-sb-svc + spec: + containers: + - image: docker.io/squakez/my-camel-sb-svc:1.0.0 + imagePullPolicy: IfNotPresent + name: my-camel-sb-svc + resources: {} + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + volumeMounts: + - name: my-cm + mountPath: /tmp/app/data + volumes: + - name: my-cm + configMap: + name: my-cm + dnsPolicy: ClusterFirst + restartPolicy: Always + schedulerName: default-scheduler + securityContext: {} + terminationGracePeriodSeconds: 30 diff --git a/e2e/commonwithcustominstall/synthetic_test.go b/e2e/commonwithcustominstall/synthetic_test.go new file mode 100644 index 0000000000..2979d0b398 --- /dev/null +++ b/e2e/commonwithcustominstall/synthetic_test.go @@ -0,0 +1,119 @@ +//go:build integration +// +build integration + +// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration" + +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package commonwithcustominstall + +import ( + "testing" + + . "github.com/onsi/gomega" + + . "github.com/apache/camel-k/v2/e2e/support" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + . "github.com/onsi/gomega/gstruct" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" +) + +func TestSyntheticIntegrationOff(t *testing.T) { + RegisterTestingT(t) + WithNewTestNamespace(t, func(ns string) { + // Install Camel K without synthetic Integration feature variable (default) + operatorID := "camel-k-synthetic-env-off" + Expect(KamelInstallWithID(operatorID, ns).Execute()).To(Succeed()) + + // Run the external deployment + ExpectExecSucceed(t, Kubectl("apply", "-f", "files/deploy.yaml", "-n", ns)) + Eventually(DeploymentCondition(ns, "my-camel-sb-svc", appsv1.DeploymentProgressing), TestTimeoutShort). + Should(MatchFields(IgnoreExtras, Fields{ + "Status": Equal(corev1.ConditionTrue), + "Reason": Equal("NewReplicaSetAvailable"), + })) + + // Label the deployment --> Verify the Integration is not created + ExpectExecSucceed(t, Kubectl("label", "deploy", "my-camel-sb-svc", "camel.apache.org/integration=my-it", "-n", ns)) + Eventually(Integration(ns, "my-it"), TestTimeoutShort).Should(BeNil()) + }) +} +func TestSyntheticIntegrationFromDeployment(t *testing.T) { + RegisterTestingT(t) + WithNewTestNamespace(t, func(ns string) { + // Install Camel K with the synthetic Integration feature variable + operatorID := "camel-k-synthetic-env" + Expect(KamelInstallWithID(operatorID, ns, + "--operator-env-vars", "CAMEL_K_SYNTHETIC_INTEGRATIONS=true", + ).Execute()).To(Succeed()) + + // Run the external deployment + ExpectExecSucceed(t, Kubectl("apply", "-f", "files/deploy.yaml", "-n", ns)) + Eventually(DeploymentCondition(ns, "my-camel-sb-svc", appsv1.DeploymentProgressing), TestTimeoutShort). + Should(MatchFields(IgnoreExtras, Fields{ + "Status": Equal(corev1.ConditionTrue), + "Reason": Equal("NewReplicaSetAvailable"), + })) + + // Label the deployment --> Verify the Integration is created (cannot still monitor) + ExpectExecSucceed(t, Kubectl("label", "deploy", "my-camel-sb-svc", "camel.apache.org/integration=my-it", "-n", ns)) + Eventually(IntegrationPhase(ns, "my-it"), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseRunning)) + Eventually(IntegrationConditionStatus(ns, "my-it", v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionFalse)) + Eventually(IntegrationCondition(ns, "my-it", v1.IntegrationConditionReady), TestTimeoutShort).Should( + WithTransform(IntegrationConditionReason, Equal(v1.IntegrationConditionMonitoringPodsAvailableReason))) + + // Label the deployment template --> Verify the Integration is monitored + ExpectExecSucceed(t, Kubectl("patch", "deployment", "my-camel-sb-svc", "--patch", `{"spec": {"template": {"metadata": {"labels": {"camel.apache.org/integration": "my-it"}}}}}`, "-n", ns)) + Eventually(IntegrationPhase(ns, "my-it"), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseRunning)) + Eventually(IntegrationConditionStatus(ns, "my-it", v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) + one := int32(1) + Eventually(IntegrationStatusReplicas(ns, "my-it"), TestTimeoutShort).Should(Equal(&one)) + + // Delete the deployment --> Verify the Integration is eventually garbage collected + ExpectExecSucceed(t, Kubectl("delete", "deploy", "my-camel-sb-svc", "-n", ns)) + Eventually(Integration(ns, "my-it"), TestTimeoutShort).Should(BeNil()) + + // Recreate the deployment and label --> Verify the Integration is monitored + ExpectExecSucceed(t, Kubectl("apply", "-f", "files/deploy.yaml", "-n", ns)) + ExpectExecSucceed(t, Kubectl("label", "deploy", "my-camel-sb-svc", "camel.apache.org/integration=my-it", "-n", ns)) + ExpectExecSucceed(t, Kubectl("patch", "deployment", "my-camel-sb-svc", "--patch", `{"spec": {"template": {"metadata": {"labels": {"camel.apache.org/integration": "my-it"}}}}}`, "-n", ns)) + Eventually(IntegrationPhase(ns, "my-it"), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseRunning)) + Eventually(IntegrationConditionStatus(ns, "my-it", v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) + Eventually(IntegrationStatusReplicas(ns, "my-it"), TestTimeoutShort).Should(Equal(&one)) + + // Remove label from the deployment --> Verify the Integration is deleted + ExpectExecSucceed(t, Kubectl("label", "deploy", "my-camel-sb-svc", "camel.apache.org/integration-", "-n", ns)) + Eventually(Integration(ns, "my-it"), TestTimeoutShort).Should(BeNil()) + + // Add label back to the deployment --> Verify the Integration is created + ExpectExecSucceed(t, Kubectl("label", "deploy", "my-camel-sb-svc", "camel.apache.org/integration=my-it", "-n", ns)) + Eventually(IntegrationPhase(ns, "my-it"), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseRunning)) + Eventually(IntegrationConditionStatus(ns, "my-it", v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) + Eventually(IntegrationStatusReplicas(ns, "my-it"), TestTimeoutShort).Should(Equal(&one)) + // Scale the deployment --> verify replicas are correctly set + ExpectExecSucceed(t, Kubectl("scale", "deploy", "my-camel-sb-svc", "--replicas", "2", "-n", ns)) + two := int32(2) + Eventually(IntegrationStatusReplicas(ns, "my-it"), TestTimeoutShort).Should(Equal(&two)) + + // Delete Integration and deployments --> verify no Integration exists any longer + Expect(Kamel("delete", "--all", "-n", ns).Execute()).To(Succeed()) + ExpectExecSucceed(t, Kubectl("delete", "deploy", "my-camel-sb-svc", "-n", ns)) + Eventually(Integration(ns, "my-it"), TestTimeoutShort).Should(BeNil()) + }) +} diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go index 4b67c6dc65..2743b050cc 100644 --- a/e2e/support/test_support.go +++ b/e2e/support/test_support.go @@ -481,6 +481,14 @@ func MakeWithContext(ctx context.Context, rule string, args ...string) *exec.Cmd return exec.Command("make", args...) } +func Kubectl(args ...string) *exec.Cmd { + return KubectlWithContext(TestContext, args...) +} + +func KubectlWithContext(ctx context.Context, args ...string) *exec.Cmd { + return exec.Command("kubectl", args...) +} + // ============================================================================= // Curried utility functions for testing // ============================================================================= diff --git a/helm/camel-k/templates/operator-role.yaml b/helm/camel-k/templates/operator-role.yaml index b8e709b80d..40ef9742ac 100644 --- a/helm/camel-k/templates/operator-role.yaml +++ b/helm/camel-k/templates/operator-role.yaml @@ -54,6 +54,7 @@ rules: - camel.apache.org resources: - builds + - integrations verbs: - delete - apiGroups: diff --git a/pkg/apis/camel/v1/integration_types.go b/pkg/apis/camel/v1/integration_types.go index 78dd40a8cd..9f293f6a63 100644 --- a/pkg/apis/camel/v1/integration_types.go +++ b/pkg/apis/camel/v1/integration_types.go @@ -156,6 +156,8 @@ const ( // IntegrationPhaseError --. IntegrationPhaseError IntegrationPhase = "Error" + // IntegrationConditionReady --. + IntegrationConditionReady IntegrationConditionType = "Ready" // IntegrationConditionKitAvailable --. IntegrationConditionKitAvailable IntegrationConditionType = "IntegrationKitAvailable" // IntegrationConditionPlatformAvailable --. @@ -178,10 +180,9 @@ const ( IntegrationConditionJolokiaAvailable IntegrationConditionType = "JolokiaAvailable" // IntegrationConditionProbesAvailable --. IntegrationConditionProbesAvailable IntegrationConditionType = "ProbesAvailable" - // IntegrationConditionReady --. - IntegrationConditionReady IntegrationConditionType = "Ready" // IntegrationConditionTraitInfo --. IntegrationConditionTraitInfo IntegrationConditionType = "TraitInfo" + // IntegrationConditionKitAvailableReason --. IntegrationConditionKitAvailableReason string = "IntegrationKitAvailable" // IntegrationConditionPlatformAvailableReason --. @@ -220,7 +221,8 @@ const ( IntegrationConditionJolokiaAvailableReason string = "JolokiaAvailable" // IntegrationConditionProbesAvailableReason --. IntegrationConditionProbesAvailableReason string = "ProbesAvailable" - + // IntegrationConditionMonitoringPodsAvailableReason used to specify that the Pods generated are available for monitoring. + IntegrationConditionMonitoringPodsAvailableReason string = "MonitoringPodsAvailable" // IntegrationConditionKnativeServiceReadyReason --. IntegrationConditionKnativeServiceReadyReason string = "KnativeServiceReady" // IntegrationConditionDeploymentReadyReason --. @@ -239,18 +241,18 @@ const ( IntegrationConditionRuntimeNotReadyReason string = "RuntimeNotReady" // IntegrationConditionErrorReason --. IntegrationConditionErrorReason string = "Error" - // IntegrationConditionInitializationFailedReason --. IntegrationConditionInitializationFailedReason string = "InitializationFailed" // IntegrationConditionUnsupportedLanguageReason --. IntegrationConditionUnsupportedLanguageReason string = "UnsupportedLanguage" - // IntegrationConditionKameletsAvailable --. IntegrationConditionKameletsAvailable IntegrationConditionType = "KameletsAvailable" // IntegrationConditionKameletsAvailableReason --. IntegrationConditionKameletsAvailableReason string = "KameletsAvailable" // IntegrationConditionKameletsNotAvailableReason --. IntegrationConditionKameletsNotAvailableReason string = "KameletsNotAvailable" + // IntegrationConditionImportingKindAvailableReason used (as false) if we're trying to import an unsupported kind. + IntegrationConditionImportingKindAvailableReason string = "ImportingKindAvailable" ) // IntegrationCondition describes the state of a resource at a certain point. diff --git a/pkg/apis/camel/v1/integration_types_support.go b/pkg/apis/camel/v1/integration_types_support.go index ef24e207ba..3342be76a6 100644 --- a/pkg/apis/camel/v1/integration_types_support.go +++ b/pkg/apis/camel/v1/integration_types_support.go @@ -25,8 +25,18 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// IntegrationLabel is used to tag k8s object created by a given Integration. const IntegrationLabel = "camel.apache.org/integration" +// IntegrationSyntheticLabel is used to tag k8s synthetic Integrations. +const IntegrationSyntheticLabel = "camel.apache.org/is-synthetic" + +// IntegrationImportedKindLabel specifies from what kind of resource an Integration was imported. +const IntegrationImportedKindLabel = "camel.apache.org/imported-from-kind" + +// IntegrationImportedNameLabel specifies from what resource an Integration was imported. +const IntegrationImportedNameLabel = "camel.apache.org/imported-from-name" + func NewIntegration(namespace string, name string) Integration { return Integration{ TypeMeta: metav1.TypeMeta{ @@ -283,6 +293,11 @@ func (in *Integration) SetReadyConditionError(err string) { in.SetReadyCondition(corev1.ConditionFalse, IntegrationConditionErrorReason, err) } +// IsSynthetic returns true for synthetic Integrations (non managed, likely imported from external deployments). +func (in *Integration) IsSynthetic() bool { + return in.Annotations[IntegrationSyntheticLabel] == "true" +} + // GetCondition returns the condition with the provided type. func (in *IntegrationStatus) GetCondition(condType IntegrationConditionType) *IntegrationCondition { for i := range in.Conditions { diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index 04b5ea8b23..f4bf99b820 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -59,6 +59,7 @@ import ( v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/client" "github.com/apache/camel-k/v2/pkg/controller" + "github.com/apache/camel-k/v2/pkg/controller/synthetic" "github.com/apache/camel-k/v2/pkg/event" "github.com/apache/camel-k/v2/pkg/install" "github.com/apache/camel-k/v2/pkg/platform" @@ -231,6 +232,13 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID install.OperatorStartupOptionalTools(installCtx, bootstrapClient, watchNamespace, operatorNamespace, log) exitOnError(findOrCreateIntegrationPlatform(installCtx, bootstrapClient, operatorNamespace), "failed to create integration platform") + synthEnvVal, synth := os.LookupEnv("CAMEL_K_SYNTHETIC_INTEGRATIONS") + if synth && synthEnvVal == "true" { + log.Info("Starting the synthetic Integration manager") + exitOnError(synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache()), "synthetic Integration manager error") + } else { + log.Info("Synthetic Integration manager not configured, skipping") + } log.Info("Starting the manager") exitOnError(mgr.Start(ctx), "manager exited non-zero") } diff --git a/pkg/controller/integration/initialize.go b/pkg/controller/integration/initialize.go index a08dd28c66..ad8891647f 100644 --- a/pkg/controller/integration/initialize.go +++ b/pkg/controller/integration/initialize.go @@ -19,6 +19,7 @@ package integration import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -53,6 +54,10 @@ func (action *initializeAction) CanHandle(integration *v1.Integration) bool { func (action *initializeAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) { action.L.Info("Initializing Integration") + if integration.Annotations[v1.IntegrationImportedNameLabel] != "" { + return action.importFromExternalApp(integration) + } + if _, err := trait.Apply(ctx, action.client, integration, nil); err != nil { integration.Status.Phase = v1.IntegrationPhaseError integration.SetReadyCondition(corev1.ConditionFalse, @@ -91,3 +96,85 @@ func (action *initializeAction) Handle(ctx context.Context, integration *v1.Inte return integration, nil } + +func (action *initializeAction) importFromExternalApp(integration *v1.Integration) (*v1.Integration, error) { + readyMessage := fmt.Sprintf( + "imported from %s %s", + integration.Annotations[v1.IntegrationImportedNameLabel], + integration.Annotations[v1.IntegrationImportedKindLabel], + ) + // We need to set the condition for which this Integration is imported (required later by monitoring) + integration.Status.SetConditions( + getCamelAppImportingCondition( + integration.Annotations[v1.IntegrationImportedKindLabel], + readyMessage, + )..., + ) + // If it's ready, then we can safely assume the integration is running + if integration.IsConditionTrue(v1.IntegrationConditionReady) { + integration.Status.Phase = v1.IntegrationPhaseRunning + } else { + integration.Status.Phase = v1.IntegrationPhaseError + } + + return integration, nil +} + +func getCamelAppImportingCondition(kind, message string) []v1.IntegrationCondition { + switch kind { + case "Deployment": + return []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionDeploymentAvailable, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionDeploymentAvailableReason, + Message: message, + }, + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionDeploymentReadyReason, + Message: message, + }, + } + case "CronJob": + return []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionCronJobAvailable, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionCronJobCreatedReason, + Message: message, + }, + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionDeploymentReadyReason, + Message: message, + }, + } + case "KnativeService": + return []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionKnativeServiceAvailable, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionKnativeServiceAvailableReason, + Message: message, + }, + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionTrue, + Reason: v1.IntegrationConditionKnativeServiceReadyReason, + Message: message, + }, + } + default: + return []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionFalse, + Reason: v1.IntegrationConditionImportingKindAvailableReason, + Message: fmt.Sprintf("Unsupported %s import kind", kind), + }, + } + } +} diff --git a/pkg/controller/integration/initialize_test.go b/pkg/controller/integration/initialize_test.go new file mode 100644 index 0000000000..2beaaa798b --- /dev/null +++ b/pkg/controller/integration/initialize_test.go @@ -0,0 +1,189 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + + "github.com/apache/camel-k/v2/pkg/util/log" + "github.com/apache/camel-k/v2/pkg/util/test" + + "github.com/stretchr/testify/assert" +) + +func TestCamelImportDeployment(t *testing.T) { + importedIt := &v1.Integration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.IntegrationKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-imported-it", + Annotations: map[string]string{ + v1.IntegrationImportedNameLabel: "my-deploy", + v1.IntegrationSyntheticLabel: "true", + v1.IntegrationImportedKindLabel: "Deployment", + }, + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseInitialization, + }, + } + c, err := test.NewFakeClient(importedIt) + assert.Nil(t, err) + + a := initializeAction{} + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "initialize", a.Name()) + assert.True(t, a.CanHandle(importedIt)) + handledIt, err := a.Handle(context.TODO(), importedIt) + assert.Nil(t, err) + assert.Equal(t, v1.IntegrationPhaseRunning, handledIt.Status.Phase) + // Ready condition + assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status) + assert.Equal(t, v1.IntegrationConditionDeploymentReadyReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason) + assert.Equal(t, "imported from my-deploy Deployment", handledIt.Status.GetCondition(v1.IntegrationConditionReady).Message) + // Deployment condition + assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionDeploymentAvailable).Status) + assert.Equal(t, v1.IntegrationConditionDeploymentAvailableReason, handledIt.Status.GetCondition(v1.IntegrationConditionDeploymentAvailable).Reason) + assert.Equal(t, "imported from my-deploy Deployment", handledIt.Status.GetCondition(v1.IntegrationConditionDeploymentAvailable).Message) +} + +func TestCamelImportCronJob(t *testing.T) { + importedIt := &v1.Integration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.IntegrationKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-imported-it", + Annotations: map[string]string{ + v1.IntegrationImportedNameLabel: "my-cron", + v1.IntegrationSyntheticLabel: "true", + v1.IntegrationImportedKindLabel: "CronJob", + }, + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseInitialization, + }, + } + c, err := test.NewFakeClient(importedIt) + assert.Nil(t, err) + + a := initializeAction{} + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "initialize", a.Name()) + assert.True(t, a.CanHandle(importedIt)) + handledIt, err := a.Handle(context.TODO(), importedIt) + assert.Nil(t, err) + assert.Equal(t, v1.IntegrationPhaseRunning, handledIt.Status.Phase) + // Ready condition + assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status) + assert.Equal(t, v1.IntegrationConditionDeploymentReadyReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason) + assert.Equal(t, "imported from my-cron CronJob", handledIt.Status.GetCondition(v1.IntegrationConditionReady).Message) + // CronJob condition + assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionCronJobAvailable).Status) + assert.Equal(t, v1.IntegrationConditionCronJobCreatedReason, handledIt.Status.GetCondition(v1.IntegrationConditionCronJobAvailable).Reason) + assert.Equal(t, "imported from my-cron CronJob", handledIt.Status.GetCondition(v1.IntegrationConditionCronJobAvailable).Message) +} + +func TestCamelImportKnativeService(t *testing.T) { + importedIt := &v1.Integration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.IntegrationKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-imported-it", + Annotations: map[string]string{ + v1.IntegrationImportedNameLabel: "my-ksvc", + v1.IntegrationSyntheticLabel: "true", + v1.IntegrationImportedKindLabel: "KnativeService", + }, + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseInitialization, + }, + } + c, err := test.NewFakeClient(importedIt) + assert.Nil(t, err) + + a := initializeAction{} + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "initialize", a.Name()) + assert.True(t, a.CanHandle(importedIt)) + handledIt, err := a.Handle(context.TODO(), importedIt) + assert.Nil(t, err) + assert.Equal(t, v1.IntegrationPhaseRunning, handledIt.Status.Phase) + // Ready condition + assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status) + assert.Equal(t, v1.IntegrationConditionKnativeServiceReadyReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason) + assert.Equal(t, "imported from my-ksvc KnativeService", handledIt.Status.GetCondition(v1.IntegrationConditionReady).Message) + // Knative Service condition + assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionKnativeServiceAvailable).Status) + assert.Equal(t, v1.IntegrationConditionKnativeServiceAvailableReason, handledIt.Status.GetCondition(v1.IntegrationConditionKnativeServiceAvailable).Reason) + assert.Equal(t, "imported from my-ksvc KnativeService", handledIt.Status.GetCondition(v1.IntegrationConditionKnativeServiceAvailable).Message) +} + +func TestCamelImportUnsupportedKind(t *testing.T) { + importedIt := &v1.Integration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.IntegrationKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-imported-it", + Annotations: map[string]string{ + v1.IntegrationImportedNameLabel: "my-kind", + v1.IntegrationSyntheticLabel: "true", + v1.IntegrationImportedKindLabel: "SomeKind", + }, + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseInitialization, + }, + } + c, err := test.NewFakeClient(importedIt) + assert.Nil(t, err) + + a := initializeAction{} + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "initialize", a.Name()) + assert.True(t, a.CanHandle(importedIt)) + handledIt, err := a.Handle(context.TODO(), importedIt) + assert.Nil(t, err) + assert.Equal(t, v1.IntegrationPhaseError, handledIt.Status.Phase) + // Ready condition + assert.Equal(t, corev1.ConditionFalse, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status) + assert.Equal(t, v1.IntegrationConditionImportingKindAvailableReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason) + assert.Equal(t, "Unsupported SomeKind import kind", handledIt.Status.GetCondition(v1.IntegrationConditionReady).Message) +} diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index a70a8713b4..5129f6ade5 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -32,7 +32,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/utils/pointer" - "sigs.k8s.io/controller-runtime/pkg/builder" ctrl "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -324,30 +323,47 @@ func add(ctx context.Context, mgr manager.Manager, c client.Client, r reconcile. // Evaluates to false if the object has been confirmed deleted return !e.DeleteStateUnknown }, - })). - // Watch for IntegrationKit phase transitioning to ready or error, and - // enqueue requests for any integration that matches the kit, in building - // or running phase. - Watches(&v1.IntegrationKit{}, - handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { - kit, ok := a.(*v1.IntegrationKit) - if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve integration list") - return []reconcile.Request{} - } + })) + // Watch for all the resources + watchIntegrationResources(c, b) + // Watch for the CronJob conditionally + if ok, err := kubernetes.IsAPIResourceInstalled(c, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil { + watchCronJobResources(b) + } + // Watch for the Knative Services conditionally + if ok, err := kubernetes.IsAPIResourceInstalled(c, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); err != nil { + return err + } else if ok { + if err = watchKnativeResources(ctx, c, b); err != nil { + return err + } + } - return integrationKitEnqueueRequestsFromMapFunc(ctx, c, kit) - })). + return b.Complete(r) +} + +func watchIntegrationResources(c client.Client, b *builder.Builder) { + // Watch for IntegrationKit phase transitioning to ready or error, and + // enqueue requests for any integration that matches the kit, in building + // or running phase. + b.Watches(&v1.IntegrationKit{}, + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { + kit, ok := a.(*v1.IntegrationKit) + if !ok { + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve IntegrationKit") + return []reconcile.Request{} + } + return integrationKitEnqueueRequestsFromMapFunc(ctx, c, kit) + })). // Watch for IntegrationPlatform phase transitioning to ready and enqueue // requests for any integrations that are in phase waiting for platform Watches(&v1.IntegrationPlatform{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { p, ok := a.(*v1.IntegrationPlatform) if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to list integrations") + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve IntegrationPlatform") return []reconcile.Request{} } - return integrationPlatformEnqueueRequestsFromMapFunc(ctx, c, p) })). // Watch for Configmaps or Secret used in the Integrations for updates @@ -355,30 +371,29 @@ func add(ctx context.Context, mgr manager.Manager, c client.Client, r reconcile. handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { cm, ok := a.(*corev1.ConfigMap) if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve integration list") + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Configmap") return []reconcile.Request{} } - return configmapEnqueueRequestsFromMapFunc(ctx, c, cm) })). Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { secret, ok := a.(*corev1.Secret) if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve integration list") + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Secret") return []reconcile.Request{} } - return secretEnqueueRequestsFromMapFunc(ctx, c, secret) })). - // Watch for the owned Deployments - Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{})). - // Watch for the Integration Pods + // Watch for the Integration Pods belonging to managed Integrations Watches(&corev1.Pod{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { pod, ok := a.(*corev1.Pod) if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to list integration pods") + log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Pod") + return []reconcile.Request{} + } + if pod.Labels[v1.IntegrationLabel] == "" { return []reconcile.Request{} } return []reconcile.Request{ @@ -389,13 +404,17 @@ func add(ctx context.Context, mgr manager.Manager, c client.Client, r reconcile. }, }, } - })) + })). + // Watch for the owned Deployments + Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{})) +} - if ok, err := kubernetes.IsAPIResourceInstalled(c, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil { - // Watch for the owned CronJobs - b.Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{})) - } +func watchCronJobResources(b *builder.Builder) { + // Watch for the owned CronJobs + b.Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{})) +} +func watchKnativeResources(ctx context.Context, c client.Client, b *builder.Builder) error { // Watch for the owned Knative Services conditionally if ok, err := kubernetes.IsAPIResourceInstalled(c, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); err != nil { return err @@ -410,15 +429,15 @@ func add(ctx context.Context, mgr manager.Manager, c client.Client, r reconcile. b.Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{})) } else { log.Info(` KnativeService resources installed in the cluster. However Camel K operator has not the required RBAC privileges. You can't use Knative features. - Make sure to apply the required RBAC privileges and restart the Camel K Operator Pod to be able to watch for Camel K managed Knative Services.`) + Make sure to apply the required RBAC privileges and restart the Camel K Operator Pod to be able to watch for Camel K managed Knative Services.`) } } else { log.Info(`KnativeService resources are not installed in the cluster. You can't use Knative features. If you install Knative Serving resources after the - Camel K operator, make sure to apply the required RBAC privileges and restart the Camel K Operator Pod to be able to watch for - Camel K managed Knative Services.`) + Camel K operator, make sure to apply the required RBAC privileges and restart the Camel K Operator Pod to be able to watch for + Camel K managed Knative Services.`) } - return b.Complete(r) + return nil } var _ reconcile.Reconciler = &reconcileIntegration{} @@ -476,7 +495,12 @@ func (r *reconcileIntegration) Reconcile(ctx context.Context, request reconcile. NewPlatformSetupAction(), NewInitializeAction(), newBuildKitAction(), - NewMonitorAction(), + } + + if instance.IsSynthetic() { + actions = append(actions, NewMonitorSyntheticAction()) + } else { + actions = append(actions, NewMonitorAction()) } for _, a := range actions { diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index 9a6208fcb9..048136d911 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -43,6 +43,7 @@ import ( utilResource "github.com/apache/camel-k/v2/pkg/util/resource" ) +// NewMonitorAction is an action used to monitor manager Integrations. func NewMonitorAction() Action { return &monitorAction{} } @@ -124,6 +125,37 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra return nil, err } + return action.monitorPods(ctx, environment, integration) +} + +func (action *monitorAction) monitorPods(ctx context.Context, environment *trait.Environment, integration *v1.Integration) (*v1.Integration, error) { + controller, err := action.newController(environment, integration) + if err != nil { + return nil, err + } + + // In order to simplify the monitoring and have a minor resource requirement, we will watch only those Pods + // which are labeled with `camel.apache.org/integration`. This is a design choice that requires the user to + // voluntarily add a label to their Pods (via template, possibly) in order to monitor the non managed Camel applications. + + if !controller.hasTemplateIntegrationLabel() { + // This is happening when the Deployment, CronJob, etc resources + // miss the Integration label, required to identify sibling Pods. + integration.Status.SetConditions( + v1.IntegrationCondition{ + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionFalse, + Reason: v1.IntegrationConditionMonitoringPodsAvailableReason, + Message: fmt.Sprintf( + "Could not find `camel.apache.org/integration: %s` label in the %s template. Make sure to include this label in the template for Pod monitoring purposes.", + integration.GetName(), + controller.getControllerName(), + ), + }, + ) + return integration, nil + } + // Enforce the scale sub-resource label selector. // It is used by the HPA that queries the scale sub-resource endpoint, // to list the pods owned by the integration. @@ -161,7 +193,7 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra integration.Status.Phase = v1.IntegrationPhaseRunning } if err = action.updateIntegrationPhaseAndReadyCondition( - ctx, environment, integration, pendingPods.Items, runningPods.Items, + ctx, controller, environment, integration, pendingPods.Items, runningPods.Items, ); err != nil { return nil, err } @@ -255,6 +287,8 @@ type controller interface { checkReadyCondition(ctx context.Context) (bool, error) getPodSpec() corev1.PodSpec updateReadyCondition(readyPods int) bool + hasTemplateIntegrationLabel() bool + getControllerName() string } func (action *monitorAction) newController(env *trait.Environment, integration *v1.Integration) (controller, error) { @@ -311,13 +345,9 @@ func getUpdatedController(env *trait.Environment, obj ctrl.Object) ctrl.Object { } func (action *monitorAction) updateIntegrationPhaseAndReadyCondition( - ctx context.Context, environment *trait.Environment, integration *v1.Integration, + ctx context.Context, controller controller, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod, ) error { - controller, err := action.newController(environment, integration) - if err != nil { - return err - } if done, err := controller.checkReadyCondition(ctx); done || err != nil { // There may be pods that are not ready but still probable for getting error messages. // Ignore returned error from probing as it's expected when the ctrl obj is not ready. diff --git a/pkg/controller/integration/monitor_cronjob.go b/pkg/controller/integration/monitor_cronjob.go index 1620a66c31..caa0a67a77 100644 --- a/pkg/controller/integration/monitor_cronjob.go +++ b/pkg/controller/integration/monitor_cronjob.go @@ -110,3 +110,11 @@ func (c *cronJobController) updateReadyCondition(readyPods int) bool { return false } + +func (c *cronJobController) hasTemplateIntegrationLabel() bool { + return c.obj.Spec.JobTemplate.Spec.Template.Labels[v1.IntegrationLabel] != "" +} + +func (c *cronJobController) getControllerName() string { + return fmt.Sprintf("CronJob/%s", c.obj.Name) +} diff --git a/pkg/controller/integration/monitor_deployment.go b/pkg/controller/integration/monitor_deployment.go index e2f823c16f..08b0c35e93 100644 --- a/pkg/controller/integration/monitor_deployment.go +++ b/pkg/controller/integration/monitor_deployment.go @@ -91,3 +91,11 @@ func (c *deploymentController) updateReadyCondition(readyPods int) bool { return false } + +func (c *deploymentController) hasTemplateIntegrationLabel() bool { + return c.obj.Spec.Template.Labels[v1.IntegrationLabel] != "" +} + +func (c *deploymentController) getControllerName() string { + return fmt.Sprintf("Deployment/%s", c.obj.Name) +} diff --git a/pkg/controller/integration/monitor_knative.go b/pkg/controller/integration/monitor_knative.go index 06b7dc82bf..1c70195987 100644 --- a/pkg/controller/integration/monitor_knative.go +++ b/pkg/controller/integration/monitor_knative.go @@ -19,6 +19,7 @@ package integration import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" @@ -63,3 +64,11 @@ func (c *knativeServiceController) updateReadyCondition(readyPods int) bool { return false } + +func (c *knativeServiceController) hasTemplateIntegrationLabel() bool { + return c.obj.Spec.Template.Labels[v1.IntegrationLabel] != "" +} + +func (c *knativeServiceController) getControllerName() string { + return fmt.Sprintf("KnativeService/%s", c.obj.Name) +} diff --git a/pkg/controller/integration/monitor_synthetic.go b/pkg/controller/integration/monitor_synthetic.go new file mode 100644 index 0000000000..beb736b1d7 --- /dev/null +++ b/pkg/controller/integration/monitor_synthetic.go @@ -0,0 +1,69 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/trait" + k8serrors "k8s.io/apimachinery/pkg/api/errors" +) + +// NewMonitorSyntheticAction is an action used to monitor synthetic Integrations. +func NewMonitorSyntheticAction() Action { + return &monitorSyntheticAction{} +} + +type monitorSyntheticAction struct { + monitorAction +} + +func (action *monitorSyntheticAction) Name() string { + return "monitor-synthetic" +} + +func (action *monitorSyntheticAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) { + environment, err := trait.NewSyntheticEnvironment(ctx, action.client, integration, nil) + if err != nil { + // Importing application no longer available + if k8serrors.IsNotFound(err) { + // Application was deleted. The GC will take care of + return nil, nil + } + // other reasons, likely some error to report + integration.Status.Phase = v1.IntegrationPhaseError + integration.SetReadyCondition(corev1.ConditionFalse, v1.IntegrationConditionImportingKindAvailableReason, err.Error()) + return integration, err + } + + if environment == nil { + // The application which generated the Integration has no longer the importing label. We may have missed the + // delete event, therefore we need to perform the operation here. + err := action.client.Delete(ctx, integration) + action.L.Infof("Deleting synthetic Integration %s", integration.Name) + if err != nil { + return integration, err + } + return nil, nil + } + + return action.monitorPods(ctx, environment, integration) +} diff --git a/pkg/controller/integration/monitor_synthetic_test.go b/pkg/controller/integration/monitor_synthetic_test.go new file mode 100644 index 0000000000..b1cf8a66c2 --- /dev/null +++ b/pkg/controller/integration/monitor_synthetic_test.go @@ -0,0 +1,486 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + "testing" + + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" + + "github.com/apache/camel-k/v2/pkg/util/log" + "github.com/apache/camel-k/v2/pkg/util/test" + + "github.com/stretchr/testify/assert" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +func TestMonitorSyntheticIntegrationImportingKindUnavailable(t *testing.T) { + importedIt := &v1.Integration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.IntegrationKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-imported-it", + Annotations: map[string]string{ + v1.IntegrationImportedNameLabel: "my-deploy", + v1.IntegrationSyntheticLabel: "true", + v1.IntegrationImportedKindLabel: "SomeKind", + }, + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseRunning, + }, + } + c, err := test.NewFakeClient(importedIt) + assert.Nil(t, err) + + a := monitorSyntheticAction{} + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "monitor-synthetic", a.Name()) + assert.True(t, a.CanHandle(importedIt)) + handledIt, err := a.Handle(context.TODO(), importedIt) + assert.NotNil(t, err) + assert.Equal(t, v1.IntegrationPhaseError, handledIt.Status.Phase) + assert.Equal(t, corev1.ConditionFalse, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status) + assert.Equal(t, v1.IntegrationConditionImportingKindAvailableReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason) + assert.Equal(t, "cannot create a synthetic environment for SomeKind kind", handledIt.Status.GetCondition(v1.IntegrationConditionReady).Message) +} + +func TestMonitorSyntheticIntegrationCannotMonitorPods(t *testing.T) { + importedIt := &v1.Integration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.IntegrationKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-imported-it", + Annotations: map[string]string{ + v1.IntegrationImportedNameLabel: "my-deploy", + v1.IntegrationSyntheticLabel: "true", + v1.IntegrationImportedKindLabel: "Deployment", + }, + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseRunning, + Conditions: []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionDeploymentAvailable, + Status: corev1.ConditionTrue, + }, + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + deploy := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-deploy", + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + } + c, err := test.NewFakeClient(importedIt, deploy) + assert.Nil(t, err) + + a := monitorSyntheticAction{} + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "monitor-synthetic", a.Name()) + assert.True(t, a.CanHandle(importedIt)) + handledIt, err := a.Handle(context.TODO(), importedIt) + assert.Nil(t, err) + assert.Equal(t, corev1.ConditionFalse, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status) + // Check monitoring pods condition + assert.Equal(t, v1.IntegrationConditionMonitoringPodsAvailableReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason) + assert.Equal(t, "Could not find `camel.apache.org/integration: my-imported-it` label in the Deployment/my-deploy template. Make sure to include this label in the template for Pod monitoring purposes.", handledIt.Status.GetCondition(v1.IntegrationConditionReady).Message) +} + +func TestMonitorSyntheticIntegrationDeployment(t *testing.T) { + importedIt := &v1.Integration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.IntegrationKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-imported-it", + Annotations: map[string]string{ + v1.IntegrationImportedNameLabel: "my-deploy", + v1.IntegrationSyntheticLabel: "true", + v1.IntegrationImportedKindLabel: "Deployment", + }, + }, + Spec: v1.IntegrationSpec{ + Traits: v1.Traits{ + Container: &trait.ContainerTrait{ + Name: "my-cnt", + }, + }, + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseRunning, + Conditions: []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionDeploymentAvailable, + Status: corev1.ConditionTrue, + }, + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + deploy := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-deploy", + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: appsv1.DeploymentSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "my-cnt", + Image: "my-img", + }, + }, + }, + }, + }, + } + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-pod", + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "my-cnt", + Image: "my-img", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + c, err := test.NewFakeClient(importedIt, deploy, pod) + assert.Nil(t, err) + + a := monitorSyntheticAction{} + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "monitor-synthetic", a.Name()) + assert.True(t, a.CanHandle(importedIt)) + handledIt, err := a.Handle(context.TODO(), importedIt) + assert.Nil(t, err) + assert.Equal(t, v1.IntegrationPhaseRunning, handledIt.Status.Phase) + assert.Equal(t, int32(1), *handledIt.Status.Replicas) + // Ready condition + assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status) + assert.Equal(t, v1.IntegrationConditionDeploymentReadyReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason) + assert.Equal(t, "1/1 ready replicas", handledIt.Status.GetCondition(v1.IntegrationConditionReady).Message) + + // Remove label from deployment + deploy.Labels = nil + c, err = test.NewFakeClient(importedIt, deploy) + assert.Nil(t, err) + a.InjectClient(c) + handledIt, err = a.Handle(context.TODO(), importedIt) + assert.Nil(t, err) + assert.Nil(t, handledIt) +} + +func TestMonitorSyntheticIntegrationCronJob(t *testing.T) { + importedIt := &v1.Integration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.IntegrationKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-imported-it", + Annotations: map[string]string{ + v1.IntegrationImportedNameLabel: "my-cron", + v1.IntegrationSyntheticLabel: "true", + v1.IntegrationImportedKindLabel: "CronJob", + }, + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseRunning, + Conditions: []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionCronJobAvailable, + Status: corev1.ConditionTrue, + }, + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + cron := &batchv1.CronJob{ + TypeMeta: metav1.TypeMeta{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "CronJob", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-cron", + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: batchv1.CronJobSpec{ + JobTemplate: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "my-cnt", + Image: "my-img", + }, + }, + }, + }, + }, + }, + }, + } + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-pod", + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "my-cnt", + Image: "my-img", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + c, err := test.NewFakeClient(importedIt, cron, pod) + assert.Nil(t, err) + + a := monitorSyntheticAction{} + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "monitor-synthetic", a.Name()) + assert.True(t, a.CanHandle(importedIt)) + handledIt, err := a.Handle(context.TODO(), importedIt) + assert.Nil(t, err) + assert.Equal(t, v1.IntegrationPhaseRunning, handledIt.Status.Phase) + assert.Equal(t, int32(1), *handledIt.Status.Replicas) + // Ready condition + assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status) + assert.Equal(t, v1.IntegrationConditionCronJobCreatedReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason) + assert.Equal(t, "cronjob created", handledIt.Status.GetCondition(v1.IntegrationConditionReady).Message) +} + +func TestMonitorSyntheticIntegrationKnativeService(t *testing.T) { + importedIt := &v1.Integration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: v1.IntegrationKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-imported-it", + Annotations: map[string]string{ + v1.IntegrationImportedNameLabel: "my-ksvc", + v1.IntegrationSyntheticLabel: "true", + v1.IntegrationImportedKindLabel: "KnativeService", + }, + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseRunning, + Conditions: []v1.IntegrationCondition{ + { + Type: v1.IntegrationConditionKnativeServiceAvailable, + Status: corev1.ConditionTrue, + }, + { + Type: v1.IntegrationConditionReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + ksvc := &servingv1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-ksvc", + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: servingv1.ServiceSpec{ + ConfigurationSpec: servingv1.ConfigurationSpec{ + Template: servingv1.RevisionTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: servingv1.RevisionSpec{ + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "my-cnt", + Image: "my-img", + }, + }, + }, + }, + }, + }, + }, + Status: servingv1.ServiceStatus{ + Status: duckv1.Status{ + Conditions: duckv1.Conditions{ + apis.Condition{ + Type: servingv1.ServiceConditionReady, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + } + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-pod", + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "my-cnt", + Image: "my-img", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + c, err := test.NewFakeClient(importedIt, ksvc, pod) + assert.Nil(t, err) + + a := monitorSyntheticAction{} + a.InjectLogger(log.Log) + a.InjectClient(c) + assert.Equal(t, "monitor-synthetic", a.Name()) + assert.True(t, a.CanHandle(importedIt)) + handledIt, err := a.Handle(context.TODO(), importedIt) + assert.Nil(t, err) + assert.Equal(t, v1.IntegrationPhaseRunning, handledIt.Status.Phase) + assert.Equal(t, int32(1), *handledIt.Status.Replicas) + // Ready condition + assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status) + assert.Equal(t, v1.IntegrationConditionKnativeServiceReadyReason, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Reason) +} diff --git a/pkg/controller/pipe/pipe_controller.go b/pkg/controller/pipe/pipe_controller.go index 36da7fca1a..5b174e435e 100644 --- a/pkg/controller/pipe/pipe_controller.go +++ b/pkg/controller/pipe/pipe_controller.go @@ -66,7 +66,7 @@ func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler { } func add(mgr manager.Manager, r reconcile.Reconciler) error { - c, err := controller.New("kamelet-binding-controller", mgr, controller.Options{Reconciler: r}) + c, err := controller.New("pipe-controller", mgr, controller.Options{Reconciler: r}) if err != nil { return err } diff --git a/pkg/controller/synthetic/synthetic.go b/pkg/controller/synthetic/synthetic.go new file mode 100644 index 0000000000..974a2eb05f --- /dev/null +++ b/pkg/controller/synthetic/synthetic.go @@ -0,0 +1,294 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package synthetic + +import ( + "context" + "fmt" + "reflect" + + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" + "github.com/apache/camel-k/v2/pkg/client" + "github.com/apache/camel-k/v2/pkg/platform" + "github.com/apache/camel-k/v2/pkg/util/kubernetes" + "github.com/apache/camel-k/v2/pkg/util/log" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientgocache "k8s.io/client-go/tools/cache" + "knative.dev/serving/pkg/apis/serving" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + "sigs.k8s.io/controller-runtime/pkg/cache" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + controller = true + blockOwnerDeletion = true +) + +// ManageSyntheticIntegrations is the controller for synthetic Integrations. Consider that the lifecycle of the objects are driven +// by the way we are monitoring them. Since we're filtering by `camel.apache.org/integration` label in the cached client, +// you must consider an add, update or delete +// accordingly, ie, when the user label the resource, then it is considered as an add, when it removes the label, it is considered as a delete. +// We must filter only non managed objects in order to avoid to conflict with the reconciliation loop of managed objects (owned by an Integration). +func ManageSyntheticIntegrations(ctx context.Context, c client.Client, cache cache.Cache) error { + informers, err := getInformers(ctx, c, cache) + if err != nil { + return err + } + for _, informer := range informers { + _, err := informer.AddEventHandler(clientgocache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ctrlObj, ok := obj.(ctrl.Object) + if !ok { + log.Error(fmt.Errorf("type assertion failed: %v", obj), "Failed to retrieve Object on add event") + return + } + if !isManagedObject(ctrlObj) { + integrationName := ctrlObj.GetLabels()[v1.IntegrationLabel] + it, err := getSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName) + if err != nil { + if k8serrors.IsNotFound(err) { + adapter, err := nonManagedCamelApplicationFactory(ctrlObj) + if err != nil { + log.Errorf(err, "Some error happened while creating a Camel application adapter for %s", integrationName) + } + if err = createSyntheticIntegration(ctx, c, adapter.Integration()); err != nil { + log.Errorf(err, "Some error happened while creating a synthetic Integration %s", integrationName) + } + log.Infof("Created a synthetic Integration %s after %s resource object", it.GetName(), ctrlObj.GetName()) + } else { + log.Errorf(err, "Some error happened while loading a synthetic Integration %s", integrationName) + } + } else { + log.Infof("Synthetic Integration %s is in phase %s. Skipping.", integrationName, it.Status.Phase) + } + } + }, + DeleteFunc: func(obj interface{}) { + ctrlObj, ok := obj.(ctrl.Object) + if !ok { + log.Error(fmt.Errorf("type assertion failed: %v", obj), "Failed to retrieve Object on delete event") + return + } + if !isManagedObject(ctrlObj) { + integrationName := ctrlObj.GetLabels()[v1.IntegrationLabel] + // Importing label removed + if err = deleteSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName); err != nil { + log.Errorf(err, "Some error happened while deleting a synthetic Integration %s", integrationName) + } + log.Infof("Deleted synthetic Integration %s", integrationName) + } + }, + }) + if err != nil { + return err + } + } + + return nil +} + +func getInformers(ctx context.Context, cl client.Client, c cache.Cache) ([]cache.Informer, error) { + deploy, err := c.GetInformer(ctx, &appsv1.Deployment{}) + if err != nil { + return nil, err + } + informers := []cache.Informer{deploy} + // Watch for the CronJob conditionally + if ok, err := kubernetes.IsAPIResourceInstalled(cl, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil { + cron, err := c.GetInformer(ctx, &batchv1.CronJob{}) + if err != nil { + return nil, err + } + informers = append(informers, cron) + } + // Watch for the Knative Services conditionally + if ok, err := kubernetes.IsAPIResourceInstalled(cl, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); ok && err == nil { + if ok, err := kubernetes.CheckPermission(ctx, cl, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); ok && err == nil { + ksvc, err := c.GetInformer(ctx, &servingv1.Service{}) + if err != nil { + return nil, err + } + informers = append(informers, ksvc) + } + } + + return informers, nil +} + +func getSyntheticIntegration(ctx context.Context, c client.Client, namespace, name string) (*v1.Integration, error) { + it := v1.NewIntegration(namespace, name) + err := c.Get(ctx, ctrl.ObjectKeyFromObject(&it), &it) + return &it, err +} + +func createSyntheticIntegration(ctx context.Context, c client.Client, it *v1.Integration) error { + return c.Create(ctx, it, ctrl.FieldOwner("camel-k-operator")) +} + +func deleteSyntheticIntegration(ctx context.Context, c client.Client, namespace, name string) error { + // As the Integration label was removed, we don't know which is the Synthetic integration to remove + it := v1.NewIntegration(namespace, name) + return c.Delete(ctx, &it) +} + +// isManagedObject returns true if the object is managed by an Integration. +func isManagedObject(obj ctrl.Object) bool { + for _, mr := range obj.GetOwnerReferences() { + if mr.APIVersion == "camel.apache.org/v1" && + mr.Kind == "Integration" { + return true + } + } + return false +} + +// nonManagedCamelApplicationAdapter represents a Camel application built and deployed outside the operator lifecycle. +type nonManagedCamelApplicationAdapter interface { + // Integration return an Integration resource fed by the Camel application adapter. + Integration() *v1.Integration +} + +func nonManagedCamelApplicationFactory(obj ctrl.Object) (nonManagedCamelApplicationAdapter, error) { + deploy, ok := obj.(*appsv1.Deployment) + if ok { + return &nonManagedCamelDeployment{deploy: deploy}, nil + } + cronjob, ok := obj.(*batchv1.CronJob) + if ok { + return &NonManagedCamelCronjob{cron: cronjob}, nil + } + ksvc, ok := obj.(*servingv1.Service) + if ok { + return &NonManagedCamelKnativeService{ksvc: ksvc}, nil + } + return nil, fmt.Errorf("unsupported %s object kind", obj.GetName()) +} + +// NonManagedCamelDeployment represents a regular Camel application built and deployed outside the operator lifecycle. +type nonManagedCamelDeployment struct { + deploy *appsv1.Deployment +} + +// Integration return an Integration resource fed by the Camel application adapter. +func (app *nonManagedCamelDeployment) Integration() *v1.Integration { + it := v1.NewIntegration(app.deploy.Namespace, app.deploy.Labels[v1.IntegrationLabel]) + it.SetAnnotations(map[string]string{ + v1.IntegrationImportedNameLabel: app.deploy.Name, + v1.IntegrationImportedKindLabel: "Deployment", + v1.IntegrationSyntheticLabel: "true", + }) + it.Spec = v1.IntegrationSpec{ + Traits: v1.Traits{ + Container: &trait.ContainerTrait{ + Name: app.getContainerNameFromDeployment(), + }, + }, + } + references := []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + Name: app.deploy.Name, + UID: app.deploy.UID, + Controller: &controller, + BlockOwnerDeletion: &blockOwnerDeletion, + }, + } + it.SetOwnerReferences(references) + return &it +} + +// getContainerNameFromDeployment returns the container name which is running the Camel application. +func (app *nonManagedCamelDeployment) getContainerNameFromDeployment() string { + firstContainerName := "" + for _, ct := range app.deploy.Spec.Template.Spec.Containers { + // set as fallback if no container is named as the deployment + if firstContainerName == "" { + firstContainerName = ct.Name + } + if ct.Name == app.deploy.Name { + return app.deploy.Name + } + } + return firstContainerName +} + +// NonManagedCamelCronjob represents a cron Camel application built and deployed outside the operator lifecycle. +type NonManagedCamelCronjob struct { + cron *batchv1.CronJob +} + +// Integration return an Integration resource fed by the Camel application adapter. +func (app *NonManagedCamelCronjob) Integration() *v1.Integration { + it := v1.NewIntegration(app.cron.Namespace, app.cron.Labels[v1.IntegrationLabel]) + it.SetAnnotations(map[string]string{ + v1.IntegrationImportedNameLabel: app.cron.Name, + v1.IntegrationImportedKindLabel: "CronJob", + v1.IntegrationSyntheticLabel: "true", + }) + it.Spec = v1.IntegrationSpec{ + Traits: v1.Traits{}, + } + references := []metav1.OwnerReference{ + { + APIVersion: "batch/v1", + Kind: "CronJob", + Name: app.cron.Name, + UID: app.cron.UID, + Controller: &controller, + BlockOwnerDeletion: &blockOwnerDeletion, + }, + } + it.SetOwnerReferences(references) + return &it +} + +// NonManagedCamelKnativeService represents a Knative Service based Camel application built and deployed outside the operator lifecycle. +type NonManagedCamelKnativeService struct { + ksvc *servingv1.Service +} + +// Integration return an Integration resource fed by the Camel application adapter. +func (app *NonManagedCamelKnativeService) Integration() *v1.Integration { + it := v1.NewIntegration(app.ksvc.Namespace, app.ksvc.Labels[v1.IntegrationLabel]) + it.SetAnnotations(map[string]string{ + v1.IntegrationImportedNameLabel: app.ksvc.Name, + v1.IntegrationImportedKindLabel: "KnativeService", + v1.IntegrationSyntheticLabel: "true", + }) + it.Spec = v1.IntegrationSpec{ + Traits: v1.Traits{}, + } + references := []metav1.OwnerReference{ + { + APIVersion: servingv1.SchemeGroupVersion.String(), + Kind: "Service", + Name: app.ksvc.Name, + UID: app.ksvc.UID, + Controller: &controller, + BlockOwnerDeletion: &blockOwnerDeletion, + }, + } + it.SetOwnerReferences(references) + return &it +} diff --git a/pkg/controller/synthetic/synthetic_test.go b/pkg/controller/synthetic/synthetic_test.go new file mode 100644 index 0000000000..fcc15077af --- /dev/null +++ b/pkg/controller/synthetic/synthetic_test.go @@ -0,0 +1,253 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package synthetic + +import ( + "testing" + + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" + + "github.com/stretchr/testify/assert" +) + +func TestNonManagedUnsupported(t *testing.T) { + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-pod", + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "my-cnt", + Image: "my-img", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + + nilAdapter, err := nonManagedCamelApplicationFactory(pod) + assert.NotNil(t, err) + assert.Equal(t, "unsupported my-pod object kind", err.Error()) + assert.Nil(t, nilAdapter) +} + +func TestNonManagedDeployment(t *testing.T) { + deploy := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-deploy", + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: appsv1.DeploymentSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "my-cnt", + Image: "my-img", + }, + }, + }, + }, + }, + } + + expectedIt := v1.NewIntegration("ns", "my-imported-it") + expectedIt.SetAnnotations(map[string]string{ + v1.IntegrationImportedNameLabel: "my-deploy", + v1.IntegrationImportedKindLabel: "Deployment", + v1.IntegrationSyntheticLabel: "true", + }) + expectedIt.Spec = v1.IntegrationSpec{ + Traits: v1.Traits{ + Container: &trait.ContainerTrait{ + Name: "my-cnt", + }, + }, + } + references := []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + Name: deploy.Name, + UID: deploy.UID, + Controller: &controller, + BlockOwnerDeletion: &blockOwnerDeletion, + }, + } + expectedIt.SetOwnerReferences(references) + + deploymentAdapter, err := nonManagedCamelApplicationFactory(deploy) + assert.Nil(t, err) + assert.NotNil(t, deploymentAdapter) + assert.Equal(t, expectedIt, *deploymentAdapter.Integration()) +} + +func TestNonManagedCronJob(t *testing.T) { + cron := &batchv1.CronJob{ + TypeMeta: metav1.TypeMeta{ + APIVersion: batchv1.SchemeGroupVersion.String(), + Kind: "CronJob", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-cron", + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: batchv1.CronJobSpec{ + JobTemplate: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "my-cnt", + Image: "my-img", + }, + }, + }, + }, + }, + }, + }, + } + + expectedIt := v1.NewIntegration("ns", "my-imported-it") + expectedIt.SetAnnotations(map[string]string{ + v1.IntegrationImportedNameLabel: "my-cron", + v1.IntegrationImportedKindLabel: "CronJob", + v1.IntegrationSyntheticLabel: "true", + }) + references := []metav1.OwnerReference{ + { + APIVersion: "batch/v1", + Kind: "CronJob", + Name: cron.Name, + UID: cron.UID, + Controller: &controller, + BlockOwnerDeletion: &blockOwnerDeletion, + }, + } + expectedIt.SetOwnerReferences(references) + cronJobAdapter, err := nonManagedCamelApplicationFactory(cron) + assert.Nil(t, err) + assert.NotNil(t, cronJobAdapter) + assert.Equal(t, expectedIt, *cronJobAdapter.Integration()) +} + +func TestNonManagedKnativeService(t *testing.T) { + ksvc := &servingv1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: servingv1.SchemeGroupVersion.String(), + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "my-ksvc", + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: servingv1.ServiceSpec{ + ConfigurationSpec: servingv1.ConfigurationSpec{ + Template: servingv1.RevisionTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.IntegrationLabel: "my-imported-it", + }, + }, + Spec: servingv1.RevisionSpec{ + PodSpec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "my-cnt", + Image: "my-img", + }, + }, + }, + }, + }, + }, + }, + } + + expectedIt := v1.NewIntegration("ns", "my-imported-it") + expectedIt.SetAnnotations(map[string]string{ + v1.IntegrationImportedNameLabel: "my-ksvc", + v1.IntegrationImportedKindLabel: "KnativeService", + v1.IntegrationSyntheticLabel: "true", + }) + references := []metav1.OwnerReference{ + { + APIVersion: servingv1.SchemeGroupVersion.String(), + Kind: "Service", + Name: ksvc.Name, + UID: ksvc.UID, + Controller: &controller, + BlockOwnerDeletion: &blockOwnerDeletion, + }, + } + expectedIt.SetOwnerReferences(references) + + knativeServiceAdapter, err := nonManagedCamelApplicationFactory(ksvc) + assert.Nil(t, err) + assert.NotNil(t, knativeServiceAdapter) + assert.Equal(t, expectedIt, *knativeServiceAdapter.Integration()) +} diff --git a/pkg/trait/camel.go b/pkg/trait/camel.go index 2a4e7b3f4d..71a24550e4 100644 --- a/pkg/trait/camel.go +++ b/pkg/trait/camel.go @@ -64,7 +64,8 @@ func (t *camelTrait) Configure(e *Environment) (bool, *TraitCondition, error) { t.RuntimeVersion = determineRuntimeVersion(e) } - return true, nil, nil + // Don't run this trait for a synthetic Integration + return e.Integration == nil || !e.Integration.IsSynthetic(), nil, nil } func (t *camelTrait) Apply(e *Environment) error { diff --git a/pkg/trait/platform.go b/pkg/trait/platform.go index 58e5455977..ec3fb1e04f 100644 --- a/pkg/trait/platform.go +++ b/pkg/trait/platform.go @@ -73,7 +73,8 @@ func (t *platformTrait) Configure(e *Environment) (bool, *TraitCondition, error) } } - return true, nil, nil + // Don't run this trait for a synthetic Integration + return e.Integration == nil || !e.Integration.IsSynthetic(), nil, nil } func (t *platformTrait) Apply(e *Environment) error { diff --git a/pkg/trait/trait.go b/pkg/trait/trait.go index 33676616f1..059b294f14 100644 --- a/pkg/trait/trait.go +++ b/pkg/trait/trait.go @@ -24,13 +24,15 @@ import ( corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/client" "github.com/apache/camel-k/v2/pkg/platform" "github.com/apache/camel-k/v2/pkg/util/kubernetes" "github.com/apache/camel-k/v2/pkg/util/log" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" + serving "knative.dev/serving/pkg/apis/serving/v1" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" ) func Apply(ctx context.Context, c client.Client, integration *v1.Integration, kit *v1.IntegrationKit) (*Environment, error) { @@ -97,7 +99,7 @@ func newEnvironment(ctx context.Context, c client.Client, integration *v1.Integr return nil, errors.New("neither integration nor kit are set") } - var obj k8sclient.Object + var obj ctrl.Object if integration != nil { obj = integration } else if kit != nil { @@ -134,3 +136,73 @@ func newEnvironment(ctx context.Context, c client.Client, integration *v1.Integr return &env, nil } + +// NewSyntheticEnvironment creates an environment suitable for a synthetic Integration. If the application which generated the synthetic Integration +// has no longer the label, it will return a nil result. +func NewSyntheticEnvironment(ctx context.Context, c client.Client, integration *v1.Integration, kit *v1.IntegrationKit) (*Environment, error) { + if integration == nil && kit == nil { + return nil, errors.New("neither integration nor kit are set") + } + + env := Environment{ + Ctx: ctx, + Platform: nil, + Client: c, + IntegrationKit: kit, + Integration: integration, + ExecutedTraits: make([]Trait, 0), + Resources: kubernetes.NewCollection(), + EnvVars: make([]corev1.EnvVar, 0), + ApplicationProperties: make(map[string]string), + } + + catalog := NewCatalog(c) + // set the catalog + env.Catalog = catalog + // we need to simulate the execution of the traits to fill certain values used later by monitoring + _, err := catalog.apply(&env) + if err != nil { + return nil, fmt.Errorf("error during trait customization: %w", err) + } + camelApp, err := getCamelAppObject( + ctx, + c, + integration.Annotations[v1.IntegrationImportedKindLabel], + integration.Namespace, + integration.Annotations[v1.IntegrationImportedNameLabel], + ) + if err != nil { + return nil, err + } + // Verify if the application has still the expected label. If not, return nil. + if camelApp.GetLabels()[v1.IntegrationLabel] != integration.Name { + return nil, nil + } + env.Resources.Add(camelApp) + + return &env, nil +} + +func getCamelAppObject(ctx context.Context, c client.Client, kind, namespace, name string) (ctrl.Object, error) { + switch kind { + case "Deployment": + return c.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) + case "CronJob": + return c.BatchV1().CronJobs(namespace).Get(ctx, name, metav1.GetOptions{}) + case "KnativeService": + ksvc := &serving.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: serving.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + err := c.Get(ctx, ctrl.ObjectKeyFromObject(ksvc), ksvc) + return ksvc, err + default: + return nil, fmt.Errorf("cannot create a synthetic environment for %s kind", kind) + } +} diff --git a/pkg/util/test/client.go b/pkg/util/test/client.go index fef78d2b62..9105719ebf 100644 --- a/pkg/util/test/client.go +++ b/pkg/util/test/client.go @@ -30,6 +30,7 @@ import ( camelv1alpha1 "github.com/apache/camel-k/v2/pkg/client/camel/clientset/versioned/typed/camel/v1alpha1" "github.com/apache/camel-k/v2/pkg/util" autoscalingv1 "k8s.io/api/autoscaling/v1" + corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -55,7 +56,19 @@ func NewFakeClient(initObjs ...runtime.Object) (client.Client, error) { return nil, err } - c := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(initObjs...).Build() + c := fake. + NewClientBuilder(). + WithScheme(scheme). + WithIndex( + &corev1.Pod{}, + "status.phase", + func(obj controller.Object) []string { + pod, _ := obj.(*corev1.Pod) + return []string{string(pod.Status.Phase)} + }, + ). + WithRuntimeObjects(initObjs...). + Build() camelClientset := fakecamelclientset.NewSimpleClientset(filterObjects(scheme, initObjs, func(gvk schema.GroupVersionKind) bool { return strings.Contains(gvk.Group, "camel")