Skip to content

Commit

Permalink
Cherry-pick 39a0ddd with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] authored and vitess-bot committed Jan 28, 2025
1 parent b71e9ac commit c768679
Show file tree
Hide file tree
Showing 9 changed files with 992 additions and 34 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName
tablets := make(map[string]*cluster.VttabletProcess)
for _, shard := range keyspace.Shards {
for _, tablet := range shard.Tablets {
if tablet.Vttablet.GetTabletStatus() == "SERVING" {
if tablet.Vttablet.GetTabletStatus() == "SERVING" && (tabletType == "" || strings.EqualFold(tablet.Vttablet.GetTabletType(), tabletType)) {
log.Infof("Serving status of tablet %s is %s, %s", tablet.Name, tablet.Vttablet.ServingStatus, tablet.Vttablet.GetTabletStatus())
tablets[tablet.Name] = tablet.Vttablet
}
Expand Down
211 changes: 211 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,19 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
commit, _ = vc.startQuery(t, openTxQuery)
}
switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
<<<<<<< HEAD
=======
shardNames := make([]string, 0, len(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards))
for shardName := range maps.Keys(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards) {
shardNames = append(shardNames, shardName)
}
testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)

testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2},
workflow, workflowType)

// Now let's confirm that it works as expected with an error.
>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
switchWrites(t, workflowType, ksWorkflow, false)

checkThatVDiffFails(t, targetKs, workflow)
Expand Down Expand Up @@ -1028,6 +1041,7 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
require.NoError(t, vc.AddShards(t, cells, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase, targetKsOpts))

tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "primary")
var sourceTablets, targetTablets []*cluster.VttabletProcess

