Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broker: tear down and restart a demoted replica #415

Merged
merged 3 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 37 additions & 24 deletions allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ func Allocate(args AllocateArgs) error {
return nil
}

// Response of the last transaction we applied. We'll ensure we've minimally
// watched through its revision before driving further action.
var txnResponse *clientv3.TxnResponse
// `next` Etcd revision we must read through before proceeding.
var next = ks.Header.Revision + 1

if state.isLeader() {

Expand Down Expand Up @@ -104,7 +103,13 @@ func Allocate(args AllocateArgs) error {
// Converge the current state towards |desired|.
var err error
if err = converge(txn, state, desired); err == nil {
txnResponse, err = txn.Commit()
err = txn.Flush()
}

// We must read through any Etcd transactions applied by `txn`,
// even if it subsequently encountered an error.
if r := txn.Revision(); r > next {
next = r
}

if err != nil {
Expand All @@ -118,19 +123,13 @@ func Allocate(args AllocateArgs) error {
allocatorNumItemSlots.Set(float64(state.ItemSlots))

if args.TestHook != nil {
args.TestHook(round, txn.noop)
args.TestHook(round, txn.Revision() == 0)
}
round++
}
}

// Await the next KeySpace change. If we completed a transaction,
// ensure we read through its revision before iterating again.
var next = ks.Header.Revision + 1

if txnResponse != nil && txnResponse.Header.Revision > next {
next = txnResponse.Header.Revision
}
// Await the next known Etcd revision affecting our KeySpace.
if err := ks.WaitForRevision(ctx, next); err != nil {
return err
}
Expand Down Expand Up @@ -244,15 +243,24 @@ func modRevisionUnchanged(kv keyspace.KeyValue) clientv3.Cmp {
// - It allows If and Then to be called multiple times.
// - It removes Else, as incompatible with the checkpoint model. As such,
// a Txn which does not succeed becomes an error.
//
// If Checkpoint() or Flush() return an error, that error is terminal.
// However, a preceding transaction may have been applied.
// The caller must consult Revision() to determine the Etcd revision
// to read-through before proceeding, or if this transaction was a noop then
// Revision() will be zero.
type checkpointTxn interface {
If(...clientv3.Cmp) checkpointTxn
Then(...clientv3.Op) checkpointTxn
Commit() (*clientv3.TxnResponse, error)

// Checkpoint ensures that all If and Then invocations since the last
// Checkpoint are issued in the same underlying Txn. It may partially
// flush the transaction to Etcd.
Checkpoint() error
// Flush a pending checkpoint to Etcd.
Flush() error
// Revision known to this checkpointTxn which should be read through.
Revision() int64
}

// batchedTxn implements the checkpointTxn interface, potentially queuing across
Expand All @@ -270,8 +278,8 @@ type batchedTxn struct {
nextOps []clientv3.Op
// Cmps which should be asserted on every underlying Txn.
fixedCmps []clientv3.Cmp
// Flags whether no operations have committed with this batchedTxn.
noop bool
// Applied revision to be read through.
revision int64
}

// newBatchedTxn returns a batchedTxn using the given Context and KV. It will
Expand All @@ -290,7 +298,7 @@ func newBatchedTxn(ctx context.Context, kv clientv3.KV, fixedCmps ...clientv3.Cm
}
},
fixedCmps: fixedCmps,
noop: true,
revision: 0,
}
}

