Skip to content

Commit

Permalink
Merge pull request #6 from vimeo/gone_b_gone
Browse files Browse the repository at this point in the history
PodWatcher: handle Gone Errors in Watch stream
  • Loading branch information
dfinkel authored Jul 28, 2020
2 parents ce281ad + 2354fc7 commit 51cdb00
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 7 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsC
github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8=
github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down
24 changes: 23 additions & 1 deletion k8s_pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ func (p *PodWatcher) initialPods(ctx context.Context) (int, string, error) {
// channel was closed.
var ErrResultsClosed = errors.New("k8s result channel closed")

var errVersionGone = errors.New("k8s GONE status; resync required")

const backoffResetThreshold = time.Hour

// Run starts a watcher loop, will generate CreatePod events for all existing
Expand Down Expand Up @@ -279,7 +281,9 @@ func (p *PodWatcher) Run(ctx context.Context) error {
}

rv, err := p.watch(ctx, podWatch, version, cbChans)
if err != ErrResultsClosed {
switch err {
case ErrResultsClosed, errVersionGone:
default:
return err
}
version = rv
Expand All @@ -306,6 +310,14 @@ func (p *PodWatcher) cbRunner(ctx context.Context, cb EventCallback, wg *sync.Wa
}
}

func errorEventIsGone(ev watch.Event) bool {
errObj, isStatus := ev.Object.(*k8smeta.Status)
if !isStatus {
return false
}
return errObj.Reason == k8smeta.StatusReasonGone
}

func (p *PodWatcher) watch(ctx context.Context, podWatch watch.Interface, rv string, cbChans []chan<- PodEvent) (string, error) {
lastRV := rv
resCh := podWatch.ResultChan()
Expand All @@ -325,6 +337,16 @@ func (p *PodWatcher) watch(ctx context.Context, podWatch watch.Interface, rv str
if !ok {
if ev.Type == watch.Error {
p.logf("received error event: %s", ev)
// if the error has reason "Gone",
// return and let the outr loop
// reconnect.
if errorEventIsGone(ev) {
podWatch.Stop()
// drain the result channel then exit
for range resCh {
}
return lastRV, errVersionGone
}
}
continue
}
Expand Down
33 changes: 27 additions & 6 deletions k8s_pod_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
pi podInfo
vers string
eventType watch.EventType
errObj runtime.Object
}
type watchRet struct {
watch []watchEvent
Expand Down Expand Up @@ -767,6 +768,21 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
&DeletePod{name: "foobar"},
},
},
{
name: "one_ready_then_dies_one_reconnect_change_error",
listRets: []listRetPI{{pi: []podInfo{{name: "foobar", ip: "10.42.42.42", labels: map[string]string{"app": "fimbat"},
ready: true, phase: k8score.PodRunning}}}, {pi: []podInfo{}}},
watchRets: []watchRet{
{watch: []watchEvent{{errObj: &k8smeta.Status{Code: 410, Reason: k8smeta.StatusReasonGone}, eventType: watch.Error}}},
{err: goneErr{}},
{watch: []watchEvent{}},
},
expectedEvents: []PodEvent{
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning)},
&DeletePod{name: "foobar"},
},
},
{
name: "one_ready_then_dies_one_reconnect_dedup_delete",
listRets: []listRetPI{{pi: []podInfo{{name: "foobar", ip: "10.42.42.42", labels: map[string]string{"app": "fimbat"},
Expand All @@ -776,7 +792,7 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
{err: goneErr{}},
{watch: []watchEvent{
{pi: podInfo{name: "foobar", ip: "10.42.42.42", labels: map[string]string{"app": "fimbat"},
ready: false, phase: k8score.PodFailed}},
ready: false, phase: k8score.PodFailed}, eventType: watch.Modified},
}},
},
expectedEvents: []PodEvent{
Expand Down Expand Up @@ -877,11 +893,16 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
// all events
fw := watch.NewFakeWithChanSize(len(eventSlice), false)
for _, ev := range eventSlice {
p := genPod(ev.pi.name, ev.pi.ip,
ev.pi.labels, ev.pi.ready,
ev.pi.phase)
p.ResourceVersion = ev.vers
fw.Action(ev.eventType, p)
switch ev.eventType {
case watch.Added, watch.Modified, watch.Deleted:
p := genPod(ev.pi.name, ev.pi.ip,
ev.pi.labels, ev.pi.ready,
ev.pi.phase)
p.ResourceVersion = ev.vers
fw.Action(ev.eventType, p)
case watch.Error:
fw.Action(ev.eventType, ev.errObj)
}
}
// Stop the watcher so it closes the internal
// channel
Expand Down

0 comments on commit 51cdb00

Please sign in to comment.