From ee386bcb37a5104b744728dec70b0aabcf01d885 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Mon, 23 Oct 2023 10:10:52 -0700 Subject: [PATCH 01/12] feat: cache client Signed-off-by: Dillen Padhiar --- server/apis/v1/handler.go | 59 ++++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index c4120c57f1..b307d57be0 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -57,6 +57,7 @@ type handler struct { kubeClient kubernetes.Interface metricsClient *metricsversiond.Clientset numaflowClient dfv1clients.NumaflowV1alpha1Interface + daemonClients map[string]*daemonclient.DaemonClient } // NewHandler is used to provide a new instance of the handler type @@ -75,10 +76,12 @@ func NewHandler() (*handler, error) { } metricsClient := metricsversiond.NewForConfigOrDie(k8sRestConfig) numaflowClient := dfv1versiond.NewForConfigOrDie(k8sRestConfig).NumaflowV1alpha1() + daemonClients := make(map[string]*daemonclient.DaemonClient) return &handler{ kubeClient: kubeClient, metricsClient: metricsClient, numaflowClient: numaflowClient, + daemonClients: daemonClients, }, nil } @@ -238,12 +241,15 @@ func (h *handler) GetPipeline(c *gin.Context) { // get pipeline lag // TODO: the client should be cached. - client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) - if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error())) - return + if _, ok := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)]; !ok { + daemonClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + if err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error())) + return + } + h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] = daemonClient } - defer client.Close() + client := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] var ( minWM int64 = math.MaxInt64 @@ -345,6 +351,10 @@ func (h *handler) DeletePipeline(c *gin.Context) { return } + // cleanup client after successfully deleting pipeline + h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)].Close() + delete(h.daemonClients, fmt.Sprintf("%s/%s", ns, pipeline)) + c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, nil)) } @@ -500,12 +510,15 @@ func (h *handler) ListPipelineBuffers(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") // TODO: the client should be cached. - client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) - if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to get the Inter-Step buffers for pipeline %q: %s", pipeline, err.Error())) - return + if _, ok := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)]; !ok { + daemonClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + if err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error())) + return + } + h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] = daemonClient } - defer client.Close() + client := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] buffers, err := client.ListPipelineBuffers(context.Background(), pipeline) if err != nil { @@ -521,12 +534,15 @@ func (h *handler) GetPipelineWatermarks(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") // TODO: the client should be cached. - client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) - if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to get the watermarks for pipeline %q: %s", pipeline, err.Error())) - return + if _, ok := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)]; !ok { + daemonClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + if err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error())) + return + } + h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] = daemonClient } - defer client.Close() + client := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] watermarks, err := client.GetPipelineWatermarks(context.Background(), pipeline) if err != nil { @@ -601,12 +617,15 @@ func (h *handler) GetVerticesMetrics(c *gin.Context) { } // TODO: the client should be cached. - client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) - if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to get the vertices metrics: failed to get demon service client for namespace %q pipeline %q: %s", ns, pipeline, err.Error())) - return + if _, ok := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)]; !ok { + daemonClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + if err != nil { + h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error())) + return + } + h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] = daemonClient } - defer client.Close() + client := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] var results = make(map[string][]*daemon.VertexMetrics) for _, vertex := range pl.Spec.Vertices { From 57ffc8aa67af7d94ccb85dce17c99feee35f8ae9 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Thu, 26 Oct 2023 10:57:05 -0700 Subject: [PATCH 02/12] feat: lru cache Signed-off-by: Dillen Padhiar --- server/apis/v1/handler.go | 72 ++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 35 deletions(-) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index 34cdaaa63a..04acd505c2 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -29,6 +29,7 @@ import ( "time" "github.com/gin-gonic/gin" + lru "github.com/hashicorp/golang-lru" admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -54,10 +55,10 @@ const ( ) type handler struct { - kubeClient kubernetes.Interface - metricsClient *metricsversiond.Clientset - numaflowClient dfv1clients.NumaflowV1alpha1Interface - daemonClients map[string]*daemonclient.DaemonClient + kubeClient kubernetes.Interface + metricsClient *metricsversiond.Clientset + numaflowClient dfv1clients.NumaflowV1alpha1Interface + daemonClientsCache *lru.Cache } // NewHandler is used to provide a new instance of the handler type @@ -76,12 +77,14 @@ func NewHandler() (*handler, error) { } metricsClient := metricsversiond.NewForConfigOrDie(k8sRestConfig) numaflowClient := dfv1versiond.NewForConfigOrDie(k8sRestConfig).NumaflowV1alpha1() - daemonClients := make(map[string]*daemonclient.DaemonClient) + daemonClientsCache, _ := lru.NewWithEvict(100, func(key, value interface{}) { + _ = value.(*daemonclient.DaemonClient).Close() + }) return &handler{ - kubeClient: kubeClient, - metricsClient: metricsClient, - numaflowClient: numaflowClient, - daemonClients: daemonClients, + kubeClient: kubeClient, + metricsClient: metricsClient, + numaflowClient: numaflowClient, + daemonClientsCache: daemonClientsCache, }, nil } @@ -246,16 +249,16 @@ func (h *handler) GetPipeline(c *gin.Context) { } // get pipeline lag - // TODO: the client should be cached. - if _, ok := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)]; !ok { - daemonClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + dClient, _ := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)) + if dClient == nil { + dClient, err = daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error())) + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %s, %w", pipeline, err.Error())) return } - h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] = daemonClient + h.daemonClientsCache.Add(pl.GetDaemonServiceURL(), dClient) } - client := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] + client := dClient.(*daemonclient.DaemonClient) var ( minWM int64 = math.MaxInt64 @@ -358,8 +361,7 @@ func (h *handler) DeletePipeline(c *gin.Context) { } // cleanup client after successfully deleting pipeline - h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)].Close() - delete(h.daemonClients, fmt.Sprintf("%s/%s", ns, pipeline)) + h.daemonClientsCache.Remove(daemonSvcAddress(ns, pipeline)) c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, nil)) } @@ -520,16 +522,16 @@ func (h *handler) DeleteInterStepBufferService(c *gin.Context) { func (h *handler) ListPipelineBuffers(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") - // TODO: the client should be cached. - if _, ok := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)]; !ok { - daemonClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + dClient, _ := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)) + if dClient == nil { + dClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error())) + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %s, %w", pipeline, err.Error())) return } - h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] = daemonClient + h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) } - client := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] + client := dClient.(*daemonclient.DaemonClient) buffers, err := client.ListPipelineBuffers(context.Background(), pipeline) if err != nil { @@ -544,16 +546,16 @@ func (h *handler) ListPipelineBuffers(c *gin.Context) { func (h *handler) GetPipelineWatermarks(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") - // TODO: the client should be cached. - if _, ok := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)]; !ok { - daemonClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + dClient, _ := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)) + if dClient == nil { + dClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error())) + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %s, %w", pipeline, err.Error())) return } - h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] = daemonClient + h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) } - client := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] + client := dClient.(*daemonclient.DaemonClient) watermarks, err := client.GetPipelineWatermarks(context.Background(), pipeline) if err != nil { @@ -627,16 +629,16 @@ func (h *handler) GetVerticesMetrics(c *gin.Context) { return } - // TODO: the client should be cached. - if _, ok := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)]; !ok { - daemonClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + dClient, _ := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)) + if dClient == nil { + dClient, err = daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error())) + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %s, %w", pipeline, err.Error())) return } - h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] = daemonClient + h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) } - client := h.daemonClients[fmt.Sprintf("%s/%s", ns, pipeline)] + client := dClient.(*daemonclient.DaemonClient) var results = make(map[string][]*daemon.VertexMetrics) for _, vertex := range pl.Spec.Vertices { From 660db26d2f852ae31ce38ac0ec38b75b757ac1bd Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Thu, 26 Oct 2023 11:25:28 -0700 Subject: [PATCH 03/12] chore: lint Signed-off-by: Dillen Padhiar --- server/apis/v1/handler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index 13192ea65e..182059c70c 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -274,7 +274,7 @@ func (h *handler) GetPipeline(c *gin.Context) { if dClient == nil { dClient, err = daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) if err != nil { - h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %s, %w", pipeline, err.Error())) + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } h.daemonClientsCache.Add(pl.GetDaemonServiceURL(), dClient) @@ -547,7 +547,7 @@ func (h *handler) ListPipelineBuffers(c *gin.Context) { if dClient == nil { dClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) if err != nil { - h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %s, %w", pipeline, err.Error())) + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) @@ -571,7 +571,7 @@ func (h *handler) GetPipelineWatermarks(c *gin.Context) { if dClient == nil { dClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) if err != nil { - h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %s, %w", pipeline, err.Error())) + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) @@ -654,7 +654,7 @@ func (h *handler) GetVerticesMetrics(c *gin.Context) { if dClient == nil { dClient, err = daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) if err != nil { - h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %s, %w", pipeline, err.Error())) + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) From bf56e10da2602ffedd12a4d7ce0f032b4af7d548 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Thu, 26 Oct 2023 12:14:14 -0700 Subject: [PATCH 04/12] feat: use daemonSvcAddress method Signed-off-by: Dillen Padhiar --- server/apis/v1/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index 182059c70c..650c9cd919 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -277,7 +277,7 @@ func (h *handler) GetPipeline(c *gin.Context) { h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } - h.daemonClientsCache.Add(pl.GetDaemonServiceURL(), dClient) + h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) } client := dClient.(*daemonclient.DaemonClient) From b2a4f76ffb4797f104b29a62d15a99d5a4b2551b Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Fri, 27 Oct 2023 09:43:26 -0700 Subject: [PATCH 05/12] feat: refactor to daemonClient method Signed-off-by: Dillen Padhiar --- server/apis/v1/handler.go | 61 ++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index 650c9cd919..9045f703d4 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -270,16 +270,10 @@ func (h *handler) GetPipeline(c *gin.Context) { } // get pipeline lag - dClient, _ := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)) - if dClient == nil { - dClient, err = daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) - if err != nil { - h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) - return - } - h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) + client, err := h.getDaemonClient(ns, pipeline) + if err != nil { + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) } - client := dClient.(*daemonclient.DaemonClient) var ( minWM int64 = math.MaxInt64 @@ -543,16 +537,10 @@ func (h *handler) DeleteInterStepBufferService(c *gin.Context) { func (h *handler) ListPipelineBuffers(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") - dClient, _ := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)) - if dClient == nil { - dClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) - if err != nil { - h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) - return - } - h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) + client, err := h.getDaemonClient(ns, pipeline) + if err != nil { + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) } - client := dClient.(*daemonclient.DaemonClient) buffers, err := client.ListPipelineBuffers(context.Background(), pipeline) if err != nil { @@ -567,16 +555,10 @@ func (h *handler) ListPipelineBuffers(c *gin.Context) { func (h *handler) GetPipelineWatermarks(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") - dClient, _ := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)) - if dClient == nil { - dClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) - if err != nil { - h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) - return - } - h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) + client, err := h.getDaemonClient(ns, pipeline) + if err != nil { + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) } - client := dClient.(*daemonclient.DaemonClient) watermarks, err := client.GetPipelineWatermarks(context.Background(), pipeline) if err != nil { @@ -650,16 +632,10 @@ func (h *handler) GetVerticesMetrics(c *gin.Context) { return } - dClient, _ := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)) - if dClient == nil { - dClient, err = daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) - if err != nil { - h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) - return - } - h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) + client, err := h.getDaemonClient(ns, pipeline) + if err != nil { + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) } - client := dClient.(*daemonclient.DaemonClient) var results = make(map[string][]*daemon.VertexMetrics) for _, vertex := range pl.Spec.Vertices { @@ -946,3 +922,16 @@ func validateNamespace(h *handler, pipeline *dfv1.Pipeline, ns string) error { func daemonSvcAddress(ns, pipeline string) string { return fmt.Sprintf("%s.%s.svc:%d", fmt.Sprintf("%s-daemon-svc", pipeline), ns, dfv1.DaemonServicePort) } + +func (h *handler) getDaemonClient(ns, pipeline string) (*daemonclient.DaemonClient, error) { + dClient, _ := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)) + if dClient == nil { + dClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + if err != nil { + return nil, err + } + h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) + } + + return dClient.(*daemonclient.DaemonClient), nil +} From c5051549821e1ff264ef00143027f337cd47731d Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Mon, 30 Oct 2023 14:34:21 -0700 Subject: [PATCH 06/12] feat: add removed returns Signed-off-by: Dillen Padhiar --- server/apis/v1/handler.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index d6cc6f6128..d7ced36ff5 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -273,6 +273,7 @@ func (h *handler) GetPipeline(c *gin.Context) { client, err := h.getDaemonClient(ns, pipeline) if err != nil { h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) + return } var ( @@ -540,6 +541,7 @@ func (h *handler) ListPipelineBuffers(c *gin.Context) { client, err := h.getDaemonClient(ns, pipeline) if err != nil { h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) + return } buffers, err := client.ListPipelineBuffers(context.Background(), pipeline) @@ -558,6 +560,7 @@ func (h *handler) GetPipelineWatermarks(c *gin.Context) { client, err := h.getDaemonClient(ns, pipeline) if err != nil { h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) + return } watermarks, err := client.GetPipelineWatermarks(context.Background(), pipeline) @@ -635,6 +638,7 @@ func (h *handler) GetVerticesMetrics(c *gin.Context) { client, err := h.getDaemonClient(ns, pipeline) if err != nil { h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) + return } var results = make(map[string][]*daemon.VertexMetrics) From fa66a00ce9d6ff9965f8ca3895333f28c8952c34 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Mon, 30 Oct 2023 15:48:48 -0700 Subject: [PATCH 07/12] feat: nil check Signed-off-by: Dillen Padhiar --- server/apis/v1/handler.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index d7ced36ff5..4dbb73e81c 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -271,7 +271,7 @@ func (h *handler) GetPipeline(c *gin.Context) { // get pipeline lag client, err := h.getDaemonClient(ns, pipeline) - if err != nil { + if err != nil || client == nil { h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } @@ -539,7 +539,7 @@ func (h *handler) ListPipelineBuffers(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") client, err := h.getDaemonClient(ns, pipeline) - if err != nil { + if err != nil || client == nil { h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } @@ -558,7 +558,7 @@ func (h *handler) GetPipelineWatermarks(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") client, err := h.getDaemonClient(ns, pipeline) - if err != nil { + if err != nil || client == nil { h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } @@ -636,7 +636,7 @@ func (h *handler) GetVerticesMetrics(c *gin.Context) { } client, err := h.getDaemonClient(ns, pipeline) - if err != nil { + if err != nil || client == nil { h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } @@ -937,5 +937,10 @@ func (h *handler) getDaemonClient(ns, pipeline string) (*daemonclient.DaemonClie h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) } - return dClient.(*daemonclient.DaemonClient), nil + client, ok := dClient.(*daemonclient.DaemonClient) + if !ok { + return nil, nil + } + + return client, nil } From 8de5597799c8d330fc4f455c14687b1cd37905a3 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Tue, 31 Oct 2023 14:25:44 -0700 Subject: [PATCH 08/12] feat: explicitly close client when deleting pl Signed-off-by: Dillen Padhiar --- server/apis/v1/handler.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index 28f75cff07..9f02235d18 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -377,6 +377,12 @@ func (h *handler) DeletePipeline(c *gin.Context) { } // cleanup client after successfully deleting pipeline + client, err := h.getDaemonClient(ns, pipeline) + if err != nil || client == nil { + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) + return + } + client.Close() h.daemonClientsCache.Remove(daemonSvcAddress(ns, pipeline)) c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, nil)) From 88256abeac7769a946994a6f3fd153d9c18f8ff1 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Tue, 31 Oct 2023 15:13:59 -0700 Subject: [PATCH 09/12] feat: restore change to deleting pl Signed-off-by: Dillen Padhiar --- server/apis/v1/handler.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index 9f02235d18..28f75cff07 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -377,12 +377,6 @@ func (h *handler) DeletePipeline(c *gin.Context) { } // cleanup client after successfully deleting pipeline - client, err := h.getDaemonClient(ns, pipeline) - if err != nil || client == nil { - h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) - return - } - client.Close() h.daemonClientsCache.Remove(daemonSvcAddress(ns, pipeline)) c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, nil)) From 5fc5c5b59894109503900a5c89981102d3b66241 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Wed, 1 Nov 2023 10:18:48 -0700 Subject: [PATCH 10/12] chore: codegen Signed-off-by: Dillen Padhiar --- server/apis/v1/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index ac2d3cc8e0..d4d2806ba4 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -89,7 +89,7 @@ func NewHandler(dexObj *DexObject) (*handler, error) { metricsClient: metricsClient, numaflowClient: numaflowClient, daemonClientsCache: daemonClientsCache, - dexObj: dexObj, + dexObj: dexObj, }, nil } From e42dd87ac89d72df10a97c1bcea094904b8568d9 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Wed, 1 Nov 2023 15:29:32 -0700 Subject: [PATCH 11/12] feat: change nil check Signed-off-by: Dillen Padhiar --- server/apis/v1/handler.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index d4d2806ba4..271407d3e7 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -978,12 +978,15 @@ func daemonSvcAddress(ns, pipeline string) string { } func (h *handler) getDaemonClient(ns, pipeline string) (*daemonclient.DaemonClient, error) { - dClient, _ := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)) - if dClient == nil { + dClient, ok := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)) + if !ok { dClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) if err != nil { return nil, err } + if dClient == nil { + return nil, fmt.Errorf("nil client") + } h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) } From 9c801b32dc4f6e4ee90902f7af757d66b039cf2b Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Wed, 1 Nov 2023 17:08:32 -0700 Subject: [PATCH 12/12] feat: err msg Signed-off-by: Dillen Padhiar --- server/apis/v1/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index 271407d3e7..0ff96d6d0d 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -992,7 +992,7 @@ func (h *handler) getDaemonClient(ns, pipeline string) (*daemonclient.DaemonClie client, ok := dClient.(*daemonclient.DaemonClient) if !ok { - return nil, nil + return nil, fmt.Errorf("failed to get client") } return client, nil