Skip to content

Commit

Permalink
chore: e2e tests for metrics APIs (#2277)
Browse files Browse the repository at this point in the history
  • Loading branch information
adarsh0728 authored Dec 17, 2024
1 parent fc14696 commit 7450e16
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 15 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
Expand Down Expand Up @@ -118,7 +119,6 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/gorilla/handlers v1.5.2 // indirect
Expand Down
2 changes: 1 addition & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1164,4 +1164,4 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
21 changes: 21 additions & 0 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,27 @@ func (h *handler) GetMonoVertex(c *gin.Context) {
c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, monoVertexResp))
}

// DeleteMonoVertex is used to delete a mono vertex
func (h *handler) DeleteMonoVertex(c *gin.Context) {
ns, monoVertex := c.Param("namespace"), c.Param("mono-vertex")

// Check if the mono vertex exists
_, err := h.numaflowClient.MonoVertices(ns).Get(c, monoVertex, metav1.GetOptions{})
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to fetch mono vertex %q in namespace %q, %s", monoVertex, ns, err.Error()))
return
}

// Delete the mono vertex
err = h.numaflowClient.MonoVertices(ns).Delete(c, monoVertex, metav1.DeleteOptions{})
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to delete mono vertex %q in namespace %q, %s", monoVertex, ns, err.Error()))
return
}

c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, nil))
}

