Skip to content

Commit

Permalink
Cherry-pick 10d36cb with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] authored and vitess-bot committed Jan 30, 2025
1 parent c1374d1 commit f65ec1f
Show file tree
Hide file tree
Showing 15 changed files with 2,200 additions and 45 deletions.
4 changes: 1 addition & 3 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,7 @@ func getVDiffInfo(json string) *vdiffInfo {
}

func encodeString(in string) string {
var buf strings.Builder
sqltypes.NewVarChar(in).EncodeSQL(&buf)
return buf.String()
return sqltypes.EncodeStringSQL(in)
}

// generateMoreCustomers creates additional test data for better tests
Expand Down
30 changes: 22 additions & 8 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func (blp *BinlogPlayer) setVReplicationState(state binlogdatapb.VReplicationWor
})
}
blp.blplStats.State.Store(state.String())
query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state.String(), encodeString(MessageTruncate(message)), blp.uid)
query := fmt.Sprintf("update _vt.vreplication set state=%v, message=%v where id=%v", encodeString(state.String()), encodeString(MessageTruncate(message)), blp.uid)
if _, err := blp.dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
}
Expand Down Expand Up @@ -607,21 +607,37 @@ func ReadVRSettings(dbClient DBClient, uid int32) (VRSettings, error) {
func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64, dbName string,
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) string {
return fmt.Sprintf("insert into _vt.vreplication "+
<<<<<<< HEAD

Check failure on line 610 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected <<, expected expression

Check failure on line 610 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected <<, expected expression
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v)",
encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag,
timeUpdated, binlogdatapb.VReplicationWorkflowState_Running.String(), encodeString(dbName), workflowType, workflowSubType, deferSecondaryKeys)

Check failure on line 614 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected ) at end of statement

Check failure on line 614 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected ) at end of statement
=======

Check failure on line 615 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected ==, expected }

Check failure on line 615 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected ==, expected }
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys, options) "+
"values (%v, %v, %v, %v, %v, %v, 0, %v, %v, %d, %d, %v, %s)",
encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag,
timeUpdated, encodeString(binlogdatapb.VReplicationWorkflowState_Running.String()), encodeString(dbName), workflowType,
workflowSubType, deferSecondaryKeys, encodeString("{}"))
>>>>>>> 10d36cbdbc (Always make sure to escape all strings (#17649))

Check failure on line 621 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

invalid character U+0023 '#'

Check failure on line 621 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

invalid character U+0023 '#'
}

// CreateVReplicationState returns a statement to create a stopped vreplication.
func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position string, state binlogdatapb.VReplicationWorkflowState, dbName string,
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType) string {
return fmt.Sprintf("insert into _vt.vreplication "+
<<<<<<< HEAD

Check failure on line 628 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected <<, expected expression

Check failure on line 628 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected <<, expected expression
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d)",
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled,
throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state.String(), encodeString(dbName),
workflowType, workflowSubType)

Check failure on line 633 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected ) at end of statement

Check failure on line 633 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected ) at end of statement
=======

Check failure on line 634 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected ==, expected }

Check failure on line 634 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected ==, expected }
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, options) "+
"values (%v, %v, %v, %v, %v, %v, 0, %v, %v, %d, %d, %s)",
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled,
throttler.ReplicationLagModuleDisabled, time.Now().Unix(), encodeString(state.String()), encodeString(dbName),
workflowType, workflowSubType, encodeString("{}"))
>>>>>>> 10d36cbdbc (Always make sure to escape all strings (#17649))

Check failure on line 640 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

invalid character U+0023 '#'

Check failure on line 640 in go/vt/binlog/binlogplayer/binlog_player.go

View workflow job for this annotation

GitHub Actions / Code Coverage

invalid character U+0023 '#'
}

// GenerateUpdatePos returns a statement to record the latest processed gtid in the _vt.vreplication table.
Expand Down Expand Up @@ -663,15 +679,15 @@ func GenerateUpdateTimeThrottled(uid int32, timeThrottledUnix int64, componentTh
// StartVReplicationUntil returns a statement to start the replication with a stop position.
func StartVReplicationUntil(uid int32, pos string) string {
return fmt.Sprintf(
"update _vt.vreplication set state='%v', stop_pos=%v where id=%v",
binlogdatapb.VReplicationWorkflowState_Running.String(), encodeString(pos), uid)
"update _vt.vreplication set state=%v, stop_pos=%v where id=%v",
encodeString(binlogdatapb.VReplicationWorkflowState_Running.String()), encodeString(pos), uid)
}

// StopVReplication returns a statement to stop the replication.
func StopVReplication(uid int32, message string) string {
return fmt.Sprintf(
"update _vt.vreplication set state='%v', message=%v where id=%v",
binlogdatapb.VReplicationWorkflowState_Stopped.String(), encodeString(MessageTruncate(message)), uid)
"update _vt.vreplication set state=%v, message=%v where id=%v",
encodeString(binlogdatapb.VReplicationWorkflowState_Stopped.String()), encodeString(MessageTruncate(message)), uid)
}

