From f9aec36994850e42a1886cb315365b0a23c542ed Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 28 Jan 2025 16:28:36 -0500 Subject: [PATCH] Fix unit tests Signed-off-by: Matt Lord --- .../tabletmanager/rpc_vreplication_test.go | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index c220ec7e9f9..d01604563c8 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -320,6 +320,7 @@ func TestMoveTablesUnsharded(t *testing.T) { topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY, } + trxTimestamp := time.Now().Unix() tenv := newTestEnv(t, ctx, sourceKs, []string{sourceShard}) defer tenv.close() @@ -418,14 +419,14 @@ func TestMoveTablesUnsharded(t *testing.T) { "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs), )) ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs), )) ftc.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflowConfig, wf), sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -449,7 +450,7 @@ func TestMoveTablesUnsharded(t *testing.T) { "workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, targetKs), + fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, trxTimestamp, targetKs), ), nil) tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getLatestCopyState, vreplID, vreplID), &sqltypes.Result{}) } @@ -478,28 +479,28 @@ func TestMoveTablesUnsharded(t *testing.T) { "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs), )) ftc.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, wf), sqltypes.MakeTestResult( sqltypes.MakeTestFields( "workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, targetKs), + fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, trxTimestamp, targetKs), )) ftc.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflow, wf, tenv.dbName), sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs), )) ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs), )) } @@ -519,7 +520,7 @@ func TestMoveTablesUnsharded(t *testing.T) { "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", vreplID, bls, position, targetKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", vreplID, bls, position, trxTimestamp, targetKs), )) } addInvariants(sourceTablet.vrdbClient, vreplID, sourceTabletUID, position, workflow.ReverseWorkflowName(wf), tenv.cells[0]) @@ -528,21 +529,22 @@ func TestMoveTablesUnsharded(t *testing.T) { "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", vreplID, bls, position, sourceKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", vreplID, bls, position, trxTimestamp, sourceKs), )) sourceTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, workflow.ReverseWorkflowName(wf)), sqltypes.MakeTestResult( sqltypes.MakeTestFields( "workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys", "workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64", ), - fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", workflow.ReverseWorkflowName(wf), vreplID, bls, position, sourceKs), + fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", workflow.ReverseWorkflowName(wf), + vreplID, bls, position, trxTimestamp, sourceKs), ), nil) sourceTablet.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflow, wf, tenv.dbName), sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", vreplID, bls, position, sourceKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", vreplID, bls, position, trxTimestamp, sourceKs), )) _, err = ws.WorkflowSwitchTraffic(ctx, &vtctldatapb.WorkflowSwitchTrafficRequest{ @@ -578,6 +580,7 @@ func TestMoveTablesSharded(t *testing.T) { topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY, } + trxTimestamp := time.Now().Unix() tenv := newTestEnv(t, ctx, sourceKs, []string{sourceShard}) defer tenv.close() @@ -678,14 +681,14 @@ func TestMoveTablesSharded(t *testing.T) { "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs), )) ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Stopped||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs), )) ftc.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflowConfig, wf), sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -709,7 +712,7 @@ func TestMoveTablesSharded(t *testing.T) { "workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, targetKs), + fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, trxTimestamp, targetKs), ), nil) tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getLatestCopyState, vreplID, vreplID), &sqltypes.Result{}) } @@ -737,28 +740,28 @@ func TestMoveTablesSharded(t *testing.T) { "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs), )) ftc.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, wf), sqltypes.MakeTestResult( sqltypes.MakeTestFields( "workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, targetKs), + fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", wf, vreplID, bls, position, trxTimestamp, targetKs), )) ftc.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflow, wf, tenv.dbName), sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs), )) ftc.vrdbClient.AddInvariant(binlogplayer.TestGetWorkflowQueryId1, sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys|options", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64|varchar", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, targetKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1|{}", vreplID, bls, position, trxTimestamp, targetKs), )) } @@ -778,7 +781,7 @@ func TestMoveTablesSharded(t *testing.T) { "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", vreplID, bls, position, targetKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", vreplID, bls, position, trxTimestamp, targetKs), ), nil) } addInvariants(sourceTablet.vrdbClient, vreplID, sourceTabletUID, position, workflow.ReverseWorkflowName(wf), tenv.cells[0]) @@ -787,14 +790,14 @@ func TestMoveTablesSharded(t *testing.T) { "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys", "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64", ), - fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", vreplID, bls, position, sourceKs), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", vreplID, bls, position, trxTimestamp, sourceKs), ), nil) sourceTablet.vrdbClient.AddInvariant(fmt.Sprintf(readWorkflowsLimited, tenv.dbName, workflow.ReverseWorkflowName(wf)), sqltypes.MakeTestResult( sqltypes.MakeTestFields( "workflow|id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys", "workflow|int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64", ), - fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|0|Running||%s|1||0|0|0||0|1", workflow.ReverseWorkflowName(wf), vreplID, bls, position, sourceKs), + fmt.Sprintf("%s|%d|%s|%s|NULL|0|0|||1686577659|%d|Running||%s|1||0|0|0||0|1", workflow.ReverseWorkflowName(wf), vreplID, bls, position, trxTimestamp, sourceKs), )) sourceTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, wf, tenv.dbName), &sqltypes.Result{}, nil)