// Test multi-primary setups, like a Galera cluster, which have auto increment steps > 1.
for _, tablet := range tablets {
Expand All @@ -1040,9 +1054,11 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
targetShards = "," + targetShards + ","
for _, tab := range tablets {
if strings.Contains(targetShards, ","+tab.Shard+",") {
targetTablets = append(targetTablets, tab)
log.Infof("Waiting for vrepl to catch up on %s since it IS a target shard", tab.Shard)
catchup(t, tab, workflow, "Reshard")
} else {
sourceTablets = append(sourceTablets, tab)
log.Infof("Not waiting for vrepl to catch up on %s since it is NOT a target shard", tab.Shard)
continue
}
Expand All @@ -1056,6 +1072,10 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
if dryRunResultSwitchWrites != nil {
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary", "--dry-run")
}
if tableName == "customer" {
testSwitchWritesErrorHandling(t, sourceTablets, targetTablets, workflow, "reshard")
}
// Now let's confirm that it works as expected with an error.
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary")
reshardAction(t, "Complete", workflow, ksName, "", "", "", "")
for tabletName, count := range counts {
Expand Down Expand Up @@ -1605,6 +1625,197 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
validateDryRunResults(t, output, dryRunResults)
}

<<<<<<< HEAD
=======
// testSwitchTrafficPermissionsChecks confirms that for the SwitchTraffic command, the
// necessary permissions are checked properly on the source keyspace's primary tablets.
// This ensures that we can create and manage the reverse vreplication workflow.
func testSwitchTrafficPermissionChecks(t *testing.T, workflowType, sourceKeyspace string, sourceShards []string, targetKeyspace, workflow string) {
applyPrivileges := func(query string) {
for _, shard := range sourceShards {
primary := vc.getPrimaryTablet(t, sourceKeyspace, shard)
_, err := primary.QueryTablet(query, primary.Keyspace, false)
require.NoError(t, err)
}
}
runDryRunCmd := func(expectErr bool) {
_, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKeyspace,
"SwitchTraffic", "--tablet-types=primary", "--dry-run")
require.True(t, ((err != nil) == expectErr), "expected error: %t, got: %v", expectErr, err)
}

defer func() {
// Put the default global privs back in place.
applyPrivileges("grant select,insert,update,delete on *.* to vt_filtered@localhost")
}()

t.Run("test switch traffic permission checks", func(t *testing.T) {
t.Run("test without global privileges", func(t *testing.T) {
applyPrivileges("revoke select,insert,update,delete on *.* from vt_filtered@localhost")
runDryRunCmd(true)
})

t.Run("test with db level privileges", func(t *testing.T) {
applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.* to vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(false)
})

t.Run("test without global or db level privileges", func(t *testing.T) {
applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.* from vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(true)
})

t.Run("test with table level privileges", func(t *testing.T) {
applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.vreplication to vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(false)
})

t.Run("test without global, db, or table level privileges", func(t *testing.T) {
applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.vreplication from vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(true)
})
})
}

// testSwitchWritesErrorHandling confirms that switching writes works as expected
// in the face of vreplication lag (canSwitch() precheck) and when canceling the
// switch due to replication failing to catch up in time.
// The workflow MUST be migrating the customer table from the source to the
// target keyspace AND the workflow must currently have reads switched but not
// writes.
func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []*cluster.VttabletProcess, workflow, workflowType string) {
t.Run("validate switch writes error handling", func(t *testing.T) {
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
require.NotZero(t, len(sourceTablets), "no source tablets provided")
require.NotZero(t, len(targetTablets), "no target tablets provided")
sourceKs := sourceTablets[0].Keyspace
targetKs := targetTablets[0].Keyspace
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
var err error
sourceConns := make([]*mysql.Conn, len(sourceTablets))
for i, tablet := range sourceTablets {
sourceConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
require.NoError(t, err)
defer sourceConns[i].Close()
}
targetConns := make([]*mysql.Conn, len(targetTablets))
for i, tablet := range targetTablets {
targetConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
require.NoError(t, err)
defer targetConns[i].Close()
}
startingTestRowID := 10000000
numTestRows := 100
addTestRows := func() {
for i := 0; i < numTestRows; i++ {
execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')",
startingTestRowID+i))
}
}
deleteTestRows := func() {
execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID))
}
addIndex := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "set session sql_mode=''")
execQuery(t, targetConn, "alter table customer add unique index name_idx (name)")
}
}
dropIndex := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "alter table customer drop index name_idx")
}
}
lockTargetTable := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "lock table customer read")
}
}
unlockTargetTable := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "unlock tables")
}
}
cleanupTestData := func() {
dropIndex()
deleteTestRows()
}
restartWorkflow := func() {
err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow)
require.NoError(t, err, "failed to start workflow: %v", err)
}
waitForTargetToCatchup := func() {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
waitForNoWorkflowLag(t, vc, targetKs, workflow)
}

// First let's test that the prechecks work as expected. We ALTER
// the table on the target shards to add a unique index on the name
// field.
addIndex()
// Then we replicate some test rows across the target shards by
// inserting them in the source keyspace.
addTestRows()
// Now the workflow should go into the error state and the lag should
// start to climb. So we sleep for twice the max lag duration that we
// will set for the SwitchTraffic call.
lagDuration := 3 * time.Second
time.Sleep(lagDuration * 3)
out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String())
// It should fail in the canSwitch() precheck.
require.Error(t, err)
require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*",
workflow, lagDuration.String()), out)
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
cleanupTestData()
// We have to restart the workflow again as the duplicate key error
// is a permanent/terminal one.
restartWorkflow()
waitForTargetToCatchup()

// Now let's test that the cancel works by setting the command timeout
// to a fraction (6s) of the default max repl lag duration (30s). First
// we lock the customer table on the target tablets so that we cannot
// apply the INSERTs and catch up.
lockTargetTable()
addTestRows()
timeout := lagDuration * 2 // 6s
// Use the default max-replication-lag-allowed value of 30s.
// We run the command in a goroutine so that we can unblock things
// after the timeout is reached -- as the vplayer query is blocking
// on the table lock in the MySQL layer.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String())
}()
time.Sleep(timeout)
// Now we can unblock things and let it continue.
unlockTargetTable()
wg.Wait()
// It should fail due to the command context timeout and we should
// successfully cancel.
require.Error(t, err)
require.Contains(t, out, "failed to sync up replication between the source and target")
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
deleteTestRows()
waitForTargetToCatchup()
})
}

