From f57d64c5034bd969f03a015c6ba5991fb02111d4 Mon Sep 17 00:00:00 2001 From: Fan Yang Date: Fri, 30 Aug 2024 18:54:42 +0800 Subject: [PATCH] test: copy binlog replication tests from Dolt (#17) * make TestBinlogReplicationSanityCheck work --- .../binlog_replication_alltypes_test.go | 626 ++++++++++ .../binlog_replication_filters_test.go | 191 +++ .../binlog_replication_multidb_test.go | 206 ++++ .../binlog_replication_reconnect_test.go | 227 ++++ .../binlog_replication_restart_test.go | 83 ++ binlogreplication/binlog_replication_test.go | 1031 +++++++++++++++++ executor.go | 21 +- go.mod | 19 +- go.sum | 50 +- main.go | 7 + meta/table.go | 48 + replication.go | 16 +- 12 files changed, 2508 insertions(+), 17 deletions(-) create mode 100644 binlogreplication/binlog_replication_alltypes_test.go create mode 100644 binlogreplication/binlog_replication_filters_test.go create mode 100644 binlogreplication/binlog_replication_multidb_test.go create mode 100644 binlogreplication/binlog_replication_reconnect_test.go create mode 100644 binlogreplication/binlog_replication_restart_test.go create mode 100644 binlogreplication/binlog_replication_test.go diff --git a/binlogreplication/binlog_replication_alltypes_test.go b/binlogreplication/binlog_replication_alltypes_test.go new file mode 100644 index 00000000..c62c240f --- /dev/null +++ b/binlogreplication/binlog_replication_alltypes_test.go @@ -0,0 +1,626 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlogreplication + +import ( + "encoding/json" + "fmt" + "math/rand" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestBinlogReplicationForAllTypes tests that operations (inserts, updates, and deletes) on all SQL +// data types can be successfully replicated. +func TestBinlogReplicationForAllTypes(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // Set the session's timezone to UTC, to avoid TIMESTAMP test values changing + // when they are converted to UTC for storage. + primaryDatabase.MustExec("SET @@time_zone = '+0:00';") + + // Create the test table + tableName := "alltypes" + createTableStatement := generateCreateTableStatement(tableName) + primaryDatabase.MustExec(createTableStatement) + + // Make inserts on the primary – small, large, and null values + primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 0)) + primaryDatabase.MustExec(generateInsertValuesStatement(tableName, 1)) + primaryDatabase.MustExec(generateInsertNullValuesStatement(tableName)) + + // Verify inserts on replica + waitForReplicaToCatchUp(t) + rows, err := replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;") + require.NoError(t, err) + row := convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "1", row["pk"]) + assertValues(t, 0, row) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "2", row["pk"]) + assertValues(t, 1, row) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "3", row["pk"]) + assertNullValues(t, row) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) + + // Make updates on the primary + primaryDatabase.MustExec(generateUpdateToNullValuesStatement(tableName, 1)) + primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 2, 0)) + primaryDatabase.MustExec(generateUpdateValuesStatement(tableName, 3, 1)) + + // Verify updates on the replica + waitForReplicaToCatchUp(t) + replicaDatabase.MustExec("use db01;") + rows, err = replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "1", row["pk"]) + assertNullValues(t, row) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "2", row["pk"]) + assertValues(t, 0, row) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "3", row["pk"]) + assertValues(t, 1, row) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) + + // Make deletes on the primary + primaryDatabase.MustExec("delete from alltypes where pk=1;") + primaryDatabase.MustExec("delete from alltypes where pk=2;") + primaryDatabase.MustExec("delete from alltypes where pk=3;") + + // Verify deletes on the replica + waitForReplicaToCatchUp(t) + rows, err = replicaDatabase.Queryx("select * from db01.alltypes order by pk asc;") + require.NoError(t, err) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) +} + +// --------------------- +// Test Data +// --------------------- + +type typeDescriptionAssertion struct { + Value interface{} + ExpectedValue interface{} +} + +func newTypeDescriptionAssertion(v interface{}) typeDescriptionAssertion { + return typeDescriptionAssertion{Value: v} +} + +func newTypeDescriptionAssertionWithExpectedValue(v interface{}, x interface{}) typeDescriptionAssertion { + return typeDescriptionAssertion{Value: v, ExpectedValue: x} +} + +func (tda *typeDescriptionAssertion) getExpectedValue() interface{} { + if tda.ExpectedValue != nil { + return tda.ExpectedValue + } + + if valueString, isString := tda.Value.(string); isString { + removedPrefixes := []string{"DATE", "TIMESTAMP", "TIME"} + lowercaseValue := strings.ToUpper(valueString) + for _, prefix := range removedPrefixes { + if strings.HasPrefix(lowercaseValue, prefix) { + return valueString[len(prefix)+2 : len(valueString)-2] + } + } + } + + return tda.Value +} + +type typeDescription struct { + TypeDefinition string + Assertions [2]typeDescriptionAssertion +} + +func (td *typeDescription) ColumnName() string { + name := "_" + strings.ReplaceAll(td.TypeDefinition, "(", "_") + name = strings.ReplaceAll(name, ")", "_") + name = strings.ReplaceAll(name, " ", "_") + name = strings.ReplaceAll(name, ",", "_") + name = strings.ReplaceAll(name, "\"", "") + name = strings.ReplaceAll(name, "'", "") + return name +} + +func (td *typeDescription) IsStringType() bool { + def := strings.ToLower(td.TypeDefinition) + switch { + case strings.Contains(def, "char"), + strings.Contains(def, "binary"), + strings.Contains(def, "blob"), + strings.Contains(def, "text"), + strings.Contains(def, "enum"), + strings.Contains(def, "set"), + strings.Contains(def, "json"): + return true + default: + return false + } +} + +// allTypes contains test data covering all SQL types. +// +// TODO: TypeWireTests contains most of the test data we need. I found it after implementing this, but we +// could simplify this test code by converting to use TypeWireTests and enhancing it with the additional +// test cases we need to cover (e.g. NULL values). +var allTypes = []typeDescription{ + // Bit types + { + TypeDefinition: "bit", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertionWithExpectedValue("0", []uint8{0}), + newTypeDescriptionAssertionWithExpectedValue("1", []uint8{1}), + }, + }, + { + TypeDefinition: "bit(64)", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertionWithExpectedValue("0", []byte{0, 0, 0, 0, 0, 0, 0, 0}), + newTypeDescriptionAssertionWithExpectedValue("1", []byte{0, 0, 0, 0, 0, 0, 0, 1}), + }, + }, + + // Integer types + { + TypeDefinition: "tinyint", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("-128"), + newTypeDescriptionAssertion("127"), + }, + }, + { + TypeDefinition: "tinyint unsigned", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion("255"), + }, + }, + { + TypeDefinition: "bool", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion("1"), + }, + }, + { + TypeDefinition: "smallint", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("-32768"), + newTypeDescriptionAssertion("32767"), + }, + }, + { + TypeDefinition: "smallint unsigned", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion("65535"), + }, + }, + { + TypeDefinition: "mediumint", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("-32768"), + newTypeDescriptionAssertion("32767"), + }, + }, + { + TypeDefinition: "mediumint unsigned", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion("65535"), + }, + }, + { + TypeDefinition: "int", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("-32768"), + newTypeDescriptionAssertion("32767"), + }, + }, + { + TypeDefinition: "int unsigned", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion("65535"), + }, + }, + { + TypeDefinition: "bigint", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("-32768"), + newTypeDescriptionAssertion("32767"), + }, + }, + { + TypeDefinition: "bigint unsigned", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion("65535"), + }, + }, + { + TypeDefinition: "decimal", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion("1234567890"), + }, + }, + { + TypeDefinition: "decimal(10,2)", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0.00"), + newTypeDescriptionAssertion("12345678.00"), + }, + }, + { + TypeDefinition: "decimal(20,8)", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("-1234567890.12345678"), + newTypeDescriptionAssertion("999999999999.00000001"), + }, + }, + + // Floating point types + { + TypeDefinition: "float", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("-3.40282e+38"), + newTypeDescriptionAssertion("-1.17549e-38"), + }, + }, + { + TypeDefinition: "float unsigned", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("1.17549e-38"), + newTypeDescriptionAssertion("3.40282e+38"), + }, + }, + { + TypeDefinition: "double", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("-1.7976931348623157e+308"), + newTypeDescriptionAssertion("-2.2250738585072014e-308"), + }, + }, + { + TypeDefinition: "double unsigned", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("2.2250738585072014e-308"), + newTypeDescriptionAssertion("1.7976931348623157e+308"), + }, + }, + + // String types + { + TypeDefinition: "char(1)", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion(""), + newTypeDescriptionAssertion("0"), + }, + }, + { + TypeDefinition: "char(10)", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion(""), + newTypeDescriptionAssertion("0123456789"), + }, + }, + { + TypeDefinition: "varchar(255)", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion(""), + newTypeDescriptionAssertion(generateTestDataString(255)), + }, + }, + { + TypeDefinition: "char(1) binary", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion("1"), + }, + }, + { + TypeDefinition: "binary(1)", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion("1"), + }, + }, + { + TypeDefinition: "binary(255)", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion(generateTestDataString(255)), + newTypeDescriptionAssertion(generateTestDataString(255)), + }, + }, + { + TypeDefinition: "varbinary(1)", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion("1"), + }, + }, + { + TypeDefinition: "varbinary(255)", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion(generateTestDataString(0)), + newTypeDescriptionAssertion(generateTestDataString(255)), + }, + }, + + // Blob/Text types + { + TypeDefinition: "tinyblob", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion(generateTestDataString(255)), + }, + }, + { + TypeDefinition: "blob", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion(generateTestDataString(10_000)), + }, + }, + { + TypeDefinition: "mediumblob", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion(generateTestDataString(15_000)), + }, + }, + { + TypeDefinition: "longblob", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion(generateTestDataString(20_000)), + }, + }, + { + TypeDefinition: "tinytext", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion(generateTestDataString(255)), + }, + }, + { + TypeDefinition: "text", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion(generateTestDataString(10_000)), + }, + }, + { + TypeDefinition: "mediumtext", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion(generateTestDataString(15_000)), + }, + }, + { + TypeDefinition: "longtext", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("0"), + newTypeDescriptionAssertion(generateTestDataString(20_000)), + }, + }, + + // Enum and Set types + { + TypeDefinition: "ENUM(\"\",\"a\",\"b\",\"c\")", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion(""), + newTypeDescriptionAssertion("c"), + }, + }, + { + TypeDefinition: "SET(\"a\",\"b\",\"c\")", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("a"), + newTypeDescriptionAssertion("a,b,c"), + }, + }, + + // Date types + { + TypeDefinition: "date", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("DATE('1981-02-16')"), + newTypeDescriptionAssertion("DATE('1981-02-16')"), + }, + }, + { + TypeDefinition: "time", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("TIME('01:02:03')"), + newTypeDescriptionAssertion("TIME('01:02:03')"), + }, + }, + { + TypeDefinition: "datetime", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("TIMESTAMP('1981-02-16 12:13:14')"), + newTypeDescriptionAssertion("TIMESTAMP('1981-02-16 12:13:14')"), + }, + }, + { + TypeDefinition: "timestamp", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("TIMESTAMP('1981-02-16 12:13:14')"), + newTypeDescriptionAssertion("TIMESTAMP('1981-02-16 12:13:14')"), + }, + }, + { + TypeDefinition: "year", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("1981"), + newTypeDescriptionAssertion("2020"), + }, + }, + + // Spatial types + { + TypeDefinition: "geometry", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertionWithExpectedValue("POINT(18, 23)", + "\x00\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x002@\x00\x00\x00\x00\x00\x007@"), + newTypeDescriptionAssertionWithExpectedValue("LINESTRING(POINT(0,0),POINT(1,2),POINT(2,4))", + "\x00\x00\x00\x00\x01\x02\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"+ + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\x00@\x00\x00\x00\x00"+ + "\x00\x00\x00@\x00\x00\x00\x00\x00\x00\x10@"), + }, + }, + + // JSON types + { + TypeDefinition: "json", + Assertions: [2]typeDescriptionAssertion{ + newTypeDescriptionAssertion("{}"), + newTypeDescriptionAssertion("{\"os\":\"Mac\",\"name\":\"BillyBob\",\"resolution\":{\"x\":1920,\"y\":1080}}"), + }, + }, +} + +// --------------------- +// Test Helper Functions +// --------------------- + +func assertValues(t *testing.T, assertionIndex int, row map[string]interface{}) { + for _, typeDesc := range allTypes { + assertion := typeDesc.Assertions[assertionIndex] + expectedValue := assertion.getExpectedValue() + + actualValue := "" + if row[typeDesc.ColumnName()] != nil { + actualValue = fmt.Sprintf("%v", row[typeDesc.ColumnName()]) + } + if typeDesc.TypeDefinition == "json" { + // LD_1 and DOLT storage formats return JSON strings slightly differently; DOLT removes spaces + // while LD_1 add whitespace, so for json comparison, we sanitize by removing whitespace. + var actual interface{} + json.Unmarshal([]byte(actualValue), &actual) + var expected interface{} + json.Unmarshal([]byte(expectedValue.(string)), &expected) + require.EqualValues(t, expected, actual, + "Failed on assertion %d for for column %q", assertionIndex, typeDesc.ColumnName()) + } else { + require.EqualValues(t, expectedValue, actualValue, + "Failed on assertion %d for for column %q", assertionIndex, typeDesc.ColumnName()) + } + } +} + +func assertNullValues(t *testing.T, row map[string]interface{}) { + for _, typeDesc := range allTypes { + require.Nil(t, row[typeDesc.ColumnName()], + "Failed on NULL value for for column %q", typeDesc.ColumnName()) + } +} + +func generateCreateTableStatement(tableName string) string { + sb := strings.Builder{} + sb.WriteString("create table " + tableName) + sb.WriteString("(pk int primary key auto_increment") + for _, typeDesc := range allTypes { + sb.WriteString(fmt.Sprintf(", %s %s", + typeDesc.ColumnName(), typeDesc.TypeDefinition)) + } + sb.WriteString(");") + return sb.String() +} + +func generateInsertValuesStatement(tableName string, assertionIndex int) string { + sb := strings.Builder{} + sb.WriteString("insert into " + tableName) + sb.WriteString(" values (DEFAULT") + for _, typeDesc := range allTypes { + assertion := typeDesc.Assertions[assertionIndex] + value := assertion.Value + if typeDesc.IsStringType() { + value = fmt.Sprintf("'%s'", value) + } + sb.WriteString(", " + fmt.Sprintf("%v", value)) + } + sb.WriteString(");") + + return sb.String() +} + +func generateInsertNullValuesStatement(tableName string) string { + sb := strings.Builder{} + sb.WriteString("insert into " + tableName) + sb.WriteString(" values (DEFAULT") + for range allTypes { + sb.WriteString(", null") + } + sb.WriteString(");") + + return sb.String() +} + +func generateUpdateToNullValuesStatement(tableName string, pk int) string { + sb := strings.Builder{} + sb.WriteString("update " + tableName + " set ") + for i, typeDesc := range allTypes { + if i > 0 { + sb.WriteString(", ") + } + sb.WriteString(fmt.Sprintf("%s=NULL", typeDesc.ColumnName())) + } + sb.WriteString(fmt.Sprintf(" where pk=%d;", pk)) + + return sb.String() +} + +func generateUpdateValuesStatement(tableName string, pk int, assertionIndex int) string { + sb := strings.Builder{} + sb.WriteString("update " + tableName + " set ") + for i, typeDesc := range allTypes { + if i > 0 { + sb.WriteString(", ") + } + assertion := typeDesc.Assertions[assertionIndex] + value := assertion.Value + if typeDesc.IsStringType() { + value = fmt.Sprintf("'%s'", value) + } + sb.WriteString(fmt.Sprintf("%s=%v", typeDesc.ColumnName(), value)) + } + sb.WriteString(fmt.Sprintf(" where pk=%d;", pk)) + + return sb.String() +} + +func generateTestDataString(length uint) string { + sb := strings.Builder{} + for ; length > 0; length-- { + sb.WriteRune(rune(rand.Intn(90-48) + 48)) + } + + return sb.String() +} diff --git a/binlogreplication/binlog_replication_filters_test.go b/binlogreplication/binlog_replication_filters_test.go new file mode 100644 index 00000000..4d6adc00 --- /dev/null +++ b/binlogreplication/binlog_replication_filters_test.go @@ -0,0 +1,191 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlogreplication + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestBinlogReplicationFilters_ignoreTablesOnly tests that the ignoreTables replication +// filtering option is correctly applied and honored. +func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // Ignore replication events for db01.t2. Also tests that the first filter setting is overwritten by + // the second and that db and that db and table names are case-insensitive. + replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t1);") + replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(DB01.T2);") + + // Assert that status shows replication filters + status := showReplicaStatus(t) + require.Equal(t, "db01.t2", status["Replicate_Ignore_Table"]) + require.Equal(t, "", status["Replicate_Do_Table"]) + + // Make changes on the primary + primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);") + primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);") + for i := 1; i < 12; i++ { + primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i)) + primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i)) + } + primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;") + primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;") + primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;") + primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;") + + // Pause to let the replica catch up + waitForReplicaToCatchUp(t) + + // Verify that all changes from t1 were applied on the replica + rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;") + require.NoError(t, err) + row := convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "10", row["count"]) + require.Equal(t, "0", row["min"]) + require.Equal(t, "9", row["max"]) + require.NoError(t, rows.Close()) + + // Verify that no changes from t2 were applied on the replica + rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "0", row["count"]) + require.Equal(t, nil, row["min"]) + require.Equal(t, nil, row["max"]) + require.NoError(t, rows.Close()) +} + +// TestBinlogReplicationFilters_doTablesOnly tests that the doTables replication +// filtering option is correctly applied and honored. +func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // Do replication events for db01.t1. Also tests that the first filter setting is overwritten by + // the second and that db and that db and table names are case-insensitive. + replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t2);") + replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(DB01.T1);") + + // Assert that status shows replication filters + status := showReplicaStatus(t) + require.Equal(t, "db01.t1", status["Replicate_Do_Table"]) + require.Equal(t, "", status["Replicate_Ignore_Table"]) + + // Make changes on the primary + primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);") + primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);") + for i := 1; i < 12; i++ { + primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i)) + primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i)) + } + primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;") + primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;") + primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;") + primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;") + + // Pause to let the replica catch up + waitForReplicaToCatchUp(t) + + // Verify that all changes from t1 were applied on the replica + rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;") + require.NoError(t, err) + row := convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "10", row["count"]) + require.Equal(t, "0", row["min"]) + require.Equal(t, "9", row["max"]) + require.NoError(t, rows.Close()) + + // Verify that no changes from t2 were applied on the replica + rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "0", row["count"]) + require.Equal(t, nil, row["min"]) + require.Equal(t, nil, row["max"]) + require.NoError(t, rows.Close()) +} + +// TestBinlogReplicationFilters_doTablesAndIgnoreTables tests that the doTables and ignoreTables +// replication filtering options are correctly applied and honored when used together. +func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // Do replication events for db01.t1, and db01.t2 + replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t1, db01.t2);") + // Ignore replication events for db01.t2 + replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(db01.t2);") + + // Assert that replica status shows replication filters + status := showReplicaStatus(t) + require.True(t, status["Replicate_Do_Table"] == "db01.t1,db01.t2" || + status["Replicate_Do_Table"] == "db01.t2,db01.t1") + require.Equal(t, "db01.t2", status["Replicate_Ignore_Table"]) + + // Make changes on the primary + primaryDatabase.MustExec("CREATE TABLE db01.t1 (pk INT PRIMARY KEY);") + primaryDatabase.MustExec("CREATE TABLE db01.t2 (pk INT PRIMARY KEY);") + for i := 1; i < 12; i++ { + primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t1 VALUES (%d);", i)) + primaryDatabase.MustExec(fmt.Sprintf("INSERT INTO db01.t2 VALUES (%d);", i)) + } + primaryDatabase.MustExec("UPDATE db01.t1 set pk = pk-1;") + primaryDatabase.MustExec("UPDATE db01.t2 set pk = pk-1;") + primaryDatabase.MustExec("DELETE FROM db01.t1 WHERE pk = 10;") + primaryDatabase.MustExec("DELETE FROM db01.t2 WHERE pk = 10;") + + // Pause to let the replica catch up + waitForReplicaToCatchUp(t) + + // Verify that all changes from t1 were applied on the replica + rows, err := replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t1;") + require.NoError(t, err) + row := convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "10", row["count"]) + require.Equal(t, "0", row["min"]) + require.Equal(t, "9", row["max"]) + require.NoError(t, rows.Close()) + + // Verify that no changes from t2 were applied on the replica + rows, err = replicaDatabase.Queryx("SELECT COUNT(pk) as count, MIN(pk) as min, MAX(pk) as max from db01.t2;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "0", row["count"]) + require.Equal(t, nil, row["min"]) + require.Equal(t, nil, row["max"]) + require.NoError(t, rows.Close()) +} + +// TestBinlogReplicationFilters_errorCases test returned errors for various error cases. +func TestBinlogReplicationFilters_errorCases(t *testing.T) { + defer teardown(t) + startSqlServers(t) + + // All tables must be qualified with a database + _, err := replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(t1);") + require.Error(t, err) + require.ErrorContains(t, err, "no database specified for table") + + _, err = replicaDatabase.Queryx("CHANGE REPLICATION FILTER REPLICATE_IGNORE_TABLE=(t1);") + require.Error(t, err) + require.ErrorContains(t, err, "no database specified for table") +} diff --git a/binlogreplication/binlog_replication_multidb_test.go b/binlogreplication/binlog_replication_multidb_test.go new file mode 100644 index 00000000..61436050 --- /dev/null +++ b/binlogreplication/binlog_replication_multidb_test.go @@ -0,0 +1,206 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlogreplication + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TestBinlogReplicationMultiDb tests that binlog events spanning multiple databases are correctly +// applied by a replica. +func TestBinlogReplicationMultiDb(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // Make changes on the primary to db01 and db02 + primaryDatabase.MustExec("create database db02;") + primaryDatabase.MustExec("use db01;") + primaryDatabase.MustExec("create table t01 (pk int primary key, c1 int default (0))") + primaryDatabase.MustExec("use db02;") + primaryDatabase.MustExec("create table t02 (pk int primary key, c1 int default (0))") + primaryDatabase.MustExec("use db01;") + primaryDatabase.MustExec("insert into t01 (pk) values (1), (3), (5), (8), (9);") + primaryDatabase.MustExec("use db02;") + primaryDatabase.MustExec("insert into t02 (pk) values (2), (4), (6), (7), (10);") + primaryDatabase.MustExec("use db01;") + primaryDatabase.MustExec("delete from t01 where pk=9;") + primaryDatabase.MustExec("delete from db02.t02 where pk=10;") + primaryDatabase.MustExec("use db02;") + primaryDatabase.MustExec("update db01.t01 set pk=7 where pk=8;") + primaryDatabase.MustExec("update t02 set pk=8 where pk=7;") + + // Verify the changes in db01 on the replica + waitForReplicaToCatchUp(t) + rows, err := replicaDatabase.Queryx("select * from db01.t01 order by pk asc;") + require.NoError(t, err) + row := convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "1", row["pk"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "3", row["pk"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "5", row["pk"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "7", row["pk"]) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) + require.NoError(t, rows.Close()) + + // Verify db01.dolt_diff + replicaDatabase.MustExec("use db01;") + rows, err = replicaDatabase.Queryx("select * from db01.dolt_diff;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "t01", row["table_name"]) + require.EqualValues(t, "1", row["data_change"]) + require.EqualValues(t, "0", row["schema_change"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "t01", row["table_name"]) + require.EqualValues(t, "1", row["data_change"]) + require.EqualValues(t, "0", row["schema_change"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "t01", row["table_name"]) + require.EqualValues(t, "1", row["data_change"]) + require.EqualValues(t, "0", row["schema_change"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "t01", row["table_name"]) + require.EqualValues(t, "0", row["data_change"]) + require.EqualValues(t, "1", row["schema_change"]) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) + require.NoError(t, rows.Close()) + + // Verify the changes in db02 on the replica + replicaDatabase.MustExec("use db02;") + rows, err = replicaDatabase.Queryx("select * from db02.t02 order by pk asc;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "2", row["pk"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "4", row["pk"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "6", row["pk"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "8", row["pk"]) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) + + // Verify db02.dolt_diff + rows, err = replicaDatabase.Queryx("select * from db02.dolt_diff;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "t02", row["table_name"]) + require.Equal(t, "1", row["data_change"]) + require.Equal(t, "0", row["schema_change"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "t02", row["table_name"]) + require.Equal(t, "1", row["data_change"]) + require.Equal(t, "0", row["schema_change"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "t02", row["table_name"]) + require.Equal(t, "1", row["data_change"]) + require.Equal(t, "0", row["schema_change"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "t02", row["table_name"]) + require.Equal(t, "0", row["data_change"]) + require.Equal(t, "1", row["schema_change"]) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) +} + +// TestBinlogReplicationMultiDbTransactions tests that binlog events for transactions that span +// multiple DBs are applied correctly to a replica. +func TestBinlogReplicationMultiDbTransactions(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // Make changes on the primary to db01 and db02 + primaryDatabase.MustExec("create database db02;") + primaryDatabase.MustExec("create table db01.t01 (pk int primary key, c1 int default (0))") + primaryDatabase.MustExec("create table db02.t02 (pk int primary key, c1 int default (0))") + primaryDatabase.MustExec("set @autocommit = 0;") + + primaryDatabase.MustExec("start transaction;") + primaryDatabase.MustExec("insert into db01.t01 (pk) values (1), (3), (5), (8), (9);") + primaryDatabase.MustExec("insert into db02.t02 (pk) values (2), (4), (6), (7), (10);") + primaryDatabase.MustExec("delete from db01.t01 where pk=9;") + primaryDatabase.MustExec("delete from db02.t02 where pk=10;") + primaryDatabase.MustExec("update db01.t01 set pk=7 where pk=8;") + primaryDatabase.MustExec("update db02.t02 set pk=8 where pk=7;") + primaryDatabase.MustExec("commit;") + + // Verify the changes in db01 on the replica + waitForReplicaToCatchUp(t) + rows, err := replicaDatabase.Queryx("select * from db01.t01 order by pk asc;") + require.NoError(t, err) + row := convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "1", row["pk"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "3", row["pk"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "5", row["pk"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "7", row["pk"]) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) + + // Verify db01.dolt_diff + replicaDatabase.MustExec("use db01;") + rows, err = replicaDatabase.Queryx("select * from db01.dolt_diff;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "t01", row["table_name"]) + require.EqualValues(t, "1", row["data_change"]) + require.EqualValues(t, "0", row["schema_change"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "t01", row["table_name"]) + require.EqualValues(t, "0", row["data_change"]) + require.EqualValues(t, "1", row["schema_change"]) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) + + // Verify the changes in db02 on the replica + waitForReplicaToCatchUp(t) + replicaDatabase.MustExec("use db02;") + rows, err = replicaDatabase.Queryx("select * from db02.t02 order by pk asc;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "2", row["pk"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "4", row["pk"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "6", row["pk"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "8", row["pk"]) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) + + // Verify db02.dolt_diff + rows, err = replicaDatabase.Queryx("select * from db02.dolt_diff;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "t02", row["table_name"]) + require.Equal(t, "1", row["data_change"]) + require.Equal(t, "0", row["schema_change"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "t02", row["table_name"]) + require.Equal(t, "0", row["data_change"]) + require.Equal(t, "1", row["schema_change"]) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) +} diff --git a/binlogreplication/binlog_replication_reconnect_test.go b/binlogreplication/binlog_replication_reconnect_test.go new file mode 100644 index 00000000..75455672 --- /dev/null +++ b/binlogreplication/binlog_replication_reconnect_test.go @@ -0,0 +1,227 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlogreplication + +import ( + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/Shopify/toxiproxy/v2" + toxiproxyclient "github.com/Shopify/toxiproxy/v2/client" + "github.com/prometheus/client_golang/prometheus" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +var toxiClient *toxiproxyclient.Client +var mysqlProxy *toxiproxyclient.Proxy +var proxyPort int + +// TestBinlogReplicationAutoReconnect tests that the replica's connection to the primary is correctly +// reestablished if it drops. +func TestBinlogReplicationAutoReconnect(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + configureToxiProxy(t) + configureFastConnectionRetry(t) + startReplicationAndCreateTestDb(t, proxyPort) + + // Get the replica started up and ensure it's in sync with the primary before turning on the limit_data toxic + testInitialReplicaStatus(t) + primaryDatabase.MustExec("create table reconnect_test(pk int primary key, c1 varchar(255));") + waitForReplicaToCatchUp(t) + turnOnLimitDataToxic(t) + + for i := 0; i < 1000; i++ { + value := "foobarbazbashfoobarbazbashfoobarbazbashfoobarbazbashfoobarbazbash" + primaryDatabase.MustExec(fmt.Sprintf("insert into reconnect_test values (%v, %q)", i, value)) + } + // Remove the limit_data toxic so that a connection can be reestablished + err := mysqlProxy.RemoveToxic("limit_data") + require.NoError(t, err) + t.Logf("Toxiproxy proxy limit_data toxic removed") + + // Assert that all records get written to the table + waitForReplicaToCatchUp(t) + + rows, err := replicaDatabase.Queryx("select min(pk) as min, max(pk) as max, count(pk) as count from db01.reconnect_test;") + require.NoError(t, err) + + row := convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "0", row["min"]) + require.Equal(t, "999", row["max"]) + require.Equal(t, "1000", row["count"]) + require.NoError(t, rows.Close()) + + // Assert that show replica status show reconnection IO error + status := showReplicaStatus(t) + require.Equal(t, "1158", status["Last_IO_Errno"]) + require.True(t, strings.Contains(status["Last_IO_Error"].(string), "EOF")) + requireRecentTimeString(t, status["Last_IO_Error_Timestamp"]) +} + +// configureFastConnectionRetry configures the replica to retry a failed connection after 5s, instead of the default 60s +// connection retry interval. This is used for testing connection retry logic without waiting the full default period. +func configureFastConnectionRetry(_ *testing.T) { + replicaDatabase.MustExec( + "change replication source to SOURCE_CONNECT_RETRY=5;") +} + +// testInitialReplicaStatus tests the data returned by SHOW REPLICA STATUS and errors +// out if any values are not what is expected for a replica that has just connected +// to a MySQL primary. +func testInitialReplicaStatus(t *testing.T) { + status := showReplicaStatus(t) + + // Positioning settings + require.Equal(t, "1", status["Auto_Position"]) + + // Connection settings + require.Equal(t, "5", status["Connect_Retry"]) + require.Equal(t, "86400", status["Source_Retry_Count"]) + require.Equal(t, "localhost", status["Source_Host"]) + require.NotEmpty(t, status["Source_Port"]) + require.NotEmpty(t, status["Source_User"]) + + // Error status + require.Equal(t, "0", status["Last_Errno"]) + require.Equal(t, "", status["Last_Error"]) + require.Equal(t, "0", status["Last_IO_Errno"]) + require.Equal(t, "", status["Last_IO_Error"]) + require.Equal(t, "", status["Last_IO_Error_Timestamp"]) + require.Equal(t, "0", status["Last_SQL_Errno"]) + require.Equal(t, "", status["Last_SQL_Error"]) + require.Equal(t, "", status["Last_SQL_Error_Timestamp"]) + + // Empty filter configuration + require.Equal(t, "", status["Replicate_Do_Table"]) + require.Equal(t, "", status["Replicate_Ignore_Table"]) + + // Thread status + require.True(t, + status["Replica_IO_Running"] == "Yes" || + status["Replica_IO_Running"] == "Connecting") + require.Equal(t, "Yes", status["Replica_SQL_Running"]) + + // Unsupported fields + require.Equal(t, "INVALID", status["Source_Log_File"]) + require.Equal(t, "Ignored", status["Source_SSL_Allowed"]) + require.Equal(t, "None", status["Until_Condition"]) + require.Equal(t, "0", status["SQL_Delay"]) + require.Equal(t, "0", status["SQL_Remaining_Delay"]) + require.Equal(t, "0", status["Seconds_Behind_Source"]) +} + +// requireRecentTimeString asserts that the specified |datetime| is a non-nil timestamp string +// with a value less than five minutes ago. +func requireRecentTimeString(t *testing.T, datetime interface{}) { + require.NotNil(t, datetime) + datetimeString := datetime.(string) + + datetime, err := time.Parse(time.UnixDate, datetimeString) + require.NoError(t, err) + require.LessOrEqual(t, time.Now().Add(-5*time.Minute), datetime) + require.GreaterOrEqual(t, time.Now(), datetime) +} + +// showReplicaStatus returns a map with the results of SHOW REPLICA STATUS, keyed by the +// name of each column. +func showReplicaStatus(t *testing.T) map[string]interface{} { + rows, err := replicaDatabase.Queryx("show replica status;") + require.NoError(t, err) + defer rows.Close() + return convertMapScanResultToStrings(readNextRow(t, rows)) +} + +func configureToxiProxy(t *testing.T) { + toxiproxyPort := findFreePort() + + metrics := toxiproxy.NewMetricsContainer(prometheus.NewRegistry()) + toxiproxyServer := toxiproxy.NewServer(metrics, zerolog.Nop()) + go func() { + toxiproxyServer.Listen("localhost:" + strconv.Itoa(toxiproxyPort)) + }() + time.Sleep(500 * time.Millisecond) + t.Logf("Toxiproxy control plane running on port %d", toxiproxyPort) + + toxiClient = toxiproxyclient.NewClient(fmt.Sprintf("localhost:%d", toxiproxyPort)) + + proxyPort = findFreePort() + var err error + mysqlProxy, err = toxiClient.CreateProxy("mysql", + fmt.Sprintf("localhost:%d", proxyPort), // downstream + fmt.Sprintf("localhost:%d", mySqlPort)) // upstream + if err != nil { + panic(fmt.Sprintf("unable to create toxiproxy: %v", err.Error())) + } + t.Logf("Toxiproxy proxy started on port %d", proxyPort) +} + +// turnOnLimitDataToxic adds a limit_data toxic to the active Toxiproxy, which prevents more than 1KB of data +// from being sent from the primary through the proxy to the replica. Callers MUST call configureToxiProxy +// before calling this function. +func turnOnLimitDataToxic(t *testing.T) { + require.NotNil(t, mysqlProxy) + _, err := mysqlProxy.AddToxic("limit_data", "limit_data", "downstream", 1.0, toxiproxyclient.Attributes{ + "bytes": 1_000, + }) + require.NoError(t, err) + t.Logf("Toxiproxy proxy with limit_data toxic (1KB) started on port %d", proxyPort) +} + +// convertMapScanResultToStrings converts each value in the specified map |m| into a string. +// This is necessary because MapScan doesn't honor (or know about) the correct underlying SQL types – it +// gets results back as strings, typed as []byte. Results also get returned as int64, which are converted to strings +// for ease of testing. +// More info at the end of this issue: https://github.com/jmoiron/sqlx/issues/225 +func convertMapScanResultToStrings(m map[string]interface{}) map[string]interface{} { + for key, value := range m { + switch v := value.(type) { + case []uint8: + m[key] = string(v) + case int64: + m[key] = strconv.FormatInt(v, 10) + case uint64: + m[key] = strconv.FormatUint(v, 10) + } + } + + return m +} + +// convertSliceScanResultToStrings returns a new slice, formed by converting each value in the slice |ss| into a string. +// This is necessary because SliceScan doesn't honor (or know about) the correct underlying SQL types –it +// gets results back as strings, typed as []byte, or as int64 values. +// More info at the end of this issue: https://github.com/jmoiron/sqlx/issues/225 +func convertSliceScanResultToStrings(ss []any) []any { + row := make([]any, len(ss)) + for i, value := range ss { + switch v := value.(type) { + case []uint8: + row[i] = string(v) + case int64: + row[i] = strconv.FormatInt(v, 10) + case uint64: + row[i] = strconv.FormatUint(v, 10) + default: + row[i] = v + } + } + + return row +} diff --git a/binlogreplication/binlog_replication_restart_test.go b/binlogreplication/binlog_replication_restart_test.go new file mode 100644 index 00000000..70810019 --- /dev/null +++ b/binlogreplication/binlog_replication_restart_test.go @@ -0,0 +1,83 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlogreplication + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestBinlogReplicationServerRestart tests that a replica can be configured and started, then the +// server process can be restarted and replica can be restarted without problems. +func TestBinlogReplicationServerRestart(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + primaryDatabase.MustExec("create table t (pk int auto_increment primary key)") + + // Launch a goroutine that inserts data for 5 seconds + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + limit := 5 * time.Second + for startTime := time.Now(); time.Now().Sub(startTime) <= limit; { + primaryDatabase.MustExec("insert into t values (DEFAULT);") + time.Sleep(100 * time.Millisecond) + } + }() + + // Let the replica process a few transactions, then stop the server and pause a second + waitForReplicaToReachGtid(t, 3) + stopDoltSqlServer(t) + time.Sleep(1000 * time.Millisecond) + + var err error + doltPort, doltProcess, err = startDoltSqlServer(testDir, nil) + require.NoError(t, err) + + // Check replication status on the replica and assert configuration persisted + status := showReplicaStatus(t) + // The default Connect_Retry interval is 60s; but some tests configure a faster connection retry interval + require.True(t, status["Connect_Retry"] == "5" || status["Connect_Retry"] == "60") + require.Equal(t, "86400", status["Source_Retry_Count"]) + require.Equal(t, "localhost", status["Source_Host"]) + require.NotEmpty(t, status["Source_Port"]) + require.NotEmpty(t, status["Source_User"]) + + // Restart replication on replica + // TODO: For now, we have to set server_id each time we start the service. + // Turn this into a persistent sys var + replicaDatabase.MustExec("set @@global.server_id=123;") + replicaDatabase.MustExec("START REPLICA") + + // Assert that all changes have replicated from the primary + wg.Wait() + waitForReplicaToCatchUp(t) + countMaxQuery := "SELECT COUNT(pk) AS count, MAX(pk) as max FROM db01.t;" + primaryRows, err := primaryDatabase.Queryx(countMaxQuery) + require.NoError(t, err) + replicaRows, err := replicaDatabase.Queryx(countMaxQuery) + require.NoError(t, err) + primaryRow := convertMapScanResultToStrings(readNextRow(t, primaryRows)) + replicaRow := convertMapScanResultToStrings(readNextRow(t, replicaRows)) + require.Equal(t, primaryRow["count"], replicaRow["count"]) + require.Equal(t, primaryRow["max"], replicaRow["max"]) + require.NoError(t, replicaRows.Close()) +} diff --git a/binlogreplication/binlog_replication_test.go b/binlogreplication/binlog_replication_test.go new file mode 100644 index 00000000..4914e1dc --- /dev/null +++ b/binlogreplication/binlog_replication_test.go @@ -0,0 +1,1031 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlogreplication + +import ( + "bufio" + "fmt" + "io" + "math/rand" + "net" + "os" + "os/exec" + "path/filepath" + "reflect" + "regexp" + "runtime" + "slices" + "strconv" + "strings" + "syscall" + "testing" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/require" + + "github.com/dolthub/go-mysql-server/sql/binlogreplication" +) + +var mySqlContainer string +var mySqlPort, doltPort int +var primaryDatabase, replicaDatabase *sqlx.DB +var doltProcess *os.Process +var doltLogFilePath, oldDoltLogFilePath string +var doltLogFile, mysqlLogFile *os.File +var testDir string +var originalWorkingDir string + +// doltReplicaSystemVars are the common system variables that need +// to be set on a Dolt replica before replication is turned on. +var doltReplicaSystemVars = map[string]string{ + "server_id": "42", +} + +func teardown(t *testing.T) { + if mySqlContainer != "" { + if t.Failed() { + fmt.Println("\nMySQL server log:") + if out, err := exec.Command("docker", "logs", mySqlContainer).Output(); err == nil { + fmt.Print(string(out)) + } else { + t.Log(err) + } + } + stopMySqlServer(t) + } + if doltProcess != nil { + stopDoltSqlServer(t) + } + if mysqlLogFile != nil { + mysqlLogFile.Close() + } + if doltLogFile != nil { + doltLogFile.Close() + } + + // Output server logs on failure for easier debugging + if t.Failed() { + if oldDoltLogFilePath != "" { + fmt.Printf("\nDolt server log from %s:\n", oldDoltLogFilePath) + printFile(oldDoltLogFilePath) + } + + fmt.Printf("\nDolt server log from %s:\n", doltLogFilePath) + printFile(doltLogFilePath) + } else { + // clean up temp files on clean test runs + defer os.RemoveAll(testDir) + } + + if toxiClient != nil { + proxies, _ := toxiClient.Proxies() + for _, value := range proxies { + value.Delete() + } + } +} + +// TestBinlogReplicationSanityCheck performs the simplest possible binlog replication test. It starts up +// a MySQL primary and a Dolt replica, and asserts that a CREATE TABLE statement properly replicates to the +// Dolt replica, along with simple insert, update, and delete statements. +func TestBinlogReplicationSanityCheck(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // Create a table on the primary and verify on the replica + primaryDatabase.MustExec("create table tableT (pk int primary key)") + waitForReplicaToCatchUp(t) + assertCreateTableStatement(t, replicaDatabase, "tableT", + "CREATE TABLE tableT ( pk int NOT NULL, PRIMARY KEY (pk)) "+ + "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin") + + // Insert/Update/Delete on the primary + primaryDatabase.MustExec("insert into tableT values(100), (200)") + waitForReplicaToCatchUp(t) + requireReplicaResults(t, "select * from db01.tableT", [][]any{{"100"}, {"200"}}) + primaryDatabase.MustExec("delete from tableT where pk = 100") + waitForReplicaToCatchUp(t) + requireReplicaResults(t, "select * from db01.tableT", [][]any{{"200"}}) + primaryDatabase.MustExec("update tableT set pk = 300") + waitForReplicaToCatchUp(t) + requireReplicaResults(t, "select * from db01.tableT", [][]any{{"300"}}) +} + +// TestAutoRestartReplica tests that a Dolt replica automatically starts up replication if +// replication was running when the replica was shut down. +func TestAutoRestartReplica(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + + // Assert that replication is not running yet + status := queryReplicaStatus(t) + require.Equal(t, "0", status["Last_IO_Errno"]) + require.Equal(t, "", status["Last_IO_Error"]) + require.Equal(t, "0", status["Last_SQL_Errno"]) + require.Equal(t, "", status["Last_SQL_Error"]) + require.Equal(t, "No", status["Replica_IO_Running"]) + require.Equal(t, "No", status["Replica_SQL_Running"]) + + // Start up replication and replicate some test data + startReplicationAndCreateTestDb(t, mySqlPort) + primaryDatabase.MustExec("create table db01.autoRestartTest(pk int primary key);") + waitForReplicaToCatchUp(t) + primaryDatabase.MustExec("insert into db01.autoRestartTest values (100);") + waitForReplicaToCatchUp(t) + requireReplicaResults(t, "select * from db01.autoRestartTest;", [][]any{{"100"}}) + + // Test for the presence of the replica-running state file + require.True(t, fileExists(filepath.Join(testDir, "dolt", ".doltcfg", "replica-running"))) + + // Restart the Dolt replica + stopDoltSqlServer(t) + var err error + doltPort, doltProcess, err = startDoltSqlServer(testDir, nil) + require.NoError(t, err) + + // Assert that some test data replicates correctly + primaryDatabase.MustExec("insert into db01.autoRestartTest values (200);") + waitForReplicaToCatchUp(t) + requireReplicaResults(t, "select * from db01.autoRestartTest;", + [][]any{{"100"}, {"200"}}) + + // SHOW REPLICA STATUS should show that replication is running, with no errors + status = queryReplicaStatus(t) + require.Equal(t, "0", status["Last_IO_Errno"]) + require.Equal(t, "", status["Last_IO_Error"]) + require.Equal(t, "0", status["Last_SQL_Errno"]) + require.Equal(t, "", status["Last_SQL_Error"]) + require.Equal(t, "Yes", status["Replica_IO_Running"]) + require.Equal(t, "Yes", status["Replica_SQL_Running"]) + + // Stop replication and assert the replica-running marker file is removed + replicaDatabase.MustExec("stop replica") + require.False(t, fileExists(filepath.Join(testDir, "dolt", ".doltcfg", "replica-running"))) + + // Restart the Dolt replica + stopDoltSqlServer(t) + doltPort, doltProcess, err = startDoltSqlServer(testDir, nil) + require.NoError(t, err) + + // SHOW REPLICA STATUS should show that replication is NOT running, with no errors + status = queryReplicaStatus(t) + require.Equal(t, "0", status["Last_IO_Errno"]) + require.Equal(t, "", status["Last_IO_Error"]) + require.Equal(t, "0", status["Last_SQL_Errno"]) + require.Equal(t, "", status["Last_SQL_Error"]) + require.Equal(t, "No", status["Replica_IO_Running"]) + require.Equal(t, "No", status["Replica_SQL_Running"]) +} + +// TestBinlogSystemUserIsLocked tests that the binlog applier user is locked and cannot be used to connect to the server. +func TestBinlogSystemUserIsLocked(t *testing.T) { + defer teardown(t) + startSqlServers(t) + + dsn := fmt.Sprintf("%s@tcp(127.0.0.1:%v)/", binlogApplierUser, doltPort) + db, err := sqlx.Open("mysql", dsn) + require.NoError(t, err) + + // Before starting replication, the system account does not exist + err = db.Ping() + require.Error(t, err) + require.ErrorContains(t, err, "User not found") + + // After starting replication, the system account is locked + startReplicationAndCreateTestDb(t, mySqlPort) + err = db.Ping() + require.Error(t, err) + require.ErrorContains(t, err, "Access denied for user") +} + +// TestFlushLogs tests that binary logs can be flushed on the primary, which forces a new binlog file to be written, +// including sending new Rotate and FormatDescription events to the replica. This is a simple sanity tests that we can +// process the events without errors. +func TestFlushLogs(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // Make changes on the primary and verify on the replica + primaryDatabase.MustExec("create table t (pk int primary key)") + waitForReplicaToCatchUp(t) + expectedStatement := "CREATE TABLE t ( pk int NOT NULL, PRIMARY KEY (pk)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin" + assertCreateTableStatement(t, replicaDatabase, "t", expectedStatement) + + primaryDatabase.MustExec("flush binary logs;") + waitForReplicaToCatchUp(t) + + primaryDatabase.MustExec("insert into t values (1), (2), (3);") + waitForReplicaToCatchUp(t) + + requireReplicaResults(t, "select * from db01.t;", [][]any{ + {"1"}, {"2"}, {"3"}, + }) +} + +// TestResetReplica tests that "RESET REPLICA" and "RESET REPLICA ALL" correctly clear out +// replication configuration and metadata. +func TestResetReplica(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // RESET REPLICA returns an error if replication is running + _, err := replicaDatabase.Queryx("RESET REPLICA") + require.Error(t, err) + require.ErrorContains(t, err, "unable to reset replica while replication is running") + + // Calling RESET REPLICA clears out any errors + replicaDatabase.MustExec("STOP REPLICA;") + rows, err := replicaDatabase.Queryx("RESET REPLICA;") + require.NoError(t, err) + require.NoError(t, rows.Close()) + + status := queryReplicaStatus(t) + require.Equal(t, "0", status["Last_Errno"]) + require.Equal(t, "", status["Last_Error"]) + require.Equal(t, "0", status["Last_IO_Errno"]) + require.Equal(t, "", status["Last_IO_Error"]) + require.Equal(t, "", status["Last_IO_Error_Timestamp"]) + require.Equal(t, "0", status["Last_SQL_Errno"]) + require.Equal(t, "", status["Last_SQL_Error"]) + require.Equal(t, "", status["Last_SQL_Error_Timestamp"]) + + // Calling RESET REPLICA ALL clears out all replica configuration + rows, err = replicaDatabase.Queryx("RESET REPLICA ALL;") + require.NoError(t, err) + require.NoError(t, rows.Close()) + status = queryReplicaStatus(t) + require.Equal(t, "", status["Source_Host"]) + require.Equal(t, "", status["Source_User"]) + require.Equal(t, "No", status["Replica_IO_Running"]) + require.Equal(t, "No", status["Replica_SQL_Running"]) + + rows, err = replicaDatabase.Queryx("select * from mysql.slave_master_info;") + require.NoError(t, err) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) + + // Start replication again and verify that we can still query replica status + startReplicationAndCreateTestDb(t, mySqlPort) + replicaStatus := showReplicaStatus(t) + require.Equal(t, "0", replicaStatus["Last_Errno"]) + require.Equal(t, "", replicaStatus["Last_Error"]) + require.True(t, replicaStatus["Replica_IO_Running"] == binlogreplication.ReplicaIoRunning || + replicaStatus["Replica_IO_Running"] == binlogreplication.ReplicaIoConnecting) +} + +// TestStartReplicaErrors tests that the "START REPLICA" command returns appropriate responses +// for various error conditions. +func TestStartReplicaErrors(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + + // START REPLICA returns an error when no replication source is configured + _, err := replicaDatabase.Queryx("START REPLICA;") + require.Error(t, err) + require.ErrorContains(t, err, ErrServerNotConfiguredAsReplica.Error()) + + // For an incomplete source configuration, throw an error as early as possible to make sure the user notices it. + replicaDatabase.MustExec("CHANGE REPLICATION SOURCE TO SOURCE_PORT=1234, SOURCE_HOST='localhost';") + rows, err := replicaDatabase.Queryx("START REPLICA;") + require.Error(t, err) + require.ErrorContains(t, err, "Invalid (empty) username") + require.Nil(t, rows) + + // SOURCE_AUTO_POSITION cannot be disabled – we only support GTID positioning + rows, err = replicaDatabase.Queryx("CHANGE REPLICATION SOURCE TO SOURCE_PORT=1234, " + + "SOURCE_HOST='localhost', SOURCE_USER='replicator', SOURCE_AUTO_POSITION=0;") + require.Error(t, err) + require.ErrorContains(t, err, "Error 1105 (HY000): SOURCE_AUTO_POSITION cannot be disabled") + require.Nil(t, rows) + + // START REPLICA logs a warning if replication is already running + startReplicationAndCreateTestDb(t, mySqlPort) + replicaDatabase.MustExec("START REPLICA;") + assertWarning(t, replicaDatabase, 3083, "Replication thread(s) for channel '' are already running.") +} + +// TestShowReplicaStatus tests various cases "SHOW REPLICA STATUS" that aren't covered by other tests. +func TestShowReplicaStatus(t *testing.T) { + defer teardown(t) + startSqlServers(t) + + // Assert that very long hostnames are handled correctly + longHostname := "really.really.really.really.long.host.name.012345678901234567890123456789012345678901234567890123456789.com" + replicaDatabase.MustExec(fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s';", longHostname)) + status := showReplicaStatus(t) + require.Equal(t, longHostname, status["Source_Host"]) +} + +// TestStopReplica tests that STOP REPLICA correctly stops the replication process, and that +// warnings are logged when STOP REPLICA is invoked when replication is not running. +func TestStopReplica(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + + // STOP REPLICA logs a warning if replication is not running + replicaDatabase.MustExec("STOP REPLICA;") + assertWarning(t, replicaDatabase, 3084, "Replication thread(s) for channel '' are already stopped.") + + // Start replication with bad connection params + replicaDatabase.MustExec("CHANGE REPLICATION SOURCE TO SOURCE_HOST='doesnotexist', SOURCE_PORT=111, SOURCE_USER='nobody';") + replicaDatabase.MustExec("START REPLICA;") + time.Sleep(200 * time.Millisecond) + status := showReplicaStatus(t) + require.Equal(t, "Connecting", status["Replica_IO_Running"]) + require.Equal(t, "Yes", status["Replica_SQL_Running"]) + + // STOP REPLICA works when replication cannot establish a connection + replicaDatabase.MustExec("STOP REPLICA;") + status = showReplicaStatus(t) + require.Equal(t, "No", status["Replica_IO_Running"]) + require.Equal(t, "No", status["Replica_SQL_Running"]) + + // START REPLICA and verify status + startReplicationAndCreateTestDb(t, mySqlPort) + time.Sleep(100 * time.Millisecond) + status = showReplicaStatus(t) + require.True(t, status["Replica_IO_Running"] == "Connecting" || status["Replica_IO_Running"] == "Yes") + require.Equal(t, "Yes", status["Replica_SQL_Running"]) + + // STOP REPLICA stops replication when it is running and connected to the source + replicaDatabase.MustExec("STOP REPLICA;") + status = showReplicaStatus(t) + require.Equal(t, "No", status["Replica_IO_Running"]) + require.Equal(t, "No", status["Replica_SQL_Running"]) + + // STOP REPLICA logs a warning if replication is not running + replicaDatabase.MustExec("STOP REPLICA;") + assertWarning(t, replicaDatabase, 3084, "Replication thread(s) for channel '' are already stopped.") +} + +// TestForeignKeyChecks tests that foreign key constraints replicate correctly when foreign key checks are +// enabled and disabled. +func TestForeignKeyChecks(t *testing.T) { + t.SkipNow() + + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // Test that we can execute statement-based replication that requires foreign_key_checks + // being turned off (referenced table doesn't exist yet). + primaryDatabase.MustExec("SET foreign_key_checks = 0;") + primaryDatabase.MustExec("CREATE TABLE t1 (pk int primary key, color varchar(100), FOREIGN KEY (color) REFERENCES colors(name));") + primaryDatabase.MustExec("CREATE TABLE colors (name varchar(100) primary key);") + primaryDatabase.MustExec("SET foreign_key_checks = 1;") + + // Insert a record with foreign key checks enabled + primaryDatabase.MustExec("START TRANSACTION;") + primaryDatabase.MustExec("INSERT INTO colors VALUES ('green'), ('red'), ('blue');") + primaryDatabase.MustExec("INSERT INTO t1 VALUES (1, 'red'), (2, 'green');") + primaryDatabase.MustExec("COMMIT;") + + // Test the Insert path with foreign key checks turned off + primaryDatabase.MustExec("START TRANSACTION;") + primaryDatabase.MustExec("SET foreign_key_checks = 0;") + primaryDatabase.MustExec("INSERT INTO t1 VALUES (3, 'not-a-color');") + primaryDatabase.MustExec("COMMIT;") + + // Test the Update and Delete paths with foreign key checks turned off + primaryDatabase.MustExec("START TRANSACTION;") + primaryDatabase.MustExec("DELETE FROM colors WHERE name='red';") + primaryDatabase.MustExec("UPDATE t1 SET color='still-not-a-color' WHERE pk=2;") + primaryDatabase.MustExec("COMMIT;") + + // Verify the changes on the replica + waitForReplicaToCatchUp(t) + rows, err := replicaDatabase.Queryx("select * from db01.t1 order by pk;") + require.NoError(t, err) + row := convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "1", row["pk"]) + require.Equal(t, "red", row["color"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "2", row["pk"]) + require.Equal(t, "still-not-a-color", row["color"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "3", row["pk"]) + require.Equal(t, "not-a-color", row["color"]) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) + + rows, err = replicaDatabase.Queryx("select * from db01.colors order by name;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "blue", row["name"]) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "green", row["name"]) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) +} + +// TestCharsetsAndCollations tests that we can successfully replicate data using various charsets and collations. +func TestCharsetsAndCollations(t *testing.T) { + t.SkipNow() + + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) + startReplicationAndCreateTestDb(t, mySqlPort) + + // Use non-default charset/collations to create data on the primary + primaryDatabase.MustExec("CREATE TABLE t1 (pk int primary key, c1 varchar(255) COLLATE ascii_general_ci, c2 varchar(255) COLLATE utf16_general_ci);") + primaryDatabase.MustExec("insert into t1 values (1, \"one\", \"one\");") + + // Verify on the replica + waitForReplicaToCatchUp(t) + rows, err := replicaDatabase.Queryx("show create table db01.t1;") + require.NoError(t, err) + row := convertMapScanResultToStrings(readNextRow(t, rows)) + require.Contains(t, row["Create Table"], "ascii_general_ci") + require.Contains(t, row["Create Table"], "utf16_general_ci") + require.NoError(t, rows.Close()) + + rows, err = replicaDatabase.Queryx("select * from db01.t1;") + require.NoError(t, err) + row = convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, "one", row["c1"]) + require.Equal(t, "\x00o\x00n\x00e", row["c2"]) + require.NoError(t, rows.Close()) +} + +// +// Test Helper Functions +// + +// waitForReplicaToCatchUp waits (up to 30s) for the replica to catch up with the primary database. The +// lag is measured by checking that gtid_executed is the same on the primary and replica. +func waitForReplicaToCatchUp(t *testing.T) { + timeLimit := 30 * time.Second + + endTime := time.Now().Add(timeLimit) + for time.Now().Before(endTime) { + replicaGtid := queryGtid(t, replicaDatabase) + primaryGtid := queryGtid(t, primaryDatabase) + + if primaryGtid == replicaGtid { + return + } else { + fmt.Printf("primary and replica not in sync yet... (primary: %s, replica: %s)\n", primaryGtid, replicaGtid) + time.Sleep(250 * time.Millisecond) + } + } + + // Log some status of the replica, before failing the test + outputShowReplicaStatus(t) + t.Fatal("primary and replica did not synchronize within " + timeLimit.String()) +} + +// outputShowReplicaStatus prints out replica status information. This is useful for debugging +// replication failures in tests since status will show whether the replica is successfully connected, +// any recent errors, and what GTIDs have been executed. +func outputShowReplicaStatus(t *testing.T) { + newRows, err := replicaDatabase.Queryx("show replica status;") + require.NoError(t, err) + allNewRows := readAllRowsIntoMaps(t, newRows) + fmt.Printf("\n\nSHOW REPLICA STATUS: %v\n", allNewRows) +} + +// waitForReplicaToReachGtid waits (up to 10s) for the replica's @@gtid_executed sys var to show that +// it has executed the |target| gtid transaction number. +func waitForReplicaToReachGtid(t *testing.T, target int) { + timeLimit := 10 * time.Second + endTime := time.Now().Add(timeLimit) + for time.Now().Before(endTime) { + time.Sleep(250 * time.Millisecond) + replicaGtid := queryGtid(t, replicaDatabase) + + if replicaGtid != "" { + components := strings.Split(replicaGtid, ":") + require.Equal(t, 2, len(components)) + sourceGtid := components[1] + if strings.Contains(sourceGtid, "-") { + gtidRange := strings.Split(sourceGtid, "-") + require.Equal(t, 2, len(gtidRange)) + sourceGtid = gtidRange[1] + } + + i, err := strconv.Atoi(sourceGtid) + require.NoError(t, err) + if i >= target { + return + } + } + + fmt.Printf("replica has not reached transaction %d yet; currently at: %s \n", target, replicaGtid) + } + + t.Fatal("replica did not reach target GTID within " + timeLimit.String()) +} + +// assertWarning asserts that the specified |database| has a warning with |code| and |message|, +// otherwise it will fail the current test. +func assertWarning(t *testing.T, database *sqlx.DB, code int, message string) { + rows, err := database.Queryx("SHOW WARNINGS;") + require.NoError(t, err) + warning := convertMapScanResultToStrings(readNextRow(t, rows)) + require.Equal(t, strconv.Itoa(code), warning["Code"]) + require.Equal(t, message, warning["Message"]) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) +} + +func queryGtid(t *testing.T, database *sqlx.DB) string { + rows, err := database.Queryx("SELECT @@global.gtid_executed as gtid_executed;") + require.NoError(t, err) + defer rows.Close() + row := convertMapScanResultToStrings(readNextRow(t, rows)) + if row["gtid_executed"] == nil { + t.Fatal("no value for @@GLOBAL.gtid_executed") + } + return row["gtid_executed"].(string) +} + +func readNextRow(t *testing.T, rows *sqlx.Rows) map[string]interface{} { + row := make(map[string]interface{}) + require.True(t, rows.Next()) + err := rows.MapScan(row) + require.NoError(t, err) + return row +} + +// readAllRowsIntoMaps reads all data from |rows| and returns a slice of maps, where each key +// in the map is the field name, and each value is the string representation of the field value. +func readAllRowsIntoMaps(t *testing.T, rows *sqlx.Rows) []map[string]interface{} { + result := make([]map[string]interface{}, 0) + for { + row := make(map[string]interface{}) + if rows.Next() == false { + return result + } + err := rows.MapScan(row) + require.NoError(t, err) + row = convertMapScanResultToStrings(row) + result = append(result, row) + } +} + +// readAllRowsIntoSlices reads all data from |rows| and returns a slice of slices, with +// all values converted to strings. +func readAllRowsIntoSlices(t *testing.T, rows *sqlx.Rows) [][]any { + result := make([][]any, 0) + for { + if rows.Next() == false { + return result + } + row, err := rows.SliceScan() + require.NoError(t, err) + row = convertSliceScanResultToStrings(row) + result = append(result, row) + } +} + +// startSqlServers starts a MySQL server and a Dolt sql-server for use in tests. +func startSqlServers(t *testing.T) { + startSqlServersWithDoltSystemVars(t, nil) +} + +// startSqlServersWithDoltSystemVars starts a MySQL server and a Dolt sql-server for use in tests. Before the +// Dolt sql-server is started, the specified |doltPersistentSystemVars| are persisted in the Dolt sql-server's +// local configuration. These are useful when you need to set system variables that must be available when the +// sql-server starts up, such as replication system variables. +func startSqlServersWithDoltSystemVars(t *testing.T, doltPersistentSystemVars map[string]string) { + if runtime.GOOS == "windows" { + t.Skip("Skipping binlog replication integ tests on Windows OS") + } else if runtime.GOOS == "darwin" && os.Getenv("CI") == "true" { + t.Skip("Skipping binlog replication integ tests in CI environment on Mac OS") + } + + testDir = filepath.Join(os.TempDir(), fmt.Sprintf("%s-%v", t.Name(), time.Now().Unix())) + err := os.MkdirAll(testDir, 0777) + + cmd := exec.Command("chmod", "777", testDir) + _, err = cmd.Output() + if err != nil { + panic(err) + } + + require.NoError(t, err) + fmt.Printf("temp dir: %v \n", testDir) + + // Start up primary and replica databases + mySqlPort, mySqlContainer, err = startMySqlServer(testDir) + require.NoError(t, err) + doltPort, doltProcess, err = startDoltSqlServer(testDir, doltPersistentSystemVars) + require.NoError(t, err) +} + +// stopMySqlServer stops the running MySQL server. If any errors are encountered while stopping +// the MySQL server, this function will fail the current test. +func stopMySqlServer(t *testing.T) error { + cmd := exec.Command("docker", "rm", "-f", mySqlContainer) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("unable to stop MySQL container: %v - %s", err, output) + } + return nil +} + +// stopDoltSqlServer stops the running Dolt sql-server. If any errors are encountered while +// stopping the Dolt sql-server, this function will fail the current test. +func stopDoltSqlServer(t *testing.T) { + // Use the negative process ID so that we grab the entire process group. + // This is necessary to kill all the processes the child spawns. + // Note that we use os.FindProcess, instead of syscall.Kill, since syscall.Kill + // is not available on windows. + p, err := os.FindProcess(-doltProcess.Pid) + require.NoError(t, err) + + err = p.Signal(syscall.SIGKILL) + require.NoError(t, err) + time.Sleep(250 * time.Millisecond) + + // Remove the sql-server lock file so that we can restart cleanly + lockFilepath := filepath.Join(testDir, "dolt", "db01", ".dolt", "sql-server.lock") + stat, _ := os.Stat(lockFilepath) + if stat != nil { + err = os.Remove(lockFilepath) + require.NoError(t, err) + } + // Remove the global sql-server lock file as well + lockFilepath = filepath.Join(testDir, "dolt", ".dolt", "sql-server.lock") + stat, _ = os.Stat(lockFilepath) + if stat != nil { + err = os.Remove(lockFilepath) + require.NoError(t, err) + } +} + +// startReplication configures the replication source on the replica and runs the START REPLICA statement. +func startReplication(_ *testing.T, port int) { + replicaDatabase.MustExec( + fmt.Sprintf("change replication source to SOURCE_HOST='localhost', "+ + "SOURCE_USER='replicator', SOURCE_PASSWORD='Zqr8_blrGm1!', "+ + "SOURCE_PORT=%v, SOURCE_AUTO_POSITION=1, SOURCE_CONNECT_RETRY=5;", port)) + + replicaDatabase.MustExec("start replica;") +} + +// startReplicationAndCreateTestDb starts up replication on the replica, connecting to |port| on the primary, +// creates the test database, db01, on the primary, and ensures it gets replicated to the replica. +func startReplicationAndCreateTestDb(t *testing.T, port int) { + startReplicationAndCreateTestDbWithDelay(t, port, 100*time.Millisecond) +} + +// startReplicationAndCreateTestDbWithDelay starts up replication on the replica, connecting to |port| on the primary, +// pauses for |delay| before creating the test database, db01, on the primary, and ensures it +// gets replicated to the replica. +func startReplicationAndCreateTestDbWithDelay(t *testing.T, port int, delay time.Duration) { + startReplication(t, port) + time.Sleep(delay) + + // Look to see if the test database, db01, has been created yet. If not, create it and wait for it to + // replicate to the replica. Note that when re-starting replication in certain tests, we can't rely on + // the replica to contain all GTIDs (i.e. Dolt -> MySQL replication when restarting the replica, since + // Dolt doesn't yet resend events that occurred while the replica wasn't connected). + dbNames := mustListDatabases(t, primaryDatabase) + if !slices.Contains(dbNames, "db01") { + primaryDatabase.MustExec("create database db01;") + waitForReplicaToCatchUp(t) + } + primaryDatabase.MustExec("use db01;") + _, _ = replicaDatabase.Exec("use db01;") +} + +func assertCreateTableStatement(t *testing.T, database *sqlx.DB, table string, expectedStatement string) { + rows, err := database.Queryx("show create table db01." + table + ";") + require.NoError(t, err) + var actualTable, actualStatement string + require.True(t, rows.Next()) + err = rows.Scan(&actualTable, &actualStatement) + require.NoError(t, err) + require.Equal(t, table, actualTable) + require.NotNil(t, actualStatement) + actualStatement = sanitizeCreateTableString(actualStatement) + require.Equal(t, expectedStatement, actualStatement) +} + +func sanitizeCreateTableString(statement string) string { + statement = strings.ReplaceAll(statement, "`", "") + statement = strings.ReplaceAll(statement, "\n", "") + regex := regexp.MustCompile("\\s+") + return regex.ReplaceAllString(statement, " ") +} + +// findFreePort returns an available port that can be used for a server. If any errors are +// encountered, this function will panic and fail the current test. +func findFreePort() int { + listener, err := net.Listen("tcp", ":0") + if err != nil { + panic(fmt.Sprintf("unable to find available TCP port: %v", err.Error())) + } + freePort := listener.Addr().(*net.TCPAddr).Port + err = listener.Close() + if err != nil { + panic(fmt.Sprintf("unable to find available TCP port: %v", err.Error())) + } + + if freePort < 0 { + panic(fmt.Sprintf("unable to find available TCP port; found port %v", freePort)) + } + + return freePort +} + +// startMySqlServer configures a starts a fresh MySQL server instance in a Docker container +// and returns the port it is running on. If unable to start up the MySQL server, an error is returned. +func startMySqlServer(dir string) (int, string, error) { + mySqlPort = findFreePort() + + // Use a random name for the container to avoid conflicts + mySqlContainer = "mysql-test-" + strconv.Itoa(rand.Int()) + + // Build the Docker command to start the MySQL container + cmd := exec.Command("docker", "run", + "--rm", // Remove the container when it stops + "-d", // Run in detached mode + "-p", fmt.Sprintf("%d:3306", mySqlPort), // Map the container's port 3306 to the host's mySqlPort + "-e", "MYSQL_ROOT_PASSWORD=password", // Set the root password + "-v", fmt.Sprintf("%s:/var/lib/mysql", dir), // Mount a volume for data persistence + "--name", mySqlContainer, // Give the container a name + "mysql:latest", // Use the latest MySQL image + "mysqld", + "--gtid_mode=ON", + "--enforce-gtid-consistency=ON", + ) + + // Execute the Docker command + output, err := cmd.CombinedOutput() + if err != nil { + return -1, "", fmt.Errorf("unable to start MySQL container: %v - %s", err, output) + } + + // Wait for the MySQL server to be ready + dsn := fmt.Sprintf("root:password@tcp(127.0.0.1:%v)/", mySqlPort) + primaryDatabase = sqlx.MustOpen("mysql", dsn) + + err = waitForSqlServerToStart(primaryDatabase) + if err != nil { + return -1, "", err + } + + // Ensure the replication user exists with the right grants + mustCreateReplicatorUser(primaryDatabase) + + fmt.Printf("MySQL server started in container %s on port %v \n", mySqlContainer, mySqlPort) + + return mySqlPort, mySqlContainer, nil +} + +// directoryExists returns true if the specified |path| is to a directory that exists, otherwise, +// if the path doesn't exist or isn't a directory, false is returned. +func directoryExists(path string) bool { + info, err := os.Stat(path) + if os.IsNotExist(err) { + return false + } + return info.IsDir() +} + +var cachedDoltDevBuildPath = "" + +func initializeDevDoltBuild(dir string, goDirPath string) string { + if cachedDoltDevBuildPath != "" { + return cachedDoltDevBuildPath + } + + // If we're not in a CI environment, don't worry about building a dev build + if os.Getenv("CI") != "true" { + return "" + } + + basedir := filepath.Dir(filepath.Dir(dir)) + fullpath := filepath.Join(basedir, fmt.Sprintf("devDolt-%d", os.Getpid())) + + _, err := os.Stat(fullpath) + if err == nil { + return fullpath + } + + fmt.Printf("building dolt dev build at: %s \n", fullpath) + cmd := exec.Command("go", "build", "-o", fullpath) + cmd.Dir = goDirPath + + output, err := cmd.CombinedOutput() + if err != nil { + panic("unable to build dolt for binlog integration tests: " + err.Error() + "\nFull output: " + string(output) + "\n") + } + cachedDoltDevBuildPath = fullpath + + return cachedDoltDevBuildPath +} + +// startDoltSqlServer starts a Dolt sql-server on a free port from the specified directory |dir|. If +// |doltPeristentSystemVars| is populated, then those system variables will be set, persistently, for +// the Dolt database, before the Dolt sql-server is started. +func startDoltSqlServer(dir string, doltPersistentSystemVars map[string]string) (int, *os.Process, error) { + dir = filepath.Join(dir, "dolt") + err := os.MkdirAll(dir, 0777) + if err != nil { + return -1, nil, err + } + + // If we already assigned a port, re-use it. This is useful when testing restarting a primary, since + // we want the primary to come back up on the same port, so the replica can reconnect. + if doltPort < 1 { + doltPort = findFreePort() + } + fmt.Printf("Starting Dolt sql-server on port: %d, with data dir %s\n", doltPort, dir) + + // take the CWD and move up four directories to find the go directory + if originalWorkingDir == "" { + var err error + originalWorkingDir, err = os.Getwd() + if err != nil { + panic(err) + } + } + goDirPath := filepath.Join(originalWorkingDir, "..") + err = os.Chdir(goDirPath) + if err != nil { + panic(err) + } + + args := []string{"go", "run", ".", + // "--loglevel=TRACE", + fmt.Sprintf("--port=%v", doltPort), + } + + // If we're running in CI, use a precompiled dolt binary instead of go run + devDoltPath := initializeDevDoltBuild(dir, goDirPath) + if devDoltPath != "" { + args[2] = devDoltPath + args = args[2:] + } + cmd := exec.Command(args[0], args[1:]...) + + // Set a unique process group ID so that we can cleanly kill this process, as well as + // any spawned child processes later. Mac/Unix can set the "Setpgid" field directly, but + // on windows, this field isn't present, so we need to use reflection so that this code + // can still compile for windows, even though we don't run it there. + procAttr := &syscall.SysProcAttr{} + ps := reflect.ValueOf(procAttr) + s := ps.Elem() + f := s.FieldByName("Setpgid") + f.SetBool(true) + cmd.SysProcAttr = procAttr + + // Some tests restart the Dolt sql-server, so if we have a current log file, save a reference + // to it so we can print the results later if the test fails. + if doltLogFilePath != "" { + oldDoltLogFilePath = doltLogFilePath + } + + doltLogFilePath = filepath.Join(dir, fmt.Sprintf("dolt-%d.out.log", time.Now().Unix())) + doltLogFile, err = os.Create(doltLogFilePath) + if err != nil { + return -1, nil, err + } + fmt.Printf("dolt sql-server logs at: %s \n", doltLogFilePath) + cmd.Stdout = doltLogFile + cmd.Stderr = doltLogFile + err = cmd.Start() + if err != nil { + return -1, nil, fmt.Errorf("unable to execute command %v: %v", cmd.String(), err.Error()) + } + + fmt.Printf("Dolt CMD: %s\n", cmd.String()) + + dsn := fmt.Sprintf("%s@tcp(127.0.0.1:%v)/", "root", doltPort) + replicaDatabase = sqlx.MustOpen("mysql", dsn) + + err = waitForSqlServerToStart(replicaDatabase) + if err != nil { + return -1, nil, err + } + + mustCreateReplicatorUser(replicaDatabase) + fmt.Printf("Dolt server started on port %v \n", doltPort) + + return doltPort, cmd.Process, nil +} + +// mustCreateReplicatorUser creates the replicator user on the specified |db| and grants them replication slave privs. +func mustCreateReplicatorUser(db *sqlx.DB) { + db.MustExec("CREATE USER if not exists 'replicator'@'%' IDENTIFIED BY 'Zqr8_blrGm1!';") + db.MustExec("GRANT REPLICATION SLAVE ON *.* TO 'replicator'@'%';") +} + +// waitForSqlServerToStart polls the specified database to wait for it to become available, pausing +// between retry attempts, and returning an error if it is not able to verify that the database is +// available. +func waitForSqlServerToStart(database *sqlx.DB) error { + fmt.Printf("Waiting for server to start...\n") + for counter := 0; counter < 30; counter++ { + if database.Ping() == nil { + return nil + } + fmt.Printf("not up yet; waiting...\n") + time.Sleep(500 * time.Millisecond) + } + + return database.Ping() +} + +// printFile opens the specified filepath |path| and outputs the contents of that file to stdout. +func printFile(path string) { + file, err := os.Open(path) + if err != nil { + fmt.Printf("Unable to open file: %s \n", err) + return + } + defer file.Close() + + reader := bufio.NewReader(file) + for { + s, err := reader.ReadString(byte('\n')) + if err != nil { + if err == io.EOF { + break + } else { + panic(err) + } + } + fmt.Print(s) + } + fmt.Println() +} + +// assertRepoStateFileExists asserts that the repo_state.json file is present for the specified +// database |db|. +func assertRepoStateFileExists(t *testing.T, db string) { + repoStateFile := filepath.Join(testDir, "dolt", db, ".dolt", "repo_state.json") + + _, err := os.Stat(repoStateFile) + require.NoError(t, err) +} + +// requireReplicaResults runs the specified |query| on the replica database and asserts that the results match +// |expectedResults|. Note that the actual results are converted to string values in almost all cases, due to +// limitations in the SQL library we use to query the replica database, so |expectedResults| should generally +// be expressed in strings. +func requireReplicaResults(t *testing.T, query string, expectedResults [][]any) { + requireResults(t, replicaDatabase, query, expectedResults) +} + +// requireReplicaResults runs the specified |query| on the primary database and asserts that the results match +// |expectedResults|. Note that the actual results are converted to string values in almost all cases, due to +// limitations in the SQL library we use to query the replica database, so |expectedResults| should generally +// be expressed in strings. +func requirePrimaryResults(t *testing.T, query string, expectedResults [][]any) { + requireResults(t, primaryDatabase, query, expectedResults) +} + +func requireResults(t *testing.T, db *sqlx.DB, query string, expectedResults [][]any) { + rows, err := db.Queryx(query) + require.NoError(t, err) + allRows := readAllRowsIntoSlices(t, rows) + require.Equal(t, len(expectedResults), len(allRows), "Expected %v, got %v", expectedResults, allRows) + for i := range expectedResults { + require.Equal(t, expectedResults[i], allRows[i], "Expected %v, got %v", expectedResults[i], allRows[i]) + } + require.NoError(t, rows.Close()) +} + +// queryReplicaStatus returns the results of `SHOW REPLICA STATUS` as a map, for the replica +// database. If any errors are encountered, this function will fail the current test. +func queryReplicaStatus(t *testing.T) map[string]any { + rows, err := replicaDatabase.Queryx("SHOW REPLICA STATUS;") + require.NoError(t, err) + status := convertMapScanResultToStrings(readNextRow(t, rows)) + require.NoError(t, rows.Close()) + return status +} + +// mustListDatabases returns a string slice of the databases (i.e. schemas) available on the specified |db|. If +// any errors are encountered, this function will fail the current test. +func mustListDatabases(t *testing.T, db *sqlx.DB) []string { + rows, err := db.Queryx("show databases;") + require.NoError(t, err) + allRows := readAllRowsIntoSlices(t, rows) + dbNames := make([]string, len(allRows)) + for i, row := range allRows { + dbNames[i] = row[0].(string) + } + return dbNames +} diff --git a/executor.go b/executor.go index 2ac676e3..8346d813 100644 --- a/executor.go +++ b/executor.go @@ -20,7 +20,9 @@ import ( "sync" "github.com/dolthub/go-mysql-server/sql" + "github.com/dolthub/go-mysql-server/sql/expression" "github.com/dolthub/go-mysql-server/sql/plan" + "github.com/dolthub/go-mysql-server/sql/transform" "github.com/dolthub/go-mysql-server/sql/types" "github.com/sirupsen/logrus" ) @@ -79,8 +81,6 @@ func (b *DuckBuilder) Build(ctx *sql.Context, root sql.Node, r sql.Row) (sql.Row // Handle special queries switch ctx.Query() { - case "select @@version_comment limit 1": - return b.base.Build(ctx, root, r) case "SELECT DATABASE()": return b.base.Build(ctx, root, r) } @@ -88,7 +88,22 @@ func (b *DuckBuilder) Build(ctx *sql.Context, root sql.Node, r sql.Row) (sql.Row switch n.(type) { case *plan.CreateDB, *plan.DropDB, *plan.DropTable, *plan.RenameTable, *plan.CreateTable, *plan.AddColumn, *plan.RenameColumn, *plan.DropColumn, *plan.ModifyColumn, - *plan.ShowTables, *plan.ShowCreateTable: + *plan.ShowTables, *plan.ShowCreateTable, + *plan.ShowBinlogs, *plan.ShowBinlogStatus, *plan.ShowWarnings: + return b.base.Build(ctx, root, r) + } + + // Fallback to the base builder if the plan contains system/user variables + foundVariable := false + transform.InspectExpressions(n, func(e sql.Expression) bool { + switch e.(type) { + case *expression.SystemVar, *expression.UserVar: + foundVariable = true + return false + } + return true + }) + if foundVariable { return b.base.Build(ctx, root, r) } diff --git a/go.mod b/go.mod index 29ec38d1..f723e28a 100644 --- a/go.mod +++ b/go.mod @@ -4,17 +4,25 @@ go 1.22.4 require ( github.com/DATA-DOG/go-sqlmock v1.5.2 + github.com/Shopify/toxiproxy/v2 v2.9.0 github.com/dolthub/doltgresql v0.11.1 github.com/dolthub/go-mysql-server v0.18.2-0.20240815142344-761713e36043 github.com/dolthub/vitess v0.0.0-20240807181005-71d735078e24 + github.com/go-sql-driver/mysql v1.8.1 + github.com/jmoiron/sqlx v1.4.0 github.com/lib/pq v1.10.9 github.com/marcboeker/go-duckdb v1.7.1 + github.com/prometheus/client_golang v1.19.0 + github.com/rs/zerolog v1.33.0 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.9.0 + gopkg.in/src-d/go-errors.v1 v1.0.0 ) require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/apache/arrow/go/v17 v17.0.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cockroachdb/apd/v2 v2.0.3-0.20200518165714-d020e156310a // indirect github.com/cockroachdb/errors v1.7.5 // indirect @@ -33,19 +41,26 @@ require ( github.com/google/flatbuffers v24.3.25+incompatible // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/mux v1.8.1 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect - github.com/kr/pretty v0.3.0 // indirect + github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/lestrrat-go/strftime v1.0.4 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pierrre/geohash v1.0.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.6.0 // indirect + github.com/prometheus/common v0.50.0 // indirect + github.com/prometheus/procfs v0.13.0 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect + github.com/rs/xid v1.5.0 // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/tetratelabs/wazero v1.1.0 // indirect github.com/twpayne/go-geom v1.3.6 // indirect @@ -65,6 +80,6 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/grpc v1.63.2 // indirect google.golang.org/protobuf v1.34.2 // indirect - gopkg.in/src-d/go-errors.v1 v1.0.0 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index da2c9351..64f43b1b 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8 github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/Shopify/toxiproxy/v2 v2.9.0 h1:DIaDZG2/r/kv3Em6UxYBUVnnWl1mHlYTGFv+sTPV7VI= +github.com/Shopify/toxiproxy/v2 v2.9.0/go.mod h1:2uPRyxR46fsx2yUr9i8zcejzdkWfK7p6G23jV/X6YNs= github.com/TomiHiltunen/geohash-golang v0.0.0-20150112065804-b3e4e625abfb h1:wumPkzt4zaxO4rHPBrjDK8iZMR41C1qs7njNqlacwQg= github.com/TomiHiltunen/geohash-golang v0.0.0-20150112065804-b3e4e625abfb/go.mod h1:QiYsIBRQEO+Z4Rz7GoI+dsHVneZNONvhczuA+llOZNM= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= @@ -49,6 +51,7 @@ github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZw github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/broady/gogeohash v0.0.0-20120525094510-7b2c40d64042 h1:iEdmkrNMLXbM7ecffOAtZJQOQUTE4iMonxrb5opUgE4= @@ -83,6 +86,7 @@ github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -150,14 +154,15 @@ github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= -github.com/go-sql-driver/mysql v1.7.2-0.20231213112541-0004702b931d h1:QQP1nE4qh5aHTGvI1LgOFxZYVxYoGeMfbNHikogPyoA= -github.com/go-sql-driver/mysql v1.7.2-0.20231213112541-0004702b931d/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -210,6 +215,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= @@ -253,6 +260,8 @@ github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/ github.com/iris-contrib/i18n v0.0.0-20171121225848-987a633949d0/go.mod h1:pMCz62A0xJL6I+umB2YTlFRwWXaDFA0jy+5HzGiJjqI= github.com/iris-contrib/schema v0.0.1/go.mod h1:urYA3uvUNG1TIIjOSCzHr9/LmbQo8LrOcOqfqxa4hXw= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= +github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -283,8 +292,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -307,12 +316,20 @@ github.com/marcboeker/go-duckdb v1.7.1 h1:m9/nKfP7cG9AptcQ95R1vfacRuhtrZE5pZF8BP github.com/marcboeker/go-duckdb v1.7.1/go.mod h1:2oV8BZv88S16TKGKM+Lwd0g7DX84x0jMxjTInThC8Is= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg= @@ -348,8 +365,6 @@ github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7 github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= @@ -389,6 +404,7 @@ github.com/pierrre/geohash v1.0.0 h1:f/zfjdV4rVofTCz1FhP07T+EMQAvcMM2ioGZVt+zqjI github.com/pierrre/geohash v1.0.0/go.mod h1:atytaeVa21hj5F6kMebHYPf8JbIrGxK2FSzN2ajKXms= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -401,25 +417,37 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= +github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= +github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= +github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= +github.com/prometheus/common v0.50.0 h1:YSZE6aa9+luNa2da6/Tik0q0A5AbR+U003TItK57CPQ= +github.com/prometheus/common v0.50.0/go.mod h1:wHFBCEVWVmHMUpg7pYcOm2QUR/ocQdYSJVQJKnHc3xQ= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= +github.com/prometheus/procfs v0.13.0 h1:GqzLlQyfsPbaEHaQkO7tbDlriv/4o5Hudv6OXHGKX7o= +github.com/prometheus/procfs v0.13.0/go.mod h1:cd4PFCR54QLnGKPaKGA6l+cfuNXtht43ZKY6tow0Y1g= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= +github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= @@ -602,7 +630,10 @@ golang.org/x/sys v0.0.0-20200121082415-34d275377bf9/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -687,8 +718,8 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= -gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= @@ -699,6 +730,7 @@ gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3M gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/src-d/go-errors.v1 v1.0.0 h1:cooGdZnCjYbeS1zb1s6pVAAimTdKceRrpn7aKOnNIfc= gopkg.in/src-d/go-errors.v1 v1.0.0/go.mod h1:q1cBlomlw2FnDBDNGlnh6X0jPihy+QxZfMMNxPCbdYg= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= diff --git a/main.go b/main.go index cf4033ea..ec6d18f2 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ package main import ( + "flag" "fmt" "github.com/apecloud/myduckserver/meta" @@ -46,7 +47,13 @@ func checkDependencies() { } } +func init() { + flag.StringVar(&address, "address", address, "The address to bind to.") + flag.IntVar(&port, "port", port, "The port to bind to.") +} + func main() { + flag.Parse() checkDependencies() provider, err := meta.NewDBProvider(dbFile) diff --git a/meta/table.go b/meta/table.go index 86ca7e47..6d7ddb39 100644 --- a/meta/table.go +++ b/meta/table.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/dolthub/go-mysql-server/sql" + "github.com/marcboeker/go-duckdb" ) type Table struct { @@ -15,6 +16,7 @@ type Table struct { } var _ sql.Table = (*Table)(nil) +var _ sql.PrimaryKeyTable = (*Table)(nil) var _ sql.AlterableTable = (*Table)(nil) var _ sql.InsertableTable = (*Table)(nil) var _ sql.UpdatableTable = (*Table)(nil) @@ -52,6 +54,12 @@ func (t *Table) Schema() sql.Schema { t.mu.RLock() defer t.mu.RUnlock() + schema := t.schema() + setPrimaryKeyColumns(schema, t.primaryKeyOrdinals()) + return schema +} + +func (t *Table) schema() sql.Schema { rows, err := t.db.engine.Query(` SELECT column_name, data_type, is_nullable, column_default, numeric_precision, numeric_scale FROM duckdb_columns() WHERE schema_name = ? AND table_name = ? `, t.db.name, t.name) @@ -94,11 +102,51 @@ func (t *Table) Schema() sql.Schema { return schema } +func setPrimaryKeyColumns(schema sql.Schema, ordinals []int) { + for _, idx := range ordinals { + schema[idx].PrimaryKey = true + } +} + // String implements sql.Table. func (t *Table) String() string { return t.name } +// PrimaryKeySchema implements sql.PrimaryKeyTable. +func (t *Table) PrimaryKeySchema() sql.PrimaryKeySchema { + t.mu.RLock() + defer t.mu.RUnlock() + + schema := t.schema() + ordinals := t.primaryKeyOrdinals() + setPrimaryKeyColumns(schema, ordinals) + return sql.NewPrimaryKeySchema(schema, ordinals...) +} + +func (t *Table) primaryKeyOrdinals() []int { + rows, err := t.db.engine.Query(` + SELECT constraint_column_indexes FROM duckdb_constraints() WHERE schema_name = ? AND table_name = ? AND constraint_type = 'PRIMARY KEY' LIMIT 1 + `, t.db.name, t.name) + if err != nil { + panic(ErrDuckDB.New(err)) + } + defer rows.Close() + + var ordinals []int + if rows.Next() { + var arr duckdb.Composite[[]int] + if err := rows.Scan(&arr); err != nil { + panic(ErrDuckDB.New(err)) + } + ordinals = arr.Get() + } + if err := rows.Err(); err != nil { + panic(ErrDuckDB.New(err)) + } + return ordinals +} + // AddColumn implements sql.AlterableTable. func (t *Table) AddColumn(ctx *sql.Context, column *sql.Column, order *sql.ColumnOrder) error { t.mu.Lock() diff --git a/replication.go b/replication.go index 0834aaff..dddb0d6c 100644 --- a/replication.go +++ b/replication.go @@ -125,6 +125,15 @@ func (ta *tableAppender) Close() error { return ta.appender.Close() } +func isPkUpdate(schema sql.Schema, identifyColumns, dataColumns mysql.Bitmap) bool { + for i, c := range schema { + if c.PrimaryKey && identifyColumns.Bit(i) && dataColumns.Bit(i) { + return true + } + } + return false +} + func (twp *tableWriterProvider) newTableUpdater( ctx *sql.Context, databaseName, tableName string, @@ -156,7 +165,7 @@ func (twp *tableWriterProvider) newTableUpdater( } } case binlogreplication.UpdateEvent: - if keyCount < columnCount || dataCount < columnCount { + if keyCount < columnCount || dataCount < columnCount || isPkUpdate(schema, identifyColumns, dataColumns) { sql = "UPDATE " + fullTableName + " SET " count := 0 for i := range columnCount { @@ -242,9 +251,10 @@ func (tu *tableUpdater) Update(ctx *sql.Context, keys sql.Row, values sql.Row) e if tu.replace { return tu.Insert(ctx, values) } + // UPDATE t SET col1 = ?, col2 = ? WHERE key1 = ? AND key2 = ? args := make([]interface{}, len(keys)+len(values)) - copy(args, keys) - copy(args[len(keys):], values) + copy(args, values) + copy(args[len(values):], keys) _, err := tu.stmt.ExecContext(ctx.Context, args...) return err }