Skip to content

Commit

Permalink
multi-blk aobject (#17713)
Browse files Browse the repository at this point in the history
support multi-blk aobject

Approved by: @triump2020, @XuPeng-SH
  • Loading branch information
jiangxinmeng1 authored Jul 28, 2024
1 parent da77cde commit 4b7feef
Show file tree
Hide file tree
Showing 38 changed files with 838 additions and 301 deletions.
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/logtailreplay/partition_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,13 +612,13 @@ func (p *PartitionState) HandleObjectInsert(
break
}
}
iter.Release()

// if there are no rows for the block, delete the block from the dirty
if objEntry.EntryState && scanCnt == blockDeleted && p.dirtyBlocks.Len() > 0 {
p.dirtyBlocks.Delete(*blkID)
}
}
iter.Release()
}
perfcounter.Update(ctx, func(c *perfcounter.CounterSet) {
c.DistTAE.Logtail.ActiveRows.Add(-numDeleted)
Expand Down
31 changes: 27 additions & 4 deletions pkg/vm/engine/tae/catalog/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,10 +449,33 @@ func (entry *ObjectEntry) IsVisible(txn txnif.TxnReader) bool {
return entry.CreateNode.IsVisible(txn)
}
func (entry *ObjectEntry) BlockCnt() int {
if entry.IsAppendable() {
if entry.IsLocal {
return 1
}
return int(entry.getBlockCntFromStats())
lastNode := entry.GetLatestNode()
if lastNode == nil {
logutil.Warnf("obj %v not found", entry.StringWithLevel(3))
if !entry.ObjectMVCCNode.IsEmpty() {
return int(entry.BlkCnt())
}
return 0
}
if lastNode.objData == nil {
if lastNode.GetTable().db.isSys {
return 1
} else {
panic(fmt.Sprintf("logic err obj %v-%d %v doesn't have data",
lastNode.GetTable().fullName, lastNode.GetTable().ID, lastNode.ID().String()))
}
}
if lastNode.ObjectMVCCNode.IsEmpty() {
if !lastNode.IsAppendable() {
logutil.Warnf("[Metadata] get block count when naobj is creating")
return 0
}
return lastNode.objData.BlockCnt()
}
return int(lastNode.getBlockCntFromStats())
}

func (entry *ObjectEntry) getBlockCntFromStats() (blkCnt uint32) {
Expand Down Expand Up @@ -510,9 +533,9 @@ func (entry *ObjectEntry) PrepareRollback() (err error) {
panic("logic error")
}
switch lastNode.ObjectState {
case ObjectState_Create_Active:
case ObjectState_Create_Active, ObjectState_Create_PrepareCommit:
entry.table.link.Delete(lastNode)
case ObjectState_Delete_Active:
case ObjectState_Delete_Active, ObjectState_Delete_PrepareCommit:
newEntry := entry.Clone()
newEntry.DeleteNode.Reset()
entry.table.link.Update(newEntry, entry)
Expand Down
11 changes: 11 additions & 0 deletions pkg/vm/engine/tae/catalog/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
)

func i82bool(v int8) bool {
Expand Down Expand Up @@ -129,6 +130,7 @@ type Schema struct {
BlockMaxRows uint32
// for aobj, there're at most one blk
ObjectMaxBlocks uint16
AObjectMaxSize int
Extra *apipb.SchemaExtra

// do not write down, reconstruct them when reading
Expand Down Expand Up @@ -857,6 +859,15 @@ func (s *Schema) Finalize(withoutPhyAddr bool) (err error) {
err = moerr.NewConstraintViolationNoCtx("no schema")
return
}
if s.BlockMaxRows == 0 {
s.BlockMaxRows = options.DefaultBlockMaxRows
}
if s.ObjectMaxBlocks == 0 {
s.ObjectMaxBlocks = options.DefaultObjectPerSegment
}
if s.AObjectMaxSize == 0 {
s.AObjectMaxSize = options.DefaultAObjectMaxSize
}
if !withoutPhyAddr {
phyAddrDef := &ColDef{
Name: PhyAddrColumnName,
Expand Down
5 changes: 5 additions & 0 deletions pkg/vm/engine/tae/common/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func (id *ID) SetBlockOffset(blkn uint16) {
copy(id.BlockID[types.ObjectBytesSize:], types.EncodeUint16(&blkn))
}

func (id *ID) GetBlockOffset() (blkn uint16) {
copy(types.EncodeUint16(&blkn), id.BlockID[types.ObjectBytesSize:])
return
}

func (id *ID) AsBlockID() *ID {
return &ID{
DbID: id.DbID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/db/checkpoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ func (r *runner) EstimateTableMemSize(table *catalog.TableEntry, tree *model.Tab
for _, obj := range tree.Objs {
object, err := table.GetObjectByID(obj.ID)
if err != nil {
panic(err)
panic(fmt.Sprintf("finding obj %v, err %v", obj.ID.String(), err))
}
a, d := object.GetObjectData().EstimateMemSize()
asize += a
Expand Down
14 changes: 7 additions & 7 deletions pkg/vm/engine/tae/db/test/compatibility/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func init() {

PrepareCaseRegister(MakePrepareCase(
prepareDelete, "prepare-4", "prepare case delete",
schemaCfg{10, 2, 18, 13}, (10*3+1)*3, longOpt,
schemaCfg{100, 2, 18, 13}, 93, longOpt,
))
TestCaseRegister(
MakeTestCase(testDelete, "prepare-4", "test-4", "prepare-4=>test-4"),
Expand Down Expand Up @@ -270,7 +270,7 @@ func prepareDelete(tc PrepareCase, t *testing.T) {
tae.CreateRelAndAppend(bats[0], true)

schema := tc.GetSchema(t)
for i := 0; i < int(schema.BlockMaxRows+1); i++ {
for i := 0; i < 11; i++ {
txn, rel := tae.GetRelation()
v := testutil.GetSingleSortKeyValue(bats[0], schema, i)
filter := handle.NewEQFilter(v)
Expand All @@ -284,7 +284,7 @@ func prepareDelete(tc PrepareCase, t *testing.T) {

// compact
tae.DoAppend(bats[1])
for i := 0; i < int(schema.BlockMaxRows+1); i++ {
for i := 0; i < 11; i++ {
txn, rel := tae.GetRelation()
v := testutil.GetSingleSortKeyValue(bats[1], schema, i)
filter := handle.NewEQFilter(v)
Expand All @@ -296,7 +296,7 @@ func prepareDelete(tc PrepareCase, t *testing.T) {

// not compact
tae.DoAppend(bats[2])
for i := 0; i < int(schema.BlockMaxRows+1); i++ {
for i := 0; i < 11; i++ {
txn, rel := tae.GetRelation()
v := testutil.GetSingleSortKeyValue(bats[2], schema, i)
filter := handle.NewEQFilter(v)
Expand Down Expand Up @@ -489,14 +489,14 @@ func testDelete(tc TestCase, t *testing.T) {
bats := bat.Split(bat.Length())

schema := pc.GetSchema(t)
totalRows := bat.Length() - int(schema.BlockMaxRows+1)*3
totalRows := bat.Length() - 33
txn, rel := tae.GetRelation()
testutil.CheckAllColRowsByScan(t, rel, totalRows, true)
rows := schema.BlockMaxRows*3 + 1
rows := 31
for i := 0; i < 3; i++ {
for j := 0; j < int(rows); j++ {
err := rel.Append(context.Background(), bats[i*int(rows)+j])
if j < int(schema.BlockMaxRows+1) {
if j < 11 {
assert.NoError(t, err)
} else {
assert.True(t, moerr.IsMoErrCode(err, moerr.ErrDuplicateEntry))
Expand Down
48 changes: 39 additions & 9 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/nulls"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/fileservice"
Expand Down Expand Up @@ -675,7 +676,7 @@ func TestAddObjsWithMetaLoc(t *testing.T) {
return
})
assert.True(t, cntOfobj == 1)
assert.True(t, cntOfAobj == 2)
assert.True(t, cntOfAobj == 1)
assert.Nil(t, txn.Commit(context.Background()))
}
}
Expand Down Expand Up @@ -2521,7 +2522,7 @@ func TestSegDelLogtail(t *testing.T) {

require.Equal(t, api.Entry_Insert, resp.Commands[1].EntryType)
require.True(t, strings.HasSuffix(resp.Commands[1].TableName, "obj"))
require.Equal(t, uint32(6), resp.Commands[1].Bat.Vecs[0].Len) /* 2 Objects (create) + 4 (update object info) */
require.Equal(t, uint32(4), resp.Commands[1].Bat.Vecs[0].Len) /* 2 Objects (create) + 2 (update object info) */
// start ts should not be empty
startTSVec := resp.Commands[1].Bat.Vecs[9]
cnStartVec, err := vector.ProtoVectorToVector(startTSVec)
Expand Down Expand Up @@ -2555,7 +2556,7 @@ func TestSegDelLogtail(t *testing.T) {
require.Equal(t, uint32(1), ins.Vecs[0].Len) // 1 deltaloc, skip blks without deltaloc
require.Nil(t, del) // 0 del
require.Nil(t, cnins) // 0 del
require.Equal(t, uint32(6), segdel.Vecs[0].Len) // 2 create + 4 update
require.Equal(t, uint32(4), segdel.Vecs[0].Len) // 2 create + 2 update
require.Equal(t, 12, len(segdel.Vecs))
}
check()
Expand Down Expand Up @@ -3385,7 +3386,7 @@ func TestImmutableIndexInAblk(t *testing.T) {
assert.NoError(t, err)

err = meta.GetObjectData().BatchDedup(
context.Background(), txn, bat.Vecs[1], nil, nil, false, objectio.BloomFilter{}, common.DefaultAllocator,
context.Background(), txn, bat.Vecs[1], nil, nil, false, objectio.BloomFilter{}, 0, common.DefaultAllocator,
)
assert.Error(t, err)
}
Expand Down Expand Up @@ -3875,6 +3876,7 @@ func TestLogtailBasic(t *testing.T) {
schema := catalog.MockSchemaAll(2, -1)
schema.Name = "test"
schema.BlockMaxRows = 10
schema.ObjectMaxBlocks = 1
// craete 2 db and 2 tables
txn, _ := tae.StartTxn(nil)
todropdb, _ := txn.CreateDatabase("todrop", "", "")
Expand Down Expand Up @@ -3966,7 +3968,7 @@ func TestLogtailBasic(t *testing.T) {
// 10 Objects, every Object has 1 blocks
reader = logMgr.GetReader(firstWriteTs, lastWriteTs)
dirties := reader.GetDirtyByTable(dbID, tableID)
require.Equal(t, 10, len(dirties.Objs))
require.Equal(t, 1, len(dirties.Objs))
tots := func(ts types.TS) *timestamp.Timestamp {
return &timestamp.Timestamp{PhysicalTime: types.DecodeInt64(ts[4:12]), LogicalTime: types.DecodeUint32(ts[:4])}
}
Expand Down Expand Up @@ -6384,13 +6386,17 @@ func TestAlterFakePk(t *testing.T) {
// check non-exist column foreach
newSchema := obj.GetRelation().Schema()
blkdata := obj.GetMeta().(*catalog.ObjectEntry).GetObjectData()
sels := []uint32{1, 3}
sels := &nulls.Nulls{}
sels.Add(1)
sels.Add(3)
rows := make([]int, 0, 4)
blkdata.Foreach(context.Background(), newSchema, 0, 1 /*"add1" column*/, func(v any, isnull bool, row int) error {
view, err := blkdata.GetColumnDataById(ctx, txn, newSchema, 0, 1, common.DefaultAllocator)
view.Vecs[0].Foreach(func(v any, isnull bool, row int) error {
require.True(t, true)
rows = append(rows, row)
return nil
}, sels, common.DefaultAllocator)
}, sels)
view.Close()
require.Equal(t, []int{1, 3}, rows)
require.NoError(t, err)
require.NoError(t, txn.Commit(context.Background()))
Expand Down Expand Up @@ -7997,7 +8003,6 @@ func TestDedupSnapshot3(t *testing.T) {
err := rel.BatchDedup(bats[offset].Vecs[3])
txn.Commit(context.Background())
if err != nil {
logutil.Infof("err is %v", err)
return
}

Expand Down Expand Up @@ -9370,3 +9375,28 @@ func TestClearPersistTransferTable(t *testing.T) {
},
)
}

func TestFlushAndReplay(t *testing.T) {
ctx := context.Background()
opts := config.WithLongScanAndCKPOpts(nil)
tae := testutil.NewTestEngine(ctx, ModuleName, t, opts)
defer tae.Close()
schema := catalog.MockSchemaAll(13, 3)
schema.BlockMaxRows = 10
schema.ObjectMaxBlocks = 3
tae.BindSchema(schema)
bat := catalog.MockBatch(schema, 20)
bats := bat.Split(2)
currNB := common.MutMemAllocator.CurrNB()

tae.CreateRelAndAppend(bats[0], true)
ts1 := tae.TxnMgr.Now()
tae.DoAppend(bats[1])
tae.CompactBlocks(true)
assert.Equal(t, currNB, common.MutMemAllocator.CurrNB())
tae.BGCheckpointRunner.ForceIncrementalCheckpoint(ts1, true)
tae.CheckRowsByScan(20, false)
tae.Restart(ctx)
tae.CheckRowsByScan(20, false)
assert.Equal(t, currNB, common.MutMemAllocator.CurrNB())
}
29 changes: 5 additions & 24 deletions pkg/vm/engine/tae/db/test/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,29 +426,10 @@ func TestReplay2(t *testing.T) {
assert.Nil(t, err)
assert.Nil(t, txn.Commit(context.Background()))

txn, err = tae.StartTxn(nil)
assert.Nil(t, err)
e, err = txn.GetDatabase("db")
assert.Nil(t, err)
rel, err = e.GetRelationByName(schema.Name)
assert.Nil(t, err)
blk := testutil.GetOneBlockMeta(rel)
obj, err := rel.GetObject(blk.ID())
assert.Nil(t, err)
err = rel.SoftDeleteObject(obj.GetID())
assert.NoError(t, err)
obj, err = rel.GetObject(obj.GetID())
assert.Nil(t, err)
testutil.MockObjectStats(t, obj)
assert.Nil(t, err)
objMeta := obj.GetMeta().(*catalog.ObjectEntry)
baseNode := objMeta.GetLatestNode().ObjectMVCCNode
err = objectio.SetObjectStatsSize(&baseNode.ObjectStats, 1)
assert.Nil(t, err)
err = objectio.SetObjectStatsRowCnt(&baseNode.ObjectStats, 1)
assert.Nil(t, err)
assert.False(t, baseNode.IsEmpty())
assert.Nil(t, txn.Commit(context.Background()))
txn, rel = testutil.GetRelation(t, 0, tae, "db", schema.Name)
obj := testutil.GetOneObject(rel)
assert.Nil(t, txn.Commit(ctx))
testutil.CompactBlocks(t, 0, tae, "db", schema, true)

t.Log(tae.Catalog.SimplePPString(common.PPL1))
tae.Close()
Expand All @@ -475,7 +456,7 @@ func TestReplay2(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, int32(33), val)
_, _, err = rel.GetValue(id, row, 0)
assert.NotNil(t, err)
assert.Nil(t, err)
assert.Nil(t, txn.Commit(context.Background()))

err = tae2.BGCheckpointRunner.ForceFlush(tae2.TxnMgr.Now(), context.Background(), time.Second*10)
Expand Down
Loading

0 comments on commit 4b7feef

Please sign in to comment.