From ab995b50669bd43edace8fd712b08a6d1113b241 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 1 Feb 2025 11:20:27 -0500 Subject: [PATCH] Minor tweaks Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vstream_test.go | 6 +++--- .../tabletserver/vstreamer/planbuilder.go | 18 ++++++++++-------- .../tabletserver/vstreamer/rowstreamer.go | 6 +++--- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index b9afc876e74..75b6aaf64c9 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -1093,8 +1093,8 @@ func TestVStreamHeartbeats(t *testing.T) { // TestVStreamPushdownFilters confirms that pushdown filters are applied correctly // when they are specified in the VStream API via the rule.Filter. -// It also confirms that we use the proper collations for the vstream filter when -// using varchar fields. +// It also confirms that we use the proper collation for the VStream filter when +// using VARCHAR fields. func TestVStreamPushdownFilters(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() @@ -1118,7 +1118,7 @@ func TestVStreamPushdownFilters(t *testing.T) { defer vtgateConn.Close() verifyClusterHealth(t, vc) - // Make sure that we get at least one paul event in the copy phase. + // Make sure that we get at least one paul row event in the copy phase. _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.customer (name) values ('PAUĹ')", ks), 1, false) require.NoError(t, err) diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 1b7d9d6b69b..cdecf0925b3 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -58,12 +58,14 @@ type Plan struct { // of the table. Filters []Filter - // Predicates in the Filter query that we can push down to - // MySQL to reduce the returned rows we need to filter. - // This will contain any valid expressions in the Filter's - // WHERE clause with the exception of the in_keyrange() - // function which is a filter that must be applied by the - // vstreamer (it's not a valid MySQL function). + // Predicates in the Filter query that we can push down to MySQL + // to reduce the returned rows we need to filter in the VStreamer + // during the copy phase. This will contain any valid expressions + // in the Filter's WHERE clause with the exception of the + // in_keyrange() function which is a filter that must be applied + // by the VStreamer (it's not a valid MySQL function). Note that + // the Filter cannot contain any MySQL functions because the + // VStreamer cannot filter binlog events using them. whereExprsToPushDown []sqlparser.Expr // Convert any integer values seen in the binlog events for ENUM or SET @@ -631,8 +633,8 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er log.Errorf("DEBUG: adding to list of pushdown expressions: %v", sqlparser.String(expr)) plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr) case *sqlparser.FuncExpr: - // We cannot filter binlog events in vstreamer using MySQL functions so - // we only allow the in_keyrange() function, which is vstreamer specific. + // We cannot filter binlog events in VStreamer using MySQL functions so + // we only allow the in_keyrange() function, which is VStreamer specific. if !expr.Name.EqualString("in_keyrange") { return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index bc22bfa482d..014571d05b3 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -289,14 +289,14 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error indexHint = fmt.Sprintf(" force index (%s)", escapedPKIndexName) } buf.Myprintf(" from %v%s", sqlparser.NewIdentifierCS(rs.plan.Table.Name), indexHint) - if len(rs.lastpk) != 0 { + if len(rs.lastpk) != 0 { // We're in the copy phase and need to resume if len(rs.lastpk) != len(rs.pkColumns) { return "", fmt.Errorf("cannot build a row streamer plan for the %s table as a lastpk value was provided and the number of primary key values within it (%v) does not match the number of primary key columns in the table (%d)", st.Name, rs.lastpk, rs.pkColumns) } buf.WriteString(" where ") - addPushdownExpressions() // First we add any predicates that should be pushed down. + addPushdownExpressions() if len(rs.plan.whereExprsToPushDown) > 0 { buf.Myprintf(" and ") } @@ -318,7 +318,7 @@ func (rs *rowStreamer) buildSelect(st *binlogdatapb.MinimalTable) (string, error rs.lastpk[lastcol].EncodeSQL(buf) buf.Myprintf(")") } - } else if len(rs.plan.whereExprsToPushDown) > 0 { + } else if len(rs.plan.whereExprsToPushDown) > 0 { // We're in the running/replicating phase buf.Myprintf(" where ") addPushdownExpressions() }