Skip to content

Commit

Permalink
fix some bats
Browse files Browse the repository at this point in the history
  • Loading branch information
max-hoffman committed Jun 20, 2024
1 parent a362bdc commit 279044b
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 29 deletions.
96 changes: 96 additions & 0 deletions go/libraries/doltcore/sqle/rowexec/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package rowexec

import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/plan"

"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/store/prolly"
)

type Builder struct {
def sql.NodeExecBuilder
}

var _ sql.NodeExecBuilder = (*Builder)(nil)

func NewBuilder(def sql.NodeExecBuilder) *Builder {
return &Builder{def: def}
}

func (b Builder) Build(ctx *sql.Context, n sql.Node, r sql.Row) (sql.RowIter, error) {
switch n := n.(type) {
case *plan.JoinNode:
if n.Op.IsLookup() {
// left is resolved table or a different join
source, sourceSch, err := getKvIter(ctx, n.Left())
if err != nil {
return nil, err
}

// right has to be an ITA
ita := n.Right().(*plan.IndexedTableAccess)
keyExprs := ita.Expressions()

idt := ita.Table.(*sqle.IndexedDoltTable)
target, targetSch, err := getMap(ctx, idt.DoltTable)
if err != nil {
return nil, err
}

iter := newLookupJoinKvIter(source, target, keyExprs, sourceSch, targetSch)
return newToSqlKvIter(iter), nil
}
default:
}
return b.def.Build(ctx, n, r)
}

func getMap(ctx *sql.Context, dt *sqle.DoltTable) (prolly.Map, schema.Schema, error) {
table, err := dt.DoltTable(ctx)
if err != nil {
return prolly.Map{}, nil, err
}

priIndex, err := table.GetRowData(ctx)
if err != nil {
return prolly.Map{}, nil, err
}

sch, err := table.GetSchema(ctx)
if err != nil {
return prolly.Map{}, nil, err
}

return durable.ProllyMapFromIndex(priIndex), sch, nil
}

func getKvIter(ctx *sql.Context, n sql.Node) (prolly.MapIter, schema.Schema, error) {
// |n| is ITA or ResolvedTable

rt := n.(*plan.ResolvedTable)
dt := rt.Table.(*sqle.DoltTable)
table, err := dt.DoltTable(ctx)
if err != nil {
return nil, nil, err
}
priIndex, err := table.GetRowData(ctx)
if err != nil {
return nil, nil, err
}

m := durable.ProllyMapFromIndex(priIndex)
iter, err := m.IterAll(ctx)
if err != nil {
return nil, nil, err
}

sch, err := table.GetSchema(ctx)
if err != nil {
return nil, nil, err
}

return iter, sch, nil
}
215 changes: 215 additions & 0 deletions go/libraries/doltcore/sqle/rowexec/lookup_join.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package rowexec

import (
"context"

"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/expression"

"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/val"
)

// internal iterators are strictly |prolly.MapIter|

// todo read a batch of keys from source map, sort before performing lookup

// todo batched version runs a fixed set of workers on kv ranges in parallel
// need filters and projections in this version for it to be worth it

type lookupJoinKvIter struct {
// source is left relation
source prolly.MapIter

// target is right relation
target prolly.Map

litKd val.TupleDesc
srcKd val.TupleDesc
srcVd val.TupleDesc
targetKb *val.TupleBuilder

// injective means we can target.Get keys
injective bool
// keyRangeSafe means we can convert a key lookup into key range
keyRangeSafe bool
// failing the above two, do a slow rangeIter

// ordinal mappings from source KV -> target index key
// projectionMappings? source schema tags match with target schema tags
// separate key mappings and value mappings
// need to convert literals from GMS side also
split int
srcMapping val.OrdinalMapping

// litTuple are the statically provided literal expressions in the key expression
litTuple val.Tuple
litMappings val.OrdinalMapping
}

var _ prolly.TupleIter = (*lookupJoinKvIter)(nil)

