diff --git a/k8s_pod_watcher.go b/k8s_pod_watcher.go index f628975..55e518e 100644 --- a/k8s_pod_watcher.go +++ b/k8s_pod_watcher.go @@ -174,7 +174,7 @@ func (i *InitialListComplete) Continues() bool { type PodWatcher struct { cs kubernetes.Interface k8sNamespace string - selector string + listOpts k8smeta.ListOptions cbs []EventCallback @@ -189,13 +189,14 @@ type Logger interface { Printf(string, ...interface{}) } -// NewPodWatcher constructs a new PodWatcher. Each callback in the variadic -// callback list is called in its own goroutine. -func NewPodWatcher(cs kubernetes.Interface, k8sNamespace, selector string, cbs ...EventCallback) *PodWatcher { +// NewPodWatcherOptions constructs a new PodWatcher with a potentially more +// complicated selector. Each callback in the variadic callback list is called +// in its own goroutine. +func NewPodWatcherOptions(cs kubernetes.Interface, k8sNamespace string, options k8smeta.ListOptions, cbs ...EventCallback) *PodWatcher { return &PodWatcher{ cs: cs, k8sNamespace: k8sNamespace, - selector: selector, + listOpts: options, cbs: cbs, tracker: podTracker{ lastStatus: map[string]*k8score.Pod{}, @@ -204,6 +205,13 @@ func NewPodWatcher(cs kubernetes.Interface, k8sNamespace, selector string, cbs . } } +// NewPodWatcher constructs a new PodWatcher. The `selector` selects by labels +// only. Each callback in the variadic callback list is called in its own +// goroutine. +func NewPodWatcher(cs kubernetes.Interface, k8sNamespace, selector string, cbs ...EventCallback) *PodWatcher { + return NewPodWatcherOptions(cs, k8sNamespace, k8smeta.ListOptions{LabelSelector: selector}, cbs...) +} + func (p *PodWatcher) logf(format string, args ...interface{}) { if p.Logger == nil { return @@ -211,10 +219,6 @@ func (p *PodWatcher) logf(format string, args ...interface{}) { p.Logger.Printf(format, args...) } -func (p *PodWatcher) listOpts() k8smeta.ListOptions { - return k8smeta.ListOptions{LabelSelector: p.selector} -} - func setContinues(ev PodEvent, continues bool) { switch pe := ev.(type) { case *CreatePod: @@ -231,7 +235,7 @@ func setContinues(ev PodEvent, continues bool) { // returns the new version ID (or an error) func (p *PodWatcher) resync(ctx context.Context, cbChans []chan<- PodEvent) (string, error) { initPods, initListerr := p.cs.CoreV1().Pods(p.k8sNamespace).List( - ctx, p.listOpts()) + ctx, p.listOpts) if initListerr != nil { return "", fmt.Errorf("failed pod listing: %w", initListerr) } @@ -278,7 +282,7 @@ func (p *PodWatcher) resync(ctx context.Context, cbChans []chan<- PodEvent) (str // returns the number of pods, resource version and (optionally) an error func (p *PodWatcher) initialPods(ctx context.Context) (int, string, error) { initPods, initListerr := p.cs.CoreV1().Pods(p.k8sNamespace).List( - ctx, p.listOpts()) + ctx, p.listOpts) if initListerr != nil { return -1, "", fmt.Errorf("failed initial pod listing: %w", initListerr) } @@ -391,7 +395,7 @@ func (p *PodWatcher) Run(ctx context.Context) error { lastwatchStart := time.Now() for { - watchOpt := p.listOpts() + watchOpt := p.listOpts watchOpt.ResourceVersion = version podWatch, watchStartErr := p.cs.CoreV1().Pods(p.k8sNamespace).Watch(ctx, watchOpt) if watchStartErr != nil {