Skip to content

Commit

Permalink
Push down everything but in_keyrange
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 31, 2025
1 parent 1b597ec commit 0a17436
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 71 deletions.
24 changes: 16 additions & 8 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Plan struct {
// Predicates in the Filter query that we can push down to
// MySQL to reduce the returned rows we need to filter.
// This will contain any valid expressions in the Filter's
// WHERE clause with the exception of the in_keyspace()
// WHERE clause with the exception of the in_keyrange()
// function which is a filter that must be applied by the
// vstreamer (it's not a valid MySQL function).
whereExprsToPushDown []sqlparser.Expr
Expand Down Expand Up @@ -613,9 +613,13 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
// Add it to the expressions that get pushed down to mysqld.
log.Errorf("DEBUG: adding to list of pushdown expressions: %v", sqlparser.String(expr))
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
// StrVal is varbinary, we do not support varchar since we would have to implement all collation types
if val.Type != sqlparser.IntVal && val.Type != sqlparser.StrVal {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
// StrVal is varbinary, we do not support varchar filtering in
// vstreamer since we would have to do so using the correct
// collation. So we ONLY push it down to mysqld.
// TODO: now that we have MySQL compatible collation support
// we could add this support to vstreamer as well.
continue
}
pv, err := evalengine.Translate(val, &evalengine.Config{
Collation: plan.env.CollationEnv().DefaultConnectionCharset(),
Expand All @@ -635,11 +639,14 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
Value: resolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
})
case *sqlparser.FuncExpr:
if !expr.Name.EqualString("in_keyrange") {
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil {
return err
if expr.Name.EqualString("in_keyrange") {
if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil {
return err
}
} else {
log.Errorf("DEBUG: adding to list of pushdown expressions: %v", sqlparser.String(expr))
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
}
case *sqlparser.IsExpr: // Needed for CreateLookupVindex with ignore_nulls
if expr.Right != sqlparser.IsNotNullOp {
Expand All @@ -660,6 +667,7 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
Opcode: IsNotNull,
ColNum: colnum,
})
log.Errorf("DEBUG: adding to list of pushdown expressions: %v", sqlparser.String(expr))
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
default:
Expand Down
115 changes: 52 additions & 63 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func TestMustSendDDL(t *testing.T) {
}

func TestPlanBuilder(t *testing.T) {
unicodeCollationID := collations.MySQL8().DefaultConnectionCharset()
t1 := &Table{
Name: "t1",
Fields: []*querypb.Field{{
Expand All @@ -182,9 +183,8 @@ func TestPlanBuilder(t *testing.T) {
Flags: uint32(querypb.MySqlFlag_NUM_FLAG),
}, {
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
Type: sqltypes.VarChar,
Charset: collations.CollationUtf8mb4ID,
}},
}
// t1alt has no id column
Expand All @@ -193,8 +193,7 @@ func TestPlanBuilder(t *testing.T) {
Fields: []*querypb.Field{{
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
Charset: uint32(unicodeCollationID),
}},
}
t2 := &Table{
Expand Down Expand Up @@ -252,9 +251,8 @@ func TestPlanBuilder(t *testing.T) {
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
Type: sqltypes.VarChar,
Charset: uint32(unicodeCollationID),
},
}},
env: vtenv.NewTestEnv(),
Expand All @@ -275,9 +273,8 @@ func TestPlanBuilder(t *testing.T) {
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
Type: sqltypes.VarChar,
Charset: uint32(unicodeCollationID),
},
}},
Filters: []Filter{{
Expand Down Expand Up @@ -306,9 +303,8 @@ func TestPlanBuilder(t *testing.T) {
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
Type: sqltypes.VarChar,
Charset: uint32(unicodeCollationID),
},
}},
env: vtenv.NewTestEnv(),
Expand All @@ -329,9 +325,8 @@ func TestPlanBuilder(t *testing.T) {
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
Type: sqltypes.VarChar,
Charset: uint32(unicodeCollationID),
},
}},
env: vtenv.NewTestEnv(),
Expand All @@ -344,9 +339,8 @@ func TestPlanBuilder(t *testing.T) {
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
Type: sqltypes.VarChar,
Charset: uint32(unicodeCollationID),
},
}, {
ColNum: 0,
Expand All @@ -367,9 +361,8 @@ func TestPlanBuilder(t *testing.T) {
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
Type: sqltypes.VarChar,
Charset: uint32(unicodeCollationID),
},
}, {
ColNum: 0,
Expand Down Expand Up @@ -398,9 +391,8 @@ func TestPlanBuilder(t *testing.T) {
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
Type: sqltypes.VarChar,
Charset: uint32(unicodeCollationID),
},
}, {
ColNum: 0,
Expand Down Expand Up @@ -429,9 +421,8 @@ func TestPlanBuilder(t *testing.T) {
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
Type: sqltypes.VarChar,
Charset: uint32(unicodeCollationID),
},
}, {
ColNum: 0,
Expand All @@ -457,15 +448,14 @@ func TestPlanBuilder(t *testing.T) {
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select val, id from t1 where id > 1 and id < 10"},
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select val, id from t1 where val > 'hey' and val < 'there'"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
Type: sqltypes.VarChar,
Charset: uint32(unicodeCollationID),
},
}, {
ColNum: 0,
Expand All @@ -479,27 +469,50 @@ func TestPlanBuilder(t *testing.T) {
Filters: []Filter{
{
Opcode: GreaterThan,
ColNum: 0,
Value: sqltypes.NewInt64(1),
ColNum: 1,
Value: sqltypes.NewVarChar("hey"),
Vindex: nil,
VindexColumns: nil,
KeyRange: nil,
},
{
Opcode: LessThan,
ColNum: 0,
Value: sqltypes.NewInt64(10),
ColNum: 1,
Value: sqltypes.NewVarChar("there"),
Vindex: nil,
VindexColumns: nil,
KeyRange: nil,
},
},
whereExprsToPushDown: []sqlparser.Expr{
sqlparser.NewComparisonExpr(sqlparser.GreaterThanOp, sqlparser.Expr(sqlparser.NewColName("id")), sqlparser.Expr(sqlparser.NewIntLiteral("1")), nil),
sqlparser.NewComparisonExpr(sqlparser.LessThanOp, sqlparser.Expr(sqlparser.NewColName("id")), sqlparser.Expr(sqlparser.NewIntLiteral("10")), nil),
sqlparser.NewComparisonExpr(sqlparser.GreaterThanOp, sqlparser.Expr(sqlparser.NewColName("val")), sqlparser.Expr(sqlparser.NewStrLiteral("hey")), nil),
sqlparser.NewComparisonExpr(sqlparser.LessThanOp, sqlparser.Expr(sqlparser.NewColName("val")), sqlparser.Expr(sqlparser.NewStrLiteral("there")), nil),
},
env: vtenv.NewTestEnv(),
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select convert(val using utf8mb4) as val2, id as id from t1"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarChar,
Charset: uint32(unicodeCollationID),
},
}, {
ColNum: 0,
Field: &querypb.Field{
Name: "id",
Type: sqltypes.Int64,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_NUM_FLAG),
},
}},
convertUsingUTF8Columns: map[string]bool{"val": true},
env: vtenv.NewTestEnv(),
},
}, {
inTable: t2,
inRule: &binlogdatapb.Rule{Match: "/t1/"},
Expand Down Expand Up @@ -534,30 +547,6 @@ func TestPlanBuilder(t *testing.T) {
}},
env: vtenv.NewTestEnv(),
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select convert(val using utf8mb4) as val2, id as id from t1"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarBinary,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG),
},
}, {
ColNum: 0,
Field: &querypb.Field{
Name: "id",
Type: sqltypes.Int64,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_NUM_FLAG),
},
}},
convertUsingUTF8Columns: map[string]bool{"val": true},
env: vtenv.NewTestEnv(),
},
}, {
inTable: regional,
inRule: &binlogdatapb.Rule{Match: "regional", Filter: "select id, keyspace_id() from regional"},
Expand Down

0 comments on commit 0a17436

Please sign in to comment.