Skip to content

Commit

Permalink
Merge pull request #16 from vimeo/make-full-options-available
Browse files Browse the repository at this point in the history
Make Full ListOptions Available
  • Loading branch information
sergiosalvatore authored May 9, 2024
2 parents 5ba38e2 + 79bab27 commit f08850d
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions k8s_pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (i *InitialListComplete) Continues() bool {
type PodWatcher struct {
cs kubernetes.Interface
k8sNamespace string
selector string
listOpts k8smeta.ListOptions

cbs []EventCallback

Expand All @@ -189,13 +189,14 @@ type Logger interface {
Printf(string, ...interface{})
}

// NewPodWatcher constructs a new PodWatcher. Each callback in the variadic
// callback list is called in its own goroutine.
func NewPodWatcher(cs kubernetes.Interface, k8sNamespace, selector string, cbs ...EventCallback) *PodWatcher {
// NewPodWatcherOptions constructs a new PodWatcher with a potentially more
// complicated selector. Each callback in the variadic callback list is called
// in its own goroutine.
func NewPodWatcherOptions(cs kubernetes.Interface, k8sNamespace string, options k8smeta.ListOptions, cbs ...EventCallback) *PodWatcher {
return &PodWatcher{
cs: cs,
k8sNamespace: k8sNamespace,
selector: selector,
listOpts: options,
cbs: cbs,
tracker: podTracker{
lastStatus: map[string]*k8score.Pod{},
Expand All @@ -204,17 +205,20 @@ func NewPodWatcher(cs kubernetes.Interface, k8sNamespace, selector string, cbs .
}
}

// NewPodWatcher constructs a new PodWatcher. The `selector` selects by labels
// only. Each callback in the variadic callback list is called in its own
// goroutine.
func NewPodWatcher(cs kubernetes.Interface, k8sNamespace, selector string, cbs ...EventCallback) *PodWatcher {
return NewPodWatcherOptions(cs, k8sNamespace, k8smeta.ListOptions{LabelSelector: selector}, cbs...)
}

func (p *PodWatcher) logf(format string, args ...interface{}) {
if p.Logger == nil {
return
}
p.Logger.Printf(format, args...)
}

func (p *PodWatcher) listOpts() k8smeta.ListOptions {
return k8smeta.ListOptions{LabelSelector: p.selector}
}

func setContinues(ev PodEvent, continues bool) {
switch pe := ev.(type) {
case *CreatePod:
Expand All @@ -231,7 +235,7 @@ func setContinues(ev PodEvent, continues bool) {
// returns the new version ID (or an error)
func (p *PodWatcher) resync(ctx context.Context, cbChans []chan<- PodEvent) (string, error) {
initPods, initListerr := p.cs.CoreV1().Pods(p.k8sNamespace).List(
ctx, p.listOpts())
ctx, p.listOpts)
if initListerr != nil {
return "", fmt.Errorf("failed pod listing: %w", initListerr)
}
Expand Down Expand Up @@ -278,7 +282,7 @@ func (p *PodWatcher) resync(ctx context.Context, cbChans []chan<- PodEvent) (str
// returns the number of pods, resource version and (optionally) an error
func (p *PodWatcher) initialPods(ctx context.Context) (int, string, error) {
initPods, initListerr := p.cs.CoreV1().Pods(p.k8sNamespace).List(
ctx, p.listOpts())
ctx, p.listOpts)
if initListerr != nil {
return -1, "", fmt.Errorf("failed initial pod listing: %w", initListerr)
}
Expand Down Expand Up @@ -391,7 +395,7 @@ func (p *PodWatcher) Run(ctx context.Context) error {

lastwatchStart := time.Now()
for {
watchOpt := p.listOpts()
watchOpt := p.listOpts
watchOpt.ResourceVersion = version
podWatch, watchStartErr := p.cs.CoreV1().Pods(p.k8sNamespace).Watch(ctx, watchOpt)
if watchStartErr != nil {
Expand Down

0 comments on commit f08850d

Please sign in to comment.