From bfd73e8474ac051256b08be5553c5541d9726b68 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 9 May 2024 10:45:05 +0530 Subject: [PATCH] chore: major refactor of E2E testing components (#1725) Signed-off-by: Yashash H L --- Makefile | 25 ++ config/apps/kafka/kafka-minimal.yaml | 36 +- config/apps/kafka/kustomization.yaml | 3 + config/apps/nats/kustomization.yaml | 4 + .../apps/nats}/nats-auth-fake-token.yaml | 0 config/apps/redis/kustomization.yaml | 3 + test/e2e-api/http.go | 86 ++-- test/e2e-api/kafka.go | 387 ++++++++++-------- test/e2e-api/main.go | 28 ++ test/e2e-api/nats.go | 109 ++--- test/e2e-api/redis.go | 134 ++---- test/fixtures/e2e_suite.go | 4 + test/fixtures/kafka.go | 24 +- test/idle-source-e2e/idle_source_test.go | 9 +- test/kafka-e2e/kafka_test.go | 23 +- test/nats-e2e/nats_test.go | 3 - 16 files changed, 471 insertions(+), 407 deletions(-) rename {test/nats-e2e/testdata => config/apps/nats}/nats-auth-fake-token.yaml (100%) diff --git a/Makefile b/Makefile index 0f950d7ef2..478228c887 100644 --- a/Makefile +++ b/Makefile @@ -120,6 +120,9 @@ test-diamond-e2e: test-sideinputs-e2e: test-%: $(MAKE) cleanup-e2e + $(MAKE) deploy-nats + $(MAKE) deploy-kafka + $(MAKE) deploy-redis $(MAKE) image e2eapi-image $(MAKE) restart-control-plane-components cat test/manifests/e2e-api-pod.yaml | sed 's@quay.io/numaproj/@$(IMAGE_NAMESPACE)/@' | sed 's/:latest/:$(VERSION)/' | kubectl -n numaflow-system apply -f - @@ -136,6 +139,25 @@ restart-control-plane-components: kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=numaflow-ux,app.kubernetes.io/part-of=numaflow --ignore-not-found=true kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=numaflow-webhook,app.kubernetes.io/part-of=numaflow --ignore-not-found=true +.PHONY: deploy-nats +deploy-nats: + kubectl -n numaflow-system delete statefulset nats --ignore-not-found=true + kubectl -n numaflow-system apply -k config/apps/nats + kubectl wait --for=condition=ready pod -l app=nats -n numaflow-system --timeout=120s + +.PHONY: deploy-kafka +deploy-kafka: + kubectl -n numaflow-system delete statefulset zookeeper kafka-broker --ignore-not-found=true + kubectl -n numaflow-system apply -k config/apps/kafka + kubectl wait --for=condition=ready pod -l app=kafka-broker -n numaflow-system --timeout=120s + kubectl wait --for=condition=ready pod -l app=zookeeper -n numaflow-system --timeout=120s + +.PHONY: deploy-redis +deploy-redis: + kubectl -n numaflow-system delete statefulset redis --ignore-not-found=true + kubectl apply -k config/apps/redis -n numaflow-system + kubectl wait --for=condition=ready pod -l app=redis -n numaflow-system --timeout=120s + .PHONY: cleanup-e2e cleanup-e2e: kubectl -n numaflow-system delete svc -lnumaflow-e2e=true --ignore-not-found=true @@ -148,6 +170,9 @@ cleanup-e2e: # To run just one of the e2e tests by name (i.e. 'make TestCreateSimplePipeline'): Test%: $(MAKE) cleanup-e2e + $(MAKE) deploy-nats + $(MAKE) deploy-kafka + $(MAKE) deploy-redis $(MAKE) image e2eapi-image kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=controller-manager,app.kubernetes.io/part-of=numaflow kubectl -n numaflow-system delete po e2e-api-pod --ignore-not-found=true diff --git a/config/apps/kafka/kafka-minimal.yaml b/config/apps/kafka/kafka-minimal.yaml index a2332f3ddc..053a77f50c 100644 --- a/config/apps/kafka/kafka-minimal.yaml +++ b/config/apps/kafka/kafka-minimal.yaml @@ -1,4 +1,3 @@ -# http://www.smartjava.org/content/minimal-kafka-instance-for-k8s/ --- apiVersion: v1 kind: Service @@ -8,10 +7,6 @@ spec: ports: - name: client port: 2181 - - name: follower - port: 2888 - - name: leader - port: 3888 selector: app: zookeeper @@ -31,7 +26,7 @@ spec: spec: containers: - name: main - image: wurstmeister/zookeeper + image: bitnami/zookeeper imagePullPolicy: IfNotPresent ports: - containerPort: 2181 @@ -40,6 +35,8 @@ spec: value: "1" - name: ZOOKEEPER_SERVER_1 value: zoo + - name: ALLOW_ANONYMOUS_LOGIN + value: "yes" readinessProbe: tcpSocket: port: 2181 @@ -67,26 +64,31 @@ spec: spec: containers: - name: main - image: wurstmeister/kafka:2.13-2.8.1 + image: bitnami/kafka:3.7 imagePullPolicy: IfNotPresent ports: - containerPort: 9092 env: - - name: KAFKA_ADVERTISED_PORT - value: "9092" - - name: KAFKA_ADVERTISED_HOST_NAME - value: kafka-broker - - name: KAFKA_ZOOKEEPER_CONNECT - value: zookeeper:2181 - - name: KAFKA_BROKER_ID + - name: KAFKA_CFG_BROKER_ID value: "0" - - name: KAFKA_CREATE_TOPICS - value: "kafka-topic:2:1,input-topic:1:1,middle-topic:1:1,output-topic:1:1" + - name: KAFKA_CFG_ZOOKEEPER_CONNECT + value: zookeeper:2181 + - name: KAFKA_CFG_LISTENERS + value: PLAINTEXT://:9092 + - name: KAFKA_CFG_ADVERTISED_LISTENERS + value: PLAINTEXT://kafka-broker:9092 + - name: ALLOW_PLAINTEXT_LISTENER + value: "yes" + - name: KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE + value: "true" + - name: KAFKA_CFG_DELETE_TOPIC_ENABLE + value: "true" + - name: KAFKA_CFG_NUM_PARTITIONS + value: "2" readinessProbe: tcpSocket: port: 9092 selector: matchLabels: app: kafka-broker - serviceName: kafka-broker diff --git a/config/apps/kafka/kustomization.yaml b/config/apps/kafka/kustomization.yaml index 138f0cb50d..b59416a95f 100644 --- a/config/apps/kafka/kustomization.yaml +++ b/config/apps/kafka/kustomization.yaml @@ -1,3 +1,6 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + resources: - kafka-minimal.yaml diff --git a/config/apps/nats/kustomization.yaml b/config/apps/nats/kustomization.yaml index 4e455279bf..9d42b1a927 100644 --- a/config/apps/nats/kustomization.yaml +++ b/config/apps/nats/kustomization.yaml @@ -1,5 +1,9 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + resources: - nats.yaml + - nats-auth-fake-token.yaml commonLabels: "numaflow-e2e": "true" diff --git a/test/nats-e2e/testdata/nats-auth-fake-token.yaml b/config/apps/nats/nats-auth-fake-token.yaml similarity index 100% rename from test/nats-e2e/testdata/nats-auth-fake-token.yaml rename to config/apps/nats/nats-auth-fake-token.yaml diff --git a/config/apps/redis/kustomization.yaml b/config/apps/redis/kustomization.yaml index 861423b93f..488495a419 100644 --- a/config/apps/redis/kustomization.yaml +++ b/config/apps/redis/kustomization.yaml @@ -1,3 +1,6 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + # redis-minimal.yaml is used in E2E testing to create a redis instance before we start a test pipeline which writes to redis. resources: - redis-minimal.yaml diff --git a/test/e2e-api/http.go b/test/e2e-api/http.go index 03377b1710..4ed85512f0 100644 --- a/test/e2e-api/http.go +++ b/test/e2e-api/http.go @@ -28,51 +28,57 @@ import ( "github.com/numaproj/numaflow/test/fixtures" ) -var httpClient *http.Client +type HttpController struct { + client *http.Client +} -func init() { - httpClient = &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, +func NewHttpController() *HttpController { + return &HttpController{ + client: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, }, } +} - // send-message API is used to post data to a http source vertex pod. - // The API takes in two parameters(host and vertexName) and constructs the target url as - // https://{host}:8443/vertices/{vertexName}. - http.HandleFunc("/http/send-message", func(w http.ResponseWriter, r *http.Request) { - host := r.URL.Query().Get("host") - vertexName := r.URL.Query().Get("vertexName") - reqBytes, err := io.ReadAll(r.Body) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } +func (h *HttpController) SendMessage(w http.ResponseWriter, r *http.Request) { + host := r.URL.Query().Get("host") + vertexName := r.URL.Query().Get("vertexName") + reqBytes, err := io.ReadAll(r.Body) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } - var req fixtures.HttpPostRequest - err = json.Unmarshal(reqBytes, &req) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } + var req fixtures.HttpPostRequest + err = json.Unmarshal(reqBytes, &req) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } - postReq, err := http.NewRequest("POST", fmt.Sprintf("https://%s:8443/vertices/%s", host, vertexName), bytes.NewBuffer(req.Body)) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + postReq, err := http.NewRequest("POST", fmt.Sprintf("https://%s:8443/vertices/%s", host, vertexName), bytes.NewBuffer(req.Body)) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + for k, v := range req.Header { + postReq.Header.Add(k, v) + } + _, err = h.client.Do(postReq) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} - for k, v := range req.Header { - postReq.Header.Add(k, v) - } - _, err = httpClient.Do(postReq) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - }) +// Close closes the http client +func (h *HttpController) Close() { + h.client.CloseIdleConnections() } diff --git a/test/e2e-api/kafka.go b/test/e2e-api/kafka.go index d3c5ad26b0..28c11371d0 100644 --- a/test/e2e-api/kafka.go +++ b/test/e2e-api/kafka.go @@ -22,201 +22,254 @@ import ( "log" "net/http" "strconv" + "sync" "time" "github.com/IBM/sarama" ) -func init() { - const bootstrapServers = "kafka-broker:9092" +const bootstrapServers = "kafka-broker:9092" + +type KafkaController struct { + brokers []string + adminClient sarama.ClusterAdmin + producer sarama.SyncProducer + consumer sarama.Consumer +} + +func NewKafkaController() *KafkaController { + // initialize Kafka handlers var brokers = []string{bootstrapServers} - http.HandleFunc("/kafka/create-topic", func(w http.ResponseWriter, r *http.Request) { - topic := r.URL.Query().Get("topic") - admin, err := sarama.NewClusterAdmin(brokers, sarama.NewConfig()) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer admin.Close() - if err = admin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, true); err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.WriteHeader(201) - }) - http.HandleFunc("/kafka/delete-topic", func(w http.ResponseWriter, r *http.Request) { - topic := r.URL.Query().Get("topic") - admin, err := sarama.NewClusterAdmin(brokers, sarama.NewConfig()) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer admin.Close() - if err = admin.DeleteTopic(topic); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.WriteHeader(201) - }) - http.HandleFunc("/kafka/list-topics", func(w http.ResponseWriter, r *http.Request) { - consumer, err := sarama.NewConsumer(brokers, sarama.NewConfig()) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer consumer.Close() - topics, err := consumer.Topics() - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.WriteHeader(200) - _, _ = fmt.Fprintf(w, "Total topics : %s", topics) - }) + config := sarama.NewConfig() + config.Producer.Return.Successes = true - http.HandleFunc("/kafka/count-topic", func(w http.ResponseWriter, r *http.Request) { - topic := r.URL.Query().Get("topic") - count, err := strconv.Atoi(r.URL.Query().Get("count")) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - consumer, err := sarama.NewConsumer(brokers, sarama.NewConfig()) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer consumer.Close() - pConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest) + // if partition is specified, use manual partitioner + config.Producer.Partitioner = sarama.NewManualPartitioner + + producer, err := sarama.NewSyncProducer(brokers, config) + if err != nil { + log.Fatalf("Failed to start Kafka producer: %v", err) + } + + consumer, err := sarama.NewConsumer(brokers, config) + if err != nil { + log.Fatalf("Failed to start Kafka consumer: %v", err) + } + + adminClient, err := sarama.NewClusterAdmin(brokers, config) + if err != nil { + log.Fatalf("Failed to start Kafka admin client: %v", err) + } + return &KafkaController{ + brokers: brokers, + adminClient: adminClient, + producer: producer, + consumer: consumer, + } +} + +func (kh *KafkaController) CreateTopicHandler(w http.ResponseWriter, r *http.Request) { + topic := r.URL.Query().Get("topic") + partitions, err := strconv.Atoi(r.URL.Query().Get("partitions")) + if err != nil { + log.Println(err) + http.Error(w, "Invalid number of partitions", http.StatusBadRequest) + return + } + if err = kh.adminClient.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: int32(partitions), ReplicationFactor: 1}, true); err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(201) +} + +func (kh *KafkaController) DeleteTopicHandler(w http.ResponseWriter, r *http.Request) { + topic := r.URL.Query().Get("topic") + if err := kh.adminClient.DeleteTopic(topic); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(201) +} + +func (kh *KafkaController) ListTopicsHandler(w http.ResponseWriter, r *http.Request) { + topics, err := kh.adminClient.ListTopics() + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(200) + for topic, details := range topics { + _, _ = fmt.Fprintf(w, "Topic: %s, Partitions: %d\n", topic, details.NumPartitions) + } +} + +func (kh *KafkaController) CountTopicHandler(w http.ResponseWriter, r *http.Request) { + topic := r.URL.Query().Get("topic") + count, err := strconv.Atoi(r.URL.Query().Get("count")) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + partitions, err := kh.consumer.Partitions(topic) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + msgCount := 0 + msgs := make(chan *sarama.ConsumerMessage, 256) + doneCh := make(chan struct{}) + errCh := make(chan error) + var wg sync.WaitGroup + + var consumers []sarama.PartitionConsumer + + for _, partition := range partitions { + pConsumer, err := kh.consumer.ConsumePartition(topic, partition, sarama.OffsetOldest) if err != nil { log.Println(err) http.Error(w, err.Error(), http.StatusInternalServerError) return } - msgCount := 0 - for msgCount < count { - select { - case msg := <-pConsumer.Messages(): - msgCount++ - log.Println("Received messages: ", string(msg.Key), string(msg.Value), msg.Offset, msgCount) - case consumerError := <-pConsumer.Errors(): - log.Println("Received consumerError.", string(consumerError.Topic), string(consumerError.Partition), consumerError.Err) - } - } - w.WriteHeader(200) - _, _ = w.Write([]byte(fmt.Sprint(count))) - }) - - http.HandleFunc("/kafka/produce-topic", func(w http.ResponseWriter, r *http.Request) { - var ( - partition int - err error - ) - topic := r.URL.Query().Get("topic") - key := r.URL.Query().Get("key") - queryPartition := r.URL.Query().Get("partition") - - config := sarama.NewConfig() - config.Producer.Return.Successes = true - if queryPartition == "" { - partition = 0 - } else { - partition, err = strconv.Atoi(queryPartition) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return + + consumers = append(consumers, pConsumer) + wg.Add(1) + log.Println("Starting consumer for partition: ", partition) + go func(pc sarama.PartitionConsumer) { + defer wg.Done() + for { + select { + case <-doneCh: + return + case msg := <-pc.Messages(): + select { + case msgs <- msg: + case <-doneCh: + return + } + case consumerError := <-pc.Errors(): + errCh <- consumerError.Err + return + } } - // if partition is specified, use manual partitioner - config.Producer.Partitioner = sarama.NewManualPartitioner - } - buf, err := io.ReadAll(r.Body) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return + }(pConsumer) + } + +readLoop: + for msgCount < count { + select { + case msg := <-msgs: + msgCount++ + log.Println("Received messages: ", string(msg.Key), string(msg.Value), msg.Offset, msgCount, " partition - ", msg.Partition) + case consumerError := <-errCh: + log.Println("Received consumerError - ", consumerError.Error()) + break readLoop } + } - syncProducer, err := sarama.NewSyncProducer(brokers, config) + close(doneCh) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer syncProducer.Close() - message := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(buf), - Key: sarama.ByteEncoder([]byte(key)), - Partition: int32(partition), - } - if _, _, err := syncProducer.SendMessage(message); err != nil { - _, _ = fmt.Fprintf(w, "ERROR: %v\n", err) - } - }) + for _, c := range consumers { + _ = c.Close() + } + + w.WriteHeader(200) + _, _ = w.Write([]byte(fmt.Sprint(count))) +} + +func (kh *KafkaController) ProduceTopicHandler(w http.ResponseWriter, r *http.Request) { + var ( + partition int + err error + ) + topic := r.URL.Query().Get("topic") + key := r.URL.Query().Get("key") + queryPartition := r.URL.Query().Get("partition") - http.HandleFunc("/kafka/pump-topic", func(w http.ResponseWriter, r *http.Request) { - topic := r.URL.Query().Get("topic") - mf := newMessageFactory(r.URL.Query()) - duration, err := time.ParseDuration(r.URL.Query().Get("sleep")) + if queryPartition == "" { + partition = 0 + } else { + partition, err = strconv.Atoi(queryPartition) if err != nil { log.Println(err) - http.Error(w, err.Error(), http.StatusBadRequest) + http.Error(w, err.Error(), http.StatusInternalServerError) return } + } + buf, err := io.ReadAll(r.Body) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - ns := r.URL.Query().Get("n") - if ns == "" { - ns = "-1" - } - n, err := strconv.Atoi(ns) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } + message := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(buf), + Key: sarama.ByteEncoder(key), + Partition: int32(partition), + } + if _, _, err := kh.producer.SendMessage(message); err != nil { + _, _ = fmt.Fprintf(w, "ERROR: %v\n", err) + } +} - w.Header().Set("Content-Type", "application/octet-stream") - w.WriteHeader(200) +func (kh *KafkaController) PumpTopicHandler(w http.ResponseWriter, r *http.Request) { + topic := r.URL.Query().Get("topic") + mf := newMessageFactory(r.URL.Query()) + duration, err := time.ParseDuration(r.URL.Query().Get("sleep")) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } - config := sarama.NewConfig() - config.Producer.Return.Successes = true + ns := r.URL.Query().Get("n") + if ns == "" { + ns = "-1" + } + n, err := strconv.Atoi(ns) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } - syncProducer, err := sarama.NewSyncProducer(brokers, config) + w.Header().Set("Content-Type", "application/octet-stream") + w.WriteHeader(200) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) + start := time.Now() + _, _ = fmt.Fprintf(w, "sending %d messages of size %d to %q\n", n, mf.size, topic) + + for i := 0; i < n || n < 0; i++ { + select { + case <-r.Context().Done(): return - } - defer syncProducer.Close() - - start := time.Now() - _, _ = fmt.Fprintf(w, "sending %d messages of size %d to %q\n", n, mf.size, topic) - - for i := 0; i < n || n < 0; i++ { - select { - case <-r.Context().Done(): - return - default: - message := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.ByteEncoder(mf.newMessage(i)), - Key: sarama.ByteEncoder([]byte(strconv.Itoa(i))), - } - _, _, err := syncProducer.SendMessage(message) - if err != nil { - _, _ = fmt.Fprintf(w, "ERROR: %v\n", err) - } - time.Sleep(duration) + default: + message := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(mf.newMessage(i)), + Key: sarama.ByteEncoder(strconv.Itoa(i)), + Partition: int32(0), + } + _, _, err := kh.producer.SendMessage(message) + if err != nil { + _, _ = fmt.Fprintf(w, "ERROR: %v\n", err) } + time.Sleep(duration) } - _, _ = fmt.Fprintf(w, "sent %d messages of size %d at %.0f TPS to %q\n", n, mf.size, float64(n)/time.Since(start).Seconds(), topic) - }) + } + _, _ = fmt.Fprintf(w, "sent %d messages of size %d at %.0f TPS to %q\n", n, mf.size, float64(n)/time.Since(start).Seconds(), topic) +} + +func (kh *KafkaController) Close() { + _ = kh.producer.Close() + _ = kh.consumer.Close() + _ = kh.adminClient.Close() } diff --git a/test/e2e-api/main.go b/test/e2e-api/main.go index b244aeec8a..410ea70f13 100644 --- a/test/e2e-api/main.go +++ b/test/e2e-api/main.go @@ -21,7 +21,35 @@ import ( ) func main() { + + // initialize kafka handlers + kafkaController := NewKafkaController() + http.HandleFunc("/kafka/create-topic", kafkaController.CreateTopicHandler) + http.HandleFunc("/kafka/delete-topic", kafkaController.DeleteTopicHandler) + http.HandleFunc("/kafka/list-topics", kafkaController.ListTopicsHandler) + http.HandleFunc("/kafka/count-topic", kafkaController.CountTopicHandler) + http.HandleFunc("/kafka/produce-topic", kafkaController.ProduceTopicHandler) + http.HandleFunc("/kafka/pump-topic", kafkaController.PumpTopicHandler) + + // initialize Redis handlers + redisController := NewRedisController() + http.HandleFunc("/redis/get-msg-count-contains", redisController.GetMsgCountContains) + + // initialize http handlers + httpController := NewHttpController() + http.HandleFunc("/http/send-message", httpController.SendMessage) + + // initialize NATS handler + natsController := NewNatsController("nats", "testingtoken") + http.HandleFunc("/nats/pump-subject", natsController.PumpSubject) + if err := http.ListenAndServe(":8378", nil); err != nil { panic(err) } + + // close all controllers + kafkaController.Close() + redisController.Close() + httpController.Close() + natsController.Close() } diff --git a/test/e2e-api/nats.go b/test/e2e-api/nats.go index 59537c6be6..abf4b3f359 100644 --- a/test/e2e-api/nats.go +++ b/test/e2e-api/nats.go @@ -26,64 +26,69 @@ import ( natslib "github.com/nats-io/nats.go" ) -func init() { - url := "nats" - testingToken := "testingtoken" +type NatsController struct { + client *natslib.Conn +} - http.HandleFunc("/nats/pump-subject", func(w http.ResponseWriter, r *http.Request) { - subject := r.URL.Query().Get("subject") - msg := r.URL.Query().Get("msg") - size, err := strconv.Atoi(r.URL.Query().Get("size")) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - duration, err := time.ParseDuration(r.URL.Query().Get("sleep")) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } +func NewNatsController(url string, token string) *NatsController { + opts := []natslib.Option{natslib.Token(token)} + nc, err := natslib.Connect(url, opts...) + if err != nil { + log.Fatal(err) + } + return &NatsController{ + client: nc, + } +} - ns := r.URL.Query().Get("n") - if ns == "" { - ns = "-1" - } - n, err := strconv.Atoi(ns) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } +func (n *NatsController) PumpSubject(w http.ResponseWriter, r *http.Request) { + subject := r.URL.Query().Get("subject") + msg := r.URL.Query().Get("msg") + size, err := strconv.Atoi(r.URL.Query().Get("size")) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + duration, err := time.ParseDuration(r.URL.Query().Get("sleep")) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } - w.Header().Set("Content-Type", "application/octet-stream") - w.WriteHeader(200) + ns := r.URL.Query().Get("n") + if ns == "" { + ns = "-1" + } + nCount, err := strconv.Atoi(ns) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } - opts := []natslib.Option{natslib.Token(testingToken)} - nc, err := natslib.Connect(url, opts...) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer nc.Close() + w.Header().Set("Content-Type", "application/octet-stream") + w.WriteHeader(200) - start := time.Now() - _, _ = fmt.Fprintf(w, "sending %d messages of size %d to %q\n", n, size, subject) + start := time.Now() + _, _ = fmt.Fprintf(w, "sending %d messages of size %d to %q\n", nCount, size, subject) - for i := 0; i < n || n < 0; i++ { - select { - case <-r.Context().Done(): - return - default: - err := nc.Publish(subject, []byte(msg)) - if err != nil { - _, _ = fmt.Fprintf(w, "ERROR: %v\n", err) - } - time.Sleep(duration) + for i := 0; i < nCount || nCount < 0; i++ { + select { + case <-r.Context().Done(): + return + default: + err := n.client.Publish(subject, []byte(msg)) + if err != nil { + _, _ = fmt.Fprintf(w, "ERROR: %v\n", err) } + time.Sleep(duration) } - _, _ = fmt.Fprintf(w, "sent %d messages of size %d at %.0f TPS to %q\n", n, size, float64(n)/time.Since(start).Seconds(), subject) - }) + } + _, _ = fmt.Fprintf(w, "sent %d messages of size %d at %.0f TPS to %q\n", nCount, size, float64(nCount)/time.Since(start).Seconds(), subject) +} + +func (n *NatsController) Close() { + n.client.Close() } diff --git a/test/e2e-api/redis.go b/test/e2e-api/redis.go index 4735c88c5c..fb0076c7fc 100644 --- a/test/e2e-api/redis.go +++ b/test/e2e-api/redis.go @@ -18,117 +18,55 @@ package main import ( "context" - "encoding/json" "fmt" "log" "net/http" "net/url" - "strconv" - "time" "github.com/redis/go-redis/v9" ) -var redisClient *redis.Client - -func init() { +type RedisController struct { + client *redis.Client +} +func NewRedisController() *RedisController { // When we use this API to validate e2e test result, we always assume a redis UDSink is used // to persist data to a redis instance listening on port 6379. - redisClient = redis.NewClient(&redis.Options{ - Addr: "redis:6379", - }) - - // get-msg-count-contains returns no. of occurrences of the targetStr in redis that are written by pipelineName and sinkName. - http.HandleFunc("/redis/get-msg-count-contains", func(w http.ResponseWriter, r *http.Request) { - pipelineName := r.URL.Query().Get("pipelineName") - sinkName := r.URL.Query().Get("sinkName") - targetStr, err := url.QueryUnescape(r.URL.Query().Get("targetStr")) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - count, err := redisClient.HGet(context.Background(), fmt.Sprintf("%s:%s", pipelineName, sinkName), targetStr).Result() - - if err != nil { - log.Println(err) - // If targetStr doesn't exist in the hash, HGet returns an error, meaning count is 0. - w.WriteHeader(200) - _, _ = w.Write([]byte("0")) - return - } - - w.WriteHeader(200) - _, _ = w.Write([]byte(count)) - }) - - http.HandleFunc("/redis/pump-stream", func(w http.ResponseWriter, r *http.Request) { - stream := r.URL.Query().Get("stream") - keysValuesJsonEncoded := r.URL.Query().Get("keysvalues") - keysValuesJson, err := url.QueryUnescape(keysValuesJsonEncoded) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - var keysValues map[string]string - err = json.Unmarshal([]byte(keysValuesJson), &keysValues) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - valueMap := make(map[string]interface{}) - for k, v := range keysValues { - valueMap[k] = interface{}(v) - } - - size, err := strconv.Atoi(r.URL.Query().Get("size")) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - duration, err := time.ParseDuration(r.URL.Query().Get("sleep")) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - ns := r.URL.Query().Get("n") - if ns == "" { - ns = "-1" - } - n, err := strconv.Atoi(ns) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } + return &RedisController{ + client: redis.NewClient(&redis.Options{ + Addr: "redis:6379", + }), + } +} - w.Header().Set("Content-Type", "application/octet-stream") +func (h *RedisController) GetMsgCountContains(w http.ResponseWriter, r *http.Request) { + pipelineName := r.URL.Query().Get("pipelineName") + sinkName := r.URL.Query().Get("sinkName") + targetStr, err := url.QueryUnescape(r.URL.Query().Get("targetStr")) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + count, err := h.client.HGet(context.Background(), fmt.Sprintf("%s:%s", pipelineName, sinkName), targetStr).Result() + + if err != nil { + log.Println(err) + // If targetStr doesn't exist in the hash, HGet returns an error, meaning count is 0. w.WriteHeader(200) + _, _ = w.Write([]byte("0")) + return + } - start := time.Now() - _, _ = fmt.Fprintf(w, "sending %d messages of size %d to %q\n", n, size, stream) + w.WriteHeader(200) + _, _ = w.Write([]byte(count)) +} - for i := 0; i < n || n < 0; i++ { - select { - case <-r.Context().Done(): - return - default: - result := redisClient.XAdd(r.Context(), &redis.XAddArgs{Stream: stream, Values: valueMap}) - if result.Err() != nil { - http.Error(w, result.Err().Error(), http.StatusFailedDependency) - return - } - time.Sleep(duration) - } - } - _, _ = fmt.Fprintf(w, "sent %d messages of size %d at %.0f TPS to %q\n", n, size, float64(n)/time.Since(start).Seconds(), stream) - }) +// Close closes the Redis client. +func (h *RedisController) Close() { + if err := h.client.Close(); err != nil { + log.Println(err) + } } diff --git a/test/fixtures/e2e_suite.go b/test/fixtures/e2e_suite.go index 5a10a221bf..e37e44dc13 100644 --- a/test/fixtures/e2e_suite.go +++ b/test/fixtures/e2e_suite.go @@ -58,6 +58,8 @@ var ( kind: InterStepBufferService metadata: name: default + labels: + numaflow-e2e: "true" spec: redis: native: @@ -67,6 +69,8 @@ spec: kind: InterStepBufferService metadata: name: default + labels: + numaflow-e2e: "true" spec: jetstream: version: latest diff --git a/test/fixtures/kafka.go b/test/fixtures/kafka.go index e68bc4c177..d06213baf1 100644 --- a/test/fixtures/kafka.go +++ b/test/fixtures/kafka.go @@ -26,17 +26,22 @@ import ( "k8s.io/apimachinery/pkg/util/rand" ) -func CreateKafkaTopic() string { - topic := fmt.Sprintf("e2e-topic-%s", rand.String(5)) - log.Printf("create Kafka topic %q\n", topic) - InvokeE2EAPI("/kafka/create-topic?topic=%s", topic) - return topic +func GenerateKafkaTopicName() string { + return fmt.Sprintf("e2e-topic-%s", rand.String(5)) } -func DeleteKafkaTopic(topic string) string { +func CreateKafkaTopic(topicName string, partitions int32) { + log.Printf("create Kafka topic %q\n", topicName) + InvokeE2EAPI("/kafka/create-topic?topic=%s&partitions=%d", topicName, partitions) +} + +func DeleteKafkaTopic(topic string) { log.Printf("delete Kafka topic %q\n", topic) InvokeE2EAPI("/kafka/delete-topic?topic=%s", topic) - return topic +} + +func ListKafkaTopics() { + InvokeE2EAPI("/kafka/list-topics") } func PumpKafkaTopic(topic string, n int, opts ...interface{}) { @@ -93,9 +98,4 @@ func GetKafkaCount(topic string, count int) int { func SendMessage(topic string, key string, message string, partition int) { InvokeE2EAPIPOST("/kafka/produce-topic?topic=%s&key=%s&partition=%d", message, topic, key, partition) - -} - -func ValidateMessage(topic string, key string, position int) { - } diff --git a/test/idle-source-e2e/idle_source_test.go b/test/idle-source-e2e/idle_source_test.go index 5e2ba90efa..190d6c37bc 100644 --- a/test/idle-source-e2e/idle_source_test.go +++ b/test/idle-source-e2e/idle_source_test.go @@ -21,11 +21,6 @@ Once "threshold" reached to 5s(configurable) and if source is found as idle, the 3s(configurable) after waiting for stepInterval of 2s(configurable). */ -//go:generate kubectl -n numaflow-system delete statefulset zookeeper kafka-broker --ignore-not-found=true -//go:generate kubectl apply -k ../../config/apps/kafka -n numaflow-system -// Wait for zookeeper to come up -//go:generate sleep 60 - package idle_source_e2e import ( @@ -105,7 +100,9 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipelineWithKafkaSource() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() + // create kafka topic with 2 partitions topic := "kafka-topic" + CreateKafkaTopic(topic, 2) w := is.Given().Pipeline("@testdata/kafka-pipeline.yaml").When().CreatePipelineAndWait() defer w.DeletePipelineAndWait() @@ -159,6 +156,6 @@ func generateMsg(msg string, t time.Time) string { return string(jsonBytes) } -func TestReduceSuite(t *testing.T) { +func TestIdleSourceSuite(t *testing.T) { suite.Run(t, new(IdleSourceSuite)) } diff --git a/test/kafka-e2e/kafka_test.go b/test/kafka-e2e/kafka_test.go index 07d302ab10..18395bfd88 100644 --- a/test/kafka-e2e/kafka_test.go +++ b/test/kafka-e2e/kafka_test.go @@ -30,18 +30,15 @@ import ( "github.com/numaproj/numaflow/test/fixtures" ) -//go:generate kubectl -n numaflow-system delete statefulset zookeeper kafka-broker --ignore-not-found=true -//go:generate kubectl apply -k ../../config/apps/kafka -n numaflow-system -// Wait for zookeeper to come up -//go:generate sleep 60 - type KafkaSuite struct { fixtures.E2ESuite } func (ks *KafkaSuite) TestKafkaSink() { - outputTopic := fixtures.CreateKafkaTopic() - defer fixtures.DeleteKafkaTopic(outputTopic) + topicName := fixtures.GenerateKafkaTopicName() + fixtures.CreateKafkaTopic(topicName, 1) + defer fixtures.DeleteKafkaTopic(topicName) + pipeline := &dfv1.Pipeline{ ObjectMeta: metav1.ObjectMeta{ Name: "kafka-sink-e2e", @@ -70,7 +67,7 @@ func (ks *KafkaSuite) TestKafkaSink() { AbstractSink: dfv1.AbstractSink{ Kafka: &dfv1.KafkaSink{ Brokers: []string{"kafka-broker:9092"}, - Topic: outputTopic, + Topic: topicName, }, }, }, @@ -92,13 +89,15 @@ func (ks *KafkaSuite) TestKafkaSink() { When(). CreatePipelineAndWait() defer w.DeletePipelineAndWait() - fixtures.ExpectKafkaTopicCount(outputTopic, 15, 3*time.Second) - + fixtures.ExpectKafkaTopicCount(topicName, 15, 3*time.Second) } func (ks *KafkaSuite) TestKafkaSourceSink() { - inputTopic := fixtures.CreateKafkaTopic() - outputTopic := fixtures.CreateKafkaTopic() + inputTopic := fixtures.GenerateKafkaTopicName() + fixtures.CreateKafkaTopic(inputTopic, 1) + + outputTopic := fixtures.GenerateKafkaTopicName() + fixtures.CreateKafkaTopic(outputTopic, 1) pipeline := &dfv1.Pipeline{ ObjectMeta: metav1.ObjectMeta{ Name: "kafka-sink-e2e", diff --git a/test/nats-e2e/nats_test.go b/test/nats-e2e/nats_test.go index a3a2eefd01..e909095c70 100644 --- a/test/nats-e2e/nats_test.go +++ b/test/nats-e2e/nats_test.go @@ -27,9 +27,6 @@ import ( . "github.com/numaproj/numaflow/test/fixtures" ) -//go:generate kubectl -n numaflow-system delete statefulset nats --ignore-not-found=true -//go:generate kubectl apply -k ../../config/apps/nats -n numaflow-system -//go:generate kubectl apply -f testdata/nats-auth-fake-token.yaml -n numaflow-system type NatsSuite struct { E2ESuite }