diff --git a/server/apis/v1/handler.go b/server/apis/v1/handler.go index 672ce0b0bd..0ff96d6d0d 100644 --- a/server/apis/v1/handler.go +++ b/server/apis/v1/handler.go @@ -30,6 +30,7 @@ import ( "time" "github.com/gin-gonic/gin" + lru "github.com/hashicorp/golang-lru" admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -57,10 +58,11 @@ const ( ) type handler struct { - kubeClient kubernetes.Interface - metricsClient *metricsversiond.Clientset - numaflowClient dfv1clients.NumaflowV1alpha1Interface - dexObj *DexObject + kubeClient kubernetes.Interface + metricsClient *metricsversiond.Clientset + numaflowClient dfv1clients.NumaflowV1alpha1Interface + daemonClientsCache *lru.Cache + dexObj *DexObject } // NewHandler is used to provide a new instance of the handler type @@ -79,11 +81,15 @@ func NewHandler(dexObj *DexObject) (*handler, error) { } metricsClient := metricsversiond.NewForConfigOrDie(k8sRestConfig) numaflowClient := dfv1versiond.NewForConfigOrDie(k8sRestConfig).NumaflowV1alpha1() + daemonClientsCache, _ := lru.NewWithEvict(100, func(key, value interface{}) { + _ = value.(*daemonclient.DaemonClient).Close() + }) return &handler{ - kubeClient: kubeClient, - metricsClient: metricsClient, - numaflowClient: numaflowClient, - dexObj: dexObj, + kubeClient: kubeClient, + metricsClient: metricsClient, + numaflowClient: numaflowClient, + daemonClientsCache: daemonClientsCache, + dexObj: dexObj, }, nil } @@ -313,13 +319,11 @@ func (h *handler) GetPipeline(c *gin.Context) { } // get pipeline lag - // TODO: the client should be cached. - client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) - if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error())) + client, err := h.getDaemonClient(ns, pipeline) + if err != nil || client == nil { + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } - defer client.Close() var ( minWM int64 = math.MaxInt64 @@ -421,6 +425,9 @@ func (h *handler) DeletePipeline(c *gin.Context) { return } + // cleanup client after successfully deleting pipeline + h.daemonClientsCache.Remove(daemonSvcAddress(ns, pipeline)) + c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, nil)) } @@ -580,13 +587,11 @@ func (h *handler) DeleteInterStepBufferService(c *gin.Context) { func (h *handler) ListPipelineBuffers(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") - // TODO: the client should be cached. - client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) - if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to get the Inter-Step buffers for pipeline %q: %s", pipeline, err.Error())) + client, err := h.getDaemonClient(ns, pipeline) + if err != nil || client == nil { + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } - defer client.Close() buffers, err := client.ListPipelineBuffers(context.Background(), pipeline) if err != nil { @@ -601,13 +606,11 @@ func (h *handler) ListPipelineBuffers(c *gin.Context) { func (h *handler) GetPipelineWatermarks(c *gin.Context) { ns, pipeline := c.Param("namespace"), c.Param("pipeline") - // TODO: the client should be cached. - client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) - if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to get the watermarks for pipeline %q: %s", pipeline, err.Error())) + client, err := h.getDaemonClient(ns, pipeline) + if err != nil || client == nil { + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } - defer client.Close() watermarks, err := client.GetPipelineWatermarks(context.Background(), pipeline) if err != nil { @@ -681,13 +684,11 @@ func (h *handler) GetVerticesMetrics(c *gin.Context) { return } - // TODO: the client should be cached. - client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) - if err != nil { - h.respondWithError(c, fmt.Sprintf("Failed to get the vertices metrics: failed to get demon service client for namespace %q pipeline %q: %s", ns, pipeline, err.Error())) + client, err := h.getDaemonClient(ns, pipeline) + if err != nil || client == nil { + h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error())) return } - defer client.Close() var results = make(map[string][]*daemon.VertexMetrics) for _, vertex := range pl.Spec.Vertices { @@ -975,3 +976,24 @@ func validateNamespace(h *handler, pipeline *dfv1.Pipeline, ns string) error { func daemonSvcAddress(ns, pipeline string) string { return fmt.Sprintf("%s.%s.svc:%d", fmt.Sprintf("%s-daemon-svc", pipeline), ns, dfv1.DaemonServicePort) } + +func (h *handler) getDaemonClient(ns, pipeline string) (*daemonclient.DaemonClient, error) { + dClient, ok := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline)) + if !ok { + dClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline)) + if err != nil { + return nil, err + } + if dClient == nil { + return nil, fmt.Errorf("nil client") + } + h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient) + } + + client, ok := dClient.(*daemonclient.DaemonClient) + if !ok { + return nil, fmt.Errorf("failed to get client") + } + + return client, nil +}