// CreateMonoVertex is used to create a mono vertex
func (h *handler) CreateMonoVertex(c *gin.Context) {
if h.opts.readonly {
Expand Down
51 changes: 43 additions & 8 deletions server/apis/v1/promql_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -38,13 +40,46 @@ func (m *MockPrometheusAPI) QueryRange(ctx context.Context, query string, r v1.R
return mockResponse, nil, nil
}

func compareFilters(query1, query2 string) bool {
//Extract the filter portions of the queries
filters1 := extractfilters(query1)
filters2 := extractfilters(query2)
return reflect.DeepEqual(filters1, filters2)
}

// comparePrometheusQueries compares two Prometheus queries, ignoring the order of filters within the curly braces
func comparePrometheusQueries(query1, query2 string) bool {
// Extract the filter portions of the queries
//Extract the filter portions of the queries
filters1 := extractfilters(query1)
filters2 := extractfilters(query2)
// Compare the filter portions using reflect.DeepEqual, which ignores order
return reflect.DeepEqual(filters1, filters2)
//Compare the filter portions using reflect.DeepEqual, which ignores order
if !reflect.DeepEqual(filters1, filters2) {
return false // Filters don't match
}

//Remove filter portions from the queries
query1 = removeFilters(query1)
query2 = removeFilters(query2)

//Normalize the remaining parts of the queries
query1 = normalizeQuery(query1)
query2 = normalizeQuery(query2)

//Compare the normalized queries
return cmp.Equal(query1, query2, cmpopts.IgnoreUnexported(struct{}{}))

}

func normalizeQuery(query string) string {
// Remove extra whitespace and normalize case
query = strings.TrimSpace(strings.ToLower(query))
return query
}

// remove filters within {}
func removeFilters(query string) string {
re := regexp.MustCompile(`\{(.*?)\}`)
return re.ReplaceAllString(query, "")
}

// extractfilters extracts the key-value pairs within the curly braces
Expand Down Expand Up @@ -97,7 +132,7 @@ func Test_PopulateReqMap(t *testing.T) {
assert.Equal(t, actualMap["$quantile"], expectedMap["$quantile"])
assert.Equal(t, actualMap["$duration"], expectedMap["$duration"])
assert.Equal(t, actualMap["$dimension"], expectedMap["$dimension"])
if !comparePrometheusQueries(expectedMap["$filters"], actualMap["$filters"]) {
if !compareFilters(expectedMap["$filters"], actualMap["$filters"]) {
t.Errorf("filters do not match")
}
})
Expand All @@ -121,7 +156,7 @@ func Test_PopulateReqMap(t *testing.T) {
assert.Equal(t, actualMap["$duration"], expectedMap["$duration"])
assert.Equal(t, actualMap["$dimension"], expectedMap["$dimension"])

if !comparePrometheusQueries(expectedMap["$filters"], actualMap["$filters"]) {
if !compareFilters(expectedMap["$filters"], actualMap["$filters"]) {
t.Errorf("filters do not match")
}
})
Expand Down Expand Up @@ -160,7 +195,7 @@ func Test_PromQueryBuilder(t *testing.T) {
"pod": "test-pod",
},
},
expectedQuery: `histogram_quantile(0.90, sum by(test_dimension,le) (rate(test_bucket{namespace= "test_namespace", mvtx_name= "test-mono-vertex", pod= "test-pod"}[5m])))`,
expectedQuery: `histogram_quantile(0.90, sum by(test_dimension,le) (rate(test_metric{namespace= "test_namespace", mvtx_name= "test-mono-vertex", pod= "test-pod"}[5m])))`,
},
{
name: "Missing placeholder in req",
Expand Down Expand Up @@ -283,7 +318,7 @@ func Test_PromQueryBuilder(t *testing.T) {
},
Expression: map[string]map[string]string{
"monovtx_pending": {
"mono-vertex": "$metric_name{$filters}",
"mono-vertex": "sum($metric_name{$filters}) by ($dimension, period)",
},
},
}
Expand Down Expand Up @@ -347,7 +382,7 @@ func Test_PromQueryBuilder(t *testing.T) {
},
Expression: map[string]map[string]string{
"vertex_pending_messages": {
"vertex": "$metric_name{$filters}",
"vertex": "sum($metric_name{$filters}) by ($dimension, period)",
},
},
}
Expand Down
2 changes: 2 additions & 0 deletions server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ func v1Routes(ctx context.Context, r gin.IRouter, dexObj *v1.DexObject, localUse
r.GET("/namespaces/:namespace/mono-vertices", handler.ListMonoVertices)
// Get the mono vertex information.
r.GET("/namespaces/:namespace/mono-vertices/:mono-vertex", handler.GetMonoVertex)
// Delete a mono-vertex.
r.DELETE("/namespaces/:namespace/mono-vertices/:mono-vertex", handler.DeleteMonoVertex)
// Get all the pods of a mono vertex.
r.GET("/namespaces/:namespace/mono-vertices/:mono-vertex/pods", handler.ListMonoVertexPods)
// Create a mono vertex.
Expand Down
78 changes: 75 additions & 3 deletions test/api-e2e/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type APISuite struct {
E2ESuite
}

func TestAPISuite(t *testing.T) {
suite.Run(t, new(APISuite))
}

func (s *APISuite) TestGetSysInfo() {
defer s.Given().When().UXServerPodPortForward(8043, 8443).TerminateAllPodPortForwards()

Expand Down Expand Up @@ -209,9 +213,17 @@ func (s *APISuite) TestAPIsForIsbAndPipelineAndMonoVertex() {
Expect().
Status(200).Body().Raw()
assert.Contains(s.T(), listMonoVertexBody, testMonoVertex1Name)

// deletes a mono-vertex
deleteMonoVertex := HTTPExpect(s.T(), "https://localhost:8145").DELETE(fmt.Sprintf("/api/v1/namespaces/%s/mono-vertices/%s", Namespace, testMonoVertex1Name)).
Expect().
Status(200).Body().Raw()
var deleteMonoVertexSuccessExpect = `"data":null`
assert.Contains(s.T(), deleteMonoVertex, deleteMonoVertexSuccessExpect)

}

func (s *APISuite) TestAPIsForMetricsAndWatermarkAndPods() {
func (s *APISuite) TestAPIsForMetricsAndWatermarkAndPodsForPipeline() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

Expand Down Expand Up @@ -275,8 +287,68 @@ func (s *APISuite) TestAPIsForMetricsAndWatermarkAndPods() {
Expect().
Status(200).Body().Raw()
assert.Contains(s.T(), getVerticesPodsBody, `simple-pipeline-input-0`)

// Call the DiscoverMetrics API for the vertex object
discoverMetricsBodyForVertex := HTTPExpect(s.T(), "https://localhost:8146").GET("/api/v1/metrics-discovery/object/vertex").
Expect().
Status(200).Body().Raw()

// Check that the response contains expected metrics for vertex object
assert.Contains(s.T(), discoverMetricsBodyForVertex, "forwarder_data_read_total")

// Call the API to get input vertex pods info
getVertexPodsInfoBody := HTTPExpect(s.T(), "https://localhost:8146").
GET(fmt.Sprintf("/api/v1/namespaces/%s/pipelines/%s/vertices/%s/pods-info", Namespace, pipelineName, "input")).
Expect().
Status(200).Body().Raw()

// Check that the response contains expected pod details
assert.Contains(s.T(), getVertexPodsInfoBody, `"name":`) // Check for pod name
assert.Contains(s.T(), getVertexPodsInfoBody, `"status":`) // Check for pod status
assert.Contains(s.T(), getVertexPodsInfoBody, `"totalCPU":`) // Check for pod's cpu usage
assert.Contains(s.T(), getVertexPodsInfoBody, `"totalMemory":`) // Check for pod's memory usage
assert.Contains(s.T(), getVertexPodsInfoBody, `"containerDetailsMap":`) // Check for pod's containers
}

func TestAPISuite(t *testing.T) {
suite.Run(t, new(APISuite))
func (s *APISuite) TestMetricsAPIsForMonoVertex() {
_, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

w := s.Given().MonoVertex("@testdata/mono-vertex.yaml").
When().
CreateMonoVertexAndWait()
defer w.DeleteMonoVertexAndWait()

monoVertexName := "mono-vertex"

defer w.UXServerPodPortForward(8149, 8443).TerminateAllPodPortForwards()

w.Expect().MonoVertexPodsRunning()
// Expect the messages to reach the sink.
w.Expect().RedisSinkContains("mono-vertex", "199")
w.Expect().RedisSinkContains("mono-vertex", "200")

// Call the API to get mono vertex pods info
getMonoVertexPodsInfoBody := HTTPExpect(s.T(), "https://localhost:8149").
GET(fmt.Sprintf("/api/v1/namespaces/%s/mono-vertices/%s/pods-info", Namespace, monoVertexName)).
Expect().
Status(200).Body().Raw()

// Check that the response contains expected pod details
assert.Contains(s.T(), getMonoVertexPodsInfoBody, `"name":`) // Check for pod name
assert.Contains(s.T(), getMonoVertexPodsInfoBody, `"status":`) // Check for pod status
assert.Contains(s.T(), getMonoVertexPodsInfoBody, `"totalCPU":`) // Check for pod's cpu usage
assert.Contains(s.T(), getMonoVertexPodsInfoBody, `"totalMemory":`) // Check for pod's memory usage
assert.Contains(s.T(), getMonoVertexPodsInfoBody, `"containerDetailsMap":`) // Check for pod's containers

// Call the DiscoverMetrics API for mono-vertex
discoverMetricsBodyForMonoVertex := HTTPExpect(s.T(), "https://localhost:8149").GET("/api/v1/metrics-discovery/object/mono-vertex").
Expect().
Status(200).Body().Raw()

// Check that the response contains expected metrics for mono-vertex
assert.Contains(s.T(), discoverMetricsBodyForMonoVertex, "monovtx_processing_time_bucket")
assert.Contains(s.T(), discoverMetricsBodyForMonoVertex, "monovtx_sink_time_bucket")
assert.Contains(s.T(), discoverMetricsBodyForMonoVertex, "monovtx_read_total")
assert.Contains(s.T(), discoverMetricsBodyForMonoVertex, "monovtx_pending")
}
4 changes: 2 additions & 2 deletions test/api-e2e/testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ var (
"source": {
"udsource": {
"container": {
"image": "quay.io/numaio/numaflow-java/source-simple-source:stable"
"image": "quay.io/numaio/numaflow-rs/simple-source:stable"
}
},
"transformer": {
Expand All @@ -180,7 +180,7 @@ var (
"sink": {
"udsink": {
"container": {
"image": "quay.io/numaio/numaflow-java/simple-sink:stable"
"image": "quay.io/numaio/numaflow-rs/sink-log:stable"
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions test/api-e2e/testdata/mono-vertex.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: MonoVertex
metadata:
name: mono-vertex
spec:
scale:
min: 1
source:
udsource:
container:
image: quay.io/numaio/numaflow-go/source-simple-source:stable
imagePullPolicy: Always
transformer:
container:
image: quay.io/numaio/numaflow-go/mapt-assign-event-time:stable
imagePullPolicy: Always
sink:
udsink:
container:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis_sink
image: quay.io/numaio/numaflow-go/redis-sink:stable
imagePullPolicy: Always
env:
- name: SINK_HASH_KEY
# Use the name of the mono vertex as the key
value: "mono-vertex"

0 comments on commit 7450e16

Please sign in to comment.