Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VDiff: Address SwitchWrites bugs around replication lag and cancel on error #17613

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,35 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
shardNames = append(shardNames, shardName)
}
testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)
switchWrites(t, workflowType, ksWorkflow, false)

// Confirm that switching writes works as expected in the face of
// vreplication lag and cancelling the switch.
t.Run("validate switch writes", func(t *testing.T) {
// First let's test that the pre-checks work as expected.
// Lock the customer table so that we can't replicate the coming
// sentinal row.
productConn, err := productTab.TabletConn("product", true)
require.NoError(t, err)
defer productConn.Close()
customerConn, err := customerTab1.TabletConn("customer", true)
require.NoError(t, err)
execQuery(t, customerConn, "lock tables customer read")
// Insert that sentinal row on the source.
execQuery(t, productConn, "insert into customer(cid, name) values(9999999, 'laggingCustomer')")
// Sleep twice as long as we will set the max replication lag to.
lagDuration := 3 * time.Second
time.Sleep(lagDuration * 2)
_, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--tablet-types=primary", "--workflow", workflow,
"--target-keyspace", targetKs, "SwitchWrites", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String())
require.Error(t, err)
t.Logf("expected error: %v", err)
execQuery(t, customerConn, "unlock tables")

// Now confirm that it works as expected.
catchup(t, customerTab1, workflow, workflowType)
catchup(t, customerTab2, workflow, workflowType)
switchWrites(t, workflowType, ksWorkflow, false)
})

checkThatVDiffFails(t, targetKs, workflow)

Expand Down Expand Up @@ -920,14 +948,14 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl

execVtgateQuery(t, vtgateConn, "customer", "delete from customer where name like 'tempCustomer%'")
waitForRowCountInTablet(t, customerTab1, "customer", "customer", 1)
waitForRowCountInTablet(t, customerTab2, "customer", "customer", 2)
waitForRowCount(t, vtgateConn, "customer", "customer.customer", 3)
waitForRowCountInTablet(t, customerTab2, "customer", "customer", 3)
waitForRowCount(t, vtgateConn, "customer", "customer.customer", 4)

query = "insert into customer (name, cid) values('george', 5)"
execVtgateQuery(t, vtgateConn, "customer", query)
waitForRowCountInTablet(t, customerTab1, "customer", "customer", 1)
waitForRowCountInTablet(t, customerTab2, "customer", "customer", 3)
waitForRowCount(t, vtgateConn, "customer", "customer.customer", 4)
waitForRowCountInTablet(t, customerTab2, "customer", "customer", 4)
waitForRowCount(t, vtgateConn, "customer", "customer.customer", 5)
}
})
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3075,7 +3075,8 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, maxAllowedR
return cannotSwitchFrozen, nil
}
// If no new events have been replicated after the copy phase then it will be 0.
if vreplLag := time.Now().Unix() - st.TimeUpdated.Seconds; vreplLag > maxAllowedReplLagSecs {
vreplLag := int64(getVReplicationTrxLag(st.TransactionTimestamp, st.TimeUpdated, binlogdatapb.VReplicationWorkflowState(binlogdatapb.VReplicationWorkflowState_value[st.State])))
if vreplLag > maxAllowedReplLagSecs {
return fmt.Sprintf(cannotSwitchHighLag, vreplLag, maxAllowedReplLagSecs), nil
}
switch st.State {
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,8 +1148,11 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat

// We create a new context while canceling the migration, so that we are independent of the original
// context being cancelled prior to or during the cancel operation.
// Create a child context that cannot be canceled by the parent, so that we maintain the locks.
cctx := context.WithoutCancel(ctx)
// Now create a child context from that which has a timeout.
cmTimeout := 60 * time.Second
cmCtx, cmCancel := context.WithTimeout(context.Background(), cmTimeout)
cmCtx, cmCancel := context.WithTimeout(cctx, cmTimeout)
defer cmCancel()

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
Expand Down
63 changes: 34 additions & 29 deletions go/vt/vtctl/workflow/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,35 +446,11 @@ func (wf *workflowFetcher) scanWorkflow(
workflow.WorkflowSubType = res.WorkflowSubType.String()
workflow.DeferSecondaryKeys = res.DeferSecondaryKeys

// MaxVReplicationTransactionLag estimates the actual statement processing lag
// between the source and the target. If we are still processing source events it
// is the difference b/w current time and the timestamp of the last event. If
// heartbeats are more recent than the last event, then the lag is the time since
// the last heartbeat as there can be an actual event immediately after the
// heartbeat, but which has not yet been processed on the target.
// We don't allow switching during the copy phase, so in that case we just return
// a large lag. All timestamps are in seconds since epoch.
if rstream.TransactionTimestamp == nil {
rstream.TransactionTimestamp = &vttimepb.Time{}
}
lastTransactionTime := rstream.TransactionTimestamp.Seconds
if rstream.TimeHeartbeat == nil {
rstream.TimeHeartbeat = &vttimepb.Time{}
}
lastHeartbeatTime := rstream.TimeHeartbeat.Seconds
if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() {
meta.maxVReplicationTransactionLag = math.MaxInt64
} else {
if lastTransactionTime == 0 /* no new events after copy */ ||
lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ {

lastTransactionTime = lastHeartbeatTime
}
now := time.Now().Unix() /* seconds since epoch */
transactionReplicationLag := float64(now - lastTransactionTime)
if transactionReplicationLag > meta.maxVReplicationTransactionLag {
meta.maxVReplicationTransactionLag = transactionReplicationLag
}
// MaxVReplicationTransactionLag estimates the max statement processing lag
// between the source and the target across all of the workflow streams.
transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.State)
if transactionReplicationLag > meta.maxVReplicationTransactionLag {
meta.maxVReplicationTransactionLag = transactionReplicationLag
}
}

Expand Down Expand Up @@ -670,3 +646,32 @@ func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerd
}
return rstream.State.String()
}

// getVReplicationTrxLag estimates the actual statement processing lag between the
// source and the target. If we are still processing source events it is the
// difference between current time and the timestamp of the last event. If
// heartbeats are more recent than the last event, then the lag is the time since
// the last heartbeat as there can be an actual event immediately after the
// heartbeat, but which has not yet been processed on the target. We don't allow
// switching during the copy phase, so in that case we just return a large lag.
// All timestamps are in seconds since epoch.
func getVReplicationTrxLag(trxTs, updatedTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 {
if trxTs == nil {
trxTs = &vttimepb.Time{}
}
lastTransactionTime := trxTs.Seconds
if updatedTs == nil {
updatedTs = &vttimepb.Time{}
}
lastUpdateTime := updatedTs.Seconds
if state == binlogdatapb.VReplicationWorkflowState_Copying {
return math.MaxInt64
}
if lastTransactionTime == 0 /* no new events after copy */ ||
lastUpdateTime > lastTransactionTime /* no recent transactions, so all caught up */ {

lastTransactionTime = lastUpdateTime
}
now := time.Now().Unix() /* seconds since epoch */
return float64(now - lastTransactionTime)
}
Loading