// DeleteVReplication returns a statement to delete the replication.
Expand All @@ -686,9 +702,7 @@ func MessageTruncate(msg string) string {
}

func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
return buf.String()
return sqltypes.EncodeStringSQL(in)
}

// ReadVReplicationPos returns a statement to query the gtid for a
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/vdiff_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func newTestVDiffEnv(t testing.TB, ctx context.Context, sourceShards, targetShar
// But this is one statement per stream.
env.tmc.setVRResults(
primary.tablet,
fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for vdiff' where id=%d", vdiffSourceGtid, j+1),
fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos=%s, message='synchronizing for vdiff' where id=%d", sqltypes.EncodeStringSQL(vdiffSourceGtid), j+1),
&sqltypes.Result{},
)
}
Expand Down
9 changes: 9 additions & 0 deletions go/vt/vtctl/workflow/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,17 @@ func (rs *resharder) createStreams(ctx context.Context) error {

ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, targetPrimary.DbName())

<<<<<<< HEAD
// copy excludeRules to prevent data race.
copyExcludeRules := append([]*binlogdatapb.Rule(nil), excludeRules...)
=======
// Clone excludeRules to prevent data races.
copyExcludeRules := slices.Clone(excludeRules)
optionsJSON, err := getOptionsJSON(rs.workflowOptions)
if err != nil {
return err
}
>>>>>>> 10d36cbdbc (Always make sure to escape all strings (#17649))
for _, source := range rs.sourceShards {
if !key.KeyRangeIntersect(target.KeyRange, source.KeyRange) {
continue
Expand Down
11 changes: 8 additions & 3 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,8 +792,13 @@ func (ts *trafficSwitcher) getReverseVReplicationUpdateQuery(targetCell string,
}

if ts.optCells != "" || ts.optTabletTypes != "" {
<<<<<<< HEAD
query := fmt.Sprintf("update _vt.vreplication set cell = '%s', tablet_types = '%s' where workflow = '%s' and db_name = '%s'",
ts.optCells, ts.optTabletTypes, ts.ReverseWorkflowName(), dbname)
=======
query := fmt.Sprintf("update _vt.vreplication set cell = %s, tablet_types = %s, options = %s where workflow = %s and db_name = %s",
sqltypes.EncodeStringSQL(ts.optCells), sqltypes.EncodeStringSQL(ts.optTabletTypes), sqltypes.EncodeStringSQL(options), sqltypes.EncodeStringSQL(ts.ReverseWorkflowName()), sqltypes.EncodeStringSQL(dbname))
>>>>>>> 10d36cbdbc (Always make sure to escape all strings (#17649))
return query
}
return ""
Expand Down Expand Up @@ -874,8 +879,8 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error
// For non-reference tables we return an error if there's no primary
// vindex as it's not clear what to do.
if len(vtable.ColumnVindexes) > 0 && len(vtable.ColumnVindexes[0].Columns) > 0 {
inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s.%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]),
ts.SourceKeyspaceName(), vtable.ColumnVindexes[0].Name, key.KeyRangeString(source.GetShard().KeyRange))
inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s.%s', %s)", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]),
ts.SourceKeyspaceName(), vtable.ColumnVindexes[0].Name, encodeString(key.KeyRangeString(source.GetShard().KeyRange)))
} else {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary vindex found for the %s table in the %s keyspace",
vtable.Name.String(), ts.SourceKeyspaceName())
Expand Down Expand Up @@ -1058,7 +1063,7 @@ func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error {
// re-invoked after a freeze, it will skip all the previous steps
err := ts.ForAllTargets(func(target *MigrationTarget) error {
ts.Logger().Infof("Marking target streams frozen for workflow %s db_name %s", ts.WorkflowName(), target.GetPrimary().DbName())
query := fmt.Sprintf("update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s", Frozen,
query := fmt.Sprintf("update _vt.vreplication set message = %s where db_name=%s and workflow=%s", encodeString(Frozen),
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
_, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query)
return err
Expand Down
5 changes: 1 addition & 4 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package workflow

import (
"bytes"
"context"
"fmt"
"hash/fnv"
Expand Down Expand Up @@ -578,9 +577,7 @@ func ReverseWorkflowName(workflow string) string {
// this public, but it doesn't belong in package workflow. Maybe package sqltypes,
// or maybe package sqlescape?
func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
return buf.String()
return sqltypes.EncodeStringSQL(in)
}

func getRenameFileName(tableName string) string {
Expand Down
5 changes: 1 addition & 4 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package endtoend

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -461,9 +460,7 @@ func expectLogs(ctx context.Context, t *testing.T, query string, eventCh chan []
}

func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
return buf.String()
return sqltypes.EncodeStringSQL(in)
}

func validateSchemaInserted(client *framework.QueryClient, ddl string) bool {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1642,8 +1642,8 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem

{
// temporary hack. todo: this should be done when inserting any _vt.vreplication record across all workflow types
query := fmt.Sprintf("update _vt.vreplication set workflow_type = %d where workflow = '%s'",
binlogdatapb.VReplicationWorkflowType_OnlineDDL, v.workflow)
query := fmt.Sprintf("update _vt.vreplication set workflow_type = %d where workflow = %s",
binlogdatapb.VReplicationWorkflowType_OnlineDDL, sqltypes.EncodeStringSQL(v.workflow))
if _, err := e.vreplicationExec(ctx, tablet.Tablet, query); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", tablet.Tablet, query)
}
Expand Down
5 changes: 1 addition & 4 deletions go/vt/vttablet/tabletmanager/vdiff/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package vdiff
import (
"context"
"fmt"
"strings"

"vitess.io/vitess/go/vt/vtgate/evalengine"

Expand Down Expand Up @@ -58,9 +57,7 @@ func newMergeSorter(participants map[string]*shardStreamer, comparePKs []compare
// Utility functions

func encodeString(in string) string {
var buf strings.Builder
sqltypes.NewVarChar(in).EncodeSQL(&buf)
return buf.String()
return sqltypes.EncodeStringSQL(in)
}

func pkColsToGroupByParams(pkCols []int, collationEnv *collations.Environment) []*engine.GroupByParams {
Expand Down
15 changes: 14 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/insert_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,17 @@ func NewInsertGenerator(state binlogdatapb.VReplicationWorkflowState, dbname str

// AddRow adds a row to the insert statement.
func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSource, pos, cell, tabletTypes string,
<<<<<<< HEAD
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) {
fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v)",
=======
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool, options string) {
if options == "" {
options = "{}"
}
protoutil.SortBinlogSourceTables(bls)
fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, %v, %v, %d, %d, %v, %v)",
>>>>>>> 10d36cbdbc (Always make sure to escape all strings (#17649))
ig.prefix,
encodeString(workflow),
encodeString(bls.String()),
Expand All @@ -60,11 +69,15 @@ func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSourc
encodeString(cell),
encodeString(tabletTypes),
ig.now,
ig.state,
encodeString(ig.state),
encodeString(ig.dbname),
workflowType,
workflowSubType,
deferSecondaryKeys,
<<<<<<< HEAD
=======
encodeString(options),
>>>>>>> 10d36cbdbc (Always make sure to escape all strings (#17649))
)
ig.prefix = ", "
}
Expand Down
16 changes: 13 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,21 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me
})
}
vr.stats.State.Store(state.String())
<<<<<<< HEAD
query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(binlogplayer.MessageTruncate(message)), vr.id)
if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
=======
query := fmt.Sprintf("update _vt.vreplication set state=%v, message=%v where id=%v", encodeString(state.String()), encodeString(binlogplayer.MessageTruncate(message)), vr.id)
// If we're batching a transaction, then include the state update
// in the current transaction batch.
if vr.dbClient.InTransaction && vr.dbClient.maxBatchSize > 0 {
vr.dbClient.AddQueryToTrxBatch(query)
} else { // Otherwise, send it down the wire
if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
}
>>>>>>> 10d36cbdbc (Always make sure to escape all strings (#17649))
}
if state == vr.state {
return nil
Expand All @@ -498,9 +510,7 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me
}

func encodeString(in string) string {
var buf strings.Builder
sqltypes.NewVarChar(in).EncodeSQL(&buf)
return buf.String()
return sqltypes.EncodeStringSQL(in)
}

func (vr *vreplicator) getSettingFKCheck() error {
Expand Down
13 changes: 8 additions & 5 deletions go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ limitations under the License.
package schema

import (
"bytes"
"context"
"fmt"
"sync"
"time"

"vitess.io/vitess/go/bytes2"
"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/vt/schema"
Expand Down Expand Up @@ -230,10 +230,15 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string,
}
defer conn.Recycle()

// We serialize a blob here, encodeString is for strings only
// and should not be used for binary data.
blobVal := sqltypes.MakeTrusted(sqltypes.VarBinary, blob)
buf := bytes2.Buffer{}
blobVal.EncodeSQLBytes2(&buf)
query := sqlparser.BuildParsedQuery("insert into %s.schema_version "+
"(pos, ddl, schemax, time_updated) "+
"values (%s, %s, %s, %d)", sidecar.GetIdentifier(), encodeString(gtid),
encodeString(ddl), encodeString(string(blob)), timestamp).Query
encodeString(ddl), buf.String(), timestamp).Query
_, err = conn.Conn.Exec(ctx, query, 1, false)
if err != nil {
return err
Expand All @@ -242,9 +247,7 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string,
}

func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
return buf.String()
return sqltypes.EncodeStringSQL(in)
}

// MustReloadSchemaOnDDL returns true if the ddl is for the db which is part of the workflow and is not an online ddl artifact
Expand Down
4 changes: 1 addition & 3 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,9 +856,7 @@ type extColInfo struct {
}

func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
return buf.String()
return sqltypes.EncodeStringSQL(in)
}

func (vs *vstreamer) processJournalEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) {
Expand Down
Loading

0 comments on commit f65ec1f

Please sign in to comment.