From 79bab27afea059924a3f7e59b8929bea3bd78b86 Mon Sep 17 00:00:00 2001 From: Sergio Salvatore Date: Thu, 9 May 2024 12:53:14 -0400 Subject: [PATCH] Make Full ListOptions Available Create an alternate constructor that allows us to set all the fields of the ListOptions so we can filter on Fields as well as Labels. Also, delegate to the new, more flexible, constructor from the old one. --- k8s_pod_watcher.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/k8s_pod_watcher.go b/k8s_pod_watcher.go index f628975..55e518e 100644 --- a/k8s_pod_watcher.go +++ b/k8s_pod_watcher.go @@ -174,7 +174,7 @@ func (i *InitialListComplete) Continues() bool { type PodWatcher struct { cs kubernetes.Interface k8sNamespace string - selector string + listOpts k8smeta.ListOptions cbs []EventCallback @@ -189,13 +189,14 @@ type Logger interface { Printf(string, ...interface{}) } -// NewPodWatcher constructs a new PodWatcher. Each callback in the variadic -// callback list is called in its own goroutine. -func NewPodWatcher(cs kubernetes.Interface, k8sNamespace, selector string, cbs ...EventCallback) *PodWatcher { +// NewPodWatcherOptions constructs a new PodWatcher with a potentially more +// complicated selector. Each callback in the variadic callback list is called +// in its own goroutine. +func NewPodWatcherOptions(cs kubernetes.Interface, k8sNamespace string, options k8smeta.ListOptions, cbs ...EventCallback) *PodWatcher { return &PodWatcher{ cs: cs, k8sNamespace: k8sNamespace, - selector: selector, + listOpts: options, cbs: cbs, tracker: podTracker{ lastStatus: map[string]*k8score.Pod{}, @@ -204,6 +205,13 @@ func NewPodWatcher(cs kubernetes.Interface, k8sNamespace, selector string, cbs . } } +// NewPodWatcher constructs a new PodWatcher. The `selector` selects by labels +// only. Each callback in the variadic callback list is called in its own +// goroutine. +func NewPodWatcher(cs kubernetes.Interface, k8sNamespace, selector string, cbs ...EventCallback) *PodWatcher { + return NewPodWatcherOptions(cs, k8sNamespace, k8smeta.ListOptions{LabelSelector: selector}, cbs...) +} + func (p *PodWatcher) logf(format string, args ...interface{}) { if p.Logger == nil { return @@ -211,10 +219,6 @@ func (p *PodWatcher) logf(format string, args ...interface{}) { p.Logger.Printf(format, args...) } -func (p *PodWatcher) listOpts() k8smeta.ListOptions { - return k8smeta.ListOptions{LabelSelector: p.selector} -} - func setContinues(ev PodEvent, continues bool) { switch pe := ev.(type) { case *CreatePod: @@ -231,7 +235,7 @@ func setContinues(ev PodEvent, continues bool) { // returns the new version ID (or an error) func (p *PodWatcher) resync(ctx context.Context, cbChans []chan<- PodEvent) (string, error) { initPods, initListerr := p.cs.CoreV1().Pods(p.k8sNamespace).List( - ctx, p.listOpts()) + ctx, p.listOpts) if initListerr != nil { return "", fmt.Errorf("failed pod listing: %w", initListerr) } @@ -278,7 +282,7 @@ func (p *PodWatcher) resync(ctx context.Context, cbChans []chan<- PodEvent) (str // returns the number of pods, resource version and (optionally) an error func (p *PodWatcher) initialPods(ctx context.Context) (int, string, error) { initPods, initListerr := p.cs.CoreV1().Pods(p.k8sNamespace).List( - ctx, p.listOpts()) + ctx, p.listOpts) if initListerr != nil { return -1, "", fmt.Errorf("failed initial pod listing: %w", initListerr) } @@ -391,7 +395,7 @@ func (p *PodWatcher) Run(ctx context.Context) error { lastwatchStart := time.Now() for { - watchOpt := p.listOpts() + watchOpt := p.listOpts watchOpt.ResourceVersion = version podWatch, watchStartErr := p.cs.CoreV1().Pods(p.k8sNamespace).Watch(ctx, watchOpt) if watchStartErr != nil {