Skip to content

Commit

Permalink
test: copy binlog replication tests from Dolt (apecloud#17)
Browse files Browse the repository at this point in the history
* make TestBinlogReplicationSanityCheck work
  • Loading branch information
fanyang01 authored Aug 30, 2024
1 parent f2ec24c commit f57d64c
Show file tree
Hide file tree
Showing 12 changed files with 2,508 additions and 17 deletions.
626 changes: 626 additions & 0 deletions binlogreplication/binlog_replication_alltypes_test.go

Large diffs are not rendered by default.

191 changes: 191 additions & 0 deletions binlogreplication/binlog_replication_filters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright 2023 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package binlogreplication

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

// TestBinlogReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication
// filtering option is correctly applied and honored.
func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)

// Ignore replication events for db01.t2. Also tests that the first filter setting is overwritten by
// the second and that db and that db and table names are case-insensitive.
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t1);")
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(DB01.T2);")

// Assert that status shows replication filters
status := showReplicaStatus(t)
require.Equal(t, "db01.t2", status["Replicate_Ignore_Table"])
require.Equal(t, "", status["Replicate_Do_Table"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
}
primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)

// Verify that all changes from t1 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "10", row["count"])
require.Equal(t, "0", row["min"])
require.Equal(t, "9", row["max"])
require.NoError(t, rows.Close())

// Verify that no changes from t2 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
require.Equal(t, nil, row["min"])
require.Equal(t, nil, row["max"])
require.NoError(t, rows.Close())
}

// TestBinlogReplicationFilters_doTablesOnly tests that the doTables replication
// filtering option is correctly applied and honored.
func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)

// Do replication events for db01.t1. Also tests that the first filter setting is overwritten by
// the second and that db and that db and table names are case-insensitive.
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t2);")
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(DB01.T1);")

// Assert that status shows replication filters
status := showReplicaStatus(t)
require.Equal(t, "db01.t1", status["Replicate_Do_Table"])
require.Equal(t, "", status["Replicate_Ignore_Table"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
}
primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)

// Verify that all changes from t1 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "10", row["count"])
require.Equal(t, "0", row["min"])
require.Equal(t, "9", row["max"])
require.NoError(t, rows.Close())

// Verify that no changes from t2 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
require.Equal(t, nil, row["min"])
require.Equal(t, nil, row["max"])
require.NoError(t, rows.Close())
}

// TestBinlogReplicationFilters_doTablesAndIgnoreTables tests that the doTables and ignoreTables
// replication filtering options are correctly applied and honored when used together.
func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)

// Do replication events for db01.t1, and db01.t2
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t1, db01.t2);")
// Ignore replication events for db01.t2
replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t2);")

// Assert that replica status shows replication filters
status := showReplicaStatus(t)
require.True(t, status["Replicate_Do_Table"] == "db01.t1,db01.t2" ||
status["Replicate_Do_Table"] == "db01.t2,db01.t1")
require.Equal(t, "db01.t2", status["Replicate_Ignore_Table"])

// Make changes on the primary
primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);")
primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);")
for i := 1; i < 12; i++ {
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i))
primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i))
}
primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;")
primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;")
primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;")
primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;")

// Pause to let the replica catch up
waitForReplicaToCatchUp(t)

// Verify that all changes from t1 were applied on the replica
rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "10", row["count"])
require.Equal(t, "0", row["min"])
require.Equal(t, "9", row["max"])
require.NoError(t, rows.Close())

// Verify that no changes from t2 were applied on the replica
rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "0", row["count"])
require.Equal(t, nil, row["min"])
require.Equal(t, nil, row["max"])
require.NoError(t, rows.Close())
}

// TestBinlogReplicationFilters_errorCases test returned errors for various error cases.
func TestBinlogReplicationFilters_errorCases(t *testing.T) {
defer teardown(t)
startSqlServers(t)

// All tables must be qualified with a database
_, err := replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(t1);")
require.Error(t, err)
require.ErrorContains(t, err, "no database specified for table")

_, err = replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(t1);")
require.Error(t, err)
require.ErrorContains(t, err, "no database specified for table")
}
206 changes: 206 additions & 0 deletions binlogreplication/binlog_replication_multidb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2023 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package binlogreplication

import (
"testing"

"github.com/stretchr/testify/require"
)

// TestBinlogReplicationMultiDb tests that binlog events spanning multiple databases are correctly
// applied by a replica.
func TestBinlogReplicationMultiDb(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)

// Make changes on the primary to db01 and db02
primaryDatabase.MustExec("create database db02;")
primaryDatabase.MustExec("use db01;")
primaryDatabase.MustExec("create table t01 (pk int primary key, c1 int default (0))")
primaryDatabase.MustExec("use db02;")
primaryDatabase.MustExec("create table t02 (pk int primary key, c1 int default (0))")
primaryDatabase.MustExec("use db01;")
primaryDatabase.MustExec("insert into t01 (pk) values (1), (3), (5), (8), (9);")
primaryDatabase.MustExec("use db02;")
primaryDatabase.MustExec("insert into t02 (pk) values (2), (4), (6), (7), (10);")
primaryDatabase.MustExec("use db01;")
primaryDatabase.MustExec("delete from t01 where pk=9;")
primaryDatabase.MustExec("delete from db02.t02 where pk=10;")
primaryDatabase.MustExec("use db02;")
primaryDatabase.MustExec("update db01.t01 set pk=7 where pk=8;")
primaryDatabase.MustExec("update t02 set pk=8 where pk=7;")

