diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index 561edfe8b7e..f931bed8fcc 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -336,9 +336,7 @@ func getVDiffInfo(json string) *vdiffInfo { } func encodeString(in string) string { - var buf strings.Builder - sqltypes.NewVarChar(in).EncodeSQL(&buf) - return buf.String() + return sqltypes.EncodeStringSQL(in) } // generateMoreCustomers creates additional test data for better tests diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index 05685a54d3e..5046724e14f 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -541,7 +541,7 @@ func (blp *BinlogPlayer) setVReplicationState(state binlogdatapb.VReplicationWor }) } blp.blplStats.State.Store(state.String()) - query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state.String(), encodeString(MessageTruncate(message)), blp.uid) + query := fmt.Sprintf("update _vt.vreplication set state=%v, message=%v where id=%v", encodeString(state.String()), encodeString(MessageTruncate(message)), blp.uid) if _, err := blp.dbClient.ExecuteFetch(query, 1); err != nil { return fmt.Errorf("could not set state: %v: %v", query, err) } @@ -622,9 +622,9 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi protoutil.SortBinlogSourceTables(source) return fmt.Sprintf("insert into _vt.vreplication "+ "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys, options) "+ - "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v, %s)", + "values (%v, %v, %v, %v, %v, %v, 0, %v, %v, %d, %d, %v, %s)", encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag, - timeUpdated, binlogdatapb.VReplicationWorkflowState_Running.String(), encodeString(dbName), workflowType, + timeUpdated, encodeString(binlogdatapb.VReplicationWorkflowState_Running.String()), encodeString(dbName), workflowType, workflowSubType, deferSecondaryKeys, encodeString("{}")) } @@ -634,9 +634,9 @@ func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, protoutil.SortBinlogSourceTables(source) return fmt.Sprintf("insert into _vt.vreplication "+ "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, options) "+ - "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %s)", + "values (%v, %v, %v, %v, %v, %v, 0, %v, %v, %d, %d, %s)", encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, - throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state.String(), encodeString(dbName), + throttler.ReplicationLagModuleDisabled, time.Now().Unix(), encodeString(state.String()), encodeString(dbName), workflowType, workflowSubType, encodeString("{}")) } @@ -679,15 +679,15 @@ func GenerateUpdateTimeThrottled(uid int32, timeThrottledUnix int64, componentTh // StartVReplicationUntil returns a statement to start the replication with a stop position. func StartVReplicationUntil(uid int32, pos string) string { return fmt.Sprintf( - "update _vt.vreplication set state='%v', stop_pos=%v where id=%v", - binlogdatapb.VReplicationWorkflowState_Running.String(), encodeString(pos), uid) + "update _vt.vreplication set state=%v, stop_pos=%v where id=%v", + encodeString(binlogdatapb.VReplicationWorkflowState_Running.String()), encodeString(pos), uid) } // StopVReplication returns a statement to stop the replication. func StopVReplication(uid int32, message string) string { return fmt.Sprintf( - "update _vt.vreplication set state='%v', message=%v where id=%v", - binlogdatapb.VReplicationWorkflowState_Stopped.String(), encodeString(MessageTruncate(message)), uid) + "update _vt.vreplication set state=%v, message=%v where id=%v", + encodeString(binlogdatapb.VReplicationWorkflowState_Stopped.String()), encodeString(MessageTruncate(message)), uid) } // DeleteVReplication returns a statement to delete the replication. @@ -702,9 +702,7 @@ func MessageTruncate(msg string) string { } func encodeString(in string) string { - buf := bytes.NewBuffer(nil) - sqltypes.NewVarChar(in).EncodeSQL(buf) - return buf.String() + return sqltypes.EncodeStringSQL(in) } // ReadVReplicationPos returns a statement to query the gtid for a diff --git a/go/vt/vtctl/vdiff_env_test.go b/go/vt/vtctl/vdiff_env_test.go index fdcf29367cc..9b2fade3204 100644 --- a/go/vt/vtctl/vdiff_env_test.go +++ b/go/vt/vtctl/vdiff_env_test.go @@ -128,7 +128,7 @@ func newTestVDiffEnv(t testing.TB, ctx context.Context, sourceShards, targetShar // But this is one statement per stream. env.tmc.setVRResults( primary.tablet, - fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for vdiff' where id=%d", vdiffSourceGtid, j+1), + fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos=%s, message='synchronizing for vdiff' where id=%d", sqltypes.EncodeStringSQL(vdiffSourceGtid), j+1), &sqltypes.Result{}, ) } diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 956693a20fb..14890a26b00 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -817,8 +817,8 @@ func (ts *trafficSwitcher) getReverseVReplicationUpdateQuery(targetCell string, } if ts.optCells != "" || ts.optTabletTypes != "" { - query := fmt.Sprintf("update _vt.vreplication set cell = '%s', tablet_types = '%s', options = '%s' where workflow = '%s' and db_name = '%s'", - ts.optCells, ts.optTabletTypes, options, ts.ReverseWorkflowName(), dbname) + query := fmt.Sprintf("update _vt.vreplication set cell = %s, tablet_types = %s, options = %s where workflow = %s and db_name = %s", + sqltypes.EncodeStringSQL(ts.optCells), sqltypes.EncodeStringSQL(ts.optTabletTypes), sqltypes.EncodeStringSQL(options), sqltypes.EncodeStringSQL(ts.ReverseWorkflowName()), sqltypes.EncodeStringSQL(dbname)) return query } return "" @@ -900,8 +900,8 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error // For non-reference tables we return an error if there's no primary // vindex as it's not clear what to do. if len(vtable.ColumnVindexes) > 0 && len(vtable.ColumnVindexes[0].Columns) > 0 { - inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s.%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]), - ts.SourceKeyspaceName(), vtable.ColumnVindexes[0].Name, key.KeyRangeString(source.GetShard().KeyRange)) + inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s.%s', %s)", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]), + ts.SourceKeyspaceName(), vtable.ColumnVindexes[0].Name, encodeString(key.KeyRangeString(source.GetShard().KeyRange))) } else { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary vindex found for the %s table in the %s keyspace", vtable.Name.String(), ts.SourceKeyspaceName()) @@ -1156,7 +1156,7 @@ func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error { // re-invoked after a freeze, it will skip all the previous steps err := ts.ForAllTargets(func(target *MigrationTarget) error { ts.Logger().Infof("Marking target streams frozen for workflow %s db_name %s", ts.WorkflowName(), target.GetPrimary().DbName()) - query := fmt.Sprintf("update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s", Frozen, + query := fmt.Sprintf("update _vt.vreplication set message = %s where db_name=%s and workflow=%s", encodeString(Frozen), encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName())) _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query) return err diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 9cedf01733e..d0c619cc969 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -17,7 +17,6 @@ limitations under the License. package workflow import ( - "bytes" "context" "encoding/json" "fmt" @@ -612,9 +611,7 @@ func ReverseWorkflowName(workflow string) string { // this public, but it doesn't belong in package workflow. Maybe package sqltypes, // or maybe package sqlescape? func encodeString(in string) string { - buf := bytes.NewBuffer(nil) - sqltypes.NewVarChar(in).EncodeSQL(buf) - return buf.String() + return sqltypes.EncodeStringSQL(in) } func getRenameFileName(tableName string) string { diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index 997ab222255..92980ff9e44 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -17,7 +17,6 @@ limitations under the License. package endtoend import ( - "bytes" "context" "errors" "fmt" @@ -472,9 +471,7 @@ func expectLogs(ctx context.Context, t *testing.T, query string, eventCh chan [] } func encodeString(in string) string { - buf := bytes.NewBuffer(nil) - sqltypes.NewVarChar(in).EncodeSQL(buf) - return buf.String() + return sqltypes.EncodeStringSQL(in) } func validateSchemaInserted(client *framework.QueryClient, ddl string) bool { diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index aa685b82e53..84cd5ff368b 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -1660,8 +1660,8 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem { // temporary hack. todo: this should be done when inserting any _vt.vreplication record across all workflow types - query := fmt.Sprintf("update _vt.vreplication set workflow_type = %d where workflow = '%s'", - binlogdatapb.VReplicationWorkflowType_OnlineDDL, v.workflow) + query := fmt.Sprintf("update _vt.vreplication set workflow_type = %d where workflow = %s", + binlogdatapb.VReplicationWorkflowType_OnlineDDL, sqltypes.EncodeStringSQL(v.workflow)) if _, err := e.vreplicationExec(ctx, tablet.Tablet, query); err != nil { return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", tablet.Tablet, query) } diff --git a/go/vt/vttablet/tabletmanager/vdiff/utils.go b/go/vt/vttablet/tabletmanager/vdiff/utils.go index aeaa28972e0..0eef19abe3e 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/utils.go +++ b/go/vt/vttablet/tabletmanager/vdiff/utils.go @@ -19,7 +19,6 @@ package vdiff import ( "context" "fmt" - "strings" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtgate/evalengine" @@ -59,9 +58,7 @@ func newMergeSorter(participants map[string]*shardStreamer, comparePKs []compare // Utility functions func encodeString(in string) string { - var buf strings.Builder - sqltypes.NewVarChar(in).EncodeSQL(&buf) - return buf.String() + return sqltypes.EncodeStringSQL(in) } func pkColsToGroupByParams(pkCols []int, collationEnv *collations.Environment) []*engine.GroupByParams { diff --git a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go index 6a127b084b5..421165f2212 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go @@ -53,7 +53,7 @@ func NewInsertGenerator(state binlogdatapb.VReplicationWorkflowState, dbname str func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSource, pos, cell, tabletTypes string, workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) { protoutil.SortBinlogSourceTables(bls) - fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v, %v)", + fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, %v, %v, %d, %d, %v, %v)", ig.prefix, encodeString(workflow), encodeString(bls.String()), @@ -63,7 +63,7 @@ func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSourc encodeString(cell), encodeString(tabletTypes), ig.now, - ig.state, + encodeString(ig.state), encodeString(ig.dbname), workflowType, workflowSubType, diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 8a01cf7c8ed..14dea29f302 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -478,7 +478,7 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me }) } vr.stats.State.Store(state.String()) - query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(binlogplayer.MessageTruncate(message)), vr.id) + query := fmt.Sprintf("update _vt.vreplication set state=%v, message=%v where id=%v", encodeString(state.String()), encodeString(binlogplayer.MessageTruncate(message)), vr.id) if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil { return fmt.Errorf("could not set state: %v: %v", query, err) } @@ -492,9 +492,7 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me } func encodeString(in string) string { - var buf strings.Builder - sqltypes.NewVarChar(in).EncodeSQL(&buf) - return buf.String() + return sqltypes.EncodeStringSQL(in) } func (vr *vreplicator) getSettingFKCheck() error { diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 8db202efa13..90636ea60cc 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -17,12 +17,12 @@ limitations under the License. package schema import ( - "bytes" "context" "fmt" "sync" "time" + "vitess.io/vitess/go/bytes2" "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" @@ -230,10 +230,15 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, } defer conn.Recycle() + // We serialize a blob here, encodeString is for strings only + // and should not be used for binary data. + blobVal := sqltypes.MakeTrusted(sqltypes.VarBinary, blob) + buf := bytes2.Buffer{} + blobVal.EncodeSQLBytes2(&buf) query := sqlparser.BuildParsedQuery("insert into %s.schema_version "+ "(pos, ddl, schemax, time_updated) "+ "values (%s, %s, %s, %d)", sidecar.GetIdentifier(), encodeString(gtid), - encodeString(ddl), encodeString(string(blob)), timestamp).Query + encodeString(ddl), buf.String(), timestamp).Query _, err = conn.Conn.Exec(ctx, query, 1, false) if err != nil { return err @@ -242,9 +247,7 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, } func encodeString(in string) string { - buf := bytes.NewBuffer(nil) - sqltypes.NewVarChar(in).EncodeSQL(buf) - return buf.String() + return sqltypes.EncodeStringSQL(in) } // MustReloadSchemaOnDDL returns true if the ddl is for the db which is part of the workflow and is not an online ddl artifact diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index bf41111bbc8..1987db3817f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -903,9 +903,7 @@ type extColInfo struct { } func encodeString(in string) string { - buf := bytes.NewBuffer(nil) - sqltypes.NewVarChar(in).EncodeSQL(buf) - return buf.String() + return sqltypes.EncodeStringSQL(in) } func (vs *vstreamer) processJournalEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 4d9f66f1809..51f1b025f47 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/bytes2" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" @@ -334,9 +335,15 @@ func TestVersion(t *testing.T) { } blob, _ := dbSchema.MarshalVT() gtid := "MariaDB/0-41983-20" + // We serialize a blob here, encodeString is for strings only + // and should not be used for binary data. + blobVal := sqltypes.MakeTrusted(sqltypes.VarBinary, blob) + buf := bytes2.Buffer{} + blobVal.EncodeSQLBytes2(&buf) + testcases := []testcase{{ input: []string{ - fmt.Sprintf("insert into _vt.schema_version values(1, '%s', 123, 'create table t1', %v)", gtid, encodeString(string(blob))), + fmt.Sprintf("insert into _vt.schema_version values(1, '%s', 123, 'create table t1', %v)", gtid, buf.String()), }, // External table events don't get sent. output: [][]string{{ diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index a5f7d6ae0bf..3f548e19402 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -17,7 +17,6 @@ limitations under the License. package wrangler import ( - "bytes" "context" "fmt" "sync" @@ -124,7 +123,5 @@ func (wr *Wrangler) updateShardRecords(ctx context.Context, keyspace string, sha } func encodeString(in string) string { - buf := bytes.NewBuffer(nil) - sqltypes.NewVarChar(in).EncodeSQL(buf) - return buf.String() + return sqltypes.EncodeStringSQL(in) }