Skip to content

Commit

Permalink
Use MultiEqual instead of Values OpCode
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui committed Jan 23, 2025
1 parent 9dda778 commit b265101
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 92 deletions.
114 changes: 27 additions & 87 deletions go/vt/vtgate/engine/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"golang.org/x/exp/maps"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -77,8 +76,6 @@ const (
// Is used when the query explicitly sets a target destination:
// in the clause e.g: UPDATE `keyspace[-]`.x1 SET foo=1
ByDestination
// Values // TODO
Values
)

var opName = map[Opcode]string{
Expand All @@ -95,7 +92,6 @@ var opName = map[Opcode]string{
None: "None",
ByDestination: "ByDestination",
SubShard: "SubShard",
Values: "Values",
}

// MarshalJSON serializes the Opcode as a JSON string.
Expand Down Expand Up @@ -182,90 +178,12 @@ func (rp *RoutingParameters) findRoute(ctx context.Context, vcursor VCursor, bin
default:
return rp.multiEqual(ctx, vcursor, bindVars)
}
case Values:
switch rp.Vindex.(type) {
case vindexes.MultiColumn:
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported multi column vindex for values")
default:
return rp.values(ctx, vcursor, bindVars)
}
default:
// Unreachable.
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unsupported opcode: %v", rp.Opcode)
}
}

// values is used by the "Values" OpCode. It takes a tuple of tuple in the bindVars (from a VALUES JOIN), and
// will split all the rows from the tuple to their own shards. Minimizing the amount of bindVars we send to each shard.
// rp.Values has to be formatted a certain way by the planner: The first index has to be the expression that returns a
// tuple of tuples. The second index has to be the offset where the vindex values can be found in every row of the outer tuple.
func (rp *RoutingParameters) values(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) {
if len(rp.Values) < 2 {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "values slice must at least be of length two for a values")
}
env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor)
value, err := env.Evaluate(rp.Values[0])
if err != nil {
return nil, nil, err
}

rval, ok := rp.Values[0].(*evalengine.BindVariable)
if !ok {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "cannot transform evalengine expr to bind variable for values")
}

tuple := value.TupleValues()

type rssValue struct {
rss *srvtopo.ResolvedShard
vals []sqltypes.Value
}
r := map[string]rssValue{}
for _, row := range tuple {
env.Row = nil
err = row.ForEachValue(func(bv sqltypes.Value) {
env.Row = append(env.Row, bv)
})
if err != nil {
return nil, nil, err
}
val, err := env.Evaluate(rp.Values[1])
if err != nil {
return nil, nil, err
}

rss, _, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, []sqltypes.Value{val.Value(vcursor.ConnCollation())})
if err != nil {
return nil, nil, err
}
if len(rss) > 1 {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "andres is confused")
}
r[rss[0].Target.String()] = rssValue{
rss: rss[0],
vals: append(r[rss[0].Target.String()].vals, val.Value(collations.Unknown)),
}
}
var resultRss []*srvtopo.ResolvedShard
var resultBvs []map[string]*querypb.BindVariable
for _, rssVals := range r {
resultRss = append(resultRss, rssVals.rss)

clonedBindVars := maps.Clone(bindVars)

newBv := &querypb.BindVariable{
Type: querypb.Type_TUPLE,
}
for _, s := range rssVals.vals {
newBv.Values = append(newBv.Values, sqltypes.ValueToProto(s))
}

clonedBindVars[rval.Key] = newBv
resultBvs = append(resultBvs, clonedBindVars)
}
return resultRss, resultBvs, nil
}

func (rp *RoutingParameters) systemQuery(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) {
destinations, err := rp.routeInfoSchemaQuery(ctx, vcursor, bindVars)
if err != nil {
Expand Down Expand Up @@ -511,15 +429,37 @@ func (rp *RoutingParameters) multiEqual(ctx context.Context, vcursor VCursor, bi
if err != nil {
return nil, nil, err
}
rss, _, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, value.TupleValues())
rss, bvs, err := resolveShards(ctx, vcursor, rp.Vindex.(vindexes.SingleColumn), rp.Keyspace, value.TupleValues())
if err != nil {
return nil, nil, err
}
multiBindVars := make([]map[string]*querypb.BindVariable, len(rss))
for i := range multiBindVars {
multiBindVars[i] = bindVars

tbv, ok := rp.Values[0].(*evalengine.TupleBindVariable)
if !ok {
multiBindVars := make([]map[string]*querypb.BindVariable, len(rss))
for i := range multiBindVars {
multiBindVars[i] = bindVars
}
return rss, multiBindVars, nil
}
return rss, multiBindVars, nil

var resultRss []*srvtopo.ResolvedShard
var resultBvs []map[string]*querypb.BindVariable
for i, rssVals := range rss {
resultRss = append(resultRss, rssVals)

clonedBindVars := maps.Clone(bindVars)

newBv := &querypb.BindVariable{
Type: querypb.Type_TUPLE,
Values: bvs[i],
}

clonedBindVars[tbv.Key] = newBv

resultBvs = append(resultBvs, clonedBindVars)
}
return resultRss, resultBvs, nil
}

func (rp *RoutingParameters) multiEqualMultiCol(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) ([]*srvtopo.ResolvedShard, []map[string]*querypb.BindVariable, error) {
Expand Down
5 changes: 2 additions & 3 deletions go/vt/vtgate/engine/routing_parameter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestFindRouteValuesJoin(t *testing.T) {

const valueBvName = "v"
rp := &RoutingParameters{
Opcode: Values,
Opcode: MultiEqual,

Keyspace: &vindexes.Keyspace{
Name: "ks",
Expand All @@ -29,8 +29,7 @@ func TestFindRouteValuesJoin(t *testing.T) {
Vindex: vindex,

Values: []evalengine.Expr{
evalengine.NewBindVar(valueBvName, evalengine.NewType(sqltypes.Tuple, collations.Unknown)),
evalengine.NewColumn(0, evalengine.NewType(sqltypes.Int64, collations.Unknown), nil),
&evalengine.TupleBindVariable{Key: valueBvName, Index: 0, Collation: collations.Unknown},
},
}

Expand Down
1 change: 0 additions & 1 deletion go/vt/vtgate/evalengine/expr_tuple_bvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type (
Key string

Index int
Type sqltypes.Type
Collation collations.ID
}
)
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtgate/planbuilder/operators/sharded_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,6 @@ func (tr *ShardedRouting) planCompositeInOpArg(
Index: idx,
}
if typ, found := ctx.TypeForExpr(col); found {
value.Type = typ.Type()
value.Collation = typ.Collation()
}

Expand Down

0 comments on commit b265101

Please sign in to comment.