Expand Down Expand Up @@ -322,7 +330,7 @@ func (b *batchedTxn) Checkpoint() error {
b.nextCmps, b.nextOps = b.nextCmps[:0], b.nextOps[:0]

if lc, lo := len(b.cmps)+len(nc), len(b.ops)+len(no); lc > maxTxnOps || lo > maxTxnOps {
if _, err := b.Commit(); err != nil {
if err := b.Flush(); err != nil {
return err
}
b.cmps = append(b.cmps, b.fixedCmps...)
Expand All @@ -333,30 +341,35 @@ func (b *batchedTxn) Checkpoint() error {
return nil
}

func (b *batchedTxn) Commit() (*clientv3.TxnResponse, error) {
func (b *batchedTxn) Flush() error {
if len(b.nextCmps) != 0 || len(b.nextOps) != 0 {
panic("must call Checkpoint before Commit")
} else if len(b.ops) == 0 {
return nil, nil // No-op.
return nil // No-op.
}

var response, err = b.txnDo(clientv3.OpTxn(b.cmps, b.ops, nil))

if log.GetLevel() >= log.DebugLevel {
b.debugLogTxn(response, err)
}

if err != nil {
return nil, err
return err
} else if !response.Succeeded {
return response, fmt.Errorf("transaction checks did not succeed")
// Don't retain the response revision because it may be outside our
// KeySpace, and we'd block indefinitely attempting to await it.
return fmt.Errorf("transaction checks did not succeed")
} else {
b.noop = false
b.revision = response.Header.Revision
b.cmps, b.ops = b.cmps[:0], b.ops[:0]
return response, nil
return nil
}
}

func (b *batchedTxn) Revision() int64 {
return b.revision
}

func (b *batchedTxn) debugLogTxn(response *clientv3.TxnResponse, err error) {
var dbgCmps, dbgOps []string
for _, c := range b.cmps {
Expand Down
12 changes: 3 additions & 9 deletions allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,7 @@ func (s *AllocatorSuite) TestTxnBatching(c *gc.C) {
))

// Final commit. Expect it flushes the last checkpoint.
var r, err = txn.Commit()
c.Check(r, gc.Equals, txnResp)
c.Check(err, gc.IsNil)
c.Check(txn.Flush(), gc.IsNil)

c.Check(txnOp, gc.DeepEquals, clientv3.OpTxn(
[]clientv3.Cmp{fixedCmp, testCmp},
Expand All @@ -172,17 +170,13 @@ func (s *AllocatorSuite) TestTxnBatching(c *gc.C) {
// Empty Checkpoint, then Commit. Expect it's treated as a no-op.
c.Check(txn.Checkpoint(), gc.IsNil)

r, err = txn.Commit()
c.Check(r, gc.IsNil)
c.Check(err, gc.IsNil)
c.Check(txn.Flush(), gc.IsNil)

// Non-empty commit that fails checks. Expect it's mapped to an error.
c.Check(txn.Then(testOp).Checkpoint(), gc.IsNil)
txnResp.Succeeded = false

r, err = txn.Commit()
c.Check(r, gc.Equals, txnResp)
c.Check(err, gc.ErrorMatches, "transaction checks did not succeed")
c.Check(txn.Flush(), gc.ErrorMatches, "transaction checks did not succeed")
}

var _ = gc.Suite(&AllocatorSuite{})
Expand Down
9 changes: 5 additions & 4 deletions allocator/item_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,10 @@ type mockTxnBuilder struct {
ops []clientv3.Op
}

func (b *mockTxnBuilder) If(c ...clientv3.Cmp) checkpointTxn { b.cmps = append(b.cmps, c...); return b }
func (b *mockTxnBuilder) Then(o ...clientv3.Op) checkpointTxn { b.ops = append(b.ops, o...); return b }
func (b *mockTxnBuilder) Checkpoint() error { return nil }
func (b *mockTxnBuilder) Commit() (*clientv3.TxnResponse, error) { panic("not supported") }
func (b *mockTxnBuilder) If(c ...clientv3.Cmp) checkpointTxn { b.cmps = append(b.cmps, c...); return b }
func (b *mockTxnBuilder) Then(o ...clientv3.Op) checkpointTxn { b.ops = append(b.ops, o...); return b }
func (b *mockTxnBuilder) Checkpoint() error { return nil }
func (b *mockTxnBuilder) Flush() error { panic("not supported") }
func (b *mockTxnBuilder) Revision() int64 { panic("not supported") }

var _ = gc.Suite(&ItemStateSuite{})
10 changes: 4 additions & 6 deletions allocator/scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,8 +890,7 @@ func insert(ctx context.Context, client *clientv3.Client, keyValues ...string) e
return err
}
}
var _, err = txn.Commit()
return err
return txn.Flush()
}

// update updates keys with values, requiring that the key already exist.
Expand All @@ -906,8 +905,7 @@ func update(ctx context.Context, client *clientv3.Client, keyValues ...string) e
return err
}
}
var _, err = txn.Commit()
return err
return txn.Flush()
}

// markAllConsistent which updates all Assignments to have a value of "consistent".
Expand All @@ -928,9 +926,9 @@ func markAllConsistent(ctx context.Context, client *clientv3.Client, ks *keyspac
}
}

