From ea7f7844b9d7716ac3344d6f05c1da36705232f2 Mon Sep 17 00:00:00 2001 From: zhangxu Date: Wed, 15 Jan 2025 17:40:12 +0800 Subject: [PATCH 1/5] add delete insert and select for partition table --- pkg/cnservice/server.go | 1 + pkg/frontend/test/engine_mock.go | 44 +- pkg/objectio/block_info.go | 9 +- pkg/partitionservice/service.go | 42 ++ pkg/partitionservice/types.go | 36 ++ pkg/sql/colexec/deletion/deletion.go | 26 +- .../colexec/deletion/deletion_partition.go | 184 ++++++ pkg/sql/colexec/deletion/types.go | 2 + pkg/sql/colexec/insert/insert.go | 21 +- pkg/sql/colexec/insert/insert_partition.go | 178 ++++++ pkg/sql/colexec/insert/types.go | 2 + pkg/sql/colexec/lockop/lock_op.go | 29 +- pkg/sql/colexec/lockop/lock_op_no_txn.go | 6 +- pkg/sql/colexec/lockop/lock_op_test.go | 13 +- pkg/sql/colexec/lockop/types.go | 4 +- pkg/sql/colexec/multi_update/multi_update.go | 58 +- .../multi_update/multi_update_partition.go | 178 ++++++ pkg/sql/colexec/multi_update/types.go | 2 + pkg/sql/colexec/preinsert/preinsert.go | 3 +- pkg/sql/compile/compile.go | 63 +- pkg/sql/compile/ddl.go | 40 +- pkg/sql/compile/lock_meta.go | 4 +- pkg/sql/compile/operator.go | 85 ++- pkg/tests/issues/issue_test.go | 5 +- pkg/tests/partition/create_hash_test.go | 109 ++++ pkg/tests/testutils/test_utils.go | 16 + .../disttae/local_disttae_datasource_test.go | 5 +- pkg/vm/engine/disttae/txn_database.go | 5 +- pkg/vm/engine/disttae/txn_table.go | 69 ++- ...able_sharding.go => txn_table_delegate.go} | 520 ++++++++++------ pkg/vm/engine/disttae/txn_table_partition.go | 576 ++++++++++++++++++ .../disttae/txn_table_sharding_handle.go | 10 +- pkg/vm/engine/memoryengine/table.go | 5 +- pkg/vm/engine/memoryengine/table_reader.go | 5 - pkg/vm/engine/readutil/datasource_test.go | 1 - pkg/vm/engine/readutil/relation_data.go | 33 +- pkg/vm/engine/readutil/relation_data_test.go | 3 - pkg/vm/engine/test/reader_test.go | 4 +- pkg/vm/engine/test/testutil/disttae_engine.go | 10 +- pkg/vm/engine/types.go | 6 +- pkg/vm/process/process.go | 5 + pkg/vm/process/process2.go | 22 +- pkg/vm/process/process_codec.go | 37 +- pkg/vm/process/types.go | 14 +- pkg/vm/types.go | 3 + 45 files changed, 2078 insertions(+), 415 deletions(-) create mode 100644 pkg/sql/colexec/deletion/deletion_partition.go create mode 100644 pkg/sql/colexec/insert/insert_partition.go create mode 100644 pkg/sql/colexec/multi_update/multi_update_partition.go rename pkg/vm/engine/disttae/{txn_table_sharding.go => txn_table_delegate.go} (86%) create mode 100644 pkg/vm/engine/disttae/txn_table_partition.go diff --git a/pkg/cnservice/server.go b/pkg/cnservice/server.go index 9b4c52db149af..7a8f3a5a0258b 100644 --- a/pkg/cnservice/server.go +++ b/pkg/cnservice/server.go @@ -1037,6 +1037,7 @@ func (s *service) initProcessCodecService() { s._txnClient, s.fileService, s.lockService, + s.partitionService, s.queryClient, s._hakeeperClient, s.udfService, diff --git a/pkg/frontend/test/engine_mock.go b/pkg/frontend/test/engine_mock.go index 5d3c00d20ccc2..e6e2568ad94b8 100644 --- a/pkg/frontend/test/engine_mock.go +++ b/pkg/frontend/test/engine_mock.go @@ -592,20 +592,6 @@ func (mr *MockRelDataMockRecorder) GetType() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetType", reflect.TypeOf((*MockRelData)(nil).GetType)) } -// GroupByPartitionNum mocks base method. -func (m *MockRelData) GroupByPartitionNum() map[int16]engine.RelData { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GroupByPartitionNum") - ret0, _ := ret[0].(map[int16]engine.RelData) - return ret0 -} - -// GroupByPartitionNum indicates an expected call of GroupByPartitionNum. -func (mr *MockRelDataMockRecorder) GroupByPartitionNum() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GroupByPartitionNum", reflect.TypeOf((*MockRelData)(nil).GroupByPartitionNum)) -} - // MarshalBinary mocks base method. func (m *MockRelData) MarshalBinary() ([]byte, error) { m.ctrl.T.Helper() @@ -645,20 +631,6 @@ func (mr *MockRelDataMockRecorder) SetShardID(i, id interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetShardID", reflect.TypeOf((*MockRelData)(nil).SetShardID), i, id) } -// Split mocks base method. -func (m *MockRelData) Split(i int) []engine.RelData { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Split", i) - ret0, _ := ret[0].([]engine.RelData) - return ret0 -} - -// Split indicates an expected call of Split. -func (mr *MockRelDataMockRecorder) Split(i interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Split", reflect.TypeOf((*MockRelData)(nil).Split), i) -} - // String mocks base method. func (m *MockRelData) String() string { m.ctrl.T.Helper() @@ -1334,33 +1306,33 @@ func (mr *MockRelationMockRecorder) MergeObjects(ctx, objstats, targetObjSize in } // PrimaryKeysMayBeModified mocks base method. -func (m *MockRelation) PrimaryKeysMayBeModified(ctx context.Context, from, to types.TS, keyVector *vector.Vector) (bool, error) { +func (m *MockRelation) PrimaryKeysMayBeModified(ctx context.Context, from, to types.TS, batch *batch.Batch, pkIndex int32) (bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PrimaryKeysMayBeModified", ctx, from, to, keyVector) + ret := m.ctrl.Call(m, "PrimaryKeysMayBeModified", ctx, from, to, batch, pkIndex) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) return ret0, ret1 } // PrimaryKeysMayBeModified indicates an expected call of PrimaryKeysMayBeModified. -func (mr *MockRelationMockRecorder) PrimaryKeysMayBeModified(ctx, from, to, keyVector interface{}) *gomock.Call { +func (mr *MockRelationMockRecorder) PrimaryKeysMayBeModified(ctx, from, to, batch, pkIndex interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrimaryKeysMayBeModified", reflect.TypeOf((*MockRelation)(nil).PrimaryKeysMayBeModified), ctx, from, to, keyVector) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrimaryKeysMayBeModified", reflect.TypeOf((*MockRelation)(nil).PrimaryKeysMayBeModified), ctx, from, to, batch, pkIndex) } // PrimaryKeysMayBeUpserted mocks base method. -func (m *MockRelation) PrimaryKeysMayBeUpserted(ctx context.Context, from, to types.TS, keyVector *vector.Vector) (bool, error) { +func (m *MockRelation) PrimaryKeysMayBeUpserted(ctx context.Context, from, to types.TS, batch *batch.Batch, pkIndex int32) (bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PrimaryKeysMayBeUpserted", ctx, from, to, keyVector) + ret := m.ctrl.Call(m, "PrimaryKeysMayBeUpserted", ctx, from, to, batch, pkIndex) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) return ret0, ret1 } // PrimaryKeysMayBeUpserted indicates an expected call of PrimaryKeysMayBeUpserted. -func (mr *MockRelationMockRecorder) PrimaryKeysMayBeUpserted(ctx, from, to, keyVector interface{}) *gomock.Call { +func (mr *MockRelationMockRecorder) PrimaryKeysMayBeUpserted(ctx, from, to, batch, pkIndex interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrimaryKeysMayBeUpserted", reflect.TypeOf((*MockRelation)(nil).PrimaryKeysMayBeUpserted), ctx, from, to, keyVector) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrimaryKeysMayBeUpserted", reflect.TypeOf((*MockRelation)(nil).PrimaryKeysMayBeUpserted), ctx, from, to, batch, pkIndex) } // Ranges mocks base method. diff --git a/pkg/objectio/block_info.go b/pkg/objectio/block_info.go index 4c70a0565b84a..1e5f5aee20816 100644 --- a/pkg/objectio/block_info.go +++ b/pkg/objectio/block_info.go @@ -59,8 +59,7 @@ type BlockInfo struct { MetaLoc ObjectLocation ObjectFlags int8 - //TODO:: remove it. - PartitionNum int16 + PartitionIdx int32 } func (b *BlockInfo) SetFlagByObjStats(stats *ObjectStats) { @@ -120,10 +119,10 @@ func (b *BlockInfo) MarshalWithBuf(w *bytes.Buffer) (uint32, error) { } space++ - if _, err := w.Write(types.EncodeInt16(&b.PartitionNum)); err != nil { + if _, err := w.Write(types.EncodeInt32(&b.PartitionIdx)); err != nil { return 0, err } - space += 2 + space += 4 return space, nil } @@ -138,7 +137,7 @@ func (b *BlockInfo) Unmarshal(buf []byte) error { b.ObjectFlags = types.DecodeInt8(buf[:1]) buf = buf[1:] - b.PartitionNum = types.DecodeFixed[int16](buf[:2]) + b.PartitionIdx = types.DecodeFixed[int32](buf[:4]) return nil } diff --git a/pkg/partitionservice/service.go b/pkg/partitionservice/service.go index ba36969421de3..13fbd7ba2ce96 100644 --- a/pkg/partitionservice/service.go +++ b/pkg/partitionservice/service.go @@ -21,6 +21,7 @@ import ( "sync" "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/partition" "github.com/matrixorigin/matrixone/pkg/pb/plan" @@ -155,6 +156,47 @@ func (s *service) getMetadata( } +func (s *service) Prune( + ctx context.Context, + tableID uint64, + bat *batch.Batch, + txnOp client.TxnOperator, +) (PruneResult, error) { + metadata, err := s.readMetadata( + ctx, + tableID, + txnOp, + ) + if err != nil || metadata.IsEmpty() { + return PruneResult{}, err + } + + // TODO(fagongzi): partition + return PruneResult{ + batches: []*batch.Batch{bat, bat}, + partitions: []partition.Partition{metadata.Partitions[0]}, + }, nil +} + +func (s *service) Filter( + ctx context.Context, + tableID uint64, + filters []*plan.Expr, + txnOp client.TxnOperator, +) ([]int, error) { + metadata, err := s.readMetadata( + ctx, + tableID, + txnOp, + ) + if err != nil || metadata.IsEmpty() { + return nil, err + } + + // TODO(fagongzi): partition + return []int{0}, nil +} + func (s *service) getMetadataByHashType( option *tree.PartitionOption, def *plan.TableDef, diff --git a/pkg/partitionservice/types.go b/pkg/partitionservice/types.go index 454d37fb7a5dd..574b4dd676b53 100644 --- a/pkg/partitionservice/types.go +++ b/pkg/partitionservice/types.go @@ -20,6 +20,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/runtime" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/pb/partition" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" @@ -74,6 +75,20 @@ type PartitionService interface { txnOp client.TxnOperator, ) error + Prune( + ctx context.Context, + tableID uint64, + bat *batch.Batch, + txnOp client.TxnOperator, + ) (PruneResult, error) + + Filter( + ctx context.Context, + tableID uint64, + filters []*plan.Expr, + txnOp client.TxnOperator, + ) ([]int, error) + GetStorage() PartitionStorage } @@ -114,3 +129,24 @@ func GetService( } return v.(PartitionService) } + +type PruneResult struct { + batches []*batch.Batch + partitions []partition.Partition +} + +func (res PruneResult) Iter(fn func(partition partition.Partition, bat *batch.Batch) bool) { + for i, p := range res.partitions { + if !fn(p, res.batches[i]) { + break + } + } +} + +func (res PruneResult) Close() { + +} + +func (res PruneResult) Empty() bool { + return len(res.partitions) == 0 +} diff --git a/pkg/sql/colexec/deletion/deletion.go b/pkg/sql/colexec/deletion/deletion.go index f0a18dc40c5fc..dd5ad771c1134 100644 --- a/pkg/sql/colexec/deletion/deletion.go +++ b/pkg/sql/colexec/deletion/deletion.go @@ -214,19 +214,25 @@ func (deletion *Deletion) remoteDelete(proc *process.Process) (vm.CallResult, er func (deletion *Deletion) normalDelete(proc *process.Process) (vm.CallResult, error) { analyzer := deletion.OpAnalyzer + var result vm.CallResult + var err error + if !deletion.delegated { + result, err = vm.ChildrenCall(deletion.GetChildren(0), proc, analyzer) + if err != nil { + return result, err + } + if result.Batch == nil || result.Batch.IsEmpty() { + return result, nil + } - result, err := vm.ChildrenCall(deletion.GetChildren(0), proc, analyzer) - if err != nil { - return result, err - } - if result.Batch == nil || result.Batch.IsEmpty() { - return result, nil - } + if deletion.ctr.resBat == nil { + deletion.ctr.resBat = makeDelBatch(*result.Batch.GetVector(int32(deletion.DeleteCtx.PrimaryKeyIdx)).GetType()) + } else { + deletion.ctr.resBat.CleanOnlyData() + } - if deletion.ctr.resBat == nil { - deletion.ctr.resBat = makeDelBatch(*result.Batch.GetVector(int32(deletion.DeleteCtx.PrimaryKeyIdx)).GetType()) } else { - deletion.ctr.resBat.CleanOnlyData() + result = deletion.input } bat := result.Batch diff --git a/pkg/sql/colexec/deletion/deletion_partition.go b/pkg/sql/colexec/deletion/deletion_partition.go new file mode 100644 index 0000000000000..bb166d4a42677 --- /dev/null +++ b/pkg/sql/colexec/deletion/deletion_partition.go @@ -0,0 +1,184 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deletion + +import ( + "bytes" + + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/pb/partition" + "github.com/matrixorigin/matrixone/pkg/sql/colexec" + "github.com/matrixorigin/matrixone/pkg/vm" + "github.com/matrixorigin/matrixone/pkg/vm/engine" + "github.com/matrixorigin/matrixone/pkg/vm/process" +) + +type PartitionDelete struct { + vm.OperatorBase + + raw *Deletion + tableID uint64 +} + +func NewPartitionDelete( + raw *Deletion, + tableID uint64, +) vm.Operator { + return &PartitionDelete{ + raw: raw, + tableID: tableID, + } +} + +func NewPartitionDeleteFrom( + ps *PartitionDelete, +) vm.Operator { + raw := NewArgument() + raw.DeleteCtx = ps.raw.DeleteCtx + return NewPartitionDelete(raw, ps.tableID) +} + +func (op *PartitionDelete) String(buf *bytes.Buffer) { + buf.WriteString(opName) + buf.WriteString(": partition_delete") +} + +func (op *PartitionDelete) OpType() vm.OpType { + return vm.PartitionDelete +} + +func (op *PartitionDelete) Prepare( + proc *process.Process, +) error { + if op.OpAnalyzer == nil { + op.OpAnalyzer = process.NewAnalyzer(op.GetIdx(), op.IsFirst, op.IsLast, "partition_delete") + } else { + op.OpAnalyzer.Reset() + } + + op.raw.OperatorBase = op.OperatorBase + return op.raw.Prepare(proc) +} + +func (op *PartitionDelete) Call( + proc *process.Process, +) (vm.CallResult, error) { + analyzer := op.raw.OpAnalyzer + input, err := vm.ChildrenCall(op.GetChildren(0), proc, analyzer) + if err != nil { + return input, err + } + if input.Batch == nil || input.Batch.IsEmpty() { + return input, nil + } + + if op.raw.ctr.resBat == nil { + op.raw.ctr.resBat = makeDelBatch(*input.Batch.GetVector(int32(op.raw.DeleteCtx.PrimaryKeyIdx)).GetType()) + } else { + op.raw.ctr.resBat.CleanOnlyData() + } + + op.raw.delegated = true + op.raw.input = input + + ps := proc.GetPartitionService() + + res, err := ps.Prune( + proc.Ctx, + op.tableID, + input.Batch, + proc.GetTxnOperator(), + ) + if err != nil { + return vm.CallResult{}, err + } + defer res.Close() + if res.Empty() { + panic("Prune result is empty") + } + + ref := op.raw.DeleteCtx.Ref + eng := op.raw.DeleteCtx.Engine + oldName := ref.ObjName + defer func() { + ref.ObjName = oldName + }() + + var rel engine.Relation + res.Iter( + func( + partition partition.Partition, + bat *batch.Batch, + ) bool { + ref.ObjName = partition.PartitionTableName + rel, err = colexec.GetRelAndPartitionRelsByObjRef( + proc.Ctx, + proc, + eng, + ref, + ) + if err != nil { + return false + } + op.raw.ctr.source = rel + _, e := op.raw.Call(proc) + if e != nil { + err = e + return false + } + return true + }, + ) + if err != nil { + return vm.CallResult{}, err + } + return input, nil +} + +func (op *PartitionDelete) ExecProjection( + proc *process.Process, + input *batch.Batch, +) (*batch.Batch, error) { + return input, nil +} + +func (op *PartitionDelete) Free( + proc *process.Process, + pipelineFailed bool, + err error, +) { + op.raw.Free(proc, pipelineFailed, err) + *op = PartitionDelete{} +} + +func (op *PartitionDelete) Release() { + op.raw.Release() +} + +func (op *PartitionDelete) Reset( + proc *process.Process, + pipelineFailed bool, + err error, +) { + op.raw.Reset(proc, pipelineFailed, err) +} + +func (op *PartitionDelete) GetOperatorBase() *vm.OperatorBase { + return &op.OperatorBase +} + +func (op *PartitionDelete) GetDelete() *Deletion { + return op.raw +} diff --git a/pkg/sql/colexec/deletion/types.go b/pkg/sql/colexec/deletion/types.go index 1a5e584862640..1261f45800058 100644 --- a/pkg/sql/colexec/deletion/types.go +++ b/pkg/sql/colexec/deletion/types.go @@ -82,6 +82,8 @@ type container struct { affectedRows uint64 } type Deletion struct { + delegated bool + input vm.CallResult ctr container DeleteCtx *DeleteCtx diff --git a/pkg/sql/colexec/insert/insert.go b/pkg/sql/colexec/insert/insert.go index 56c00cedbbead..cfa7c5f65a985 100644 --- a/pkg/sql/colexec/insert/insert.go +++ b/pkg/sql/colexec/insert/insert.go @@ -150,22 +150,27 @@ func (insert *Insert) insert_s3(proc *process.Process, analyzer process.Analyzer } func (insert *Insert) insert_table(proc *process.Process, analyzer process.Analyzer) (vm.CallResult, error) { - input, err := vm.ChildrenCall(insert.GetChildren(0), proc, analyzer) - if err != nil { - return input, err - } + if !insert.delegated { + input, err := vm.ChildrenCall(insert.GetChildren(0), proc, analyzer) + if err != nil { + return input, err + } + + if input.Batch == nil || input.Batch.IsEmpty() { + return input, nil + } - if input.Batch == nil || input.Batch.IsEmpty() { - return input, nil + insert.input = input } + input := insert.input affectedRows := uint64(input.Batch.RowCount()) insert.ctr.buf.CleanOnlyData() for i := range insert.ctr.buf.Attrs { if insert.ctr.buf.Vecs[i] == nil { insert.ctr.buf.Vecs[i] = vector.NewVec(*input.Batch.Vecs[i].GetType()) } - if err = insert.ctr.buf.Vecs[i].UnionBatch(input.Batch.Vecs[i], 0, input.Batch.Vecs[i].Length(), nil, proc.GetMPool()); err != nil { + if err := insert.ctr.buf.Vecs[i].UnionBatch(input.Batch.Vecs[i], 0, input.Batch.Vecs[i].Length(), nil, proc.GetMPool()); err != nil { return input, err } } @@ -175,7 +180,7 @@ func (insert *Insert) insert_table(proc *process.Process, analyzer process.Analy newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) // insert into table, insertBat will be deeply copied into txn's workspace. - err = insert.ctr.source.Write(newCtx, insert.ctr.buf) + err := insert.ctr.source.Write(newCtx, insert.ctr.buf) if err != nil { return input, err } diff --git a/pkg/sql/colexec/insert/insert_partition.go b/pkg/sql/colexec/insert/insert_partition.go new file mode 100644 index 0000000000000..f33a2a877643e --- /dev/null +++ b/pkg/sql/colexec/insert/insert_partition.go @@ -0,0 +1,178 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package insert + +import ( + "bytes" + + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/pb/partition" + "github.com/matrixorigin/matrixone/pkg/sql/colexec" + "github.com/matrixorigin/matrixone/pkg/vm" + "github.com/matrixorigin/matrixone/pkg/vm/engine" + "github.com/matrixorigin/matrixone/pkg/vm/process" +) + +type PartitionInsert struct { + vm.OperatorBase + + raw *Insert + tableID uint64 +} + +func NewPartitionInsert( + raw *Insert, + tableID uint64, +) vm.Operator { + return &PartitionInsert{ + raw: raw, + tableID: tableID, + } +} + +func NewPartitionInsertFrom( + ps *PartitionInsert, +) vm.Operator { + raw := NewArgument() + raw.InsertCtx = ps.raw.InsertCtx + raw.ToWriteS3 = ps.raw.ToWriteS3 + return NewPartitionInsert(raw, ps.tableID) +} + +func (op *PartitionInsert) String(buf *bytes.Buffer) { + buf.WriteString(opName) + buf.WriteString(": partition_insert") +} + +func (op *PartitionInsert) OpType() vm.OpType { + return vm.PartitionInsert +} + +func (op *PartitionInsert) Prepare( + proc *process.Process, +) error { + if op.OpAnalyzer == nil { + op.OpAnalyzer = process.NewAnalyzer(op.GetIdx(), op.IsFirst, op.IsLast, "partition_insert") + } else { + op.OpAnalyzer.Reset() + } + + op.raw.OperatorBase = op.OperatorBase + return op.raw.Prepare(proc) +} + +func (op *PartitionInsert) Call( + proc *process.Process, +) (vm.CallResult, error) { + input, err := vm.ChildrenCall( + op.GetChildren(0), + proc, + op.raw.OpAnalyzer, + ) + if err != nil { + return input, err + } + if input.Batch == nil || input.Batch.IsEmpty() { + return input, nil + } + + op.raw.delegated = true + op.raw.input = input + + ps := proc.GetPartitionService() + + res, err := ps.Prune( + proc.Ctx, + op.tableID, + input.Batch, + proc.GetTxnOperator(), + ) + if err != nil { + return vm.CallResult{}, err + } + defer res.Close() + if res.Empty() { + panic("Prune result is empty") + } + + ref := op.raw.InsertCtx.Ref + eng := op.raw.InsertCtx.Engine + oldName := ref.ObjName + defer func() { + ref.ObjName = oldName + }() + + var rel engine.Relation + res.Iter( + func( + partition partition.Partition, + bat *batch.Batch, + ) bool { + ref.ObjName = partition.PartitionTableName + rel, err = colexec.GetRelAndPartitionRelsByObjRef( + proc.Ctx, + proc, + eng, + ref, + ) + if err != nil { + return false + } + op.raw.ctr.source = rel + _, e := op.raw.Call(proc) + if e != nil { + err = e + return false + } + return true + }, + ) + if err != nil { + return vm.CallResult{}, err + } + return input, nil +} + +func (op *PartitionInsert) ExecProjection( + proc *process.Process, + input *batch.Batch, +) (*batch.Batch, error) { + return input, nil +} + +func (op *PartitionInsert) Free( + proc *process.Process, + pipelineFailed bool, + err error, +) { + op.raw.Free(proc, pipelineFailed, err) + *op = PartitionInsert{} +} + +func (op *PartitionInsert) Release() { + op.raw.Release() +} + +func (op *PartitionInsert) Reset( + proc *process.Process, + pipelineFailed bool, + err error, +) { + op.raw.Reset(proc, pipelineFailed, err) +} + +func (op *PartitionInsert) GetOperatorBase() *vm.OperatorBase { + return &op.OperatorBase +} diff --git a/pkg/sql/colexec/insert/types.go b/pkg/sql/colexec/insert/types.go index f7666393c08df..6a2a2fdf7ce2a 100644 --- a/pkg/sql/colexec/insert/types.go +++ b/pkg/sql/colexec/insert/types.go @@ -45,6 +45,8 @@ type container struct { } type Insert struct { + delegated bool + input vm.CallResult ctr container ToWriteS3 bool // mark if this insert's target is S3 or not. InsertCtx *InsertCtx diff --git a/pkg/sql/colexec/lockop/lock_op.go b/pkg/sql/colexec/lockop/lock_op.go index 5cadc294f1b75..fd8b7749e505e 100644 --- a/pkg/sql/colexec/lockop/lock_op.go +++ b/pkg/sql/colexec/lockop/lock_op.go @@ -173,7 +173,6 @@ func performLock( zap.Int32("filter-col", target.filterColIndexInBatch), zap.Int32("primary-index", target.primaryColumnIndexInBatch)) var filterCols []int32 - priVec := bat.GetVector(target.primaryColumnIndexInBatch) // For partitioned tables, filter is not nil if target.filter != nil { filterCols = vector.MustFixedColWithTypeCheck[int32](bat.GetVector(target.filterColIndexInBatch)) @@ -191,7 +190,8 @@ func performLock( lockOp.ctr.relations[idx], target.tableID, proc, - priVec, + bat, + target.primaryColumnIndexInBatch, target.primaryColumnType, DefaultLockOptions(lockOp.ctr.parker). WithLockMode(lock.LockMode_Exclusive). @@ -219,6 +219,7 @@ func performLock( // refreshTS is last commit ts + 1, because we need see the committed data. if proc.Base.TxnClient.RefreshExpressionEnabled() && target.refreshTimestampIndexInBatch != -1 { + priVec := bat.GetVector(target.primaryColumnIndexInBatch) vec := bat.GetVector(target.refreshTimestampIndexInBatch) ts := types.BuildTS(refreshTS.PhysicalTime, refreshTS.LogicalTime) n := priVec.Length() @@ -291,6 +292,7 @@ func LockTable( tableID, proc, nil, + 0, pkType, opts) if err != nil { @@ -313,7 +315,8 @@ func LockRows( proc *process.Process, rel engine.Relation, tableID uint64, - vec *vector.Vector, + bat *batch.Batch, + idx int32, pkType types.Type, lockMode lock.LockMode, sharding lock.Sharding, @@ -355,7 +358,8 @@ func LockRows( rel, tableID, proc, - vec, + bat, + idx, pkType, opts) if err != nil { @@ -382,7 +386,8 @@ func doLock( rel engine.Relation, tableID uint64, proc *process.Process, - vec *vector.Vector, + bat *batch.Batch, + idx int32, pkType types.Type, opts LockOptions, ) (bool, bool, timestamp.Timestamp, error) { @@ -415,6 +420,11 @@ func doLock( // update t1 set b = b+1 where a = 1; // here MO will use 't1 left join hidden_tbl' to fetch the PK in hidden table to lock, // but the result will be ConstNull vector + var vec *vector.Vector + if bat != nil { + vec = bat.GetVector(idx) + } + if vec != nil && vec.AllNull() { return false, false, timestamp.Timestamp{}, nil } @@ -552,7 +562,7 @@ func doLock( } // if [snapshotTS, newSnapshotTS] has been modified, need retry at new snapshot ts - changed, err := fn(proc, rel, analyzer, tableID, eng, vec, snapshotTS, newSnapshotTS) + changed, err := fn(proc, rel, analyzer, tableID, eng, bat, idx, snapshotTS, newSnapshotTS) if err != nil { return false, false, timestamp.Timestamp{}, err } @@ -978,10 +988,11 @@ func hasNewVersionInRange( analyzer process.Analyzer, tableID uint64, eng engine.Engine, - vec *vector.Vector, + bat *batch.Batch, + idx int32, from, to timestamp.Timestamp, ) (bool, error) { - if vec == nil { + if bat == nil { return false, nil } @@ -1009,7 +1020,7 @@ func hasNewVersionInRange( fromTS := types.BuildTS(from.PhysicalTime, from.LogicalTime) toTS := types.BuildTS(to.PhysicalTime, to.LogicalTime) - return rel.PrimaryKeysMayBeModified(newCtx, fromTS, toTS, vec) + return rel.PrimaryKeysMayBeModified(newCtx, fromTS, toTS, bat, idx) } func analyzeLockWaitTime(analyzer process.Analyzer, start time.Time) { diff --git a/pkg/sql/colexec/lockop/lock_op_no_txn.go b/pkg/sql/colexec/lockop/lock_op_no_txn.go index 3b3f56a10d8c4..c0edc77ea29e3 100644 --- a/pkg/sql/colexec/lockop/lock_op_no_txn.go +++ b/pkg/sql/colexec/lockop/lock_op_no_txn.go @@ -20,8 +20,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/common/runtime" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" - "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/lockservice" "github.com/matrixorigin/matrixone/pkg/pb/lock" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" @@ -71,7 +71,8 @@ func LockTableWithUniqueID( analyzer process.Analyzer, tableID uint64, eng engine.Engine, - vec *vector.Vector, + vec *batch.Batch, + idx int32, from, to timestamp.Timestamp) (bool, error) { return false, nil }). @@ -84,6 +85,7 @@ func LockTableWithUniqueID( tableID, proc, nil, + 0, pkType, opts) return err diff --git a/pkg/sql/colexec/lockop/lock_op_test.go b/pkg/sql/colexec/lockop/lock_op_test.go index de15c47c3675b..71c851364fc39 100644 --- a/pkg/sql/colexec/lockop/lock_op_test.go +++ b/pkg/sql/colexec/lockop/lock_op_test.go @@ -21,10 +21,6 @@ import ( "time" "github.com/lni/goutils/leaktest" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/common/runtime" @@ -41,6 +37,9 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/process" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" ) var testFunc = func( @@ -49,7 +48,8 @@ var testFunc = func( analyzer process.Analyzer, tableID uint64, eng engine.Engine, - vec *vector.Vector, + bat *batch.Batch, + idx int32, from, to timestamp.Timestamp) (bool, error) { return false, nil } @@ -340,7 +340,8 @@ func TestLockWithHasNewVersionInLockedTS(t *testing.T) { analyzer process.Analyzer, tableID uint64, eng engine.Engine, - vec *vector.Vector, + bat *batch.Batch, + idx int32, from, to timestamp.Timestamp) (bool, error) { return true, nil } diff --git a/pkg/sql/colexec/lockop/types.go b/pkg/sql/colexec/lockop/types.go index dd8883d435ea9..8d36fb340145e 100644 --- a/pkg/sql/colexec/lockop/types.go +++ b/pkg/sql/colexec/lockop/types.go @@ -17,6 +17,7 @@ package lockop import ( "github.com/matrixorigin/matrixone/pkg/common/log" "github.com/matrixorigin/matrixone/pkg/common/reuse" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/pb/lock" @@ -126,7 +127,8 @@ type hasNewVersionInRangeFunc func( analyzer process.Analyzer, tableID uint64, eng engine.Engine, - vec *vector.Vector, + bat *batch.Batch, + idx int32, from, to timestamp.Timestamp) (bool, error) type state struct { diff --git a/pkg/sql/colexec/multi_update/multi_update.go b/pkg/sql/colexec/multi_update/multi_update.go index cba2789208f12..c6aebfe04c25c 100644 --- a/pkg/sql/colexec/multi_update/multi_update.go +++ b/pkg/sql/colexec/multi_update/multi_update.go @@ -196,16 +196,21 @@ func (update *MultiUpdate) update_s3(proc *process.Process, analyzer process.Ana } func (update *MultiUpdate) update(proc *process.Process, analyzer process.Analyzer) (vm.CallResult, error) { - input, err := vm.ChildrenCall(update.GetChildren(0), proc, analyzer) - if err != nil { - return input, err - } + if !update.delegated { + input, err := vm.ChildrenCall(update.GetChildren(0), proc, analyzer) + if err != nil { + return input, err + } - if input.Batch == nil || input.Batch.IsEmpty() { - return input, nil + if input.Batch == nil || input.Batch.IsEmpty() { + return input, nil + } + + update.input = input } - err = update.updateOneBatch(proc, analyzer, input.Batch) + input := update.input + err := update.updateOneBatch(proc, analyzer, input.Batch) if err != nil { return vm.CancelResult, err } @@ -344,3 +349,42 @@ func (update *MultiUpdate) updateOneBatch(proc *process.Process, analyzer proces return nil } + +func (update *MultiUpdate) resetMultiUpdateCtxs() { + for k := range update.ctr.updateCtxInfos { + delete(update.ctr.updateCtxInfos, k) + } + + for _, updateCtx := range update.MultiUpdateCtx { + info := new(updateCtxInfo) + for _, col := range updateCtx.TableDef.Cols { + if col.Name != catalog.Row_ID { + info.insertAttrs = append(info.insertAttrs, col.Name) + } + } + + tableType := UpdateMainTable + if strings.HasPrefix(updateCtx.TableDef.Name, catalog.UniqueIndexTableNamePrefix) { + tableType = UpdateUniqueIndexTable + } else if strings.HasPrefix(updateCtx.TableDef.Name, catalog.SecondaryIndexTableNamePrefix) { + tableType = UpdateSecondaryIndexTable + } + info.tableType = tableType + update.ctr.updateCtxInfos[updateCtx.TableDef.Name] = info + } +} + +func (update *MultiUpdate) resetMultiSources(proc *process.Process) error { + for _, updateCtx := range update.MultiUpdateCtx { + info := update.ctr.updateCtxInfos[updateCtx.TableDef.Name] + info.Sources = nil + if update.Action != UpdateWriteS3 { + rel, err := colexec.GetRelAndPartitionRelsByObjRef(proc.Ctx, proc, update.Engine, updateCtx.ObjRef) + if err != nil { + return err + } + info.Sources = append(info.Sources, rel) + } + } + return nil +} diff --git a/pkg/sql/colexec/multi_update/multi_update_partition.go b/pkg/sql/colexec/multi_update/multi_update_partition.go new file mode 100644 index 0000000000000..fbd813d84468e --- /dev/null +++ b/pkg/sql/colexec/multi_update/multi_update_partition.go @@ -0,0 +1,178 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multi_update + +import ( + "bytes" + + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/pb/partition" + "github.com/matrixorigin/matrixone/pkg/sql/colexec" + "github.com/matrixorigin/matrixone/pkg/vm" + "github.com/matrixorigin/matrixone/pkg/vm/engine" + "github.com/matrixorigin/matrixone/pkg/vm/process" +) + +type PartitionMultiUpdate struct { + vm.OperatorBase + + raw *MultiUpdate + tableID uint64 +} + +func NewPartitionMultiUpdate( + raw *MultiUpdate, + tableID uint64, +) vm.Operator { + return &PartitionMultiUpdate{ + raw: raw, + tableID: tableID, + } +} + +func NewPartitionMultiUpdateFrom( + from *PartitionMultiUpdate, +) vm.Operator { + op := NewArgument() + op.MultiUpdateCtx = from.raw.MultiUpdateCtx + op.Action = from.raw.Action + op.IsOnduplicateKeyUpdate = from.raw.IsOnduplicateKeyUpdate + op.Engine = from.raw.Engine + return op +} + +func (op *PartitionMultiUpdate) String(buf *bytes.Buffer) { + buf.WriteString(opName) + buf.WriteString(": partition_multi_update") +} + +func (op *PartitionMultiUpdate) OpType() vm.OpType { + return vm.PartitionMultiUpdate +} + +func (op *PartitionMultiUpdate) Prepare( + proc *process.Process, +) error { + if op.OpAnalyzer == nil { + op.OpAnalyzer = process.NewAnalyzer(op.GetIdx(), op.IsFirst, op.IsLast, "partition_multi_update") + } else { + op.OpAnalyzer.Reset() + } + + op.raw.OperatorBase = op.OperatorBase + return op.raw.Prepare(proc) +} + +func (op *PartitionMultiUpdate) Call( + proc *process.Process, +) (vm.CallResult, error) { + input, err := vm.ChildrenCall( + op.GetChildren(0), + proc, + op.raw.OpAnalyzer, + ) + if err != nil { + return input, err + } + if input.Batch == nil || input.Batch.IsEmpty() { + return input, nil + } + + op.raw.delegated = true + op.raw.input = input + + ps := proc.GetPartitionService() + + res, err := ps.Prune( + proc.Ctx, + op.tableID, + input.Batch, + proc.GetTxnOperator(), + ) + if err != nil { + return vm.CallResult{}, err + } + defer res.Close() + if res.Empty() { + panic("Prune result is empty") + } + + var rel engine.Relation + res.Iter( + func( + partition partition.Partition, + bat *batch.Batch, + ) bool { + for _, c := range op.raw.MultiUpdateCtx { + c.ObjRef.ObjName = partition.PartitionTableName + rel, err = colexec.GetRelAndPartitionRelsByObjRef( + proc.Ctx, + proc, + op.raw.Engine, + c.ObjRef, + ) + if err != nil { + return false + } + + c.ObjRef.ObjName = partition.PartitionTableName + c.TableDef = rel.GetTableDef(proc.Ctx) + } + op.raw.resetMultiUpdateCtxs() + if err = op.raw.resetMultiSources(proc); err != nil { + return false + } + + _, err = op.raw.Call(proc) + return err == nil + }, + ) + if err != nil { + return vm.CallResult{}, err + } + return input, nil +} + +func (op *PartitionMultiUpdate) ExecProjection( + proc *process.Process, + input *batch.Batch, +) (*batch.Batch, error) { + return input, nil +} + +func (op *PartitionMultiUpdate) Free( + proc *process.Process, + pipelineFailed bool, + err error, +) { + op.raw.Free(proc, pipelineFailed, err) + *op = PartitionMultiUpdate{} +} + +func (op *PartitionMultiUpdate) Release() { + op.raw.Release() +} + +func (op *PartitionMultiUpdate) Reset( + proc *process.Process, + pipelineFailed bool, + err error, +) { + op.raw.Reset(proc, pipelineFailed, err) +} + +func (op *PartitionMultiUpdate) GetOperatorBase() *vm.OperatorBase { + return &op.OperatorBase +} diff --git a/pkg/sql/colexec/multi_update/types.go b/pkg/sql/colexec/multi_update/types.go index 0c498bb83c6f2..4c45c0d09c171 100644 --- a/pkg/sql/colexec/multi_update/types.go +++ b/pkg/sql/colexec/multi_update/types.go @@ -65,6 +65,8 @@ func init() { } type MultiUpdate struct { + delegated bool + input vm.CallResult ctr container MultiUpdateCtx []*MultiUpdateCtx diff --git a/pkg/sql/colexec/preinsert/preinsert.go b/pkg/sql/colexec/preinsert/preinsert.go index 99542b3c0da11..b86f251b232a4 100644 --- a/pkg/sql/colexec/preinsert/preinsert.go +++ b/pkg/sql/colexec/preinsert/preinsert.go @@ -286,14 +286,13 @@ func genAutoIncrCol(bat *batch.Batch, proc *proc, preInsert *PreInsert) error { } for col, idx := range needReCheck { - vec := bat.GetVector(int32(idx)) from, err := proc.GetIncrService().GetLastAllocateTS(proc.Ctx, tableID, col) if err != nil { return err } fromTs := types.TimestampToTS(from) toTs := types.TimestampToTS(proc.Base.TxnOperator.SnapshotTS()) - if mayChanged, err := rel.PrimaryKeysMayBeUpserted(proc.Ctx, fromTs, toTs, vec); err == nil { + if mayChanged, err := rel.PrimaryKeysMayBeUpserted(proc.Ctx, fromTs, toTs, bat, int32(idx)); err == nil { if mayChanged { logutil.Debugf("user may have manually specified the value to be inserted into the auto pk col before this transaction.") return moerr.NewTxnNeedRetry(proc.Ctx) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index a6b17e51a9fc1..ec9cf59c8ce1b 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -3136,8 +3136,12 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* currentFirstFlag := c.anal.isFirst // Not write S3 for i := range ss { - insertArg := constructInsert(n, c.e) - insertArg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) + insertArg, err := constructInsert(c.proc, n, c.e, false) + if err != nil { + return nil, err + } + + insertArg.GetOperatorBase().SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) ss[i].setRootOperator(insertArg) } c.anal.isFirst = false @@ -3184,9 +3188,12 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* } dataScope.IsEnd = true for i := range scopes { - insertArg := constructInsert(n, c.e) - insertArg.ToWriteS3 = true - insertArg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) + insertArg, err := constructInsert(c.proc, n, c.e, true) + if err != nil { + return nil, err + } + + insertArg.GetOperatorBase().SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) scopes[i].setRootOperator(insertArg) } currentFirstFlag = false @@ -3204,9 +3211,12 @@ func (c *Compile) compileInsert(ns []*plan.Node, n *plan.Node, ss []*Scope) ([]* currentFirstFlag := c.anal.isFirst c.anal.isFirst = false for i := range ss { - insertArg := constructInsert(n, c.e) - insertArg.ToWriteS3 = true - insertArg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) + insertArg, err := constructInsert(c.proc, n, c.e, true) + if err != nil { + return nil, err + } + + insertArg.GetOperatorBase().SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) ss[i].setRootOperator(insertArg) } currentFirstFlag = false @@ -3250,9 +3260,12 @@ func (c *Compile) compileMultiUpdate(_ []*plan.Node, n *plan.Node, ss []*Scope) } for i := range ss { - multiUpdateArg := constructMultiUpdate(n, c.e) - multiUpdateArg.Action = multi_update.UpdateWriteS3 - multiUpdateArg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) + multiUpdateArg, err := constructMultiUpdate(n, c.e, c.proc, multi_update.UpdateWriteS3) + if err != nil { + return nil, err + } + + multiUpdateArg.GetOperatorBase().SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) ss[i].setRootOperator(multiUpdateArg) } @@ -3261,8 +3274,11 @@ func (c *Compile) compileMultiUpdate(_ []*plan.Node, n *plan.Node, ss []*Scope) rs = c.newMergeScope(ss) } - multiUpdateArg := constructMultiUpdate(n, c.e) - multiUpdateArg.Action = multi_update.UpdateFlushS3Info + multiUpdateArg, err := constructMultiUpdate(n, c.e, c.proc, multi_update.UpdateFlushS3Info) + if err != nil { + return nil, err + } + rs.setRootOperator(multiUpdateArg) ss = []*Scope{rs} } else { @@ -3270,9 +3286,11 @@ func (c *Compile) compileMultiUpdate(_ []*plan.Node, n *plan.Node, ss []*Scope) rs := c.newMergeScope(ss) ss = []*Scope{rs} } - multiUpdateArg := constructMultiUpdate(n, c.e) - multiUpdateArg.Action = multi_update.UpdateWriteTable - multiUpdateArg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) + multiUpdateArg, err := constructMultiUpdate(n, c.e, c.proc, multi_update.UpdateWriteTable) + if err != nil { + return nil, err + } + multiUpdateArg.GetOperatorBase().SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) ss[0].setRootOperator(multiUpdateArg) } c.anal.isFirst = false @@ -3302,15 +3320,22 @@ func (c *Compile) compilePreInsertSK(n *plan.Node, ss []*Scope) []*Scope { } func (c *Compile) compileDelete(n *plan.Node, ss []*Scope) ([]*Scope, error) { - var arg *deletion.Deletion currentFirstFlag := c.anal.isFirst - arg, err := constructDeletion(n, c.e) + op, err := constructDeletion(n, c.e, c.proc) if err != nil { return nil, err } - arg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) + + op.GetOperatorBase().SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) c.anal.isFirst = false + var arg *deletion.Deletion + if _, ok := op.(*deletion.Deletion); ok { + arg = op.(*deletion.Deletion) + } else { + arg = op.(*deletion.PartitionDelete).GetDelete() + } + if n.Stats.GetOutcnt()*float64(SingleLineSizeEstimate) > float64(DistributedThreshold) && !arg.DeleteCtx.CanTruncate { rs := c.newDeleteMergeScope(arg, ss, n) rs.Magic = MergeDelete diff --git a/pkg/sql/compile/ddl.go b/pkg/sql/compile/ddl.go index 34d5f4a96d30a..3385a6ccd24b1 100644 --- a/pkg/sql/compile/ddl.go +++ b/pkg/sql/compile/ddl.go @@ -3492,27 +3492,34 @@ func lockRows( eng engine.Engine, proc *process.Process, rel engine.Relation, - vec *vector.Vector, + bat *batch.Batch, + idx int32, lockMode lock.LockMode, sharding lock.Sharding, - group uint32) error { + group uint32, +) error { + var vec *vector.Vector + if bat != nil { + vec = bat.GetVector(idx) + } if vec == nil || vec.Length() == 0 { panic("lock rows is empty") } id := rel.GetTableID(proc.Ctx) - err := lockop.LockRows( + return lockop.LockRows( eng, proc, rel, id, - vec, + bat, + idx, *vec.GetType(), lockMode, sharding, - group) - return err + group, + ) } var maybeCreateAutoIncrement = func( @@ -3634,7 +3641,7 @@ func getRelFromMoCatalog(c *Compile, tblName string) (engine.Relation, error) { return rel, nil } -func getLockVector(proc *process.Process, accountId uint32, names []string) (*vector.Vector, error) { +func getLockBatch(proc *process.Process, accountId uint32, names []string) (*batch.Batch, error) { vecs := make([]*vector.Vector, len(names)+1) defer func() { for _, v := range vecs { @@ -3665,7 +3672,9 @@ func getLockVector(proc *process.Process, accountId uint32, names []string) (*ve if err != nil { return nil, err } - return vec, nil + bat := batch.NewWithSize(1) + bat.SetVector(0, vec) + return bat, nil } var lockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) error { @@ -3674,12 +3683,12 @@ var lockMoDatabase = func(c *Compile, dbName string, lockMode lock.LockMode) err return err } accountID := c.proc.GetSessionInfo().AccountId - vec, err := getLockVector(c.proc, accountID, []string{dbName}) + bat, err := getLockBatch(c.proc, accountID, []string{dbName}) if err != nil { return err } - defer vec.Free(c.proc.Mp()) - if err := lockRows(c.e, c.proc, dbRel, vec, lockMode, lock.Sharding_None, accountID); err != nil { + defer bat.GetVector(0).Free(c.proc.Mp()) + if err := lockRows(c.e, c.proc, dbRel, bat, 0, lockMode, lock.Sharding_None, accountID); err != nil { return err } return nil @@ -3689,19 +3698,20 @@ var lockMoTable = func( c *Compile, dbName string, tblName string, - lockMode lock.LockMode) error { + lockMode lock.LockMode, +) error { dbRel, err := getRelFromMoCatalog(c, catalog.MO_TABLES) if err != nil { return err } accountID := c.proc.GetSessionInfo().AccountId - vec, err := getLockVector(c.proc, accountID, []string{dbName, tblName}) + bat, err := getLockBatch(c.proc, accountID, []string{dbName, tblName}) if err != nil { return err } - defer vec.Free(c.proc.Mp()) + defer bat.GetVector(0).Free(c.proc.Mp()) - if err := lockRows(c.e, c.proc, dbRel, vec, lockMode, lock.Sharding_None, accountID); err != nil { + if err := lockRows(c.e, c.proc, dbRel, bat, 0, lockMode, lock.Sharding_None, accountID); err != nil { return err } return nil diff --git a/pkg/sql/compile/lock_meta.go b/pkg/sql/compile/lock_meta.go index 08293c3b15c68..a5e23fa1b6761 100644 --- a/pkg/sql/compile/lock_meta.go +++ b/pkg/sql/compile/lock_meta.go @@ -172,7 +172,9 @@ func (l *LockMeta) lockMetaRows(e engine.Engine, proc *process.Process, executor if err != nil { return err } - if err := lockop.LockRows(e, proc, nil, tableId, lockVec, *lockVec.GetType(), lock.LockMode_Shared, lock.Sharding_None, accountId); err != nil { + b := batch.NewWithSize(1) + b.SetVector(0, lockVec) + if err := lockop.LockRows(e, proc, nil, tableId, b, 0, *lockVec.GetType(), lock.LockMode_Shared, lock.Sharding_None, accountId); err != nil { // if get error in locking mocatalog.mo_tables by it's dbName & tblName // that means the origin table's schema was changed. then return NeedRetryWithDefChanged err if moerr.IsMoErrCode(err, moerr.ErrTxnNeedRetry) || diff --git a/pkg/sql/compile/operator.go b/pkg/sql/compile/operator.go index 526b4db2930aa..2dc83261f1f6b 100644 --- a/pkg/sql/compile/operator.go +++ b/pkg/sql/compile/operator.go @@ -527,6 +527,16 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator { op.ToWriteS3 = t.ToWriteS3 op.SetInfo(&info) return op + case vm.PartitionInsert: + t := sourceOp.(*insert.PartitionInsert) + op := insert.NewPartitionInsertFrom(t) + op.SetInfo(&info) + return op + case vm.PartitionDelete: + t := sourceOp.(*deletion.PartitionDelete) + op := deletion.NewPartitionDeleteFrom(t) + op.SetInfo(&info) + return op case vm.PreInsert: t := sourceOp.(*preinsert.PreInsert) op := preinsert.NewArgument() @@ -646,7 +656,7 @@ func constructRestrict(n *plan.Node, filterExpr *plan.Expr) *filter.Filter { return op } -func constructDeletion(n *plan.Node, eg engine.Engine) (*deletion.Deletion, error) { +func constructDeletion(n *plan.Node, eg engine.Engine, proc *process.Process) (vm.Operator, error) { oldCtx := n.DeleteCtx delCtx := &deletion.DeleteCtx{ Ref: oldCtx.Ref, @@ -659,7 +669,22 @@ func constructDeletion(n *plan.Node, eg engine.Engine) (*deletion.Deletion, erro op := deletion.NewArgument() op.DeleteCtx = delCtx - return op, nil + + ps := proc.GetPartitionService() + ok, _, err := ps.Is( + proc.Ctx, + oldCtx.TableDef.TblId, + proc.GetTxnOperator(), + ) + if err != nil { + return nil, err + } + + if !ok { + return op, nil + } + + return deletion.NewPartitionDelete(op, oldCtx.TableDef.TblId), nil } func constructOnduplicateKey(n *plan.Node, _ engine.Engine) *onduplicatekey.OnDuplicatekey { @@ -812,7 +837,12 @@ func constructLockOp(n *plan.Node, eng engine.Engine) (*lockop.LockOp, error) { return arg, nil } -func constructMultiUpdate(n *plan.Node, eg engine.Engine) *multi_update.MultiUpdate { +func constructMultiUpdate( + n *plan.Node, + eg engine.Engine, + proc *process.Process, + action multi_update.UpdateAction, +) (vm.Operator, error) { arg := multi_update.NewArgument() arg.Engine = eg @@ -835,11 +865,34 @@ func constructMultiUpdate(n *plan.Node, eg engine.Engine) *multi_update.MultiUpd DeleteCols: deleteCols, } } + arg.Action = action - return arg + ps := proc.GetPartitionService() + ok, _, err := ps.Is( + proc.Ctx, + n.UpdateCtxList[0].TableDef.TblId, + proc.GetTxnOperator(), + ) + if err != nil { + return nil, err + } + + if !ok { + return arg, nil + } + + return multi_update.NewPartitionMultiUpdate( + arg, + n.UpdateCtxList[0].TableDef.TblId, + ), nil } -func constructInsert(n *plan.Node, eg engine.Engine) *insert.Insert { +func constructInsert( + proc *process.Process, + n *plan.Node, + eg engine.Engine, + toS3 bool, +) (vm.Operator, error) { oldCtx := n.InsertCtx var attrs []string for _, col := range oldCtx.TableDef.Cols { @@ -856,7 +909,27 @@ func constructInsert(n *plan.Node, eg engine.Engine) *insert.Insert { } arg := insert.NewArgument() arg.InsertCtx = newCtx - return arg + arg.ToWriteS3 = toS3 + + ps := proc.GetPartitionService() + if ps == nil { + return arg, nil + } + + ok, _, err := ps.Is( + proc.Ctx, + oldCtx.TableDef.TblId, + proc.GetTxnOperator(), + ) + if err != nil { + return nil, err + } + + if !ok { + return arg, nil + } + + return insert.NewPartitionInsert(arg, oldCtx.TableDef.TblId), nil } func constructProjection(n *plan.Node) *projection.Projection { diff --git a/pkg/tests/issues/issue_test.go b/pkg/tests/issues/issue_test.go index 057c77ee19811..67b92ca716eac 100644 --- a/pkg/tests/issues/issue_test.go +++ b/pkg/tests/issues/issue_test.go @@ -28,6 +28,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/cnservice" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/runtime" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/defines" @@ -295,7 +296,9 @@ func TestBinarySearchBlkDataOnUnSortedFakePKCol(t *testing.T) { vector.AppendFixed[int64](vec, keys[i], false, proc.GetMPool()) } - rel.PrimaryKeysMayBeModified(ctx, types.TS{}, types.MaxTs(), vec) + bat := batch.NewWithSize(1) + bat.SetVector(0, vec) + rel.PrimaryKeysMayBeModified(ctx, types.TS{}, types.MaxTs(), bat, 0) vec.Free(proc.GetMPool()) } diff --git a/pkg/tests/partition/create_hash_test.go b/pkg/tests/partition/create_hash_test.go index 46551b4152b88..5da5efbac6b63 100644 --- a/pkg/tests/partition/create_hash_test.go +++ b/pkg/tests/partition/create_hash_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/matrixorigin/matrixone/pkg/cnservice" + "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/defines" "github.com/matrixorigin/matrixone/pkg/embed" "github.com/matrixorigin/matrixone/pkg/partitionservice" @@ -92,6 +93,114 @@ func TestCreateAndDeleteHashBased(t *testing.T) { ) } +func TestInsertAndDeleteHashBased(t *testing.T) { + creates := []string{ + "create table %s (c int) partition by hash(c) partitions 2", + "create table %s (c int, b vecf32(2)) partition by hash(c) partitions 2", + } + inserts := []string{ + "insert into %s values(1)", + "insert into %s values(1, '[1.1, 2.2]')", + } + deletes := []string{ + "delete from %s where c = 1", + "delete from %s where c = 1", + } + + runPartitionClusterTest( + t, + func(c embed.Cluster) { + cn, err := c.GetCNService(0) + require.NoError(t, err) + + db := testutils.GetDatabaseName(t) + testutils.CreateTestDatabase(t, db, cn) + + for idx := range creates { + table := fmt.Sprintf("%s_%d", t.Name(), idx) + create := fmt.Sprintf(creates[idx], table) + insert := fmt.Sprintf(inserts[idx], table) + delete := fmt.Sprintf(deletes[idx], table) + + testutils.ExecSQL( + t, + db, + cn, + create, + ) + + fn := func() int64 { + n := int64(0) + for i := 0; i < 2; i++ { + testutils.ExecSQLWithReadResult( + t, + db, + cn, + func(i int, s string, r executor.Result) { + r.ReadRows( + func(rows int, cols []*vector.Vector) bool { + n += executor.GetFixedRows[int64](cols[0])[0] + return true + }, + ) + }, + fmt.Sprintf("select count(1) from %s_p%d", table, i), + ) + } + return n + } + + testutils.ExecSQL( + t, + db, + cn, + insert, + ) + require.Equal(t, int64(1), fn()) + + testutils.ExecSQLWithReadResult( + t, + db, + cn, + func(i int, s string, r executor.Result) { + r.ReadRows( + func(rows int, cols []*vector.Vector) bool { + require.Equal(t, int64(1), executor.GetFixedRows[int64](cols[0])[0]) + return true + }, + ) + }, + fmt.Sprintf("select count(1) from %s", table), + ) + + testutils.ExecSQL( + t, + db, + cn, + delete, + ) + require.Equal(t, int64(0), fn()) + + testutils.ExecSQLWithReadResult( + t, + db, + cn, + func(i int, s string, r executor.Result) { + r.ReadRows( + func(rows int, cols []*vector.Vector) bool { + require.Equal(t, int64(0), executor.GetFixedRows[int64](cols[0])[0]) + return true + }, + ) + }, + fmt.Sprintf("select count(1) from %s", table), + ) + } + }, + ) + +} + func getMetadata( t *testing.T, accountID uint32, diff --git a/pkg/tests/testutils/test_utils.go b/pkg/tests/testutils/test_utils.go index 4736a6ab17599..339a9bf7eb5f7 100644 --- a/pkg/tests/testutils/test_utils.go +++ b/pkg/tests/testutils/test_utils.go @@ -94,6 +94,21 @@ func WaitTableCreated( } } +func WaitLogtailApplied( + t *testing.T, + min timestamp.Timestamp, + cn embed.ServiceOperator, +) { + ctx, cancel := context.WithTimeout( + context.Background(), + 10*time.Second, + ) + defer cancel() + txn := cn.RawService().(cnservice.Service).GetTxnClient() + _, err := txn.WaitLogTailAppliedAt(ctx, min) + require.NoError(t, err) +} + func WaitDatabaseCreated( t *testing.T, name string, @@ -159,6 +174,7 @@ func ExecSQLWithReadResult( ) require.NoError(t, moerr.AttachCause(ctx, err), sql) + // WaitLogtailApplied(t, txnOp.Txn().CommitTS, cn) return txnOp.Txn().CommitTS } diff --git a/pkg/vm/engine/disttae/local_disttae_datasource_test.go b/pkg/vm/engine/disttae/local_disttae_datasource_test.go index cefd1deacb453..7d4d81af0d243 100644 --- a/pkg/vm/engine/disttae/local_disttae_datasource_test.go +++ b/pkg/vm/engine/disttae/local_disttae_datasource_test.go @@ -47,9 +47,8 @@ func TestRelationDataV2_MarshalAndUnMarshal(t *testing.T) { for i := 0; i < blkNum; i++ { blkID := types.NewBlockidWithObjectID(&objID, uint16(blkNum)) blkInfo := objectio.BlockInfo{ - BlockID: *blkID, - MetaLoc: metaLoc, - PartitionNum: int16(i), + BlockID: *blkID, + MetaLoc: metaLoc, } blkInfo.ObjectFlags |= objectio.ObjectFlag_Appendable relData.AppendBlockInfo(&blkInfo) diff --git a/pkg/vm/engine/disttae/txn_database.go b/pkg/vm/engine/disttae/txn_database.go index 015a9d9243772..fa956c0bd4940 100644 --- a/pkg/vm/engine/disttae/txn_database.go +++ b/pkg/vm/engine/disttae/txn_database.go @@ -33,7 +33,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/api" txn2 "github.com/matrixorigin/matrixone/pkg/pb/txn" - "github.com/matrixorigin/matrixone/pkg/shardservice" "github.com/matrixorigin/matrixone/pkg/util/executor" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache" @@ -150,11 +149,9 @@ func (db *txnDatabase) relation(ctx context.Context, name string, proc any) (eng } tbl, err := newTxnTable( + ctx, db, *item, - p, - shardservice.GetService(p.GetService()), - txn.engine, ) if err != nil { return nil, err diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 2bc709b15c1fa..c5691e72c02cc 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -40,9 +40,11 @@ import ( "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" + "github.com/matrixorigin/matrixone/pkg/partitionservice" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/pb/plan" pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" + "github.com/matrixorigin/matrixone/pkg/shardservice" "github.com/matrixorigin/matrixone/pkg/sql/colexec" "github.com/matrixorigin/matrixone/pkg/sql/colexec/deletion" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" @@ -52,6 +54,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/util/errutil" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine" + "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/logtailreplay" "github.com/matrixorigin/matrixone/pkg/vm/engine/readutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" @@ -70,6 +73,63 @@ var traceFilterExprInterval2 atomic.Uint64 var _ engine.Relation = new(txnTable) +func newTxnTable( + ctx context.Context, + db *txnDatabase, + item cache.TableItem, +) (engine.Relation, error) { + txn := db.getTxn() + process := txn.proc + eng := txn.engine + + tbl := &txnTableDelegate{ + origin: newTxnTableWithItem( + db, + item, + process, + eng, + ), + } + tbl.isLocal = tbl.isLocalFunc + + if db.databaseId != catalog.MO_CATALOG_ID { + ps := partitionservice.GetService(process.GetService()) + is, metadata, err := ps.Is(ctx, item.Id, txn.op) + if err != nil { + return nil, err + } + if is { + p, err := newPartitionTxnTable( + tbl.origin, + metadata, + ps, + ) + if err != nil { + return nil, err + } + tbl.partition.tbl = p + tbl.partition.is = true + tbl.partition.service = ps + } + + tbl.shard.service = shardservice.GetService(process.GetService()) + tbl.shard.is = false + + if tbl.shard.service.Config().Enable { + tableID, policy, is, err := tbl.shard.service.GetShardInfo(item.Id) + if err != nil { + return nil, err + } + + tbl.shard.is = is + tbl.shard.policy = policy + tbl.shard.tableID = tableID + } + } + + return tbl, nil +} + func (tbl *txnTable) getEngine() engine.Engine { return tbl.eng } @@ -2008,7 +2068,6 @@ func (tbl *txnTable) PKPersistedBetween( blk.SetFlagByObjStats(&obj.ObjectStats) - blk.PartitionNum = -1 candidateBlks[blk.BlockID] = &blk return true }, obj.ObjectStats) @@ -2090,8 +2149,10 @@ func (tbl *txnTable) PrimaryKeysMayBeUpserted( ctx context.Context, from types.TS, to types.TS, - keysVector *vector.Vector, + batch *batch.Batch, + pkIndex int32, ) (bool, error) { + keysVector := batch.GetVector(pkIndex) return tbl.primaryKeysMayBeChanged(ctx, from, to, keysVector, false) } @@ -2099,8 +2160,10 @@ func (tbl *txnTable) PrimaryKeysMayBeModified( ctx context.Context, from types.TS, to types.TS, - keysVector *vector.Vector, + batch *batch.Batch, + pkIndex int32, ) (bool, error) { + keysVector := batch.GetVector(pkIndex) return tbl.primaryKeysMayBeChanged(ctx, from, to, keysVector, true) } diff --git a/pkg/vm/engine/disttae/txn_table_sharding.go b/pkg/vm/engine/disttae/txn_table_delegate.go similarity index 86% rename from pkg/vm/engine/disttae/txn_table_sharding.go rename to pkg/vm/engine/disttae/txn_table_delegate.go index 312568b827552..3df0c74a06c10 100644 --- a/pkg/vm/engine/disttae/txn_table_sharding.go +++ b/pkg/vm/engine/disttae/txn_table_delegate.go @@ -19,21 +19,21 @@ import ( "time" "github.com/fagongzi/goetty/v2/buf" - "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/morpc" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" - "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/partitionservice" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/shard" pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" "github.com/matrixorigin/matrixone/pkg/shardservice" + plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache" "github.com/matrixorigin/matrixone/pkg/vm/engine/readutil" @@ -68,8 +68,6 @@ func newTxnTableWithItem( relKind: item.Kind, viewdef: item.ViewDef, comment: item.Comment, - partitioned: item.Partitioned, - partition: item.Partition, createSql: item.CreateSql, constraint: item.Constraint, extraInfo: item.ExtraInfo, @@ -90,6 +88,15 @@ type txnTableDelegate struct { tableID uint64 is bool } + + // partition info + partition struct { + tbl *partitionTxnTable + service partitionservice.PartitionService + is bool + tableID uint64 + } + isMock bool isLocal func() (bool, error) } @@ -123,42 +130,11 @@ func MockTableDelegate( return tbl, nil } -func newTxnTable( - db *txnDatabase, - item cache.TableItem, - process *process.Process, - service shardservice.ShardService, - eng engine.Engine, -) (engine.Relation, error) { - tbl := &txnTableDelegate{ - origin: newTxnTableWithItem( - db, - item, - process, - eng, - ), - } - - tbl.shard.service = service - tbl.shard.is = false - tbl.isLocal = tbl.isLocalFunc - - if service.Config().Enable && - db.databaseId != catalog.MO_CATALOG_ID { - tableID, policy, is, err := service.GetShardInfo(item.Id) - if err != nil { - return nil, err - } - - tbl.shard.is = is - tbl.shard.policy = policy - tbl.shard.tableID = tableID +func (tbl *txnTableDelegate) CollectChanges(ctx context.Context, from, to types.TS, mp *mpool.MPool) (engine.ChangesHandle, error) { + if tbl.partition.is { + return tbl.partition.tbl.CollectChanges(ctx, from, to, mp) } - return tbl, nil -} - -func (tbl *txnTableDelegate) CollectChanges(ctx context.Context, from, to types.TS, mp *mpool.MPool) (engine.ChangesHandle, error) { return tbl.origin.CollectChanges(ctx, from, to, mp) } @@ -166,6 +142,10 @@ func (tbl *txnTableDelegate) Stats( ctx context.Context, sync bool, ) (*pb.StatsInfo, error) { + if tbl.partition.is { + return tbl.partition.tbl.Stats(ctx, sync) + } + is, err := tbl.isLocal() if err != nil { return nil, err @@ -210,6 +190,10 @@ func (tbl *txnTableDelegate) Stats( func (tbl *txnTableDelegate) Rows( ctx context.Context, ) (uint64, error) { + if tbl.partition.is { + return tbl.partition.tbl.Rows(ctx) + } + is, err := tbl.isLocal() if err != nil { return 0, err @@ -240,6 +224,10 @@ func (tbl *txnTableDelegate) Size( ctx context.Context, columnName string, ) (uint64, error) { + if tbl.partition.is { + return tbl.partition.tbl.Size(ctx, columnName) + } + is, err := tbl.isLocal() if err != nil { return 0, err @@ -269,6 +257,10 @@ func (tbl *txnTableDelegate) Size( } func (tbl *txnTableDelegate) Ranges(ctx context.Context, rangesParam engine.RangesParam) (engine.RelData, error) { + if tbl.partition.is { + return tbl.partition.tbl.Ranges(ctx, rangesParam) + } + is, err := tbl.isLocal() if err != nil { return nil, err @@ -279,7 +271,7 @@ func (tbl *txnTableDelegate) Ranges(ctx context.Context, rangesParam engine.Rang var blocks objectio.BlockInfoSlice var uncommitted []objectio.ObjectStats - if rangesParam.Policy&engine.Policy_CollectUncommittedPersistedData != 0 { + if rangesParam.Policy != engine.Policy_CheckCommittedOnly { uncommitted, _ = tbl.origin.collectUnCommittedDataObjs(rangesParam.TxnOffset) } err = tbl.origin.rangesOnePart( @@ -302,7 +294,7 @@ func (tbl *txnTableDelegate) Ranges(ctx context.Context, rangesParam engine.Rang func(param *shard.ReadParam) { param.RangesParam.Exprs = rangesParam.BlockFilters param.RangesParam.PreAllocSize = 2 - param.RangesParam.DataCollectPolicy = engine.Policy_CollectCommittedPersistedData + param.RangesParam.DataCollectPolicy = engine.Policy_CollectCommittedData param.RangesParam.TxnOffset = 0 }, @@ -335,7 +327,12 @@ func (tbl *txnTableDelegate) Ranges(ctx context.Context, rangesParam engine.Rang func (tbl *txnTableDelegate) CollectTombstones( ctx context.Context, txnOffset int, - policy engine.TombstoneCollectPolicy) (engine.Tombstoner, error) { + policy engine.TombstoneCollectPolicy, +) (engine.Tombstoner, error) { + if tbl.partition.is { + return tbl.partition.tbl.CollectTombstones(ctx, txnOffset, policy) + } + is, err := tbl.isLocal() if err != nil { return nil, err @@ -382,6 +379,10 @@ func (tbl *txnTableDelegate) GetColumMetadataScanInfo( ctx context.Context, name string, ) ([]*plan.MetadataScanInfo, error) { + if tbl.partition.is { + return tbl.partition.tbl.GetColumMetadataScanInfo(ctx, name) + } + is, err := tbl.isLocal() if err != nil { return nil, err @@ -417,6 +418,10 @@ func (tbl *txnTableDelegate) GetColumMetadataScanInfo( func (tbl *txnTableDelegate) ApproxObjectsNum( ctx context.Context, ) int { + if tbl.partition.is { + return tbl.partition.tbl.ApproxObjectsNum(ctx) + } + is, err := tbl.isLocal() if err != nil { logutil.Infof("approx objects num err: %v", err) @@ -455,7 +460,22 @@ func (tbl *txnTableDelegate) BuildReaders( txnOffset int, orderBy bool, policy engine.TombstoneApplyPolicy, - filterHint engine.FilterHint) ([]engine.Reader, error) { + filterHint engine.FilterHint, +) ([]engine.Reader, error) { + if tbl.partition.is { + return tbl.partition.tbl.BuildReaders( + ctx, + proc, + expr, + relData, + num, + txnOffset, + orderBy, + policy, + filterHint, + ) + } + is, err := tbl.isLocal() if err != nil { return nil, err @@ -485,148 +505,6 @@ func (tbl *txnTableDelegate) BuildReaders( ) } -type shardingLocalReader struct { - iteratePhase ReaderPhase - closed bool - lrd engine.Reader - tblDelegate *txnTableDelegate - streamID types.Uuid - //relation data to distribute to remote CN which holds shard's partition state. - remoteRelData engine.RelData - remoteTombApplyPolicy engine.TombstoneApplyPolicy -} - -// TODO:: -func MockShardingLocalReader() engine.Reader { - return &shardingLocalReader{} -} - -func (r *shardingLocalReader) Read( - ctx context.Context, - cols []string, - expr *plan.Expr, - mp *mpool.MPool, - bat *batch.Batch, -) (isEnd bool, err error) { - defer func() { - if err != nil || isEnd { - r.close() - } - }() - - for { - - switch r.iteratePhase { - case InLocal: - if r.lrd != nil { - isEnd, err = r.lrd.Read(ctx, cols, expr, mp, bat) - if err != nil { - return - } - if !isEnd { - return - } - } - if r.remoteRelData == nil || r.remoteRelData.DataCnt() == 0 { - r.iteratePhase = InEnd - return - } - relData, err := r.remoteRelData.MarshalBinary() - if err != nil { - return false, err - } - err = r.tblDelegate.forwardRead( - ctx, - shardservice.ReadBuildReader, - func(param *shard.ReadParam) { - param.ReaderBuildParam.RelData = relData - param.ReaderBuildParam.Expr = expr - param.ReaderBuildParam.TombstoneApplyPolicy = int32(r.remoteTombApplyPolicy) - }, - func(resp []byte) { - r.streamID = types.DecodeUuid(resp) - }, - ) - if err != nil { - return false, err - } - r.iteratePhase = InRemote - case InRemote: - err = r.tblDelegate.forwardRead( - ctx, - shardservice.ReadNext, - func(param *shard.ReadParam) { - param.ReadNextParam.Uuid = types.EncodeUuid(&r.streamID) - param.ReadNextParam.Columns = cols - }, - func(resp []byte) { - isEnd = types.DecodeBool(resp) - if isEnd { - return - } - resp = resp[1:] - l := types.DecodeUint32(resp) - resp = resp[4:] - if err := bat.UnmarshalBinary(resp[:l]); err != nil { - panic(err) - } - }, - ) - if err != nil { - return false, err - } - if isEnd { - r.iteratePhase = InEnd - } - return - case InEnd: - return true, nil - } - - } - -} - -func (r *shardingLocalReader) Close() error { - return r.close() -} - -func (r *shardingLocalReader) close() error { - if !r.closed { - if r.lrd != nil { - r.lrd.Close() - } - if r.remoteRelData != nil { - ctx, cancel := context.WithTimeoutCause(context.Background(), time.Second*10, moerr.CauseShardingLocalReader) - defer cancel() - - err := r.tblDelegate.forwardRead( - ctx, - shardservice.ReadClose, - func(param *shard.ReadParam) { - param.ReadCloseParam.Uuid = types.EncodeUuid(&r.streamID) - }, - func(resp []byte) { - }, - ) - if err != nil { - return moerr.AttachCause(ctx, err) - } - } - } - return nil -} - -func (r *shardingLocalReader) SetOrderBy(orderby []*plan.OrderBySpec) { -} - -func (r *shardingLocalReader) GetOrderBy() []*plan.OrderBySpec { - return nil -} - -func (r *shardingLocalReader) SetFilterZM(zm objectio.ZoneMap) { -} - func (tbl *txnTableDelegate) BuildShardingReaders( ctx context.Context, p any, @@ -637,9 +515,26 @@ func (tbl *txnTableDelegate) BuildShardingReaders( orderBy bool, policy engine.TombstoneApplyPolicy, ) ([]engine.Reader, error) { + if tbl.partition.is { + return tbl.partition.tbl.BuildShardingReaders( + ctx, + p, + expr, + relData, + num, + txnOffset, + orderBy, + policy, + ) + } + var rds []engine.Reader proc := p.(*process.Process) + if plan2.IsFalseExpr(expr) { + return []engine.Reader{new(readutil.EmptyReader)}, nil + } + if orderBy && num != 1 { return nil, moerr.NewInternalErrorNoCtx("orderBy only support one reader") } @@ -747,8 +642,19 @@ func (tbl *txnTableDelegate) PrimaryKeysMayBeModified( ctx context.Context, from types.TS, to types.TS, - keyVector *vector.Vector, + bat *batch.Batch, + pkIndex int32, ) (bool, error) { + if tbl.partition.is { + return tbl.partition.tbl.PrimaryKeysMayBeModified( + ctx, + from, + to, + bat, + pkIndex, + ) + } + is, err := tbl.isLocal() if err != nil { return false, err @@ -758,11 +664,13 @@ func (tbl *txnTableDelegate) PrimaryKeysMayBeModified( ctx, from, to, - keyVector, + bat, + pkIndex, ) } modify := false + keyVector := bat.GetVector(pkIndex) err = tbl.forwardRead( ctx, shardservice.ReadPrimaryKeysMayBeModified, @@ -799,8 +707,19 @@ func (tbl *txnTableDelegate) PrimaryKeysMayBeUpserted( ctx context.Context, from types.TS, to types.TS, - keyVector *vector.Vector, + bat *batch.Batch, + pkIndex int32, ) (bool, error) { + if tbl.partition.is { + return tbl.partition.tbl.PrimaryKeysMayBeUpserted( + ctx, + from, + to, + bat, + pkIndex, + ) + } + is, err := tbl.isLocal() if err != nil { return false, err @@ -810,11 +729,13 @@ func (tbl *txnTableDelegate) PrimaryKeysMayBeUpserted( ctx, from, to, - keyVector, + bat, + pkIndex, ) } modify := false + keyVector := bat.GetVector(pkIndex) err = tbl.forwardRead( ctx, shardservice.ReadPrimaryKeysMayBeUpserted, @@ -847,7 +768,19 @@ func (tbl *txnTableDelegate) PrimaryKeysMayBeUpserted( return modify, nil } -func (tbl *txnTableDelegate) MergeObjects(ctx context.Context, objstats []objectio.ObjectStats, targetObjSize uint32) (*api.MergeCommitEntry, error) { +func (tbl *txnTableDelegate) MergeObjects( + ctx context.Context, + objstats []objectio.ObjectStats, + targetObjSize uint32, +) (*api.MergeCommitEntry, error) { + if tbl.partition.is { + return tbl.partition.tbl.MergeObjects( + ctx, + objstats, + targetObjSize, + ) + } + is, err := tbl.isLocal() if err != nil { return nil, err @@ -883,6 +816,10 @@ func (tbl *txnTableDelegate) MergeObjects(ctx context.Context, objstats []object } func (tbl *txnTableDelegate) GetNonAppendableObjectStats(ctx context.Context) ([]objectio.ObjectStats, error) { + if tbl.partition.is { + return tbl.partition.tbl.GetNonAppendableObjectStats(ctx) + } + is, err := tbl.isLocal() if err != nil { return nil, err @@ -918,30 +855,45 @@ func (tbl *txnTableDelegate) GetNonAppendableObjectStats(ctx context.Context) ([ func (tbl *txnTableDelegate) TableDefs( ctx context.Context, ) ([]engine.TableDef, error) { + if tbl.partition.is { + return tbl.partition.tbl.TableDefs(ctx) + } return tbl.origin.TableDefs(ctx) } func (tbl *txnTableDelegate) GetTableDef( ctx context.Context, ) *plan.TableDef { + if tbl.partition.is { + return tbl.partition.tbl.GetTableDef(ctx) + } return tbl.origin.GetTableDef(ctx) } func (tbl *txnTableDelegate) CopyTableDef( ctx context.Context, ) *plan.TableDef { + if tbl.partition.is { + return tbl.partition.tbl.CopyTableDef(ctx) + } return tbl.origin.CopyTableDef(ctx) } func (tbl *txnTableDelegate) GetPrimaryKeys( ctx context.Context, ) ([]*engine.Attribute, error) { + if tbl.partition.is { + return tbl.partition.tbl.GetPrimaryKeys(ctx) + } return tbl.origin.GetPrimaryKeys(ctx) } func (tbl *txnTableDelegate) GetHideKeys( ctx context.Context, ) ([]*engine.Attribute, error) { + if tbl.partition.is { + return tbl.partition.tbl.GetHideKeys(ctx) + } return tbl.origin.GetHideKeys(ctx) } @@ -949,6 +901,9 @@ func (tbl *txnTableDelegate) Write( ctx context.Context, bat *batch.Batch, ) error { + if tbl.partition.is { + return tbl.partition.tbl.Write(ctx, bat) + } return tbl.origin.Write(ctx, bat) } @@ -956,6 +911,9 @@ func (tbl *txnTableDelegate) Update( ctx context.Context, bat *batch.Batch, ) error { + if tbl.partition.is { + return tbl.partition.tbl.Update(ctx, bat) + } return tbl.origin.Update(ctx, bat) } @@ -964,6 +922,9 @@ func (tbl *txnTableDelegate) Delete( bat *batch.Batch, name string, ) error { + if tbl.partition.is { + return tbl.partition.tbl.Delete(ctx, bat, name) + } return tbl.origin.Delete(ctx, bat, name) } @@ -971,6 +932,9 @@ func (tbl *txnTableDelegate) AddTableDef( ctx context.Context, def engine.TableDef, ) error { + if tbl.partition.is { + return tbl.partition.tbl.AddTableDef(ctx, def) + } return tbl.origin.AddTableDef(ctx, def) } @@ -978,6 +942,9 @@ func (tbl *txnTableDelegate) DelTableDef( ctx context.Context, def engine.TableDef, ) error { + if tbl.partition.is { + return tbl.partition.tbl.DelTableDef(ctx, def) + } return tbl.origin.DelTableDef(ctx, def) } @@ -986,6 +953,9 @@ func (tbl *txnTableDelegate) AlterTable( c *engine.ConstraintDef, reqs []*api.AlterTableReq, ) error { + if tbl.partition.is { + return tbl.partition.tbl.AlterTable(ctx, c, reqs) + } return tbl.origin.AlterTable(ctx, c, reqs) } @@ -993,6 +963,9 @@ func (tbl *txnTableDelegate) UpdateConstraint( ctx context.Context, c *engine.ConstraintDef, ) error { + if tbl.partition.is { + return tbl.partition.tbl.UpdateConstraint(ctx, c) + } return tbl.origin.UpdateConstraint(ctx, c) } @@ -1000,47 +973,72 @@ func (tbl *txnTableDelegate) TableRenameInTxn( ctx context.Context, constraint [][]byte, ) error { + if tbl.partition.is { + return tbl.partition.tbl.TableRenameInTxn(ctx, constraint) + } return tbl.origin.TableRenameInTxn(ctx, constraint) } func (tbl *txnTableDelegate) GetTableID( ctx context.Context, ) uint64 { + if tbl.partition.is { + return tbl.partition.tbl.GetTableID(ctx) + } return tbl.origin.GetTableID(ctx) } func (tbl *txnTableDelegate) GetTableName() string { + if tbl.partition.is { + return tbl.partition.tbl.GetTableName() + } return tbl.origin.GetTableName() } func (tbl *txnTableDelegate) GetDBID( ctx context.Context, ) uint64 { + if tbl.partition.is { + return tbl.partition.tbl.GetDBID(ctx) + } return tbl.origin.GetDBID(ctx) } func (tbl *txnTableDelegate) TableColumns( ctx context.Context, ) ([]*engine.Attribute, error) { + if tbl.partition.is { + return tbl.partition.tbl.TableColumns(ctx) + } return tbl.origin.TableColumns(ctx) } func (tbl *txnTableDelegate) MaxAndMinValues( ctx context.Context, ) ([][2]any, []uint8, error) { + if tbl.partition.is { + return tbl.partition.tbl.MaxAndMinValues(ctx) + } return tbl.origin.MaxAndMinValues(ctx) } func (tbl *txnTableDelegate) GetEngineType() engine.EngineType { + if tbl.partition.is { + return tbl.partition.tbl.GetEngineType() + } return tbl.origin.GetEngineType() } func (tbl *txnTableDelegate) GetProcess() any { + if tbl.partition.is { + return tbl.partition.tbl.GetProcess() + } return tbl.origin.GetProcess() } func (tbl *txnTableDelegate) isLocalFunc() (bool, error) { - if !tbl.shard.service.Config().Enable || // sharding not enabled + if !tbl.shard.is || // is not sharding table + !tbl.shard.service.Config().Enable || // sharding not enabled !tbl.shard.is || // sharding not enabled (tbl.shard.policy == shard.Policy_Partition && tbl.origin.tableId == tbl.shard.tableID) { // partition table self. return true, nil @@ -1177,3 +1175,145 @@ func (tbl *txnTableDelegate) forwardRead( return nil } + +type shardingLocalReader struct { + iteratePhase ReaderPhase + closed bool + lrd engine.Reader + tblDelegate *txnTableDelegate + streamID types.Uuid + //relation data to distribute to remote CN which holds shard's partition state. + remoteRelData engine.RelData + remoteTombApplyPolicy engine.TombstoneApplyPolicy +} + +// TODO:: +func MockShardingLocalReader() engine.Reader { + return &shardingLocalReader{} +} + +func (r *shardingLocalReader) Read( + ctx context.Context, + cols []string, + expr *plan.Expr, + mp *mpool.MPool, + bat *batch.Batch, +) (isEnd bool, err error) { + defer func() { + if err != nil || isEnd { + r.close() + } + }() + + for { + + switch r.iteratePhase { + case InLocal: + if r.lrd != nil { + isEnd, err = r.lrd.Read(ctx, cols, expr, mp, bat) + if err != nil { + return + } + if !isEnd { + return + } + } + if r.remoteRelData == nil || r.remoteRelData.DataCnt() == 0 { + r.iteratePhase = InEnd + return + } + relData, err := r.remoteRelData.MarshalBinary() + if err != nil { + return false, err + } + err = r.tblDelegate.forwardRead( + ctx, + shardservice.ReadBuildReader, + func(param *shard.ReadParam) { + param.ReaderBuildParam.RelData = relData + param.ReaderBuildParam.Expr = expr + param.ReaderBuildParam.TombstoneApplyPolicy = int32(r.remoteTombApplyPolicy) + }, + func(resp []byte) { + r.streamID = types.DecodeUuid(resp) + }, + ) + if err != nil { + return false, err + } + r.iteratePhase = InRemote + case InRemote: + err = r.tblDelegate.forwardRead( + ctx, + shardservice.ReadNext, + func(param *shard.ReadParam) { + param.ReadNextParam.Uuid = types.EncodeUuid(&r.streamID) + param.ReadNextParam.Columns = cols + }, + func(resp []byte) { + isEnd = types.DecodeBool(resp) + if isEnd { + return + } + resp = resp[1:] + l := types.DecodeUint32(resp) + resp = resp[4:] + if err := bat.UnmarshalBinary(resp[:l]); err != nil { + panic(err) + } + }, + ) + if err != nil { + return false, err + } + if isEnd { + r.iteratePhase = InEnd + } + return + case InEnd: + return true, nil + } + + } + +} + +func (r *shardingLocalReader) Close() error { + return r.close() +} + +func (r *shardingLocalReader) close() error { + if !r.closed { + if r.lrd != nil { + r.lrd.Close() + } + if r.remoteRelData != nil { + ctx, cancel := context.WithTimeoutCause(context.Background(), time.Second*10, moerr.CauseShardingLocalReader) + defer cancel() + + err := r.tblDelegate.forwardRead( + ctx, + shardservice.ReadClose, + func(param *shard.ReadParam) { + param.ReadCloseParam.Uuid = types.EncodeUuid(&r.streamID) + }, + func(resp []byte) { + }, + ) + if err != nil { + return moerr.AttachCause(ctx, err) + } + } + } + return nil +} + +func (r *shardingLocalReader) SetOrderBy(orderby []*plan.OrderBySpec) { +} + +func (r *shardingLocalReader) GetOrderBy() []*plan.OrderBySpec { + return nil +} + +func (r *shardingLocalReader) SetFilterZM(zm objectio.ZoneMap) { +} diff --git a/pkg/vm/engine/disttae/txn_table_partition.go b/pkg/vm/engine/disttae/txn_table_partition.go new file mode 100644 index 0000000000000..136fefddecfec --- /dev/null +++ b/pkg/vm/engine/disttae/txn_table_partition.go @@ -0,0 +1,576 @@ +// Copyright 2021-2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package disttae + +import ( + "context" + + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/partitionservice" + "github.com/matrixorigin/matrixone/pkg/pb/api" + "github.com/matrixorigin/matrixone/pkg/pb/partition" + "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" + splan "github.com/matrixorigin/matrixone/pkg/sql/plan" + "github.com/matrixorigin/matrixone/pkg/vm/engine" +) + +var _ engine.Relation = (*partitionTxnTable)(nil) + +type partitionTxnTable struct { + primary *txnTable + ps partitionservice.PartitionService + metadata partition.PartitionMetadata +} + +func newPartitionTxnTable( + primary *txnTable, + metadata partition.PartitionMetadata, + ps partitionservice.PartitionService, +) (*partitionTxnTable, error) { + tbl := &partitionTxnTable{ + primary: primary, + metadata: metadata, + ps: ps, + } + return tbl, nil +} + +func (t *partitionTxnTable) getRelation( + ctx context.Context, + idx int, +) (engine.Relation, error) { + return t.primary.db.relation( + ctx, + t.metadata.Partitions[idx].PartitionTableName, + t.primary.proc.Load(), + ) +} + +func (t *partitionTxnTable) Ranges( + ctx context.Context, + param engine.RangesParam, +) (engine.RelData, error) { + targets, err := t.ps.Filter( + ctx, + t.metadata.TableID, + param.BlockFilters, + t.primary.proc.Load().GetTxnOperator(), + ) + if err != nil { + return nil, err + } + + if len(targets) == 1 { + rel, err := t.getRelation(ctx, targets[0]) + if err != nil { + return nil, err + } + return rel.Ranges(ctx, param) + } + + pd := newPartitionedRelData() + for _, idx := range targets { + rel, err := t.getRelation(ctx, targets[0]) + if err != nil { + return nil, err + } + if err := pd.addPartition(ctx, rel, param, idx); err != nil { + return nil, err + } + } + return pd, nil +} + +func (t *partitionTxnTable) BuildReaders( + ctx context.Context, + proc any, + expr *plan.Expr, + relData engine.RelData, + num int, + txnOffset int, + orderBy bool, + policy engine.TombstoneApplyPolicy, + filterHint engine.FilterHint, +) ([]engine.Reader, error) { + var readers []engine.Reader + m := make(map[int]engine.RelData, 2) + slice := relData.GetBlockInfoSlice() + n := slice.Len() + for i := 0; i < n; i++ { + value := slice.Get(i) + data, ok := m[int(value.PartitionIdx)] + if !ok { + data = relData.BuildEmptyRelData(n) + data.AttachTombstones(data.GetTombstones()) + m[int(value.PartitionIdx)] = data + } + data.AppendBlockInfo(value) + } + + for idx, data := range m { + rel, err := t.getRelation(ctx, idx) + if err != nil { + return nil, err + } + r, err := rel.BuildReaders( + ctx, + proc, + expr, + data, + num, + txnOffset, + orderBy, + policy, + filterHint, + ) + if err != nil { + return nil, err + } + readers = append(readers, r...) + } + return readers, nil +} + +func (t *partitionTxnTable) BuildShardingReaders( + ctx context.Context, + proc any, + expr *plan.Expr, + relData engine.RelData, + num int, + txnOffset int, + orderBy bool, + policy engine.TombstoneApplyPolicy, +) ([]engine.Reader, error) { + panic("Not Support") +} + +func (t *partitionTxnTable) Rows( + ctx context.Context, +) (uint64, error) { + rows := uint64(0) + for idx := range t.metadata.Partitions { + p, err := t.getRelation(ctx, idx) + if err != nil { + return 0, nil + } + + v, err := p.Rows(ctx) + if err != nil { + return 0, err + } + + rows += v + } + return rows, nil +} + +func (t *partitionTxnTable) Stats( + ctx context.Context, + sync bool, +) (*statsinfo.StatsInfo, error) { + value := splan.NewStatsInfo() + for idx := range t.metadata.Partitions { + p, err := t.getRelation(ctx, idx) + if err != nil { + return nil, nil + } + + v, err := p.Stats(ctx, sync) + if err != nil { + return nil, err + } + + value.Merge(v) + } + return value, nil +} + +func (t *partitionTxnTable) Size( + ctx context.Context, + columnName string, +) (uint64, error) { + value := uint64(0) + for idx := range t.metadata.Partitions { + p, err := t.getRelation(ctx, idx) + if err != nil { + return 0, nil + } + + v, err := p.Size(ctx, columnName) + if err != nil { + return 0, err + } + + value += v + } + return value, nil +} + +func (t *partitionTxnTable) CollectTombstones( + ctx context.Context, + txnOffset int, + policy engine.TombstoneCollectPolicy, +) (engine.Tombstoner, error) { + var tombstone engine.Tombstoner + for idx := range t.metadata.Partitions { + p, err := t.getRelation(ctx, idx) + if err != nil { + return nil, err + } + + t, err := p.CollectTombstones(ctx, txnOffset, policy) + if err != nil { + return nil, err + } + if tombstone == nil { + tombstone = t + continue + } + if err := tombstone.Merge(t); err != nil { + return nil, err + } + } + return tombstone, nil +} + +func (t *partitionTxnTable) CollectChanges( + ctx context.Context, + from, to types.TS, + mp *mpool.MPool, +) (engine.ChangesHandle, error) { + panic("not implemented") +} + +func (t *partitionTxnTable) ApproxObjectsNum(ctx context.Context) int { + num := 0 + for idx := range t.metadata.Partitions { + p, err := t.getRelation(ctx, idx) + if err != nil { + // TODO: fix , return error + return 0 + } + num += p.ApproxObjectsNum(ctx) + } + return num +} + +func (t *partitionTxnTable) MergeObjects( + ctx context.Context, + objstats []objectio.ObjectStats, + targetObjSize uint32, +) (*api.MergeCommitEntry, error) { + panic("not implemented") +} + +func (t *partitionTxnTable) GetNonAppendableObjectStats(ctx context.Context) ([]objectio.ObjectStats, error) { + var stats []objectio.ObjectStats + for idx := range t.metadata.Partitions { + p, err := t.getRelation(ctx, idx) + if err != nil { + return nil, err + } + values, err := p.GetNonAppendableObjectStats(ctx) + if err != nil { + return nil, err + } + stats = append(stats, values...) + } + return stats, nil +} + +func (t *partitionTxnTable) GetColumMetadataScanInfo( + ctx context.Context, + name string, +) ([]*plan.MetadataScanInfo, error) { + var values []*plan.MetadataScanInfo + for idx := range t.metadata.Partitions { + p, err := t.getRelation(ctx, idx) + if err != nil { + return nil, err + } + v, err := p.GetColumMetadataScanInfo(ctx, name) + if err != nil { + return nil, err + } + values = append(values, v...) + } + return values, nil +} + +func (t *partitionTxnTable) UpdateConstraint(context.Context, *engine.ConstraintDef) error { + panic("not implemented") +} + +func (t *partitionTxnTable) AlterTable(context.Context, *engine.ConstraintDef, []*api.AlterTableReq) error { + panic("not implemented") +} + +func (t *partitionTxnTable) TableRenameInTxn(ctx context.Context, constraint [][]byte) error { + panic("not implemented") +} + +func (t *partitionTxnTable) MaxAndMinValues(ctx context.Context) ([][2]any, []uint8, error) { + panic("not implemented") +} + +func (t *partitionTxnTable) TableDefs(ctx context.Context) ([]engine.TableDef, error) { + return t.primary.TableDefs(ctx) +} + +func (t *partitionTxnTable) GetTableDef(ctx context.Context) *plan.TableDef { + return t.primary.GetTableDef(ctx) +} + +func (t *partitionTxnTable) CopyTableDef(ctx context.Context) *plan.TableDef { + return t.primary.CopyTableDef(ctx) +} + +func (t *partitionTxnTable) GetPrimaryKeys(ctx context.Context) ([]*engine.Attribute, error) { + return t.primary.GetPrimaryKeys(ctx) +} + +func (t *partitionTxnTable) GetHideKeys(ctx context.Context) ([]*engine.Attribute, error) { + return t.primary.GetHideKeys(ctx) +} + +func (t *partitionTxnTable) AddTableDef(context.Context, engine.TableDef) error { + return nil +} + +func (t *partitionTxnTable) DelTableDef(context.Context, engine.TableDef) error { + return nil +} + +func (t *partitionTxnTable) GetTableID(ctx context.Context) uint64 { + return t.primary.GetTableID(ctx) +} + +func (t *partitionTxnTable) GetTableName() string { + return t.primary.GetTableName() +} + +func (t *partitionTxnTable) GetDBID(ctx context.Context) uint64 { + return t.primary.GetDBID(ctx) +} + +func (t *partitionTxnTable) TableColumns(ctx context.Context) ([]*engine.Attribute, error) { + return t.primary.TableColumns(ctx) +} + +func (t *partitionTxnTable) GetEngineType() engine.EngineType { + return t.primary.GetEngineType() +} + +func (t *partitionTxnTable) GetProcess() any { + return t.primary.GetProcess() +} + +func (t *partitionTxnTable) PrimaryKeysMayBeModified( + ctx context.Context, + from types.TS, + to types.TS, + bat *batch.Batch, + pkIndex int32, +) (bool, error) { + res, err := t.ps.Prune( + ctx, + t.metadata.TableID, + bat, + nil, + ) + if err != nil { + return false, err + } + defer res.Close() + + changed := false + res.Iter( + func(p partition.Partition, bat *batch.Batch) bool { + v, e := t.primary.db.relation( + ctx, + p.PartitionTableName, + t.primary.proc.Load(), + ) + if e != nil { + err = e + return false + } + changed, err = v.PrimaryKeysMayBeModified( + ctx, + from, + to, + bat, + pkIndex, + ) + if err != nil || changed { + return false + } + return true + }, + ) + return changed, err +} + +func (t *partitionTxnTable) Write(context.Context, *batch.Batch) error { + panic("BUG: cannot write data to partition primary table") +} + +func (t *partitionTxnTable) Update(context.Context, *batch.Batch) error { + panic("BUG: cannot update data to partition primary table") +} + +func (t *partitionTxnTable) Delete(context.Context, *batch.Batch, string) error { + panic("BUG: cannot delete data to partition primary table") +} + +func (t *partitionTxnTable) PrimaryKeysMayBeUpserted( + ctx context.Context, + from types.TS, + to types.TS, + bat *batch.Batch, + pkIndex int32, +) (bool, error) { + panic("BUG: cannot upsert primary keys in partition primary table") +} + +type partitionedRelData struct { + cnt int + blocks objectio.BlockInfoSlice + partitions map[uint64]engine.RelData + tables map[uint64]engine.Relation +} + +func newPartitionedRelData() *partitionedRelData { + return &partitionedRelData{ + partitions: make(map[uint64]engine.RelData), + tables: make(map[uint64]engine.Relation), + } +} + +func (r *partitionedRelData) addPartition( + ctx context.Context, + table engine.Relation, + param engine.RangesParam, + idx int, +) error { + data, err := table.Ranges( + ctx, + param, + ) + if err != nil { + return err + } + + blocks := data.GetBlockInfoSlice() + n := blocks.Len() + for i := 0; i < n; i++ { + blocks.Get(i).PartitionIdx = int32(idx) + } + + id := table.GetTableID(ctx) + r.tables[id] = table + r.partitions[id] = data + r.cnt += data.DataCnt() + r.blocks = append(r.blocks, data.GetBlockInfoSlice()...) + return nil +} + +func (r *partitionedRelData) AttachTombstones(tombstones engine.Tombstoner) error { + for _, p := range r.partitions { + if err := p.AttachTombstones(tombstones); err != nil { + return err + } + } + return nil +} + +func (r *partitionedRelData) BuildEmptyRelData(preAllocSize int) engine.RelData { + for _, p := range r.partitions { + return p.BuildEmptyRelData(preAllocSize) + } + panic("BUG: no partitions") +} + +func (r *partitionedRelData) DataCnt() int { + return r.cnt +} + +func (r *partitionedRelData) GetBlockInfoSlice() objectio.BlockInfoSlice { + return r.blocks +} + +func (r *partitionedRelData) GetType() engine.RelDataType { + panic("not implemented") +} + +func (r *partitionedRelData) String() string { + return "partitionedRelData" +} + +func (r *partitionedRelData) MarshalBinary() ([]byte, error) { + panic("not implemented") +} + +func (r *partitionedRelData) UnmarshalBinary(buf []byte) error { + panic("not implemented") +} + +func (r *partitionedRelData) GetTombstones() engine.Tombstoner { + panic("not implemented") +} + +func (r *partitionedRelData) DataSlice(begin, end int) engine.RelData { + panic("not implemented") +} + +func (r *partitionedRelData) GetShardIDList() []uint64 { + panic("not implemented") +} + +func (r *partitionedRelData) GetShardID(i int) uint64 { + panic("not implemented") +} + +func (r *partitionedRelData) SetShardID(i int, id uint64) { + panic("not implemented") +} + +func (r *partitionedRelData) AppendShardID(id uint64) { + panic("not implemented") +} + +func (r *partitionedRelData) SetBlockInfo(i int, blk *objectio.BlockInfo) { + panic("not implemented") +} + +func (r *partitionedRelData) GetBlockInfo(i int) objectio.BlockInfo { + panic("not implemented") +} + +func (r *partitionedRelData) AppendBlockInfo(blk *objectio.BlockInfo) { + panic("not implemented") +} + +func (r *partitionedRelData) AppendBlockInfoSlice(objectio.BlockInfoSlice) { + panic("not implemented") +} + +func (r *partitionedRelData) Split(i int) []engine.RelData { + panic("not implemented") +} diff --git a/pkg/vm/engine/disttae/txn_table_sharding_handle.go b/pkg/vm/engine/disttae/txn_table_sharding_handle.go index 600379ea3ac4f..dd8bc3251ca30 100644 --- a/pkg/vm/engine/disttae/txn_table_sharding_handle.go +++ b/pkg/vm/engine/disttae/txn_table_sharding_handle.go @@ -511,11 +511,14 @@ func HandleShardingReadPrimaryKeysMayBeModified( return nil, err } + batch := batch.NewWithSize(1) + batch.SetVector(0, keyVector) modify, err := tbl.PrimaryKeysMayBeModified( ctx, from, to, - keyVector, + batch, + 0, ) if err != nil { return nil, err @@ -561,11 +564,14 @@ func HandleShardingReadPrimaryKeysMayBeUpserted( return nil, err } + batch := batch.NewWithSize(1) + batch.SetVector(0, keyVector) modify, err := tbl.PrimaryKeysMayBeUpserted( ctx, from, to, - keyVector, + batch, + 0, ) if err != nil { return nil, err diff --git a/pkg/vm/engine/memoryengine/table.go b/pkg/vm/engine/memoryengine/table.go index ca04c112da9c2..5d2f19ebd5648 100644 --- a/pkg/vm/engine/memoryengine/table.go +++ b/pkg/vm/engine/memoryengine/table.go @@ -22,7 +22,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" - "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/pb/plan" @@ -535,11 +534,11 @@ func (t *Table) GetColumMetadataScanInfo(ctx context.Context, name string) ([]*p return nil, nil } -func (t *Table) PrimaryKeysMayBeModified(ctx context.Context, from types.TS, to types.TS, keyVector *vector.Vector) (bool, error) { +func (t *Table) PrimaryKeysMayBeModified(ctx context.Context, from types.TS, to types.TS, bat *batch.Batch, idx int32) (bool, error) { return true, nil } -func (t *Table) PrimaryKeysMayBeUpserted(ctx context.Context, from types.TS, to types.TS, keyVector *vector.Vector) (bool, error) { +func (t *Table) PrimaryKeysMayBeUpserted(ctx context.Context, from types.TS, to types.TS, bat *batch.Batch, idx int32) (bool, error) { return true, nil } diff --git a/pkg/vm/engine/memoryengine/table_reader.go b/pkg/vm/engine/memoryengine/table_reader.go index be63af9c49bad..6d424da1e2ba7 100644 --- a/pkg/vm/engine/memoryengine/table_reader.go +++ b/pkg/vm/engine/memoryengine/table_reader.go @@ -348,11 +348,6 @@ func (rd *MemRelationData) DataSlice(i, j int) engine.RelData { panic("Not Support") } -// GroupByPartitionNum TODO::remove it after refactor of partition table. -func (rd *MemRelationData) GroupByPartitionNum() map[int16]engine.RelData { - panic("Not Support") -} - func (rd *MemRelationData) BuildEmptyRelData(i int) engine.RelData { return &MemRelationData{} } diff --git a/pkg/vm/engine/readutil/datasource_test.go b/pkg/vm/engine/readutil/datasource_test.go index 218c630b2365b..87680a6bbd886 100644 --- a/pkg/vm/engine/readutil/datasource_test.go +++ b/pkg/vm/engine/readutil/datasource_test.go @@ -120,7 +120,6 @@ func TestObjListRelData(t *testing.T) { // for test coverage buf, err := objlistRelData.MarshalBinary() require.NoError(t, err) objlistRelData.UnmarshalBinary(buf) - objlistRelData.GroupByPartitionNum() } func TestObjListRelData1(t *testing.T) { diff --git a/pkg/vm/engine/readutil/relation_data.go b/pkg/vm/engine/readutil/relation_data.go index 4edc8b57680b7..8736e55323728 100644 --- a/pkg/vm/engine/readutil/relation_data.go +++ b/pkg/vm/engine/readutil/relation_data.go @@ -17,6 +17,7 @@ package readutil import ( "bytes" "fmt" + "github.com/matrixorigin/matrixone/pkg/pb/plan" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" @@ -178,10 +179,6 @@ func (rd *EmptyRelationData) DataSlice(begin, end int) engine.RelData { panic("Not Supported") } -func (rd *EmptyRelationData) GroupByPartitionNum() map[int16]engine.RelData { - panic("Not Supported") -} - func (rd *EmptyRelationData) AppendDataBlk(blk any) { panic("Not Supported") } @@ -365,11 +362,6 @@ func (or *ObjListRelData) DataSlice(i, j int) engine.RelData { return or.blocklistRelData.DataSlice(i, j) } -func (or *ObjListRelData) GroupByPartitionNum() map[int16]engine.RelData { - or.expand() - return or.blocklistRelData.GroupByPartitionNum() -} - func (or *ObjListRelData) DataCnt() int { return int(or.TotalBlocks) } @@ -547,29 +539,6 @@ func (relData *BlockListRelData) DataSlice(i, j int) engine.RelData { } } -func (relData *BlockListRelData) GroupByPartitionNum() map[int16]engine.RelData { - ret := make(map[int16]engine.RelData) - - blks := relData.GetBlockInfoSlice() - blksLen := blks.Len() - for idx := range blksLen { - blkInfo := blks.Get(idx) - if blkInfo.IsMemBlk() { - continue - } - partitionNum := blkInfo.PartitionNum - if _, ok := ret[partitionNum]; !ok { - ret[partitionNum] = &BlockListRelData{ - tombstones: relData.tombstones, - } - ret[partitionNum].AppendBlockInfo(&objectio.EmptyBlockInfo) - } - ret[partitionNum].AppendBlockInfo(blkInfo) - } - - return ret -} - func (relData *BlockListRelData) DataCnt() int { return relData.blklist.Len() } diff --git a/pkg/vm/engine/readutil/relation_data_test.go b/pkg/vm/engine/readutil/relation_data_test.go index 1e7093dd16949..d3eb88fbd519e 100644 --- a/pkg/vm/engine/readutil/relation_data_test.go +++ b/pkg/vm/engine/readutil/relation_data_test.go @@ -54,9 +54,6 @@ func TestEmptyRelationData(t *testing.T) { require.Panics(t, func() { relData.DataSlice(0, 0) }) - require.Panics(t, func() { - relData.GroupByPartitionNum() - }) require.Equal(t, 0, relData.DataCnt()) } diff --git a/pkg/vm/engine/test/reader_test.go b/pkg/vm/engine/test/reader_test.go index 37eeebf8b18b5..dfd7620e13f46 100644 --- a/pkg/vm/engine/test/reader_test.go +++ b/pkg/vm/engine/test/reader_test.go @@ -1073,7 +1073,9 @@ func Test_ShardingTableDelegate(t *testing.T) { require.NoError(t, err) require.True(t, tomb.HasAnyInMemoryTombstone()) - _, err = delegate.PrimaryKeysMayBeUpserted(ctx, types.TS{}, types.MaxTs(), vector.NewVec(types.T_int64.ToType())) + bat := batch.NewWithSize(1) + bat.SetVector(0, vector.NewVec(types.T_int64.ToType())) + _, err = delegate.PrimaryKeysMayBeUpserted(ctx, types.TS{}, types.MaxTs(), bat, 0) require.NoError(t, err) _, err = delegate.BuildReaders( diff --git a/pkg/vm/engine/test/testutil/disttae_engine.go b/pkg/vm/engine/test/testutil/disttae_engine.go index 145dad3cdf354..e3a4e94e9fa40 100644 --- a/pkg/vm/engine/test/testutil/disttae_engine.go +++ b/pkg/vm/engine/test/testutil/disttae_engine.go @@ -16,16 +16,17 @@ package testutil import ( "context" - "github.com/matrixorigin/matrixone/pkg/config" - "github.com/matrixorigin/matrixone/pkg/frontend" - ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor" - "github.com/matrixorigin/matrixone/pkg/util/toml" "os" "strings" "sync" "sync/atomic" "time" + "github.com/matrixorigin/matrixone/pkg/config" + "github.com/matrixorigin/matrixone/pkg/frontend" + ie "github.com/matrixorigin/matrixone/pkg/util/internalExecutor" + "github.com/matrixorigin/matrixone/pkg/util/toml" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/clusterservice" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -203,6 +204,7 @@ func NewTestDisttaeEngine( de.txnClient, fs, new(mockLockService), + nil, qc, hakeeper, nil, diff --git a/pkg/vm/engine/types.go b/pkg/vm/engine/types.go index 4ce4ab44ceaf9..d13a623c6e7f1 100644 --- a/pkg/vm/engine/types.go +++ b/pkg/vm/engine/types.go @@ -689,8 +689,6 @@ type RelData interface { GetTombstones() Tombstoner DataSlice(begin, end int) RelData - // GroupByPartitionNum TODO::remove it after refactor of partition table. - GroupByPartitionNum() map[int16]RelData BuildEmptyRelData(preAllocSize int) RelData DataCnt() int @@ -940,9 +938,9 @@ type Relation interface { // PrimaryKeysMayBeModified reports whether any rows with any primary keys in keyVector was modified during `from` to `to` // If not sure, returns true // Initially added for implementing locking rows by primary keys - PrimaryKeysMayBeModified(ctx context.Context, from types.TS, to types.TS, keyVector *vector.Vector) (bool, error) + PrimaryKeysMayBeModified(ctx context.Context, from types.TS, to types.TS, batch *batch.Batch, pkIndex int32) (bool, error) - PrimaryKeysMayBeUpserted(ctx context.Context, from types.TS, to types.TS, keyVector *vector.Vector) (bool, error) + PrimaryKeysMayBeUpserted(ctx context.Context, from types.TS, to types.TS, batch *batch.Batch, pkIndex int32) (bool, error) ApproxObjectsNum(ctx context.Context) int MergeObjects(ctx context.Context, objstats []objectio.ObjectStats, targetObjSize uint32) (*api.MergeCommitEntry, error) diff --git a/pkg/vm/process/process.go b/pkg/vm/process/process.go index 9eed88db29c33..a67253fba2447 100644 --- a/pkg/vm/process/process.go +++ b/pkg/vm/process/process.go @@ -32,6 +32,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/lockservice" "github.com/matrixorigin/matrixone/pkg/logservice" "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/partitionservice" "github.com/matrixorigin/matrixone/pkg/pb/lock" qclient "github.com/matrixorigin/matrixone/pkg/queryservice/client" "github.com/matrixorigin/matrixone/pkg/util/trace" @@ -108,6 +109,10 @@ func (proc *Process) GetLockService() lockservice.LockService { return proc.Base.LockService } +func (proc *Process) GetPartitionService() partitionservice.PartitionService { + return proc.Base.PartitionService +} + func (proc *Process) GetWaitPolicy() lock.WaitPolicy { return proc.Base.WaitPolicy } diff --git a/pkg/vm/process/process2.go b/pkg/vm/process/process2.go index bdd8755bc02d8..95752c2228864 100644 --- a/pkg/vm/process/process2.go +++ b/pkg/vm/process/process2.go @@ -27,6 +27,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/incrservice" "github.com/matrixorigin/matrixone/pkg/lockservice" "github.com/matrixorigin/matrixone/pkg/logservice" + "github.com/matrixorigin/matrixone/pkg/partitionservice" "github.com/matrixorigin/matrixone/pkg/perfcounter" qclient "github.com/matrixorigin/matrixone/pkg/queryservice/client" "github.com/matrixorigin/matrixone/pkg/stage" @@ -68,16 +69,17 @@ func NewTopProcess( }, // 1. fields from outer - mp: mp, - TxnClient: txnClient, - TxnOperator: txnOperator, - FileService: fileService, - IncrService: incrservice.GetAutoIncrementService(sid), - LockService: lockService, - Aicm: autoIncrease, - QueryClient: queryClient, - Hakeeper: HAKeeper, - UdfService: udfService, + mp: mp, + TxnClient: txnClient, + TxnOperator: txnOperator, + FileService: fileService, + IncrService: incrservice.GetAutoIncrementService(sid), + LockService: lockService, + PartitionService: partitionservice.GetService(sid), + Aicm: autoIncrease, + QueryClient: queryClient, + Hakeeper: HAKeeper, + UdfService: udfService, // 2. fields from make. LastInsertID: new(uint64), diff --git a/pkg/vm/process/process_codec.go b/pkg/vm/process/process_codec.go index df76660938045..acf63d8ff74cc 100644 --- a/pkg/vm/process/process_codec.go +++ b/pkg/vm/process/process_codec.go @@ -29,6 +29,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/lockservice" "github.com/matrixorigin/matrixone/pkg/logservice" + "github.com/matrixorigin/matrixone/pkg/partitionservice" "github.com/matrixorigin/matrixone/pkg/pb/pipeline" qclient "github.com/matrixorigin/matrixone/pkg/queryservice/client" "github.com/matrixorigin/matrixone/pkg/txn/client" @@ -128,6 +129,7 @@ func NewCodecService( txnClient client.TxnClient, fileService fileservice.FileService, lockService lockservice.LockService, + partitionService partitionservice.PartitionService, queryClient qclient.QueryClient, hakeeper logservice.CNHAKeeperClient, udfService udf.Service, @@ -138,26 +140,28 @@ func NewCodecService( panic(err) } return &codecService{ - txnClient: txnClient, - fileService: fileService, - lockService: lockService, - queryClient: queryClient, - hakeeper: hakeeper, - udfService: udfService, - engine: engine, - mp: mp, + txnClient: txnClient, + fileService: fileService, + lockService: lockService, + partitionService: partitionService, + queryClient: queryClient, + hakeeper: hakeeper, + udfService: udfService, + engine: engine, + mp: mp, } } type codecService struct { - txnClient client.TxnClient - fileService fileservice.FileService - lockService lockservice.LockService - queryClient qclient.QueryClient - hakeeper logservice.CNHAKeeperClient - udfService udf.Service - mp *mpool.MPool - engine engine.Engine + txnClient client.TxnClient + fileService fileservice.FileService + lockService lockservice.LockService + partitionService partitionservice.PartitionService + queryClient qclient.QueryClient + hakeeper logservice.CNHAKeeperClient + udfService udf.Service + mp *mpool.MPool + engine engine.Engine } func GetCodecService(service string) ProcessCodecService { @@ -206,6 +210,7 @@ func (c *codecService) Decode( nil, ) proc.Base.LockService = c.lockService + proc.Base.PartitionService = c.partitionService proc.Base.UnixTime = value.UnixTime proc.Base.Id = value.Id proc.Base.Lim = ConvertToProcessLimitation(value.Lim) diff --git a/pkg/vm/process/types.go b/pkg/vm/process/types.go index f4ce0c0b6d638..3e4a133dbd11c 100644 --- a/pkg/vm/process/types.go +++ b/pkg/vm/process/types.go @@ -21,6 +21,7 @@ import ( "sync/atomic" "time" + "github.com/matrixorigin/matrixone/pkg/partitionservice" "github.com/matrixorigin/matrixone/pkg/stage" "github.com/matrixorigin/matrixone/pkg/vm/message" @@ -274,12 +275,13 @@ type BaseProcess struct { Lim Limitation mp *mpool.MPool // unix timestamp - UnixTime int64 - TxnClient client.TxnClient - SessionInfo SessionInfo - FileService fileservice.FileService - LockService lockservice.LockService - IncrService incrservice.AutoIncrementService + UnixTime int64 + TxnClient client.TxnClient + SessionInfo SessionInfo + FileService fileservice.FileService + LockService lockservice.LockService + PartitionService partitionservice.PartitionService + IncrService incrservice.AutoIncrementService LastInsertID *uint64 LoadLocalReader *io.PipeReader diff --git a/pkg/vm/types.go b/pkg/vm/types.go index 24eaca628479c..37b9f820ac114 100644 --- a/pkg/vm/types.go +++ b/pkg/vm/types.go @@ -69,6 +69,9 @@ const ( External Source MultiUpdate + PartitionInsert + PartitionDelete + PartitionMultiUpdate Minus Intersect From a350abe779024cd58a240523db59ca1c904ed80f Mon Sep 17 00:00:00 2001 From: zhangxu Date: Wed, 15 Jan 2025 19:03:54 +0800 Subject: [PATCH 2/5] fix --- pkg/sql/compile/compile.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index ec9cf59c8ce1b..e7ac1f1c48490 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -3361,7 +3361,7 @@ func (c *Compile) compileDelete(n *plan.Node, ss []*Scope) ([]*Scope, error) { rs = c.newMergeScope(ss) } - rs.setRootOperator(arg) + rs.setRootOperator(op) ss = []*Scope{rs} return ss, nil } From f5c1605014980341331f66e52d75276527d8905f Mon Sep 17 00:00:00 2001 From: zhangxu Date: Wed, 15 Jan 2025 20:16:44 +0800 Subject: [PATCH 3/5] fix ut --- pkg/tests/partition/partition_test.go | 43 +++++++++++---------------- pkg/vm/engine/disttae/txn_table.go | 6 ++-- 2 files changed, 20 insertions(+), 29 deletions(-) diff --git a/pkg/tests/partition/partition_test.go b/pkg/tests/partition/partition_test.go index a3ec05207da8b..4732fb3a54c0e 100644 --- a/pkg/tests/partition/partition_test.go +++ b/pkg/tests/partition/partition_test.go @@ -27,9 +27,9 @@ import ( ) var ( - once sync.Once - shardingCluster embed.Cluster - mu sync.Mutex + once sync.Once + shareCluster embed.Cluster + mu sync.Mutex ) func runPartitionClusterTest( @@ -51,11 +51,9 @@ func runPartitionClusterTestWithReuse( mu.Lock() defer mu.Unlock() - var err error - var cluster embed.Cluster var c embed.Cluster - createFunc := func() { - c, err = embed.NewCluster( + createFunc := func() embed.Cluster { + new, err := embed.NewCluster( embed.WithCNCount(3), embed.WithTesting(), embed.WithPreStart( @@ -70,28 +68,21 @@ func runPartitionClusterTestWithReuse( }, ), ) - if err != nil { - return - } - err = c.Start() - if err != nil { - return - } - cluster = c - if reuse { - shardingCluster = cluster - } - } - - if err != nil { - return err + require.NoError(t, err) + require.NoError(t, new.Start()) + return new } if reuse { - once.Do(createFunc) - cluster = shardingCluster + once.Do( + func() { + c = createFunc() + shareCluster = c + }, + ) + c = shareCluster } else { - createFunc() + c = createFunc() } cn, err := c.GetCNService(0) @@ -104,6 +95,6 @@ func runPartitionClusterTestWithReuse( partitionservice.InitSQLs..., ) } - fn(cluster) + fn(c) return nil } diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index c5691e72c02cc..eeb3bbdba238f 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -40,7 +40,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" - "github.com/matrixorigin/matrixone/pkg/partitionservice" "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/pb/plan" pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" @@ -92,8 +91,9 @@ func newTxnTable( } tbl.isLocal = tbl.isLocalFunc - if db.databaseId != catalog.MO_CATALOG_ID { - ps := partitionservice.GetService(process.GetService()) + ps := process.GetPartitionService() + if ps != nil && db.databaseId != catalog.MO_CATALOG_ID { + is, metadata, err := ps.Is(ctx, item.Id, txn.op) if err != nil { return nil, err From 6641e33700f1b40c125e9078fee86e99c23fd352 Mon Sep 17 00:00:00 2001 From: zhangxu Date: Thu, 16 Jan 2025 10:45:08 +0800 Subject: [PATCH 4/5] fix ut --- pkg/partitionservice/service.go | 4 ++++ pkg/vm/process/process.go | 6 +++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/partitionservice/service.go b/pkg/partitionservice/service.go index 13fbd7ba2ce96..0e6aa7ba6261e 100644 --- a/pkg/partitionservice/service.go +++ b/pkg/partitionservice/service.go @@ -30,6 +30,10 @@ import ( "github.com/matrixorigin/matrixone/pkg/txn/client" ) +var ( + DisabledService = NewService(Config{}, nil) +) + type service struct { cfg Config store PartitionStorage diff --git a/pkg/vm/process/process.go b/pkg/vm/process/process.go index a67253fba2447..9fb281b87a1e9 100644 --- a/pkg/vm/process/process.go +++ b/pkg/vm/process/process.go @@ -110,7 +110,11 @@ func (proc *Process) GetLockService() lockservice.LockService { } func (proc *Process) GetPartitionService() partitionservice.PartitionService { - return proc.Base.PartitionService + ps := proc.Base.PartitionService + if ps == nil { + return partitionservice.DisabledService + } + return ps } func (proc *Process) GetWaitPolicy() lock.WaitPolicy { From 03fc8735c0b9bc6e6614c6438e9b0123b99da109 Mon Sep 17 00:00:00 2001 From: zhangxu Date: Fri, 17 Jan 2025 11:21:42 +0800 Subject: [PATCH 5/5] fix ut --- pkg/objectio/block_info_test.go | 16 +++++++ pkg/partitionservice/service_test.go | 48 +++++++++++++++++++ pkg/sql/colexec/insert/insert_partition.go | 2 +- .../colexec/insert/insert_partition_test.go | 40 ++++++++++++++++ .../multi_update/multi_update_partition.go | 4 +- .../multi_update_partition_test.go | 42 ++++++++++++++++ pkg/sql/compile/operator_test.go | 42 ++++++++++++++++ pkg/tests/testutils/test_utils.go | 2 +- 8 files changed, 192 insertions(+), 4 deletions(-) create mode 100644 pkg/sql/colexec/insert/insert_partition_test.go create mode 100644 pkg/sql/colexec/multi_update/multi_update_partition_test.go create mode 100644 pkg/sql/compile/operator_test.go diff --git a/pkg/objectio/block_info_test.go b/pkg/objectio/block_info_test.go index 1e48862dbed0e..acecf2f189331 100644 --- a/pkg/objectio/block_info_test.go +++ b/pkg/objectio/block_info_test.go @@ -15,6 +15,7 @@ package objectio import ( + "bytes" "testing" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -288,3 +289,18 @@ func BenchmarkObjectStatsRelatedUtils(b *testing.B) { } }) } + +func TestMarshalAndUnMarshal(t *testing.T) { + b := &BlockInfo{} + b.PartitionIdx = 1 + b.ObjectFlags = 1 + + buf := bytes.NewBuffer(nil) + _, err := b.MarshalWithBuf(buf) + require.NoError(t, err) + + b2 := &BlockInfo{} + require.NoError(t, b2.Unmarshal(buf.Bytes())) + + require.Equal(t, b, b2) +} diff --git a/pkg/partitionservice/service_test.go b/pkg/partitionservice/service_test.go index 99d5019b03763..e48065be9a427 100644 --- a/pkg/partitionservice/service_test.go +++ b/pkg/partitionservice/service_test.go @@ -20,7 +20,9 @@ import ( "testing" "time" + "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/pb/partition" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/sql/parsers" "github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect" @@ -132,6 +134,52 @@ func TestIs(t *testing.T) { ) } +func TestPruneNoPartition(t *testing.T) { + runTestPartitionServiceTest( + func( + ctx context.Context, + txnOp client.TxnOperator, + s *service, + store *memStorage, + ) { + res, err := s.Prune(ctx, 1, nil, txnOp) + require.NoError(t, err) + require.True(t, res.Empty()) + }, + ) +} + +func TestFilterNoPartition(t *testing.T) { + runTestPartitionServiceTest( + func( + ctx context.Context, + txnOp client.TxnOperator, + s *service, + store *memStorage, + ) { + res, err := s.Filter(ctx, 1, nil, txnOp) + require.NoError(t, err) + require.Empty(t, res) + }, + ) +} + +func TestIterResult(t *testing.T) { + res := PruneResult{ + batches: make([]*batch.Batch, 10), + partitions: make([]partition.Partition, 10), + } + + n := 0 + res.Iter( + func(partition partition.Partition, bat *batch.Batch) bool { + n++ + return false + }, + ) + require.Equal(t, 1, n) +} + func runTestPartitionServiceTest( fn func( ctx context.Context, diff --git a/pkg/sql/colexec/insert/insert_partition.go b/pkg/sql/colexec/insert/insert_partition.go index f33a2a877643e..55704df34f7c2 100644 --- a/pkg/sql/colexec/insert/insert_partition.go +++ b/pkg/sql/colexec/insert/insert_partition.go @@ -1,4 +1,4 @@ -// Copyright 2021 Matrix Origin +// Copyright 2021-2024 Matrix Origin // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/sql/colexec/insert/insert_partition_test.go b/pkg/sql/colexec/insert/insert_partition_test.go new file mode 100644 index 0000000000000..e7b312bcf1191 --- /dev/null +++ b/pkg/sql/colexec/insert/insert_partition_test.go @@ -0,0 +1,40 @@ +// Copyright 2021-2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package insert + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPartitionInsertString(t *testing.T) { + op := &PartitionInsert{} + buf := new(bytes.Buffer) + op.String(buf) + require.Equal(t, "insert: partition_insert", buf.String()) +} + +func TestNewPartitionInsertFrom(t *testing.T) { + ps := &PartitionInsert{ + raw: &Insert{}, + tableID: 1, + } + op := NewPartitionInsertFrom(ps) + require.Equal(t, ps.raw.InsertCtx, op.(*PartitionInsert).raw.InsertCtx) + require.Equal(t, ps.raw.ToWriteS3, op.(*PartitionInsert).raw.ToWriteS3) + require.Equal(t, ps.tableID, op.(*PartitionInsert).tableID) +} diff --git a/pkg/sql/colexec/multi_update/multi_update_partition.go b/pkg/sql/colexec/multi_update/multi_update_partition.go index fbd813d84468e..4914d8a207682 100644 --- a/pkg/sql/colexec/multi_update/multi_update_partition.go +++ b/pkg/sql/colexec/multi_update/multi_update_partition.go @@ -1,4 +1,4 @@ -// Copyright 2021 Matrix Origin +// Copyright 2021-2024 Matrix Origin // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -50,7 +50,7 @@ func NewPartitionMultiUpdateFrom( op.Action = from.raw.Action op.IsOnduplicateKeyUpdate = from.raw.IsOnduplicateKeyUpdate op.Engine = from.raw.Engine - return op + return NewPartitionMultiUpdate(op, from.tableID) } func (op *PartitionMultiUpdate) String(buf *bytes.Buffer) { diff --git a/pkg/sql/colexec/multi_update/multi_update_partition_test.go b/pkg/sql/colexec/multi_update/multi_update_partition_test.go new file mode 100644 index 0000000000000..5fd9edf58a681 --- /dev/null +++ b/pkg/sql/colexec/multi_update/multi_update_partition_test.go @@ -0,0 +1,42 @@ +// Copyright 2021-2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multi_update + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPartitionMultiUpdateString(t *testing.T) { + op := &PartitionMultiUpdate{} + buf := new(bytes.Buffer) + op.String(buf) + require.Equal(t, "MultiUpdate: partition_multi_update", buf.String()) +} + +func TestNewPartitionMultiUpdateFrom(t *testing.T) { + ps := &PartitionMultiUpdate{ + raw: &MultiUpdate{}, + tableID: 1, + } + op := NewPartitionMultiUpdateFrom(ps) + require.Equal(t, ps.raw.MultiUpdateCtx, op.(*PartitionMultiUpdate).raw.MultiUpdateCtx) + require.Equal(t, ps.raw.Action, op.(*PartitionMultiUpdate).raw.Action) + require.Equal(t, ps.raw.IsOnduplicateKeyUpdate, op.(*PartitionMultiUpdate).raw.IsOnduplicateKeyUpdate) + require.Equal(t, ps.raw.Engine, op.(*PartitionMultiUpdate).raw.Engine) + require.Equal(t, ps.tableID, op.(*PartitionMultiUpdate).tableID) +} diff --git a/pkg/sql/compile/operator_test.go b/pkg/sql/compile/operator_test.go new file mode 100644 index 0000000000000..b2e7f4b07e479 --- /dev/null +++ b/pkg/sql/compile/operator_test.go @@ -0,0 +1,42 @@ +// Copyright 2021-2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compile + +import ( + "testing" + + "github.com/matrixorigin/matrixone/pkg/sql/colexec/deletion" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/insert" +) + +func TestDupOperator(t *testing.T) { + dupOperator( + insert.NewPartitionInsert( + &insert.Insert{}, + 1, + ), + 0, + 0, + ) + + dupOperator( + deletion.NewPartitionDelete( + &deletion.Deletion{}, + 1, + ), + 0, + 0, + ) +} diff --git a/pkg/tests/testutils/test_utils.go b/pkg/tests/testutils/test_utils.go index 339a9bf7eb5f7..bb7648a040a3c 100644 --- a/pkg/tests/testutils/test_utils.go +++ b/pkg/tests/testutils/test_utils.go @@ -174,7 +174,7 @@ func ExecSQLWithReadResult( ) require.NoError(t, moerr.AttachCause(ctx, err), sql) - // WaitLogtailApplied(t, txnOp.Txn().CommitTS, cn) + WaitLogtailApplied(t, txnOp.Txn().CommitTS, cn) return txnOp.Txn().CommitTS }