diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 64075cfbf93..45695c54ec8 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -918,8 +918,20 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl addTestRows() timeout := lagDuration * 2 // 6s // Use the default max-replication-lag-allowed value of 30s. - out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, - "SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String()) + // We run the command in a goroutine so that we can unblock things + // after the timeout is reached -- as the vplayer query is blocking + // on the table lock. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, + "SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String()) + }() + time.Sleep(timeout) + // Now we can unblock things and let it continue. + unlockTargetTable() + wg.Wait() // It should fail due to the command context timeout and we should // successfully cancel. require.Error(t, err) @@ -927,7 +939,6 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl require.NotContains(t, out, "cancel migration failed") // Confirm that queries still work fine. execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") - unlockTargetTable() deleteTestRows() waitForTargetToCatchup() }) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 42f6a114b13..1e86761c71b 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2951,7 +2951,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } if cerr := sw.cancelMigration(ctx, sm); cerr != nil { - err = vterrors.Wrap(err, fmt.Sprintf("(%v)", cerr)) + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) } return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err) } @@ -2963,7 +2963,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit for cnt := 1; cnt <= lockTablesCycles; cnt++ { if err := ts.executeLockTablesOnSource(ctx); err != nil { if cerr := sw.cancelMigration(ctx, sm); cerr != nil { - err = vterrors.Wrap(err, fmt.Sprintf("(%v)", cerr)) + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) } return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err) } diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go index a700a1338dd..47d5a48c4f9 100644 --- a/go/vt/vtctl/workflow/stream_migrator.go +++ b/go/vt/vtctl/workflow/stream_migrator.go @@ -203,12 +203,15 @@ func (sm *StreamMigrator) Templates() []*VReplicationStream { } // CancelStreamMigrations cancels the stream migrations. -func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) { +func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error { if sm.streams == nil { - return + return nil } + errs := &concurrency.AllErrorRecorder{} - _ = sm.deleteTargetStreams(ctx) + if err := sm.deleteTargetStreams(ctx); err != nil { + errs.RecordError(err) + } // Restart the source streams, but leave the Reshard workflow's reverse // variant stopped. @@ -221,8 +224,13 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) { return err }) if err != nil { + errs.RecordError(err) sm.logger.Errorf("Cancel stream migrations failed: could not restart source streams: %v", err) } + if errs.HasErrors() { + return errs.AggrError(vterrors.Aggregate) + } + return nil } // MigrateStreams migrates N streams diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 2a5e982270e..c8bf6c8eb68 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -754,7 +754,7 @@ func (ts *trafficSwitcher) changeShardsAccess(ctx context.Context, keyspace stri func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error { if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { - return ts.switchDeniedTables(ctx) + return ts.switchDeniedTables(ctx, false) } return ts.changeShardsAccess(ctx, ts.TargetKeyspaceName(), ts.TargetShards(), allowWrites) } @@ -1062,7 +1062,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { var err error if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { - err = ts.switchDeniedTables(ctx) + err = ts.switchDeniedTables(ctx, false) } else { err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites) } @@ -1075,16 +1075,25 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { // switchDeniedTables switches the denied tables rules for the traffic switch. // They are removed on the source side and added on the target side. -func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error { +// If backward is true, then we swap this logic, removing on the target side +// and adding on the source side. You would want to do that e.g. when canceling +// a failed (and currently partial) traffic switch as the source and target +// have already been switched in the trafficSwitcher. +func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context, backward bool) error { if ts.MigrationType() != binlogdatapb.MigrationType_TABLES { return nil } + rmsource, rmtarget := false, true + if backward { + rmsource, rmtarget = true, false + } + egrp, ectx := errgroup.WithContext(ctx) egrp.Go(func() error { return ts.ForAllSources(func(source *MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables()) + return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, rmsource, ts.Tables()) }); err != nil { return err } @@ -1107,7 +1116,7 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error { egrp.Go(func() error { return ts.ForAllTargets(func(target *MigrationTarget) error { if _, err := ts.TopoServer().UpdateShardFields(ectx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, rmtarget, ts.Tables()) }); err != nil { return err } @@ -1153,12 +1162,12 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat // canceled by the parent context. wcCtx := context.WithoutCancel(ctx) // Now we create a child context from that which has a timeout. - cmTimeout := 60 * time.Second + cmTimeout := 2 * time.Minute cmCtx, cmCancel := context.WithTimeout(wcCtx, cmTimeout) defer cmCancel() if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { - err = ts.switchDeniedTables(cmCtx) + err = ts.switchDeniedTables(cmCtx, true /* revert */) } else { err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } @@ -1167,7 +1176,10 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err) } - sm.CancelStreamMigrations(cmCtx) + if err := sm.CancelStreamMigrations(cmCtx); err != nil { + cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err)) + ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err) + } err = ts.ForAllTargets(func(target *MigrationTarget) error { query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", @@ -1180,8 +1192,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) } - err = ts.deleteReverseVReplication(cmCtx) - if err != nil { + if err := ts.deleteReverseVReplication(cmCtx); err != nil { cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err)) ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err) }