Skip to content

Commit

Permalink
broker: tear down and restart a demoted replica
Browse files Browse the repository at this point in the history
It's possible for a broker to transition from primary => replica, if it
doesn't happen to observe a keyspace update where it's assignment is
removed prior to it being re-added by the allocator.

If we were the primary, we need to be sure to tear down the replication
pipeline we may be holding. Handle this scenario by cancelling the
current replica and building a new one.

Also add a (hopefully) temporary work-around for the multiple assignment
issue we're currently tracking down.
  • Loading branch information
jgraettinger committed Jan 17, 2025
1 parent 41c8dbe commit e0f2f4a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 6 deletions.
25 changes: 20 additions & 5 deletions broker/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type resolver struct {
type resolverReplica struct {
*replica
assignments keyspace.KeyValues
primary bool
signalCh chan struct{}
}

Expand Down Expand Up @@ -227,12 +228,22 @@ func (r *resolver) updateResolutions() {
for _, li := range r.state.LocalItems {
var item = li.Item.Decoded.(allocator.Item)
var name = pb.Journal(item.ID)

var primary = li.Assignments[li.Index].Decoded.(allocator.Assignment).Slot == 0
var replica, ok = r.replicas[name]
if !ok {

if _, ok := next[name]; ok {
// TODO(johnny): This SHOULD not happen, but sometimes does.
// If it does, we don't want to create extra replicas that are unlinked.
continue
}

// If the replica is not found, or if it's `primary` but we have been demoted,
// then create a new replica and tear down the old (if there is one).
if !ok || !primary && replica.primary {
r.wg.Add(1)
replica = &resolverReplica{
replica: r.newReplica(name), // Newly assigned journal.
primary: primary,
assignments: li.Assignments.Copy(),
signalCh: make(chan struct{}),
}
Expand All @@ -241,11 +252,12 @@ func (r *resolver) updateResolutions() {
pbx.Init(&rt, li.Assignments)

log.WithFields(log.Fields{
"name": replica.journal,
"route": rt,
"name": replica.journal,
"primary": primary,
}).Info("starting local journal replica")

} else {
replica.primary = primary
delete(r.replicas, name)
}
next[name] = replica
Expand Down Expand Up @@ -277,7 +289,10 @@ func (r *resolver) stopServingLocalReplicas() {

func (r *resolver) cancelReplicas(m map[pb.Journal]*resolverReplica) {
for _, replica := range m {
log.WithField("name", replica.journal).Info("stopping local journal replica")
log.WithFields(log.Fields{
"name": replica.journal,
"primary": replica.primary,
}).Info("stopping local journal replica")

// Close |signalCh| to unblock any Replicate or Append RPCs which would
// otherwise race shutDownReplica() to the |spoolCh| or |pipelineCh|.
Expand Down
23 changes: 22 additions & 1 deletion broker/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func TestResolverLocalReplicaStopping(t *testing.T) {
var peer = newMockBroker(t, etcd, pb.ProcessSpec_ID{Zone: "peer", Suffix: "broker"})

setTestJournal(broker, pb.JournalSpec{Name: "a/journal", Replication: 1}, broker.id)
setTestJournal(broker, pb.JournalSpec{Name: "peer/journal", Replication: 1}, peer.id)

// Precondition: journal & replica resolve as per expectation.
var r, _ = broker.svc.resolver.resolve(ctx, allClaims, "a/journal", resolveOpts{})
Expand All @@ -167,6 +166,28 @@ func TestResolverLocalReplicaStopping(t *testing.T) {
require.NotNil(t, r.replica)
require.NoError(t, r.replica.ctx.Err())

// Initially, we're primary for `peer/journal`.
setTestJournal(broker, pb.JournalSpec{Name: "peer/journal", Replication: 1}, broker.id)

r, _ = broker.svc.resolver.resolve(ctx, allClaims, "peer/journal", resolveOpts{})
require.Equal(t, pb.Status_OK, r.status)
require.Equal(t, broker.id, r.Header.ProcessId)
require.NotNil(t, r.replica)

var prevReplica = r.replica

// We're demoted to replica for `peer/journal`.
setTestJournal(broker, pb.JournalSpec{Name: "peer/journal", Replication: 1}, peer.id, broker.id)

r, _ = broker.svc.resolver.resolve(ctx, allClaims, "peer/journal", resolveOpts{})
require.Equal(t, pb.Status_OK, r.status)
require.Equal(t, broker.id, r.Header.ProcessId)
require.NotNil(t, r.replica)
require.NotEqual(t, prevReplica, r.replica) // Expect a new replica was created.

// Peer is wholly responsible for `peer/journal`.
setTestJournal(broker, pb.JournalSpec{Name: "peer/journal", Replication: 1}, peer.id)

broker.svc.resolver.stopServingLocalReplicas()

// Expect a route invalidation occurred immediately, to wake any awaiting RPCs.
Expand Down

0 comments on commit e0f2f4a

Please sign in to comment.