Skip to content

Commit

Permalink
Merge pull request #143 from 9corp/dselans/post-overwatch-fixes
Browse files Browse the repository at this point in the history
Dselans/post overwatch fixes
  • Loading branch information
dselans authored Apr 19, 2017
2 parents 397106c + 2243776 commit b12e3b5
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 27 deletions.
5 changes: 0 additions & 5 deletions alerter/alerter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"strings"

log "github.com/Sirupsen/logrus"
"github.com/relistan/go-director"
gouuid "github.com/satori/go.uuid"

"github.com/9corp/9volt/base"
Expand All @@ -41,7 +40,6 @@ type Alerter struct {
Config *config.Config
Alerters map[string]IAlerter
MessageChannel <-chan *Message
Looper director.Looper

base.Component
}
Expand All @@ -63,7 +61,6 @@ func New(cfg *config.Config, messageChannel <-chan *Message) *Alerter {
MemberID: cfg.MemberID,
Config: cfg,
MessageChannel: messageChannel,
Looper: director.NewFreeLooper(director.FOREVER, make(chan error)),
Component: base.Component{
Identifier: "alerter",
},
Expand Down Expand Up @@ -113,8 +110,6 @@ OUTER:
log.Debugf("%v-run: Received message (%v) from checker '%v' -> '%v'", msg.uuid, a.Identifier, msg.Source, msg.Key)

go a.handleMessage(msg)

return nil
case <-a.Component.Ctx.Done():
log.Debugf("%v-run: Asked to shutdown", a.Identifier)
break OUTER
Expand Down
32 changes: 27 additions & 5 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package cluster
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path"
Expand Down Expand Up @@ -88,6 +87,7 @@ type Cluster struct {
DirectorHeartbeatLooper looper.Looper
MemberHeartbeatLooper looper.Looper
restarted bool
shutdown bool // provide a way for basic (non-director) loopers to exit

base.Component
}
Expand Down Expand Up @@ -146,6 +146,7 @@ func (c *Cluster) Start() error {
log.Infof("%v: Launching cluster engine components...", c.Identifier)

c.Component.Ctx, c.Component.Cancel = context.WithCancel(context.Background())
c.shutdown = false

go c.runDirectorMonitor()
go c.runDirectorHeartbeat()
Expand All @@ -169,6 +170,9 @@ func (c *Cluster) Stop() error {
// stop the director heartbeat send
c.DirectorHeartbeatLooper.Quit()

// alert memberMonitor to stop if we're not a director
c.shutdown = true

// stop memberMonitor
if c.Component.Cancel == nil {
log.Warningf("%v: Looks like .Cancel is nil; is this expected?", c.Identifier)
Expand Down Expand Up @@ -217,13 +221,21 @@ func (c *Cluster) runDirectorHeartbeat() {
c.DirectorHeartbeatLooper.Loop(func() error {
if !c.amDirector() {
// log.Debugf("%v-directorHeartbeat: Not a director - nothing to do", c.Identifier)
return errors.New("Not a director, nothing to do")
return nil
}

// update */director with current state data
if err := c.sendDirectorHeartbeat(); err != nil {
c.Config.EQClient.AddWithErrorLog("error", fmt.Sprintf("%v-directorHeartbeat: %v", c.Identifier, err.Error()))
return err

// Let overwatch decide what to do in this case
c.OverwatchChan <- &overwatch.Message{
Error: fmt.Errorf("Potential etcd write error: %v", err),
Source: fmt.Sprintf("%v.runDirectorHeartbeat", c.Identifier),
ErrorType: overwatch.ETCD_GENERIC_ERROR,
}

return nil
} else {
log.Debugf("%v-directorHeartbeat: Successfully sent periodic heartbeat (MemberID: %v)",
c.Identifier, c.MemberID)
Expand Down Expand Up @@ -263,6 +275,16 @@ func (c *Cluster) runMemberMonitor() {
watcher := c.DalClient.NewWatcher(membersDir, true)

for {
// This for loop cannot be a director looper -- happy path is to block
// indefinitely on the watch - with a director looper, another execution
// would be triggered when the interval is reached.
//
// This could be a channel, but this seems easy albeit a bit hacky
if c.shutdown {
log.Debugf("%v-runMemberMonitor: Received a (non-context based) notice to shutdown", c.Identifier)
break
}

if !c.amDirector() {
time.Sleep(time.Duration(c.Config.HeartbeatInterval))
continue
Expand Down Expand Up @@ -379,7 +401,7 @@ func (c *Cluster) runMemberHeartbeat() {
c.Config.EQClient.AddWithErrorLog("error",
fmt.Sprintf("%v-runMemberHeartbeat: Unable to generate member JSON (retrying in %v): %v",
c.Identifier, c.Config.HeartbeatInterval.String(), err.Error()))
return err
return nil
}

// set status key (could fail)
Expand All @@ -404,7 +426,7 @@ func (c *Cluster) runMemberHeartbeat() {
ErrorType: overwatch.ETCD_GENERIC_ERROR,
}

return err
return nil
}

// refresh dir
Expand Down
12 changes: 6 additions & 6 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,6 @@ var _ = Describe("cluster", func() {
})

It("should not do anything if not director", func() {
err := <-looperChan

Expect(err).To(Equal(errors.New("Not a director, nothing to do")))
Expect(fakeDAL.UpdateDirectorStateCallCount()).To(Equal(0))
})
})
Expand All @@ -193,14 +190,17 @@ var _ = Describe("cluster", func() {
fakeDAL.UpdateDirectorStateReturns(errors.New("generic error"))
})

It("should return error + add event log", func() {
err := <-looperChan
It("should add event log and send message to overwatch", func() {
k, v := fakeEventClient.AddWithErrorLogArgsForCall(0)

Expect(err.Error()).To(ContainSubstring("Unable to update director heartbeat"))
Expect(fakeDAL.UpdateDirectorStateCallCount()).To(Equal(1))
Expect(k).To(Equal("error"))
Expect(v).To(ContainSubstring("directorHeartbeat"))

time.Sleep(100 * time.Millisecond)
overwatchMsg := <-overwatchChan

Expect(overwatchMsg.Error.Error()).To(ContainSubstring("Potential etcd write error"))
})
})
})
Expand Down
20 changes: 19 additions & 1 deletion dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,25 @@ func (d *Dal) FetchAllMemberRefs() (map[string]string, []string, error) {
Recurse: true,
})
if err != nil {
return nil, nil, fmt.Errorf("Problem fetching refs for '%v': %v", memberID, err.Error())
// This is a precaution, it _shouldn't_ happen -- this case should've only
// been a problem *before* the introduction of overwatch.
//
// Previously, if etcd went away, members wouldn't be able to refresh
// their memberdir eventually causing the memberdir to automatically
// disappear (since they have TTL's). When etcd would come back, th
// member would resume sending heartbeats but it's config dir would
// no longer exist.
//
// With overwatch, this should no longer be an issue -- when etcd
// experiences a problem, all components are properly shutdown; then
// when etcd recovers, overwatch will start the components back up
// which will recreate the member's directory structure.
if client.IsKeyNotFound(err) {
log.Debugf("dal: FetchAllMemberRefs() - member '%v' does not have a config dir, is this expected?", memberID)
refs = make(map[string]string, 0)
} else {
return nil, nil, fmt.Errorf("Problem fetching refs for '%v': %v", memberID, err.Error())
}
}

if len(refs) == 0 {
Expand Down
6 changes: 0 additions & 6 deletions manager/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
// TODO: If our 'member dir' disappears, should we stop all monitors?
// TODO: Does a state change mean we should cease monitoring?

package manager

import (
Expand All @@ -10,7 +7,6 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/coreos/etcd/client"
"github.com/relistan/go-director"

"github.com/9corp/9volt/alerter"
"github.com/9corp/9volt/base"
Expand All @@ -23,7 +19,6 @@ import (
type Manager struct {
MemberID string
Config *config.Config
Looper director.Looper
Monitor *monitor.Monitor
OverwatchChan chan<- *overwatch.Message

Expand All @@ -34,7 +29,6 @@ func New(cfg *config.Config, messageChannel chan *alerter.Message, stateChannel
return &Manager{
MemberID: cfg.MemberID,
Config: cfg,
Looper: director.NewFreeLooper(director.FOREVER, make(chan error)),
Monitor: monitor.New(cfg, messageChannel, stateChannel),
OverwatchChan: overwatchChan,
Component: base.Component{
Expand Down
4 changes: 2 additions & 2 deletions overwatch/overwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ package overwatch

import (
"context"
"errors"
// "errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -81,7 +81,7 @@ func (o *Overwatch) runListener() error {

if o.activeWatch {
log.Debugf("%v: Watcher already activated, nothing else left to do", o.Identifier)
return errors.New("Watcher already activated, nothing else left to do")
return nil
}

o.activeWatch = true
Expand Down
4 changes: 2 additions & 2 deletions overwatch/overwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ var _ = Describe("overwatch", func() {
o.activeWatch = true

o.runListener()
err := o.Looper.Wait()

Expect(err).To(Equal(errors.New("Watcher already activated, nothing else left to do")))
// stopTheWorld() should not have been called
Expect(fakeComponent.StopCallCount()).To(Equal(0))
})

It("sets active watch to true and stops the world", func() {
Expand Down

0 comments on commit b12e3b5

Please sign in to comment.