Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support insert/delete/update/select for partition table #21242

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions pkg/cnservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,7 @@ func (s *service) initProcessCodecService() {
s._txnClient,
s.fileService,
s.lockService,
s.partitionService,
s.queryClient,
s._hakeeperClient,
s.udfService,
Expand Down
44 changes: 8 additions & 36 deletions pkg/frontend/test/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions pkg/objectio/block_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ type BlockInfo struct {
MetaLoc ObjectLocation
ObjectFlags int8

//TODO:: remove it.
PartitionNum int16
PartitionIdx int32
}

func (b *BlockInfo) SetFlagByObjStats(stats *ObjectStats) {
Expand Down Expand Up @@ -120,10 +119,10 @@ func (b *BlockInfo) MarshalWithBuf(w *bytes.Buffer) (uint32, error) {
}
space++

if _, err := w.Write(types.EncodeInt16(&b.PartitionNum)); err != nil {
if _, err := w.Write(types.EncodeInt32(&b.PartitionIdx)); err != nil {
return 0, err
}
space += 2
space += 4

return space, nil
}
Expand All @@ -138,7 +137,7 @@ func (b *BlockInfo) Unmarshal(buf []byte) error {
b.ObjectFlags = types.DecodeInt8(buf[:1])
buf = buf[1:]

b.PartitionNum = types.DecodeFixed[int16](buf[:2])
b.PartitionIdx = types.DecodeFixed[int32](buf[:4])
return nil
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/objectio/block_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package objectio

import (
"bytes"
"testing"

"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -288,3 +289,18 @@ func BenchmarkObjectStatsRelatedUtils(b *testing.B) {
}
})
}

func TestMarshalAndUnMarshal(t *testing.T) {
b := &BlockInfo{}
b.PartitionIdx = 1
b.ObjectFlags = 1

buf := bytes.NewBuffer(nil)
_, err := b.MarshalWithBuf(buf)
require.NoError(t, err)

b2 := &BlockInfo{}
require.NoError(t, b2.Unmarshal(buf.Bytes()))

require.Equal(t, b, b2)
}
46 changes: 46 additions & 0 deletions pkg/partitionservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/partition"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
Expand All @@ -29,6 +30,10 @@ import (
"github.com/matrixorigin/matrixone/pkg/txn/client"
)

var (
DisabledService = NewService(Config{}, nil)
)

type service struct {
cfg Config
store PartitionStorage
Expand Down Expand Up @@ -155,6 +160,47 @@ func (s *service) getMetadata(

}

func (s *service) Prune(
ctx context.Context,
tableID uint64,
bat *batch.Batch,
txnOp client.TxnOperator,
) (PruneResult, error) {
metadata, err := s.readMetadata(
ctx,
tableID,
txnOp,
)
if err != nil || metadata.IsEmpty() {
return PruneResult{}, err
}

// TODO(fagongzi): partition
return PruneResult{
batches: []*batch.Batch{bat, bat},
partitions: []partition.Partition{metadata.Partitions[0]},
}, nil
}

func (s *service) Filter(
ctx context.Context,
tableID uint64,
filters []*plan.Expr,
txnOp client.TxnOperator,
) ([]int, error) {
metadata, err := s.readMetadata(
ctx,
tableID,
txnOp,
)
if err != nil || metadata.IsEmpty() {
return nil, err
}

// TODO(fagongzi): partition
return []int{0}, nil
}

func (s *service) getMetadataByHashType(
option *tree.PartitionOption,
def *plan.TableDef,
Expand Down
48 changes: 48 additions & 0 deletions pkg/partitionservice/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"testing"
"time"

"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/partition"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/parsers"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect"
Expand Down Expand Up @@ -132,6 +134,52 @@ func TestIs(t *testing.T) {
)
}

func TestPruneNoPartition(t *testing.T) {
runTestPartitionServiceTest(
func(
ctx context.Context,
txnOp client.TxnOperator,
s *service,
store *memStorage,
) {
res, err := s.Prune(ctx, 1, nil, txnOp)
require.NoError(t, err)
require.True(t, res.Empty())
},
)
}

func TestFilterNoPartition(t *testing.T) {
runTestPartitionServiceTest(
func(
ctx context.Context,
txnOp client.TxnOperator,
s *service,
store *memStorage,
) {
res, err := s.Filter(ctx, 1, nil, txnOp)
require.NoError(t, err)
require.Empty(t, res)
},
)
}

func TestIterResult(t *testing.T) {
res := PruneResult{
batches: make([]*batch.Batch, 10),
partitions: make([]partition.Partition, 10),
}

n := 0
res.Iter(
func(partition partition.Partition, bat *batch.Batch) bool {
n++
return false
},
)
require.Equal(t, 1, n)
}

func runTestPartitionServiceTest(
fn func(
ctx context.Context,
Expand Down
36 changes: 36 additions & 0 deletions pkg/partitionservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/pb/partition"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
Expand Down Expand Up @@ -74,6 +75,20 @@ type PartitionService interface {
txnOp client.TxnOperator,
) error

Prune(
ctx context.Context,
tableID uint64,
bat *batch.Batch,
txnOp client.TxnOperator,
) (PruneResult, error)

Filter(
ctx context.Context,
tableID uint64,
filters []*plan.Expr,
txnOp client.TxnOperator,
) ([]int, error)

GetStorage() PartitionStorage
}

Expand Down Expand Up @@ -114,3 +129,24 @@ func GetService(
}
return v.(PartitionService)
}

type PruneResult struct {
batches []*batch.Batch
partitions []partition.Partition
}

func (res PruneResult) Iter(fn func(partition partition.Partition, bat *batch.Batch) bool) {
for i, p := range res.partitions {
if !fn(p, res.batches[i]) {
break
}
}
}

func (res PruneResult) Close() {

}

func (res PruneResult) Empty() bool {
return len(res.partitions) == 0
}
26 changes: 16 additions & 10 deletions pkg/sql/colexec/deletion/deletion.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,25 @@ func (deletion *Deletion) remoteDelete(proc *process.Process) (vm.CallResult, er

func (deletion *Deletion) normalDelete(proc *process.Process) (vm.CallResult, error) {
analyzer := deletion.OpAnalyzer
var result vm.CallResult
var err error
if !deletion.delegated {
result, err = vm.ChildrenCall(deletion.GetChildren(0), proc, analyzer)
if err != nil {
return result, err
}
if result.Batch == nil || result.Batch.IsEmpty() {
return result, nil
}

result, err := vm.ChildrenCall(deletion.GetChildren(0), proc, analyzer)
if err != nil {
return result, err
}
if result.Batch == nil || result.Batch.IsEmpty() {
return result, nil
}
if deletion.ctr.resBat == nil {
deletion.ctr.resBat = makeDelBatch(*result.Batch.GetVector(int32(deletion.DeleteCtx.PrimaryKeyIdx)).GetType())
} else {
deletion.ctr.resBat.CleanOnlyData()
}

if deletion.ctr.resBat == nil {
deletion.ctr.resBat = makeDelBatch(*result.Batch.GetVector(int32(deletion.DeleteCtx.PrimaryKeyIdx)).GetType())
} else {
deletion.ctr.resBat.CleanOnlyData()
result = deletion.input
}

bat := result.Batch
Expand Down
Loading
Loading