From c334edd8e3de5332cde6a9503c984fcfe02c9337 Mon Sep 17 00:00:00 2001 From: Omer Aplatony Date: Thu, 2 Jan 2025 17:04:18 +0200 Subject: [PATCH 1/2] refactor gcp cloud tasks scaler (#6406) Signed-off-by: Omer Aplatony --- pkg/scalers/gcp_cloud_tasks_scaler.go | 75 +++---------- pkg/scalers/gcp_cloud_tasks_scaler_test.go | 125 ++++++++++++++++----- 2 files changed, 114 insertions(+), 86 deletions(-) diff --git a/pkg/scalers/gcp_cloud_tasks_scaler.go b/pkg/scalers/gcp_cloud_tasks_scaler.go index 3c92633d1e7..90a250693de 100644 --- a/pkg/scalers/gcp_cloud_tasks_scaler.go +++ b/pkg/scalers/gcp_cloud_tasks_scaler.go @@ -3,7 +3,6 @@ package scalers import ( "context" "fmt" - "strconv" "github.com/go-logr/logr" v2 "k8s.io/api/autoscaling/v2" @@ -16,8 +15,6 @@ import ( const ( cloudTasksStackDriverQueueSize = "cloudtasks.googleapis.com/queue/depth" - - cloudTaskDefaultValue = 100 ) type gcpCloudTasksScaler struct { @@ -28,12 +25,12 @@ type gcpCloudTasksScaler struct { } type gcpCloudTaskMetadata struct { - value float64 - activationValue float64 - filterDuration int64 + Value float64 `keda:"name=value, order=triggerMetadata, optional, default=100"` + ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, optional, default=0"` + FilterDuration int64 `keda:"name=filterDuration, order=triggerMetadata, optional"` - queueName string - projectID string + QueueName string `keda:"name=queueName, order=triggerMetadata"` + ProjectID string `keda:"name=projectID, order=triggerMetadata"` gcpAuthorization *gcp.AuthorizationMetadata triggerIndex int } @@ -60,61 +57,19 @@ func NewGcpCloudTasksScaler(config *scalersconfig.ScalerConfig) (Scaler, error) } func parseGcpCloudTasksMetadata(config *scalersconfig.ScalerConfig) (*gcpCloudTaskMetadata, error) { - meta := gcpCloudTaskMetadata{value: cloudTaskDefaultValue} - - value, valuePresent := config.TriggerMetadata["value"] - - if valuePresent { - triggerValue, err := strconv.ParseFloat(value, 64) - if err != nil { - return nil, fmt.Errorf("value parsing error %w", err) - } - meta.value = triggerValue - } - - if val, ok := config.TriggerMetadata["queueName"]; ok { - if val == "" { - return nil, fmt.Errorf("no queue name given") - } - meta.queueName = val - } else { - return nil, fmt.Errorf("no queue name given") - } - - if val, ok := config.TriggerMetadata["filterDuration"]; ok { - filterDuration, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("filterDuration parsing error %w", err) - } - meta.filterDuration = filterDuration - } - - meta.activationValue = 0 - if val, ok := config.TriggerMetadata["activationValue"]; ok { - activationValue, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("activationValue parsing error %w", err) - } - meta.activationValue = activationValue - } - - if val, ok := config.TriggerMetadata["projectID"]; ok { - if val == "" { - return nil, fmt.Errorf("no project id given") - } - - meta.projectID = val - } else { - return nil, fmt.Errorf("no project id given") + meta := &gcpCloudTaskMetadata{} + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing Gcp cloud task metadata: %w", err) } auth, err := gcp.GetGCPAuthorization(config) if err != nil { return nil, err } + meta.gcpAuthorization = auth meta.triggerIndex = config.TriggerIndex - return &meta, nil + return meta, nil } func (s *gcpCloudTasksScaler) Close(context.Context) error { @@ -132,9 +87,9 @@ func (s *gcpCloudTasksScaler) Close(context.Context) error { func (s *gcpCloudTasksScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ct-%s", s.metadata.queueName))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ct-%s", s.metadata.QueueName))), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.value), + Target: GetMetricTargetMili(s.metricType, s.metadata.Value), } // Create the metric spec for the HPA @@ -158,7 +113,7 @@ func (s *gcpCloudTasksScaler) GetMetricsAndActivity(ctx context.Context, metricN metric := GenerateMetricInMili(metricName, value) - return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationValue, nil + return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.ActivationValue, nil } func (s *gcpCloudTasksScaler) setStackdriverClient(ctx context.Context) error { @@ -185,9 +140,9 @@ func (s *gcpCloudTasksScaler) getMetrics(ctx context.Context, metricType string) return -1, err } } - filter := `metric.type="` + metricType + `" AND resource.labels.queue_id="` + s.metadata.queueName + `"` + filter := `metric.type="` + metricType + `" AND resource.labels.queue_id="` + s.metadata.QueueName + `"` // Cloud Tasks metrics are collected every 60 seconds so no need to aggregate them. // See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudtasks - return s.client.GetMetrics(ctx, filter, s.metadata.projectID, nil, nil, s.metadata.filterDuration) + return s.client.GetMetrics(ctx, filter, s.metadata.ProjectID, nil, nil, s.metadata.FilterDuration) } diff --git a/pkg/scalers/gcp_cloud_tasks_scaler_test.go b/pkg/scalers/gcp_cloud_tasks_scaler_test.go index 61772272e0a..4f3b6d84b62 100644 --- a/pkg/scalers/gcp_cloud_tasks_scaler_test.go +++ b/pkg/scalers/gcp_cloud_tasks_scaler_test.go @@ -2,10 +2,12 @@ package scalers import ( "context" + "reflect" "testing" "github.com/go-logr/logr" + "github.com/kedacore/keda/v2/pkg/scalers/gcp" "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" ) @@ -17,6 +19,8 @@ type parseGcpCloudTasksMetadataTestData struct { authParams map[string]string metadata map[string]string isError bool + expected *gcpCloudTaskMetadata + comment string } type gcpCloudTasksMetricIdentifier struct { @@ -26,25 +30,82 @@ type gcpCloudTasksMetricIdentifier struct { } var testGcpCloudTasksMetadata = []parseGcpCloudTasksMetadataTestData{ - {map[string]string{}, map[string]string{}, true}, - // all properly formed - {nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "5"}, false}, - // missing subscriptionName - {nil, map[string]string{"queueName": "", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, - // missing credentials - {nil, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject", "credentialsFromEnv": ""}, true}, - // malformed subscriptionSize - {nil, map[string]string{"queueName": "myQueue", "value": "AA", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, - // malformed mode - {nil, map[string]string{"queueName": "", "mode": "AA", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, - // malformed activationTargetValue - {nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "AA"}, true}, - // Credentials from AuthParams - {map[string]string{"GoogleApplicationCredentials": "Creds"}, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject"}, false}, - // Credentials from AuthParams with empty creds - {map[string]string{"GoogleApplicationCredentials": ""}, map[string]string{"queueName": "myQueue", "subscriptionSize": "7", "projectID": "myproject"}, true}, - // properly formed float value and activationTargetValue - {nil, map[string]string{"queueName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1", "projectID": "myproject"}, false}, + + {map[string]string{}, map[string]string{}, true, nil, "erro case"}, + + {nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "5"}, false, &gcpCloudTaskMetadata{ + Value: 7, + ActivationValue: 5, + FilterDuration: 0, + QueueName: "myQueue", + ProjectID: "myproject", + gcpAuthorization: &gcp.AuthorizationMetadata{ + GoogleApplicationCredentials: "{}", + PodIdentityProviderEnabled: false, + }, + triggerIndex: 0}, "all properly formed"}, + + {nil, map[string]string{"queueName": "", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true, nil, "missing subscriptionName"}, + + {nil, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject", "credentialsFromEnv": ""}, true, nil, "missing credentials"}, + + {nil, map[string]string{"queueName": "myQueue", "value": "AA", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true, nil, "malformed subscriptionSize"}, + + {nil, map[string]string{"queueName": "", "mode": "AA", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true, nil, "malformed mode"}, + + {nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "AA"}, true, nil, "malformed activationTargetValue"}, + + {map[string]string{"GoogleApplicationCredentials": "Creds"}, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject"}, false, &gcpCloudTaskMetadata{ + Value: 7, + ActivationValue: 0, + FilterDuration: 0, + QueueName: "myQueue", + ProjectID: "myproject", + gcpAuthorization: &gcp.AuthorizationMetadata{ + GoogleApplicationCredentials: "Creds", + PodIdentityProviderEnabled: false, + }, + triggerIndex: 0}, "Credentials from AuthParams"}, + + {map[string]string{"GoogleApplicationCredentials": ""}, map[string]string{"queueName": "myQueue", "subscriptionSize": "7", "projectID": "myproject"}, true, nil, "Credentials from AuthParams with empty creds"}, + + {nil, map[string]string{"queueName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1", "projectID": "myproject"}, false, &gcpCloudTaskMetadata{ + Value: 7.1, + ActivationValue: 2.1, + FilterDuration: 0, + QueueName: "mysubscription", + ProjectID: "myproject", + gcpAuthorization: &gcp.AuthorizationMetadata{ + GoogleApplicationCredentials: "{}", + PodIdentityProviderEnabled: false, + }, + triggerIndex: 0}, "properly formed float value and activationTargetValue"}, + + {nil, map[string]string{"queueName": "myQueue", "projectID": "myProject", "credentialsFromEnv": "SAMPLE_CREDS"}, false, &gcpCloudTaskMetadata{ + Value: 100, + ActivationValue: 0, + FilterDuration: 0, + QueueName: "myQueue", + ProjectID: "myProject", + gcpAuthorization: &gcp.AuthorizationMetadata{ + GoogleApplicationCredentials: "{}", + PodIdentityProviderEnabled: false, + }, + triggerIndex: 0}, "test default value (100) when value is not provided"}, + + {nil, map[string]string{"queueName": "myQueue", "projectID": "myProject", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "5"}, false, &gcpCloudTaskMetadata{ + Value: 100, + ActivationValue: 5, + FilterDuration: 0, + QueueName: "myQueue", + ProjectID: "myProject", + gcpAuthorization: &gcp.AuthorizationMetadata{ + GoogleApplicationCredentials: "{}", + PodIdentityProviderEnabled: false, + }, + triggerIndex: 0}, "test default value with specified activationVal"}, + + {nil, map[string]string{"queueName": "myQueue", "projectID": "myProject", "credentialsFromEnv": "SAMPLE_CREDS", "filterDuration": "invalid"}, true, nil, "test invalid filterDuration with default values"}, } var gcpCloudTasksMetricIdentifiers = []gcpCloudTasksMetricIdentifier{ @@ -54,13 +115,25 @@ var gcpCloudTasksMetricIdentifiers = []gcpCloudTasksMetricIdentifier{ func TestGcpCloudTasksParseMetadata(t *testing.T) { for _, testData := range testGcpCloudTasksMetadata { - _, err := parseGcpCloudTasksMetadata(&scalersconfig.ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testGcpCloudTasksResolvedEnv}) - if err != nil && !testData.isError { - t.Error("Expected success but got error", err) - } - if testData.isError && err == nil { - t.Error("Expected error but got success") - } + t.Run(testData.comment, func(t *testing.T) { + metadata, err := parseGcpCloudTasksMetadata(&scalersconfig.ScalerConfig{ + AuthParams: testData.authParams, + TriggerMetadata: testData.metadata, + ResolvedEnv: testGcpCloudTasksResolvedEnv, + }) + + if err != nil && !testData.isError { + t.Errorf("Expected success but got error") + } + + if testData.isError && err == nil { + t.Errorf("Expected error but got success") + } + + if !testData.isError && !reflect.DeepEqual(testData.expected, metadata) { + t.Fatalf("Expected %#v but got %+#v", testData.expected, metadata) + } + }) } } From 720af45b1d3fc28891f9d9ea64e092f60b34ee04 Mon Sep 17 00:00:00 2001 From: Shane Date: Thu, 2 Jan 2025 23:40:28 +0800 Subject: [PATCH 2/2] Fix testLogBug and changeSomeVariableName (#6420) Signed-off-by: Shane --- apis/keda/v1alpha1/scaledobject_types.go | 2 +- apis/keda/v1alpha1/scaledobject_webhook.go | 14 +++++++------- tests/internals/events/events_test.go | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/apis/keda/v1alpha1/scaledobject_types.go b/apis/keda/v1alpha1/scaledobject_types.go index 2ffbcc4932e..ce810528485 100644 --- a/apis/keda/v1alpha1/scaledobject_types.go +++ b/apis/keda/v1alpha1/scaledobject_types.go @@ -75,7 +75,7 @@ const ( // HealthStatusFailing means the status of the health object is failing HealthStatusFailing HealthStatusType = "Failing" - // Composite metric name used for scalingModifiers composite metric + // CompositeMetricName is used for scalingModifiers composite metric CompositeMetricName string = "composite-metric" defaultHPAMinReplicas int32 = 1 diff --git a/apis/keda/v1alpha1/scaledobject_webhook.go b/apis/keda/v1alpha1/scaledobject_webhook.go index 41bd0355596..62325038866 100644 --- a/apis/keda/v1alpha1/scaledobject_webhook.go +++ b/apis/keda/v1alpha1/scaledobject_webhook.go @@ -231,8 +231,8 @@ func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error { return err } - var incomingSoGckr GroupVersionKindResource - incomingSoGckr, err = ParseGVKR(restMapper, incomingSo.Spec.ScaleTargetRef.APIVersion, incomingSo.Spec.ScaleTargetRef.Kind) + var incomingSoGvkr GroupVersionKindResource + incomingSoGvkr, err = ParseGVKR(restMapper, incomingSo.Spec.ScaleTargetRef.APIVersion, incomingSo.Spec.ScaleTargetRef.Kind) if err != nil { scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from incoming ScaledObject", "apiVersion", incomingSo.Spec.ScaleTargetRef.APIVersion, "kind", incomingSo.Spec.ScaleTargetRef.Kind) return err @@ -245,13 +245,13 @@ func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error { val, _ := json.MarshalIndent(hpa, "", " ") scaledobjectlog.V(1).Info(fmt.Sprintf("checking hpa %s: %v", hpa.Name, string(val))) - hpaGckr, err := ParseGVKR(restMapper, hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind) + hpaGvkr, err := ParseGVKR(restMapper, hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind) if err != nil { scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from HPA", "hpaName", hpa.Name, "apiVersion", hpa.Spec.ScaleTargetRef.APIVersion, "kind", hpa.Spec.ScaleTargetRef.Kind) return err } - if hpaGckr.GVKString() == incomingSoGckr.GVKString() && + if hpaGvkr.GVKString() == incomingSoGvkr.GVKString() && hpa.Spec.ScaleTargetRef.Name == incomingSo.Spec.ScaleTargetRef.Name { owned := false for _, owner := range hpa.OwnerReferences { @@ -268,7 +268,7 @@ func verifyHpas(incomingSo *ScaledObject, action string, _ bool) error { incomingSo.Spec.Advanced.HorizontalPodAutoscalerConfig.Name == hpa.Name { scaledobjectlog.Info(fmt.Sprintf("%s hpa ownership being transferred to %s", hpa.Name, incomingSo.Name)) } else { - err = fmt.Errorf("the workload '%s' of type '%s' is already managed by the hpa '%s'", incomingSo.Spec.ScaleTargetRef.Name, incomingSoGckr.GVKString(), hpa.Name) + err = fmt.Errorf("the workload '%s' of type '%s' is already managed by the hpa '%s'", incomingSo.Spec.ScaleTargetRef.Name, incomingSoGvkr.GVKString(), hpa.Name) scaledobjectlog.Error(err, "validation error") metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-hpa") return err @@ -363,13 +363,13 @@ func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string, dryRun bool Namespace: incomingSo.Namespace, Name: incomingSo.Spec.ScaleTargetRef.Name, } - incomingSoGckr, err := ParseGVKR(restMapper, incomingSo.Spec.ScaleTargetRef.APIVersion, incomingSo.Spec.ScaleTargetRef.Kind) + incomingSoGvkr, err := ParseGVKR(restMapper, incomingSo.Spec.ScaleTargetRef.APIVersion, incomingSo.Spec.ScaleTargetRef.Kind) if err != nil { scaledobjectlog.Error(err, "Failed to parse Group, Version, Kind, Resource from incoming ScaledObject", "apiVersion", incomingSo.Spec.ScaleTargetRef.APIVersion, "kind", incomingSo.Spec.ScaleTargetRef.Kind) return err } - switch incomingSoGckr.GVKString() { + switch incomingSoGvkr.GVKString() { case "apps/v1.Deployment": deployment := &appsv1.Deployment{} if err := getFromCacheOrDirect(context.Background(), key, deployment); err != nil { diff --git a/tests/internals/events/events_test.go b/tests/internals/events/events_test.go index 3f9122d81fe..d6c5f197087 100644 --- a/tests/internals/events/events_test.go +++ b/tests/internals/events/events_test.go @@ -324,12 +324,12 @@ func getTemplateData() (templateData, []Template) { }, []Template{} } -func checkingEvent(t *testing.T, namespace string, scaledObject string, index int, eventreason string, message string) { +func checkingEvent(t *testing.T, namespace string, scaledObject string, index int, eventReason string, message string) { result, err := ExecuteCommand(fmt.Sprintf("kubectl get events -n %s --field-selector involvedObject.name=%s --sort-by=.metadata.creationTimestamp -o jsonpath=\"{.items[%d].reason}:{.items[%d].message}\"", namespace, scaledObject, index, index)) assert.NoError(t, err) lastEventMessage := strings.Trim(string(result), "\"") - assert.Equal(t, lastEventMessage, eventreason+":"+message) + assert.Equal(t, eventReason+":"+message, lastEventMessage) } func testNormalEvent(t *testing.T, kc *kubernetes.Clientset, data templateData) {