Skip to content

Commit

Permalink
Merge pull request #17 from ncode/juliano/updates
Browse files Browse the repository at this point in the history
updates
  • Loading branch information
ncode authored Feb 13, 2024
2 parents 054c8e3 + 802cb67 commit 7451a30
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 24 deletions.
34 changes: 31 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ Consul based leader election with tagging support and hooks

Consul lacks a built-in feature for leader election among registered services. This tool is designed to fill that gap. It functions by designating a leader among multiple services, marking the chosen leader with a specified tag. Additionally, it allows for the execution of a script whenever a leader election occurs.

### How do I test it?
### How does it work?

Ballot uses Consul's session API to create a session for each service. The session is then used to create a lock on a key. The service that successfully creates the lock is elected as the leader. The leader is then tagged with a specified tag. The leader election is monitored and the leader is updated if the current leader is no longer healthy.
More info about the sessions here [https://developer.hashicorp.com/consul/tutorials/developer-configuration/application-leader-elections](https://developer.hashicorp.com/consul/tutorials/developer-configuration/application-leader-elections).

### How do I use it?

1. Install Ballot
```bash
Expand Down Expand Up @@ -53,10 +58,33 @@ $PORT # Port of the service
$SESSIONID # Current SessionID of the elected master
```

### Configuration

The configuration file is a yaml file with the following structure:

```yaml
consul:
token: # Consul token
election:
enabled:
- my-service-name # Name of the service enabled for election
services:
my-service-name: # Name of the service
id: my-service-name # ID of the service
key: my-service-name # Key to be used for the lock in Consul, this should be the same across all nodes
token: # Token to be used for the session in Consul
serviceChecks: # List of checks to be used to determine the health of the service
- ping # Name of the check
primaryTag: primary # Tag to be used to mark the leader
execOnPromote: '/bin/echo primary' # Command to be executed when the service is elected as leader
execOnDemote: '/bin/echo secondary' # Command to be executed when the service is demoted as leader
ttl: 10s # TTL for the session
lockDelay: 5s # Lock delay for the session
```
### TODO:
- Write tests
- Write more tests
- Add more examples
- Re-enable the hooks on state change
- Allow to pre-define the preferred leader
- Update the docks with the lock delays and timeouts
4 changes: 2 additions & 2 deletions examples/config/ballot1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ election:
serviceChecks:
- service:election1
primaryTag: primary
execOnPromote: '/usr/bin/say I am da boss'
execOnDemote: '/usr/bin/say I am no longer da boss'
execOnPromote: '/usr/bin/say primary'
execOnDemote: '/usr/bin/say secondary'
4 changes: 2 additions & 2 deletions examples/config/ballot2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ election:
serviceChecks:
- service:election2
primaryTag: primary
execOnPromote: '/usr/bin/say I am da boss'
execOnDemote: '/usr/bin/say I am no longer da boss'
execOnPromote: '/usr/bin/say primary'
execOnDemote: '/usr/bin/say secondary'
6 changes: 1 addition & 5 deletions examples/consul/my-service1.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,5 @@
"Interval": "10s",
"Timeout": "5s"
},
"EnableTagOverride": true,
"Weights": {
"Passing": 10,
"Warning": 1
}
"EnableTagOverride": true
}
6 changes: 1 addition & 5 deletions examples/consul/my-service2.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,5 @@
"Interval": "10s",
"Timeout": "5s"
},
"EnableTagOverride": true,
"Weights": {
"Passing": 10,
"Warning": 1
}
"EnableTagOverride": true
}
67 changes: 61 additions & 6 deletions internal/ballot/ballot.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type ElectionPayload struct {
SessionID string
}

type commandExecutor struct{}

func (c *commandExecutor) Command(name string, arg ...string) *exec.Cmd {
return exec.Command(name, arg...)
}

