Skip to content

Commit

Permalink
Merge branch 'main' into fix-ut-1
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 17, 2025
2 parents 5b9674e + f80352a commit 88a52d8
Show file tree
Hide file tree
Showing 19 changed files with 1,027 additions and 127 deletions.
34 changes: 34 additions & 0 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,40 @@ func IsMoErrCode(e error, rc uint16) bool {
return me.code == rc
}

func GetMoErrCode(e error) (uint16, bool) {
if e == nil {
return 0, false
}

me, ok := e.(*Error)
if !ok {
return 0, false
}

return me.code, true
}

func IsSameMoErr(a error, b error) bool {
if a == nil || b == nil {
return false
}

var (
ok bool
aa, bb *Error
)

if aa, ok = a.(*Error); !ok {
return false
}

if bb, ok = b.(*Error); !ok {
return false
}

return aa.code == bb.code
}

func DowncastError(e error) *Error {
if err, ok := e.(*Error); ok {
return err
Expand Down
50 changes: 50 additions & 0 deletions pkg/common/moerr/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,53 @@ func TestEncoding(t *testing.T) {
require.Nil(t, err)
require.Equal(t, e, e2)
}

type fakeErr struct {
}

func (f *fakeErr) Error() string {
return "fake error"
}

func TestIsSameMoErr(t *testing.T) {
var a, b error
require.False(t, IsSameMoErr(a, b))

_, ok := GetMoErrCode(a)
require.False(t, ok)

_, ok = GetMoErrCode(b)
require.False(t, ok)

a = &fakeErr{}
require.False(t, IsSameMoErr(a, b))

_, ok = GetMoErrCode(a)
require.False(t, ok)

b = &fakeErr{}
require.False(t, IsSameMoErr(a, b))

_, ok = GetMoErrCode(b)
require.False(t, ok)

a = GetOkExpectedEOB()
require.False(t, IsSameMoErr(a, b))

code, ok := GetMoErrCode(a)
require.True(t, ok)
require.Equal(t, OkExpectedEOB, code)

b = GetOkExpectedDup()
require.False(t, IsSameMoErr(a, b))

code, ok = GetMoErrCode(b)
require.True(t, ok)
require.Equal(t, OkExpectedDup, code)

b = nil
require.False(t, IsSameMoErr(a, b))

b = GetOkExpectedEOB()
require.True(t, IsSameMoErr(a, b))
}
18 changes: 18 additions & 0 deletions pkg/objectio/ioutil/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,21 @@ func IsGCMetaExt(ext string) bool {
func IsMetadataName(name string) bool {
return strings.HasPrefix(name, PrefixMetadata) && strings.HasSuffix(name, SuffixMetadata)
}

const (
TableObjectsAttr_Accout_Idx = 0
TableObjectsAttr_DB_Idx = 1
TableObjectsAttr_Table_Idx = 2
TableObjectsAttr_ObjectType_Idx = 3
TableObjectsAttr_ID_Idx = 4
TableObjectsAttr_CreateTS_Idx = 5
TableObjectsAttr_DeleteTS_Idx = 6
TableObjectsAttr_Cluster_Idx = 7
// TableObjectsAttr_PhysicalAddr_Idx = 8
)

const (
ObjectType_Invalid int8 = iota
ObjectType_Data
ObjectType_Tombstone
)
63 changes: 32 additions & 31 deletions pkg/objectio/ioutil/sinker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,50 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func mockSchema(colCnt int, pkIdx int) ([]string, []types.Type, []uint16) {
attrs := make([]string, 0)
typs := make([]types.Type, 0)
seq := make([]uint16, 0)
for i := 0; i < colCnt; i++ {
colName := fmt.Sprintf("mock_%d", i)
attrs = append(attrs, colName)
typs = append(typs, types.T_int32.ToType())
seq = append(seq, uint16(i))
}
return attrs, typs, seq
}

func TestNewSinker(t *testing.T) {
proc := testutil.NewProc()
fs, err := fileservice.Get[fileservice.FileService](
proc.GetFileService(), defines.SharedFileServiceName)
require.NoError(t, err)

schema := catalog.MockSchema(3, 2)
seqnums := make([]uint16, len(schema.Attrs()))
for i := range schema.Attrs() {
seqnums[i] = schema.GetSeqnum(schema.Attrs()[i])
}
attrs, typs, seqnums := mockSchema(3, 2)

buffer := containers.NewOneSchemaBatchBuffer(mpool.GB, schema.Attrs(), schema.Types())
buffer := containers.NewOneSchemaBatchBuffer(mpool.GB, attrs, typs)

factory := NewFSinkerImplFactory(
seqnums,
schema.GetPrimaryKey().Idx,
2,
true,
false,
schema.Version,
0,
)

sinker := NewSinker(
schema.GetPrimaryKey().Idx,
schema.Attrs(),
schema.Types(),
2,
attrs,
typs,
factory,
proc.Mp(),
fs,
Expand All @@ -67,7 +76,7 @@ func TestNewSinker(t *testing.T) {
WithBuffer(buffer, false),
)

bat := catalog.MockBatch(schema, 1000)
bat := containers.MockBatch(typs, 1000, 2, nil)
err = sinker.Write(context.Background(), containers.ToCNBatch(bat))
assert.Nil(t, err)

Expand Down Expand Up @@ -95,31 +104,23 @@ func TestNewSinker(t *testing.T) {
}

func makeTestSinker(
inSchema *catalog.Schema,
mp *mpool.MPool,
fs fileservice.FileService,
) (outSchema *catalog.Schema, sinker *Sinker) {
outSchema = inSchema
if outSchema == nil {
outSchema = catalog.MockSchema(3, 2)
}
seqnums := make([]uint16, len(outSchema.Attrs()))
for i := range outSchema.Attrs() {
seqnums[i] = outSchema.GetSeqnum(outSchema.Attrs()[i])
}
) (attr []string, typs []types.Type, seq []uint16, sinker *Sinker) {
attr, typs, seq = mockSchema(3, 2)

factory := NewFSinkerImplFactory(
seqnums,
outSchema.GetPrimaryKey().Idx,
seq,
2,
true,
false,
outSchema.Version,
0,
)

sinker = NewSinker(
outSchema.GetPrimaryKey().Idx,
outSchema.Attrs(),
outSchema.Types(),
2,
attr,
typs,
factory,
mp,
fs,
Expand All @@ -133,10 +134,10 @@ func TestNewSinker2(t *testing.T) {
proc.GetFileService(), defines.SharedFileServiceName)
require.NoError(t, err)

schema, sinker := makeTestSinker(nil, proc.Mp(), fs)
_, typs, _, sinker := makeTestSinker(proc.Mp(), fs)

for i := 0; i < 5; i++ {
bat := catalog.MockBatch(schema, 8192*2)
bat := containers.MockBatch(typs, 8192*2, 2, nil)
err = sinker.Write(context.Background(), containers.ToCNBatch(bat))
assert.Nil(t, err)

Expand Down
18 changes: 3 additions & 15 deletions pkg/vm/engine/ckputil/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,9 @@ import (
// `start_row` is the start rowid of the table in the object
// `end_row` is the end rowid of the table in the object (same object as `start_row`)
// `location` is the location of the object
var MetaSchema_TableRange_Seqnums = []uint16{0, 1, 2, 3, 4}
var MetaSchema_TableRange_Attrs = []string{
TableObjectsAttr_Table,
"object_type",
"start_row",
"end_row",
"location",
}
var MetaSchema_TableRange_Types = []types.Type{
TableObjectsTypes[TableObjectsAttr_Table_Idx],
types.T_int8.ToType(),
objectio.RowidType,
objectio.RowidType,
types.T_char.ToType(),
}
var MetaSchema_TableRange_Seqnums = MetaSeqnums
var MetaSchema_TableRange_Attrs = MetaAttrs
var MetaSchema_TableRange_Types = MetaTypes

func TableRangesString(r []TableRange) string {
var buf bytes.Buffer
Expand Down
50 changes: 28 additions & 22 deletions pkg/vm/engine/ckputil/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package ckputil

import (
"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"

"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
)
Expand All @@ -31,33 +33,33 @@ const (
TableObjectsAttr_Accout = "account_id"
TableObjectsAttr_DB = "db_id"
TableObjectsAttr_Table = "table_id"
TableObjectsAttr_ObjectType = "object_type"
TableObjectsAttr_ID = "id"
TableObjectsAttr_CreateTS = "create_ts"
TableObjectsAttr_DeleteTS = "delete_ts"
TableObjectsAttr_ObjectType = "object_type"

// TableObjects should be clustered by `table`+`id`
// TableObjects should be clustered by `table`+`object_type`+`id`
TableObjectsAttr_Cluster = "cluster"

// TableObjectsAttr_PhysicalAddr = objectio.PhysicalAddr_Attr
)

const (
TableObjectsAttr_Accout_Idx = 0
TableObjectsAttr_DB_Idx = 1
TableObjectsAttr_Table_Idx = 2
TableObjectsAttr_ObjectType_Idx = 3
TableObjectsAttr_ID_Idx = 4
TableObjectsAttr_CreateTS_Idx = 5
TableObjectsAttr_DeleteTS_Idx = 6
TableObjectsAttr_Cluster_Idx = 7
TableObjectsAttr_Accout_Idx = ioutil.TableObjectsAttr_Accout_Idx
TableObjectsAttr_DB_Idx = ioutil.TableObjectsAttr_DB_Idx
TableObjectsAttr_Table_Idx = ioutil.TableObjectsAttr_Table_Idx
TableObjectsAttr_ObjectType_Idx = ioutil.TableObjectsAttr_ObjectType_Idx
TableObjectsAttr_ID_Idx = ioutil.TableObjectsAttr_ID_Idx
TableObjectsAttr_CreateTS_Idx = ioutil.TableObjectsAttr_CreateTS_Idx
TableObjectsAttr_DeleteTS_Idx = ioutil.TableObjectsAttr_DeleteTS_Idx
TableObjectsAttr_Cluster_Idx = ioutil.TableObjectsAttr_Cluster_Idx
// TableObjectsAttr_PhysicalAddr_Idx = 8
)

const (
ObjectType_Invalid int8 = iota
ObjectType_Data
ObjectType_Tombstone
ObjectType_Invalid = ioutil.ObjectType_Invalid
ObjectType_Data = ioutil.ObjectType_Data
ObjectType_Tombstone = ioutil.ObjectType_Tombstone
)

var TableObjectsAttrs = []string{
Expand Down Expand Up @@ -98,34 +100,38 @@ var TableObjectsSeqnums = []uint16{0, 1, 2, 3, 4, 5, 6, 7}
// }

const (
MetaAttr_Table = "table_id"
MetaAttr_Start = "start_rowid"
MetaAttr_End = "end_rowid"
MetaAttr_Location = "location"
MetaAttr_Table = "table_id"
MetaAttr_ObjectType = "object_type"
MetaAttr_Start = "start_rowid"
MetaAttr_End = "end_rowid"
MetaAttr_Location = "location"
)

const (
MetaAttr_Table_Idx = 0
MetaAttr_Start_Idx = 1
MetaAttr_End_Idx = 2
MetaAttr_Location_Idx = 3
MetaAttr_Table_Idx = 0
MetaAttr_ObjectType_Idx = 1
MetaAttr_Start_Idx = 2
MetaAttr_End_Idx = 3
MetaAttr_Location_Idx = 4
)

var MetaAttrs = []string{
MetaAttr_Table,
MetaAttr_ObjectType,
MetaAttr_Start,
MetaAttr_End,
MetaAttr_Location,
}

var MetaTypes = []types.Type{
types.T_uint64.ToType(),
types.T_int8.ToType(),
types.T_Rowid.ToType(),
types.T_Rowid.ToType(),
types.T_char.ToType(),
}

var MetaSeqnums = []uint16{0, 1, 2, 3}
var MetaSeqnums = []uint16{0, 1, 2, 3, 4}

func NewObjectListBatch() *batch.Batch {
return batch.NewWithSchema(false, TableObjectsAttrs, TableObjectsTypes)
Expand Down
4 changes: 4 additions & 0 deletions pkg/vm/engine/disttae/cache/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ type TableItem struct {
ClusterByIdx int
}

func (item *TableItem) IsDeleted() bool {
return item.deleted
}

func (item *TableItem) String() string {
return fmt.Sprintln(
"item ptr", uintptr(unsafe.Pointer(item)),
Expand Down
Loading

0 comments on commit 88a52d8

Please sign in to comment.