if _, err := txn.Commit(); err != nil {
if err := txn.Flush(); err != nil {
return err
} else if txn.noop {
} else if txn.Revision() == 0 {
return io.ErrNoProgress
} else {
return nil
Expand Down
6 changes: 1 addition & 5 deletions broker/append_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,7 @@ func (b *appendFSM) onResolve() {
b.state = stateError
} else if b.resolved.ProcessId != b.resolved.localID {
// If we hold the pipeline from a previous resolution but are no longer
// primary, we must tear it down to release replica spools.
if b.pln != nil {
go b.pln.shutdown(false)
b.pln = nil
}
// primary, we must release it.
b.returnPipeline()
b.state = stateAwaitDesiredReplicas // We must proxy.
} else if b.plnReturnCh != nil {
Expand Down
25 changes: 20 additions & 5 deletions broker/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type resolver struct {
type resolverReplica struct {
*replica
assignments keyspace.KeyValues
primary bool
signalCh chan struct{}
}

Expand Down Expand Up @@ -227,12 +228,22 @@ func (r *resolver) updateResolutions() {
for _, li := range r.state.LocalItems {
var item = li.Item.Decoded.(allocator.Item)
var name = pb.Journal(item.ID)

var primary = li.Assignments[li.Index].Decoded.(allocator.Assignment).Slot == 0
var replica, ok = r.replicas[name]
if !ok {

if _, ok := next[name]; ok {
// TODO(johnny): This SHOULD not happen, but sometimes does.
// If it does, we don't want to create extra replicas that are unlinked.
continue
jgraettinger marked this conversation as resolved.
Show resolved Hide resolved
}

// If the replica is not found, or if it's `primary` but we have been demoted,
// then create a new replica and tear down the old (if there is one).
if !ok || !primary && replica.primary {
r.wg.Add(1)
replica = &resolverReplica{
replica: r.newReplica(name), // Newly assigned journal.
primary: primary,
assignments: li.Assignments.Copy(),
signalCh: make(chan struct{}),
}
Expand All @@ -241,11 +252,12 @@ func (r *resolver) updateResolutions() {
pbx.Init(&rt, li.Assignments)

log.WithFields(log.Fields{
"name": replica.journal,
"route": rt,
"name": replica.journal,
"primary": primary,
}).Info("starting local journal replica")

} else {
replica.primary = primary
delete(r.replicas, name)
}
next[name] = replica
Expand Down Expand Up @@ -277,7 +289,10 @@ func (r *resolver) stopServingLocalReplicas() {

func (r *resolver) cancelReplicas(m map[pb.Journal]*resolverReplica) {
for _, replica := range m {
log.WithField("name", replica.journal).Info("stopping local journal replica")
log.WithFields(log.Fields{
"name": replica.journal,
"primary": replica.primary,
}).Info("stopping local journal replica")

// Close |signalCh| to unblock any Replicate or Append RPCs which would
// otherwise race shutDownReplica() to the |spoolCh| or |pipelineCh|.
Expand Down
23 changes: 22 additions & 1 deletion broker/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func TestResolverLocalReplicaStopping(t *testing.T) {
var peer = newMockBroker(t, etcd, pb.ProcessSpec_ID{Zone: "peer", Suffix: "broker"})

setTestJournal(broker, pb.JournalSpec{Name: "a/journal", Replication: 1}, broker.id)
setTestJournal(broker, pb.JournalSpec{Name: "peer/journal", Replication: 1}, peer.id)

// Precondition: journal & replica resolve as per expectation.
var r, _ = broker.svc.resolver.resolve(ctx, allClaims, "a/journal", resolveOpts{})
Expand All @@ -167,6 +166,28 @@ func TestResolverLocalReplicaStopping(t *testing.T) {
require.NotNil(t, r.replica)
require.NoError(t, r.replica.ctx.Err())

// Initially, we're primary for `peer/journal`.
setTestJournal(broker, pb.JournalSpec{Name: "peer/journal", Replication: 1}, broker.id)

r, _ = broker.svc.resolver.resolve(ctx, allClaims, "peer/journal", resolveOpts{})
require.Equal(t, pb.Status_OK, r.status)
require.Equal(t, broker.id, r.Header.ProcessId)
require.NotNil(t, r.replica)

var prevReplica = r.replica

// We're demoted to replica for `peer/journal`.
setTestJournal(broker, pb.JournalSpec{Name: "peer/journal", Replication: 1}, peer.id, broker.id)

r, _ = broker.svc.resolver.resolve(ctx, allClaims, "peer/journal", resolveOpts{})
require.Equal(t, pb.Status_OK, r.status)
require.Equal(t, broker.id, r.Header.ProcessId)
require.NotNil(t, r.replica)
require.False(t, prevReplica == r.replica) // Expect a new replica was created.

// Peer is wholly responsible for `peer/journal`.
setTestJournal(broker, pb.JournalSpec{Name: "peer/journal", Replication: 1}, peer.id)

broker.svc.resolver.stopServingLocalReplicas()

// Expect a route invalidation occurred immediately, to wake any awaiting RPCs.
Expand Down