diff --git a/go/vt/vttablet/tabletmanager/shard_sync.go b/go/vt/vttablet/tabletmanager/shard_sync.go index ab995ec14b1..4afbaa1a639 100644 --- a/go/vt/vttablet/tabletmanager/shard_sync.go +++ b/go/vt/vttablet/tabletmanager/shard_sync.go @@ -85,10 +85,15 @@ func (tm *TabletManager) shardSyncLoop(ctx context.Context, notifyChan <-chan st // We don't use the watch event except to know that we should // re-read the shard record, and to know if the watch dies. log.Info("Change in shard record") - if event.Err != nil { - // The watch failed. Stop it so we start a new one if needed. - log.Errorf("Shard watch failed: %v", event.Err) - shardWatch.stop() + + if event != nil { + if event.Err != nil { + // The watch failed. Stop it so we start a new one if needed. + log.Errorf("Shard watch failed: %v", event.Err) + shardWatch.stop() + } + } else { + log.Infof("Got a nil event from the shard watcher for %s. This should not happen.", tm.tabletAlias) } case <-ctx.Done(): // Our context was cancelled. Terminate the loop. diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index 20c1501989e..7b5e1311066 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -83,7 +83,7 @@ type controller struct { TableDiffPhaseTimings *stats.Timings } -func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFactory func() binlogplayer.DBClient, +func newController(row sqltypes.RowNamedValues, dbClientFactory func() binlogplayer.DBClient, ts *topo.Server, vde *Engine, options *tabletmanagerdata.VDiffOptions) (*controller, error) { log.Infof("VDiff controller initializing for %+v", row) @@ -104,9 +104,6 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"), TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"), } - ctx, ct.cancel = context.WithCancel(ctx) - go ct.run(ctx) - return ct, nil } diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine.go b/go/vt/vttablet/tabletmanager/vdiff/engine.go index b2285a070fa..cd3771accfd 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine.go @@ -146,6 +146,8 @@ func (vde *Engine) openLocked(ctx context.Context) error { vde.resetControllers() } + globalStats.initControllerStats() + // At this point the tablet has no controllers running. So // we want to start any VDiffs that have not been explicitly // stopped or otherwise finished. @@ -153,12 +155,12 @@ func (vde *Engine) openLocked(ctx context.Context) error { if err != nil { return err } + vde.ctx, vde.cancel = context.WithCancel(ctx) vde.isOpen = true // now we are open and have things to close if err := vde.initControllers(rows); err != nil { return err } - vde.updateStats() // At this point we've fully and successfully opened so begin // retrying error'd VDiffs until the engine is closed. @@ -212,7 +214,7 @@ func (vde *Engine) retry(ctx context.Context, err error) { // addController creates a new controller using the given vdiff record and adds it to the engine. // You must already have the main engine mutex (mu) locked before calling this. func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletmanagerdata.VDiffOptions) error { - ct, err := newController(vde.ctx, row, vde.dbClientFactoryDba, vde.ts, vde, options) + ct, err := newController(row, vde.dbClientFactoryDba, vde.ts, vde, options) if err != nil { return fmt.Errorf("controller could not be initialized for stream %+v on tablet %v", row, vde.thisTablet.Alias) @@ -221,6 +223,10 @@ func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletman globalStats.mu.Lock() defer globalStats.mu.Unlock() globalStats.controllers[ct.id] = ct + + controllerCtx, cancel := context.WithCancel(vde.ctx) + ct.cancel = cancel + go ct.run(controllerCtx) return nil } @@ -395,16 +401,4 @@ func (vde *Engine) resetControllers() { ct.Stop() } vde.controllers = make(map[int64]*controller) - vde.updateStats() -} - -// updateStats must only be called while holding the engine lock. -func (vre *Engine) updateStats() { - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - - globalStats.controllers = make(map[int64]*controller, len(vre.controllers)) - for id, ct := range vre.controllers { - globalStats.controllers[id] = ct - } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index 43aa76894d4..f6ca201e042 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -651,3 +651,39 @@ func (tvde *testVDiffEnv) addTablet(id int, keyspace, shard string, tabletType t tstenv.SchemaEngine.Reload(context.Background()) return tvde.tablets[id] } +<<<<<<< HEAD +======= + +func (tvde *testVDiffEnv) createController(t *testing.T, id int) *controller { + controllerQR := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + vdiffTestCols, + vdiffTestColTypes, + ), + fmt.Sprintf("%d|%s|%s|%s|%s|%s|%s|%s|", id, uuid.New(), tvde.workflow, tstenv.KeyspaceName, tstenv.ShardName, vdiffDBName, PendingState, optionsJS), + ) + tvde.dbClient.ExpectRequest(fmt.Sprintf("select * from _vt.vdiff where id = %d", id), noResults, nil) + ct := tvde.newController(t, controllerQR) + ct.sources = map[string]*migrationSource{ + tstenv.ShardName: { + vrID: 1, + shardStreamer: &shardStreamer{ + tablet: tvde.vde.thisTablet, + shard: tstenv.ShardName, + }, + }, + } + ct.sourceKeyspace = tstenv.KeyspaceName + + return ct +} + +func (tvde *testVDiffEnv) newController(t *testing.T, controllerQR *sqltypes.Result) *controller { + ctx := context.Background() + ct, err := newController(controllerQR.Named().Row(), tvde.dbClientFactory, tstenv.TopoServ, tvde.vde, tvde.opts) + require.NoError(t, err) + ctx2, cancel := context.WithCancel(ctx) + ct.cancel = cancel + go ct.run(ctx2) + return ct +} +>>>>>>> aebc4b82f9 (VDiff: fix race when a vdiff resumes on vttablet restart (#17638)) diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index 04cda6ac0c1..ae59884e6c2 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -44,11 +44,18 @@ type vdiffStats struct { RowsDiffedCount *stats.Counter } +func (vds *vdiffStats) initControllerStats() { + vds.mu.Lock() + defer vds.mu.Unlock() + vds.controllers = make(map[int64]*controller) +} + func (vds *vdiffStats) register() { globalStats.Count = stats.NewGauge("", "") globalStats.ErrorCount = stats.NewCounter("", "") globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table") globalStats.RowsDiffedCount = stats.NewCounter("", "") + globalStats.initControllerStats() stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", vds.numControllers) diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go index d4f9ddb001d..a57bd621f79 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go @@ -17,7 +17,6 @@ limitations under the License. package vdiff import ( - "context" "fmt" "strings" "testing" @@ -49,8 +48,22 @@ func TestBuildPlanSuccess(t *testing.T) { ) vdiffenv.dbClient.ExpectRequest("select * from _vt.vdiff where id = 1", noResults, nil) +<<<<<<< HEAD ct, err := newController(context.Background(), controllerQR.Named().Row(), vdiffenv.dbClientFactory, tstenv.TopoServ, vdiffenv.vde, vdiffenv.opts) require.NoError(t, err) +======= + ct := vdenv.newController(t, controllerQR) + ct.sources = map[string]*migrationSource{ + tstenv.ShardName: { + vrID: 1, + shardStreamer: &shardStreamer{ + tablet: vdenv.vde.thisTablet, + shard: tstenv.ShardName, + }, + }, + } + ct.sourceKeyspace = tstenv.KeyspaceName +>>>>>>> aebc4b82f9 (VDiff: fix race when a vdiff resumes on vttablet restart (#17638)) testcases := []struct { input *binlogdatapb.Rule @@ -674,9 +687,7 @@ func TestBuildPlanFailure(t *testing.T) { fmt.Sprintf("1|%s|%s|%s|%s|%s|%s|%s|", UUID, vdiffenv.workflow, tstenv.KeyspaceName, tstenv.ShardName, vdiffDBName, PendingState, optionsJS), ) vdiffenv.dbClient.ExpectRequest("select * from _vt.vdiff where id = 1", noResults, nil) - ct, err := newController(context.Background(), controllerQR.Named().Row(), vdiffenv.dbClientFactory, tstenv.TopoServ, vdiffenv.vde, vdiffenv.opts) - require.NoError(t, err) - + ct := vdenv.newController(t, controllerQR) testcases := []struct { input *binlogdatapb.Rule err string