Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cache daemon client for each pipeline #1276

Merged
merged 25 commits into from
Nov 2, 2023
Merged
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ee386bc
feat: cache client
dpadhiar Oct 23, 2023
a309070
Merge branch 'main' into cache-client
dpadhiar Oct 26, 2023
57ffc8a
feat: lru cache
dpadhiar Oct 26, 2023
6f94395
Merge branch 'main' into cache-client
dpadhiar Oct 26, 2023
660db26
chore: lint
dpadhiar Oct 26, 2023
bf56e10
feat: use daemonSvcAddress method
dpadhiar Oct 26, 2023
b35a643
Merge branch 'main' into cache-client
dpadhiar Oct 26, 2023
55eec90
Merge branch 'cache-client' of https://github.com/dpadhiar/numaflow i…
dpadhiar Oct 26, 2023
b2a4f76
feat: refactor to daemonClient method
dpadhiar Oct 27, 2023
0c451c7
Merge branch 'main' into cache-client
dpadhiar Oct 27, 2023
2361ba8
Merge branch 'main' into cache-client
dpadhiar Oct 27, 2023
4bc2a43
Merge branch 'main' into cache-client
dpadhiar Oct 30, 2023
c505154
feat: add removed returns
dpadhiar Oct 30, 2023
fa66a00
feat: nil check
dpadhiar Oct 30, 2023
ec3e022
Merge branch 'main' into cache-client
dpadhiar Oct 30, 2023
944dd4f
Merge branch 'main' into cache-client
dpadhiar Oct 31, 2023
ed20082
Merge branch 'main' into cache-client
dpadhiar Oct 31, 2023
8de5597
feat: explicitly close client when deleting pl
dpadhiar Oct 31, 2023
88256ab
feat: restore change to deleting pl
dpadhiar Oct 31, 2023
b1dfa98
Merge branch 'main' into cache-client
dpadhiar Nov 1, 2023
5fc5c5b
chore: codegen
dpadhiar Nov 1, 2023
e42dd87
feat: change nil check
dpadhiar Nov 1, 2023
ebc947c
Merge branch 'main' into cache-client
dpadhiar Nov 1, 2023
9c801b3
feat: err msg
dpadhiar Nov 2, 2023
5f24fb0
Merge branch 'main' into cache-client
whynowy Nov 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 47 additions & 28 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/gin-gonic/gin"
lru "github.com/hashicorp/golang-lru"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -57,10 +58,11 @@ const (
)

type handler struct {
kubeClient kubernetes.Interface
metricsClient *metricsversiond.Clientset
numaflowClient dfv1clients.NumaflowV1alpha1Interface
dexObj *DexObject
kubeClient kubernetes.Interface
metricsClient *metricsversiond.Clientset
numaflowClient dfv1clients.NumaflowV1alpha1Interface
daemonClientsCache *lru.Cache
dexObj *DexObject
}

// NewHandler is used to provide a new instance of the handler type
Expand All @@ -79,11 +81,15 @@ func NewHandler(dexObj *DexObject) (*handler, error) {
}
metricsClient := metricsversiond.NewForConfigOrDie(k8sRestConfig)
numaflowClient := dfv1versiond.NewForConfigOrDie(k8sRestConfig).NumaflowV1alpha1()
daemonClientsCache, _ := lru.NewWithEvict(100, func(key, value interface{}) {
_ = value.(*daemonclient.DaemonClient).Close()
})
return &handler{
kubeClient: kubeClient,
metricsClient: metricsClient,
numaflowClient: numaflowClient,
dexObj: dexObj,
kubeClient: kubeClient,
metricsClient: metricsClient,
numaflowClient: numaflowClient,
daemonClientsCache: daemonClientsCache,
dexObj: dexObj,
}, nil
}

Expand Down Expand Up @@ -313,13 +319,11 @@ func (h *handler) GetPipeline(c *gin.Context) {
}

// get pipeline lag
// TODO: the client should be cached.
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to fetch pipeline: failed to calculate lag for pipeline %q: %s", pipeline, err.Error()))
client, err := h.getDaemonClient(ns, pipeline)
if err != nil || client == nil {
h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error()))
jy4096 marked this conversation as resolved.
Show resolved Hide resolved
return
}
defer client.Close()

var (
minWM int64 = math.MaxInt64
Expand Down Expand Up @@ -421,6 +425,9 @@ func (h *handler) DeletePipeline(c *gin.Context) {
return
}

// cleanup client after successfully deleting pipeline
h.daemonClientsCache.Remove(daemonSvcAddress(ns, pipeline))

c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, nil))
}

Expand Down Expand Up @@ -580,13 +587,11 @@ func (h *handler) DeleteInterStepBufferService(c *gin.Context) {
func (h *handler) ListPipelineBuffers(c *gin.Context) {
ns, pipeline := c.Param("namespace"), c.Param("pipeline")

// TODO: the client should be cached.
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to get the Inter-Step buffers for pipeline %q: %s", pipeline, err.Error()))
client, err := h.getDaemonClient(ns, pipeline)
if err != nil || client == nil {
h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error()))
return
}
defer client.Close()

buffers, err := client.ListPipelineBuffers(context.Background(), pipeline)
if err != nil {
Expand All @@ -601,13 +606,11 @@ func (h *handler) ListPipelineBuffers(c *gin.Context) {
func (h *handler) GetPipelineWatermarks(c *gin.Context) {
ns, pipeline := c.Param("namespace"), c.Param("pipeline")

// TODO: the client should be cached.
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to get the watermarks for pipeline %q: %s", pipeline, err.Error()))
client, err := h.getDaemonClient(ns, pipeline)
if err != nil || client == nil {
h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error()))
return
}
defer client.Close()

watermarks, err := client.GetPipelineWatermarks(context.Background(), pipeline)
if err != nil {
Expand Down Expand Up @@ -681,13 +684,11 @@ func (h *handler) GetVerticesMetrics(c *gin.Context) {
return
}

// TODO: the client should be cached.
client, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to get the vertices metrics: failed to get demon service client for namespace %q pipeline %q: %s", ns, pipeline, err.Error()))
client, err := h.getDaemonClient(ns, pipeline)
if err != nil || client == nil {
h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for pipeline %q, %s", pipeline, err.Error()))
return
}
defer client.Close()

var results = make(map[string][]*daemon.VertexMetrics)
for _, vertex := range pl.Spec.Vertices {
Expand Down Expand Up @@ -975,3 +976,21 @@ func validateNamespace(h *handler, pipeline *dfv1.Pipeline, ns string) error {
func daemonSvcAddress(ns, pipeline string) string {
return fmt.Sprintf("%s.%s.svc:%d", fmt.Sprintf("%s-daemon-svc", pipeline), ns, dfv1.DaemonServicePort)
}

func (h *handler) getDaemonClient(ns, pipeline string) (*daemonclient.DaemonClient, error) {
dClient, _ := h.daemonClientsCache.Get(daemonSvcAddress(ns, pipeline))
if dClient == nil {
dClient, err := daemonclient.NewDaemonServiceClient(daemonSvcAddress(ns, pipeline))
if err != nil {
return nil, err
}
h.daemonClientsCache.Add(daemonSvcAddress(ns, pipeline), dClient)
}

client, ok := dClient.(*daemonclient.DaemonClient)
if !ok {
return nil, nil
}

return client, nil
}
Loading