// TODO: lookup into primary can do a |map.Get|, but secondary has to do |map.PrefixGet|
func newPrimaryLookupKvIter(ctx context.Context, source prolly.MapIter, target prolly.Map, keyExprs []sql.Expression, sourceSch schema.Schema) *lookupJoinKvIter {
// if original source not covering, need extra lookup
// lookup primary mapping
// mappings from source->target

// keyMappings: from source key to lookup key index [0,1] => first key
// litMappings: literal value to place in lookup key
// valMappings: from source value to lookup key index
// keyExprs = [gf(), 1, gf()]
// get -> schIdx -> key or value

keyless := schema.IsKeyless(sourceSch)
var split int
if keyless {
// the only key is the hash of the values
split = 1
} else {
split = sourceSch.GetPKCols().Size()
}

// schMappings tell us where to look for key fields. A field will either
// be in the source key tuple (< split), source value tuple (>=split),
// or in the literal tuple (-1).
srcMapping := make(val.OrdinalMapping, target.KeyDesc().Count())
var litMappings val.OrdinalMapping
var litTypes []val.Type
kd, _ := target.Descriptors()
for i, e := range keyExprs {
switch e := e.(type) {
case *expression.GetField:
// map the schema order index to the physical storage index
j := e.Index()
col := sourceSch.GetAllCols().GetColumns()[j]
if col.IsPartOfPK {
srcMapping[i] = sourceSch.GetPKCols().TagToIdx[col.Tag]
} else if keyless {
// Skip cardinality column
srcMapping[i] = split + 1 + sourceSch.GetNonPKCols().TagToIdx[col.Tag]
} else {
srcMapping[i] = split + sourceSch.GetNonPKCols().TagToIdx[col.Tag]
}
case *expression.Literal:
srcMapping[i] = -1
litMappings = append(litMappings, i)
litTypes = append(litTypes, kd.Types[i])
}
}
litDesc := val.NewTupleDescriptor(litTypes...)
litTb := val.NewTupleBuilder(litDesc)
for i, j := range litMappings {
tree.PutField(ctx, target.NodeStore(), litTb, i, keyExprs[j].(*expression.Literal).Value())
}

litTuple := litTb.Build(target.Pool())

return &lookupJoinKvIter{
source: source,
target: target,
split: split,
srcMapping: srcMapping,
litMappings: litMappings,
litTuple: litTuple,
}
}

func (l lookupJoinKvIter) Next(ctx context.Context) (k, v val.Tuple) {
// get row from |src|
k, v, err := l.source.Next(ctx)
if err != nil {
return nil, nil
}

// do mapping to |dst| key
var litIdx int
for to := range l.srcMapping {
from := l.srcMapping.MapOrdinal(to)
var tup val.Tuple
var desc val.TupleDesc
if from == -1 {
tup = l.litTuple
desc = l.litKd
// literal offsets increment sequentially
from = litIdx
litIdx++
} else if from < l.split {
desc = l.srcKd
tup = k
} else {
// value tuple, adjust offset
tup = v
desc = l.srcVd
from = from - l.split
}

value, err := tree.GetField(ctx, desc, from, tup, l.target.NodeStore())
if err != nil {
return nil, nil
}

err = tree.PutField(ctx, l.target.NodeStore(), l.targetKb, to, value)
if err != nil {
return nil, nil
}
}

l.targetKb.Build(l.target.NodeStore())

// map.Get(dstKey)
// return concatenated result
return
}

type toSqlKvIter struct {
source prolly.MapIter

ns tree.NodeStore
keyDesc val.TupleDesc
valDesc val.TupleDesc

// map to sql row at the end for return to SQL layer
// see |prollyRowIter.Next|
keyProj []int
valProj []int
// ordProj is a concatenated list of output ordinals for |keyProj| and |valProj|
ordProj []int
}

func (it *toSqlKvIter) Next(ctx *sql.Context) (sql.Row, error) {
key, value, err := it.source.Next(ctx)
if err != nil {
return nil, err
}

row := make(sql.Row, len(it.ordProj))
for i, idx := range it.keyProj {
outputIdx := it.ordProj[i]
row[outputIdx], err = tree.GetField(ctx, it.keyDesc, idx, key, it.ns)
if err != nil {
return nil, err
}
}
for i, idx := range it.valProj {
outputIdx := it.ordProj[len(it.keyProj)+i]
row[outputIdx], err = tree.GetField(ctx, it.valDesc, idx, value, it.ns)
if err != nil {
return nil, err
}
}
return row, nil
}

func (it *toSqlKvIter) Close(c *sql.Context) error {
//TODO implement me
panic("implement me")
}

var _ sql.RowIter = (*toSqlKvIter)(nil)

func newToSqlKvIter(iter prolly.TupleIter) sql.RowIter {
// mappings from source->target
}
5 changes: 2 additions & 3 deletions go/libraries/doltcore/sqle/statsnoms/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/planbuilder"
"github.com/dolthub/go-mysql-server/sql/stats"
"gopkg.in/errgo.v2/errors"

"github.com/dolthub/dolt/go/libraries/doltcore/schema"
Expand Down Expand Up @@ -114,15 +113,15 @@ func (s *statsIter) Next(ctx *sql.Context) (sql.Row, error) {
upperBoundCnt := row[schema.StatsUpperBoundCntTag].(int64)
createdAt := row[schema.StatsCreatedAtTag].(time.Time)

typs := strings.Split(typesStr, ",")
typs := strings.Split(typesStr, "\n")
for i, t := range typs {
typs[i] = strings.TrimSpace(t)
}

qual := sql.NewStatQualifier(dbName, tableName, indexName)
if curQual := qual.String(); !strings.EqualFold(curQual, s.currentQual) {
s.currentQual = curQual
s.currentTypes, err = stats.ParseTypeStrings(typs)
s.currentTypes, err = parseTypeStrings(typs)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 279044b

Please sign in to comment.