Skip to content

Commit

Permalink
Fix unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 28, 2025
1 parent 481d480 commit f9aec36
Showing 1 changed file with 24 additions and 21 deletions.
45 changes: 24 additions & 21 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ func TestMoveTablesUnsharded(t *testing.T) {
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}
trxTimestamp := time.Now().Unix()

tenv := newTestEnv(t, ctx, sourceKs, []string{sourceShard})
defer tenv.close()
Expand Down Expand Up @@ -418,14 +419,14 @@ func TestMoveTablesUnsharded(t *testing.T) {
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs),
))
ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs),
))
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflowConfig, wf), sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand All @@ -449,7 +450,7 @@ func TestMoveTablesUnsharded(t *testing.T) {
"workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, targetKs),
fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, trxTimestamp, targetKs),
), nil)
tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getLatestCopyState, vreplID, vreplID), &sqltypes.Result{})
}
Expand Down Expand Up @@ -478,28 +479,28 @@ func TestMoveTablesUnsharded(t *testing.T) {
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs),
))
ftc.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, wf), sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, targetKs),
fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, trxTimestamp, targetKs),
))
ftc.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflow, wf, tenv.dbName), sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs),
))
ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs),
))
}

Expand All @@ -519,7 +520,7 @@ func TestMoveTablesUnsharded(t *testing.T) {
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", vreplID, bls, position, targetKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", vreplID, bls, position, trxTimestamp, targetKs),
))
}
addInvariants(sourceTablet.vrdbClient, vreplID, sourceTabletUID, position, workflow.ReverseWorkflowName(wf), tenv.cells[0])
Expand All @@ -528,21 +529,22 @@ func TestMoveTablesUnsharded(t *testing.T) {
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", vreplID, bls, position, sourceKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", vreplID, bls, position, trxTimestamp, sourceKs),
))
sourceTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, workflow.ReverseWorkflowName(wf)), sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys",
"workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64",
),
fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", workflow.ReverseWorkflowName(wf), vreplID, bls, position, sourceKs),
fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", workflow.ReverseWorkflowName(wf),
vreplID, bls, position, trxTimestamp, sourceKs),
), nil)
sourceTablet.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflow, wf, tenv.dbName), sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", vreplID, bls, position, sourceKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", vreplID, bls, position, trxTimestamp, sourceKs),
))

_, err = ws.WorkflowSwitchTraffic(ctx, &vtctldatapb.WorkflowSwitchTrafficRequest{
Expand Down Expand Up @@ -578,6 +580,7 @@ func TestMoveTablesSharded(t *testing.T) {
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}
trxTimestamp := time.Now().Unix()

tenv := newTestEnv(t, ctx, sourceKs, []string{sourceShard})
defer tenv.close()
Expand Down Expand Up @@ -678,14 +681,14 @@ func TestMoveTablesSharded(t *testing.T) {
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs),
))
ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs),
))
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflowConfig, wf), sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand All @@ -709,7 +712,7 @@ func TestMoveTablesSharded(t *testing.T) {
"workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, targetKs),
fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, trxTimestamp, targetKs),
), nil)
tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getLatestCopyState, vreplID, vreplID), &sqltypes.Result{})
}
Expand Down Expand Up @@ -737,28 +740,28 @@ func TestMoveTablesSharded(t *testing.T) {
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs),
))
ftc.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, wf), sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, targetKs),
fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, trxTimestamp, targetKs),
))
ftc.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflow, wf, tenv.dbName), sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs),
))
ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs),
))
}

Expand All @@ -778,7 +781,7 @@ func TestMoveTablesSharded(t *testing.T) {
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", vreplID, bls, position, targetKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", vreplID, bls, position, trxTimestamp, targetKs),
), nil)
}
addInvariants(sourceTablet.vrdbClient, vreplID, sourceTabletUID, position, workflow.ReverseWorkflowName(wf), tenv.cells[0])
Expand All @@ -787,14 +790,14 @@ func TestMoveTablesSharded(t *testing.T) {
"id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys",
"int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64",
),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", vreplID, bls, position, sourceKs),
fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", vreplID, bls, position, trxTimestamp, sourceKs),
), nil)
sourceTablet.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, workflow.ReverseWorkflowName(wf)), sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys",
"workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64",
),
fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", workflow.ReverseWorkflowName(wf), vreplID, bls, position, sourceKs),
fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", workflow.ReverseWorkflowName(wf), vreplID, bls, position, trxTimestamp, sourceKs),
))
sourceTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, wf, tenv.dbName), &sqltypes.Result{}, nil)

Expand Down

0 comments on commit f9aec36

Please sign in to comment.