// New returns a new Ballot instance.
func New(ctx context.Context, name string) (b *Ballot, err error) {
if ctx == nil {
Expand All @@ -55,6 +61,7 @@ func New(ctx context.Context, name string) (b *Ballot, err error) {
b.leader.Store(false)
b.Token = consulConfig.Token
b.ctx = ctx
b.executor = &commandExecutor{}

b.Name = name
if b.LockDelay == 0 {
Expand Down Expand Up @@ -84,7 +91,7 @@ type Ballot struct {
leader atomic.Bool `mapstructure:"-"`
client ConsulClient `mapstructure:"-"`
ctx context.Context `mapstructure:"-"`
exec CommandExecutor `mapstructure:"-"`
executor CommandExecutor `mapstructure:"-"`
}

// Copy *api.AgentService to *api.AgentServiceRegistration
Expand Down Expand Up @@ -154,15 +161,15 @@ func (b *Ballot) runCommand(command string, electionPayload *ElectionPayload) ([
if err != nil {
return nil, err
}
cmd := b.exec.Command(args[0], args[1:]...)
cmd := b.executor.Command(args[0], args[1:]...)
cmd.Env = append(cmd.Env, fmt.Sprintf("ADDRESS=%s", electionPayload.Address))
cmd.Env = append(cmd.Env, fmt.Sprintf("PORT=%d", electionPayload.Port))
cmd.Env = append(cmd.Env, fmt.Sprintf("SESSIONID=%s", electionPayload.SessionID))
return cmd.Output()
}

// updateServiceTags updates the service tags.
func (b *Ballot) updateServiceTags() error {
func (b *Ballot) updateServiceTags(isLeader bool) error {
service, _, err := b.getService()
if err != nil {
return err
Expand All @@ -175,10 +182,10 @@ func (b *Ballot) updateServiceTags() error {
hasPrimaryTag := slices.Contains(registration.Tags, b.PrimaryTag)

// Update tags based on leadership status
if b.IsLeader() && !hasPrimaryTag {
if isLeader && !hasPrimaryTag {
// Add primary tag if not present and this node is the leader
registration.Tags = append(registration.Tags, b.PrimaryTag)
} else if !b.IsLeader() && hasPrimaryTag {
} else if !isLeader && hasPrimaryTag {
// Remove primary tag if present and this node is not the leader
index := slices.Index(registration.Tags, b.PrimaryTag)
registration.Tags = append(registration.Tags[:index], registration.Tags[index+1:]...)
Expand All @@ -187,6 +194,35 @@ func (b *Ballot) updateServiceTags() error {
return nil
}

// Run the command associated with the new leadership status
var command string
if isLeader {
command = b.ExecOnPromote
} else {
command = b.ExecOnDemote
}
if command != "" && b.executor != nil {
go func(isLeader bool, command string) {
// Run the command in a separate goroutine
ctx, cancel := context.WithTimeout(b.ctx, (b.TTL+b.LockDelay)*2)
defer cancel()
payload, err := b.waitForNextValidSessionData(ctx)
output, err := b.runCommand(command, payload)
if err != nil {
log.WithFields(log.Fields{
"caller": "updateLeadershipStatus",
"isLeader": isLeader,
"error": err,
}).Error("failed to run command")
}
log.WithFields(log.Fields{
"caller": "updateLeadershipStatus",
"isLeader": isLeader,
"output": string(output),
}).Info("ran command")
}(isLeader, command)
}

// Log the updated tags
log.WithFields(log.Fields{
"caller": "updateServiceTags",
Expand Down Expand Up @@ -345,7 +381,7 @@ func (b *Ballot) updateLeadershipStatus(isLeader bool) error {
b.leader.Store(isLeader)

// Update service tags based on leadership status
err := b.updateServiceTags()
err := b.updateServiceTags(isLeader)
if err != nil {
return err
}
Expand Down Expand Up @@ -447,6 +483,25 @@ func (b *Ballot) IsLeader() bool {
return b.leader.Load() && b.sessionID.Load() != nil
}

func (b *Ballot) waitForNextValidSessionData(ctx context.Context) (data *ElectionPayload, err error) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
data, err := b.getSessionData()
if err != nil {
return data, err
}
if data != nil {
return data, nil
}
case <-ctx.Done():
return data, ctx.Err()
}
}
}

func (b *Ballot) getSessionData() (data *ElectionPayload, err error) {
sessionKey, _, err := b.client.KV().Get(b.Key, nil)
if err != nil {
Expand Down
26 changes: 25 additions & 1 deletion internal/ballot/ballot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestRunCommand(t *testing.T) {

// Create a Ballot instance with the mock executor
b := &Ballot{
exec: mockExecutor,
executor: mockExecutor,
}

// Define the command to run
Expand Down Expand Up @@ -178,3 +178,27 @@ func TestIsLeader(t *testing.T) {
assert.False(t, b.IsLeader())
})
}

type MockConsulClient struct {
mock.Mock
}

func (m *MockConsulClient) Agent() *api.Agent {
args := m.Called()
return args.Get(0).(*api.Agent)
}

func (m *MockConsulClient) Catalog() *api.Catalog {
args := m.Called()
return args.Get(0).(*api.Catalog)
}

func (m *MockConsulClient) KV() *api.KV {
args := m.Called()
return args.Get(0).(*api.KV)
}

func (m *MockConsulClient) Session() *api.Session {
args := m.Called()
return args.Get(0).(*api.Session)
}

0 comments on commit 7451a30

Please sign in to comment.