Skip to content

Commit

Permalink
feat: cache daemon client for each pipeline (#1276)
Browse files Browse the repository at this point in the history
Signed-off-by: Dillen Padhiar <[email protected]>
  • Loading branch information
dpadhiar authored Nov 2, 2023
1 parent a727042 commit 42f81df
Showing 1 changed file with 50 additions and 28 deletions.
78 changes: 50 additions & 28 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/gin-gonic/gin"
lru "github.com/hashicorp/golang-lru"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -57,10 +58,11 @@ const (
)

type handler struct {
kubeClient kubernetes.Interface
metricsClient *metricsversiond.Clientset
numaflowClient dfv1clients.NumaflowV1alpha1Interface
dexObj *DexObject
kubeClient kubernetes.Interface
metricsClient *metricsversiond.Clientset
numaflowClient dfv1clients.NumaflowV1alpha1Interface
daemonClientsCache *lru.Cache
dexObj *DexObject
}

// NewHandler is used to provide a new instance of the handler type
Expand All @@ -79,11 +81,15 @@ func NewHandler(dexObj *DexObject) (*handler, error) {
}
metricsClient := metricsversiond.NewForConfigOrDie(k8sRestConfig)
numaflowClient := dfv1versiond.NewForConfigOrDie(k8sRestConfig).NumaflowV1alpha1()
daemonClientsCache, _ := lru.NewWithEvict(100, func(key, value interface{}) {
_ = value.(*daemonclient.DaemonClient).Close()
})
return &handler{
kubeClient: kubeClient,
metricsClient: metricsClient,
numaflowClient: numaflowClient,
dexObj: dexObj,
kubeClient: kubeClient,
metricsClient: metricsClient,
numaflowClient: numaflowClient,
daemonClientsCache: daemonClientsCache,
dexObj: dexObj,
}, nil
}

Expand Down Expand Up @@ -313,13 +319,11 @@ func (h *handler) GetPipeline(c *gin.Context) {
}

// get pipeline lag
// TODO: the client should be cached.
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error()))
client, err := h.getDaemonClient(ns, pipeline)
if err != nil || client == nil {
h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error()))
return
}
defer client.Close()

var (
minWM int64 = math.MaxInt64
Expand Down Expand Up @@ -421,6 +425,9 @@ func (h *handler) DeletePipeline(c *gin.Context) {
return
}

// cleanup client after successfully deleting pipeline
h.daemonClientsCache.Remove(daemonSvcAddress(ns, pipeline))

c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, nil))
}

Expand Down Expand Up @@ -580,13 +587,11 @@ func (h *handler) DeleteInterStepBufferService(c *gin.Context) {
func (h *handler) ListPipelineBuffers(c *gin.Context) {
ns, pipeline := c.Param("namespace"), c.Param("pipeline")

// TODO: the client should be cached.
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to get the Inter-Step buffers for pipeline %q: %s", pipeline, err.Error()))
client, err := h.getDaemonClient(ns, pipeline)
if err != nil || client == nil {
h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error()))
return
}
defer client.Close()

buffers, err := client.ListPipelineBuffers(context.Background(), pipeline)
if err != nil {
Expand All @@ -601,13 +606,11 @@ func (h *handler) ListPipelineBuffers(c *gin.Context) {
func (h *handler) GetPipelineWatermarks(c *gin.Context) {
ns, pipeline := c.Param("namespace"), c.Param("pipeline")

// TODO: the client should be cached.
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to get the watermarks for pipeline %q: %s", pipeline, err.Error()))
client, err := h.getDaemonClient(ns, pipeline)
if err != nil || client == nil {
h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error()))
return
}
defer client.Close()

watermarks, err := client.GetPipelineWatermarks(context.Background(), pipeline)
if err != nil {
Expand Down Expand Up @@ -681,13 +684,11 @@ func (h *handler) GetVerticesMetrics(c *gin.Context) {
return
}

// TODO: the client should be cached.
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to get the vertices metrics: failed to get demon service client for namespace %q pipeline %q: %s", ns, pipeline, err.Error()))
client, err := h.getDaemonClient(ns, pipeline)
if err != nil || client == nil {
h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error()))
return
}
defer client.Close()

var results = make(map[string][]*daemon.VertexMetrics)
for _, vertex := range pl.Spec.Vertices {
Expand Down Expand Up @@ -975,3 +976,24 @@ func validateNamespace(h *handler, pipeline *dfv1.Pipeline, ns string) error {
func daemonSvcAddress(ns, pipeline string) string {
return fmt.Sprintf("%s.%s.svc:%d", fmt.Sprintf("%s-daemon-svc", pipeline), ns, dfv1.DaemonServicePort)
}

func (h *handler) getDaemonClient(ns, pipeline string) (*daemonclient.DaemonClient, error) {
dClient, ok := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline))
if !ok {
dClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
return nil, err
}
if dClient == nil {
return nil, fmt.Errorf("nil client")
}
h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient)
}

client, ok := dClient.(*daemonclient.DaemonClient)
if !ok {
return nil, fmt.Errorf("failed to get client")
}

return client, nil
}

0 comments on commit 42f81df

Please sign in to comment.