// Verify the changes in db01 on the replica
waitForReplicaToCatchUp(t)
rows, err := replicaDatabase.Queryx("select * from db01.t01 order by pk asc;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "1", row["pk"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "3", row["pk"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "5", row["pk"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "7", row["pk"])
require.False(t, rows.Next())
require.NoError(t, rows.Close())
require.NoError(t, rows.Close())

// Verify db01.dolt_diff
replicaDatabase.MustExec("use db01;")
rows, err = replicaDatabase.Queryx("select * from db01.dolt_diff;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t01", row["table_name"])
require.EqualValues(t, "1", row["data_change"])
require.EqualValues(t, "0", row["schema_change"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t01", row["table_name"])
require.EqualValues(t, "1", row["data_change"])
require.EqualValues(t, "0", row["schema_change"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t01", row["table_name"])
require.EqualValues(t, "1", row["data_change"])
require.EqualValues(t, "0", row["schema_change"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t01", row["table_name"])
require.EqualValues(t, "0", row["data_change"])
require.EqualValues(t, "1", row["schema_change"])
require.False(t, rows.Next())
require.NoError(t, rows.Close())
require.NoError(t, rows.Close())

// Verify the changes in db02 on the replica
replicaDatabase.MustExec("use db02;")
rows, err = replicaDatabase.Queryx("select * from db02.t02 order by pk asc;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "2", row["pk"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "4", row["pk"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "6", row["pk"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "8", row["pk"])
require.False(t, rows.Next())
require.NoError(t, rows.Close())

// Verify db02.dolt_diff
rows, err = replicaDatabase.Queryx("select * from db02.dolt_diff;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t02", row["table_name"])
require.Equal(t, "1", row["data_change"])
require.Equal(t, "0", row["schema_change"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t02", row["table_name"])
require.Equal(t, "1", row["data_change"])
require.Equal(t, "0", row["schema_change"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t02", row["table_name"])
require.Equal(t, "1", row["data_change"])
require.Equal(t, "0", row["schema_change"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t02", row["table_name"])
require.Equal(t, "0", row["data_change"])
require.Equal(t, "1", row["schema_change"])
require.False(t, rows.Next())
require.NoError(t, rows.Close())
}

// TestBinlogReplicationMultiDbTransactions tests that binlog events for transactions that span
// multiple DBs are applied correctly to a replica.
func TestBinlogReplicationMultiDbTransactions(t *testing.T) {
defer teardown(t)
startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)

// Make changes on the primary to db01 and db02
primaryDatabase.MustExec("create database db02;")
primaryDatabase.MustExec("create table db01.t01 (pk int primary key, c1 int default (0))")
primaryDatabase.MustExec("create table db02.t02 (pk int primary key, c1 int default (0))")
primaryDatabase.MustExec("set @autocommit = 0;")

primaryDatabase.MustExec("start transaction;")
primaryDatabase.MustExec("insert into db01.t01 (pk) values (1), (3), (5), (8), (9);")
primaryDatabase.MustExec("insert into db02.t02 (pk) values (2), (4), (6), (7), (10);")
primaryDatabase.MustExec("delete from db01.t01 where pk=9;")
primaryDatabase.MustExec("delete from db02.t02 where pk=10;")
primaryDatabase.MustExec("update db01.t01 set pk=7 where pk=8;")
primaryDatabase.MustExec("update db02.t02 set pk=8 where pk=7;")
primaryDatabase.MustExec("commit;")

// Verify the changes in db01 on the replica
waitForReplicaToCatchUp(t)
rows, err := replicaDatabase.Queryx("select * from db01.t01 order by pk asc;")
require.NoError(t, err)
row := convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "1", row["pk"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "3", row["pk"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "5", row["pk"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "7", row["pk"])
require.False(t, rows.Next())
require.NoError(t, rows.Close())

// Verify db01.dolt_diff
replicaDatabase.MustExec("use db01;")
rows, err = replicaDatabase.Queryx("select * from db01.dolt_diff;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t01", row["table_name"])
require.EqualValues(t, "1", row["data_change"])
require.EqualValues(t, "0", row["schema_change"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t01", row["table_name"])
require.EqualValues(t, "0", row["data_change"])
require.EqualValues(t, "1", row["schema_change"])
require.False(t, rows.Next())
require.NoError(t, rows.Close())

// Verify the changes in db02 on the replica
waitForReplicaToCatchUp(t)
replicaDatabase.MustExec("use db02;")
rows, err = replicaDatabase.Queryx("select * from db02.t02 order by pk asc;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "2", row["pk"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "4", row["pk"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "6", row["pk"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "8", row["pk"])
require.False(t, rows.Next())
require.NoError(t, rows.Close())

// Verify db02.dolt_diff
rows, err = replicaDatabase.Queryx("select * from db02.dolt_diff;")
require.NoError(t, err)
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t02", row["table_name"])
require.Equal(t, "1", row["data_change"])
require.Equal(t, "0", row["schema_change"])
row = convertMapScanResultToStrings(readNextRow(t, rows))
require.Equal(t, "t02", row["table_name"])
require.Equal(t, "0", row["data_change"])
require.Equal(t, "1", row["schema_change"])
require.False(t, rows.Next())
require.NoError(t, rows.Close())
}
Loading

0 comments on commit f57d64c

Please sign in to comment.