Skip to content

Commit

Permalink
Fix another bug in cancel: reverting the denied tables list
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 23, 2025
1 parent 45adff7 commit 7019af3
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 18 deletions.
17 changes: 14 additions & 3 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,16 +918,27 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
addTestRows()
timeout := lagDuration * 2 // 6s
// Use the default max-replication-lag-allowed value of 30s.
out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String())
// We run the command in a goroutine so that we can unblock things
// after the timeout is reached -- as the vplayer query is blocking
// on the table lock.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String())
}()
time.Sleep(timeout)
// Now we can unblock things and let it continue.
unlockTargetTable()
wg.Wait()
// It should fail due to the command context timeout and we should
// successfully cancel.
require.Error(t, err)
require.Contains(t, out, "failed to sync up replication between the source and target")
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
unlockTargetTable()
deleteTestRows()
waitForTargetToCatchup()
})
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2951,7 +2951,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
}
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Wrap(err, fmt.Sprintf("(%v)", cerr))
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err)
}
Expand All @@ -2963,7 +2963,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
for cnt := 1; cnt <= lockTablesCycles; cnt++ {
if err := ts.executeLockTablesOnSource(ctx); err != nil {
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Wrap(err, fmt.Sprintf("(%v)", cerr))
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err)
}
Expand Down
14 changes: 11 additions & 3 deletions go/vt/vtctl/workflow/stream_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,15 @@ func (sm *StreamMigrator) Templates() []*VReplicationStream {
}

// CancelStreamMigrations cancels the stream migrations.
func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) {
func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error {
if sm.streams == nil {
return
return nil
}
errs := &concurrency.AllErrorRecorder{}

_ = sm.deleteTargetStreams(ctx)
if err := sm.deleteTargetStreams(ctx); err != nil {
errs.RecordError(err)
}

// Restart the source streams, but leave the Reshard workflow's reverse
// variant stopped.
Expand All @@ -221,8 +224,13 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) {
return err
})
if err != nil {
errs.RecordError(err)
sm.logger.Errorf("Cancel stream migrations failed: could not restart source streams: %v", err)
}
if errs.HasErrors() {
return errs.AggrError(vterrors.Aggregate)
}
return nil
}

// MigrateStreams migrates N streams
Expand Down
31 changes: 21 additions & 10 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func (ts *trafficSwitcher) changeShardsAccess(ctx context.Context, keyspace stri

func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error {
if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
return ts.switchDeniedTables(ctx)
return ts.switchDeniedTables(ctx, false)
}
return ts.changeShardsAccess(ctx, ts.TargetKeyspaceName(), ts.TargetShards(), allowWrites)
}
Expand Down Expand Up @@ -1062,7 +1062,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati
func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error {
var err error
if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
err = ts.switchDeniedTables(ctx)
err = ts.switchDeniedTables(ctx, false)
} else {
err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites)
}
Expand All @@ -1075,16 +1075,25 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error {

// switchDeniedTables switches the denied tables rules for the traffic switch.
// They are removed on the source side and added on the target side.
func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error {
// If backward is true, then we swap this logic, removing on the target side
// and adding on the source side. You would want to do that e.g. when canceling
// a failed (and currently partial) traffic switch as the source and target
// have already been switched in the trafficSwitcher.
func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context, backward bool) error {
if ts.MigrationType() != binlogdatapb.MigrationType_TABLES {
return nil
}

rmsource, rmtarget := false, true
if backward {
rmsource, rmtarget = true, false
}

egrp, ectx := errgroup.WithContext(ctx)
egrp.Go(func() error {
return ts.ForAllSources(func(source *MigrationSource) error {
if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error {
return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables())
return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, rmsource, ts.Tables())
}); err != nil {
return err
}
Expand All @@ -1107,7 +1116,7 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error {
egrp.Go(func() error {
return ts.ForAllTargets(func(target *MigrationTarget) error {
if _, err := ts.TopoServer().UpdateShardFields(ectx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error {
return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables())
return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, rmtarget, ts.Tables())
}); err != nil {
return err
}
Expand Down Expand Up @@ -1153,12 +1162,12 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
// canceled by the parent context.
wcCtx := context.WithoutCancel(ctx)
// Now we create a child context from that which has a timeout.
cmTimeout := 60 * time.Second
cmTimeout := 2 * time.Minute
cmCtx, cmCancel := context.WithTimeout(wcCtx, cmTimeout)
defer cmCancel()

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
err = ts.switchDeniedTables(cmCtx)
err = ts.switchDeniedTables(cmCtx, true /* revert */)
} else {
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
Expand All @@ -1167,7 +1176,10 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
}

sm.CancelStreamMigrations(cmCtx)
if err := sm.CancelStreamMigrations(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err)
}

err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
Expand All @@ -1180,8 +1192,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
}

err = ts.deleteReverseVReplication(cmCtx)
if err != nil {
if err := ts.deleteReverseVReplication(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
}
Expand Down

0 comments on commit 7019af3

Please sign in to comment.