>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))
// restartWorkflow confirms that a workflow can be successfully
// stopped and started.
func restartWorkflow(t *testing.T, ksWorkflow string) {
Expand Down
48 changes: 35 additions & 13 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3356,8 +3356,10 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
return handleError("failed to migrate the workflow streams", err)
}
if cancel {
sw.cancelMigration(ctx, sm)
return 0, sw.logs(), nil
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return 0, sw.logs(), err
}

// We stop writes on the source before stopping the source streams so that the catchup time
Expand All @@ -3369,7 +3371,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
// we actually stop them.
ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
}

Expand All @@ -3387,7 +3391,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource)
}
}
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err)
}

Expand All @@ -3397,7 +3403,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
// the tablet's deny list check and the first mysqld side table lock.
for cnt := 1; cnt <= lockTablesCycles; cnt++ {
if err := ts.executeLockTablesOnSource(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err)
}
// No need to UNLOCK the tables as the connection was closed once the locks were acquired
Expand All @@ -3407,26 +3415,39 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}

ts.Logger().Infof("Waiting for streams to catchup")
<<<<<<< HEAD

Check failure on line 3418 in go/vt/vtctl/workflow/server.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected <<, expected }

Check failure on line 3418 in go/vt/vtctl/workflow/server.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected <<, expected }
if err := sw.waitForCatchup(ctx, timeout); err != nil {
sw.cancelMigration(ctx, sm)
=======

Check failure on line 3421 in go/vt/vtctl/workflow/server.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected ==, expected }

Check failure on line 3421 in go/vt/vtctl/workflow/server.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected ==, expected }
if err := sw.waitForCatchup(ctx, waitTimeout); err != nil {
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616))

Check failure on line 3426 in go/vt/vtctl/workflow/server.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected >>, expected }

Check failure on line 3426 in go/vt/vtctl/workflow/server.go

View workflow job for this annotation

GitHub Actions / Code Coverage

invalid character U+0023 '#'

Check failure on line 3426 in go/vt/vtctl/workflow/server.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected >>, expected }

Check failure on line 3426 in go/vt/vtctl/workflow/server.go

View workflow job for this annotation

GitHub Actions / Code Coverage

invalid character U+0023 '#'
return handleError("failed to sync up replication between the source and target", err)
}

ts.Logger().Infof("Migrating streams")

Check failure on line 3430 in go/vt/vtctl/workflow/server.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: non-declaration statement outside function body

Check failure on line 3430 in go/vt/vtctl/workflow/server.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: non-declaration statement outside function body
if err := sw.migrateStreams(ctx, sm); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to migrate the workflow streams", err)
}

ts.Logger().Infof("Resetting sequences")
if err := sw.resetSequences(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to reset the sequences", err)
}

ts.Logger().Infof("Creating reverse streams")
if err := sw.createReverseVReplication(ctx); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError("failed to create the reverse vreplication streams", err)
}

Expand All @@ -3439,7 +3460,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2)
defer cancel()
if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil {
sw.cancelMigration(ctx, sm)
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
}
return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}
Expand Down Expand Up @@ -3492,15 +3515,14 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *Stat
if err != nil {
return "", err
}
if wf.MaxVReplicationTransactionLag > maxAllowedReplLagSecs {
return fmt.Sprintf(cannotSwitchHighLag, wf.MaxVReplicationTransactionLag, maxAllowedReplLagSecs), nil
}
for _, stream := range wf.ShardStreams {
for _, st := range stream.GetStreams() {
if st.Message == Frozen {
return cannotSwitchFrozen, nil
}
// If no new events have been replicated after the copy phase then it will be 0.
if vreplLag := time.Now().Unix() - st.TimeUpdated.Seconds; vreplLag > maxAllowedReplLagSecs {
return fmt.Sprintf(cannotSwitchHighLag, vreplLag, maxAllowedReplLagSecs), nil
}
switch st.State {
case binlogdatapb.VReplicationWorkflowState_Copying.String():
return cannotSwitchCopyIncomplete, nil
Expand Down
Loading

0 comments on commit c768679

Please sign in to comment.