diff --git a/pkg/vm/engine/tae/catalog/base.go b/pkg/vm/engine/tae/catalog/base.go index d0e89470a6a63..8ab28765d4223 100644 --- a/pkg/vm/engine/tae/catalog/base.go +++ b/pkg/vm/engine/tae/catalog/base.go @@ -37,7 +37,7 @@ type BaseEntry interface { type BaseEntryImpl[T BaseNode[T]] struct { //chain of MetadataMVCCNode - *txnbase.MVCCChain[*MVCCNode[T]] + txnbase.MVCCChain[*MVCCNode[T]] } func NewBaseEntry[T BaseNode[T]](factory func() T) *BaseEntryImpl[T] { @@ -63,7 +63,7 @@ func (be *BaseEntryImpl[T]) PPStringLocked(level common.PPLevel, depth int, pref func (be *BaseEntryImpl[T]) CreateWithTSLocked(ts types.TS, baseNode T) { node := &MVCCNode[T]{ - EntryMVCCNode: &EntryMVCCNode{ + EntryMVCCNode: EntryMVCCNode{ CreatedAt: ts, }, TxnMVCCNode: txnbase.NewTxnMVCCNodeWithTS(ts), @@ -77,7 +77,7 @@ func (be *BaseEntryImpl[T]) CreateWithTxnLocked(txn txnif.AsyncTxn, baseNode T) logutil.Warnf("unexpected txn is nil: %+v", stack.Callers(0)) } node := &MVCCNode[T]{ - EntryMVCCNode: &EntryMVCCNode{ + EntryMVCCNode: EntryMVCCNode{ CreatedAt: txnif.UncommitTS, }, TxnMVCCNode: txnbase.NewTxnMVCCNodeWithTxn(txn), diff --git a/pkg/vm/engine/tae/catalog/basemvccnode.go b/pkg/vm/engine/tae/catalog/basemvccnode.go index 00d9ccd63fa6b..a60e98882e0a3 100644 --- a/pkg/vm/engine/tae/catalog/basemvccnode.go +++ b/pkg/vm/engine/tae/catalog/basemvccnode.go @@ -64,19 +64,8 @@ func (un EntryMVCCNode) IsCreating() bool { return un.CreatedAt.Equal(&txnif.UncommitTS) } -func (un EntryMVCCNode) IsDropping() bool { - return un.DeletedAt.Equal(&txnif.UncommitTS) -} - -func (un EntryMVCCNode) Clone() *EntryMVCCNode { - return &EntryMVCCNode{ - CreatedAt: un.CreatedAt, - DeletedAt: un.DeletedAt, - } -} - -func (un EntryMVCCNode) CloneData() *EntryMVCCNode { - return &EntryMVCCNode{ +func (un EntryMVCCNode) Clone() EntryMVCCNode { + return EntryMVCCNode{ CreatedAt: un.CreatedAt, DeletedAt: un.DeletedAt, } @@ -201,24 +190,22 @@ type BaseNode[T any] interface { } type MVCCNode[T BaseNode[T]] struct { - *EntryMVCCNode - *txnbase.TxnMVCCNode + EntryMVCCNode + txnbase.TxnMVCCNode BaseNode T - CommitSideEffect func(commitTs types.TS) // used for object replay, no need to persist + CommitSideEffect func(id string, commitTs types.TS) // used for object replay, no need to persist } func NewEmptyMVCCNodeFactory[T BaseNode[T]](factory func() T) func() *MVCCNode[T] { return func() *MVCCNode[T] { return &MVCCNode[T]{ - EntryMVCCNode: &EntryMVCCNode{}, - TxnMVCCNode: &txnbase.TxnMVCCNode{}, - BaseNode: factory(), + BaseNode: factory(), } } } func CompareBaseNode[T BaseNode[T]](e, o *MVCCNode[T]) int { - return e.Compare(o.TxnMVCCNode) + return e.Compare(&o.TxnMVCCNode) } func (e *MVCCNode[T]) CloneAll() *MVCCNode[T] { @@ -235,8 +222,8 @@ func (e *MVCCNode[T]) CloneAll() *MVCCNode[T] { func (e *MVCCNode[T]) CloneData() *MVCCNode[T] { return &MVCCNode[T]{ - EntryMVCCNode: e.EntryMVCCNode.CloneData(), - TxnMVCCNode: &txnbase.TxnMVCCNode{}, + EntryMVCCNode: e.EntryMVCCNode.Clone(), + TxnMVCCNode: txnbase.TxnMVCCNode{}, BaseNode: e.BaseNode.CloneData(), } } @@ -266,7 +253,7 @@ func (e *MVCCNode[T]) ApplyCommit(id string) (err error) { } err = e.EntryMVCCNode.ApplyCommit(commitTS) if e.CommitSideEffect != nil { - e.CommitSideEffect(commitTS) + e.CommitSideEffect(id, commitTS) } return err } diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index abad9f5faace4..7f6c91d09d6e3 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -180,11 +180,11 @@ func (catalog *Catalog) onReplayUpdateObject( obj.table = rel obj.ObjectNode = *cmd.node obj.SortHint = catalog.NextObject() - obj.EntryMVCCNode = *cmd.mvccNode.EntryMVCCNode - obj.CreateNode = *cmd.mvccNode.TxnMVCCNode - cmd.mvccNode.TxnMVCCNode = &obj.CreateNode - cmd.mvccNode.EntryMVCCNode = &obj.EntryMVCCNode - cmd.mvccNode.CommitSideEffect = func(ts types.TS) { + obj.EntryMVCCNode = cmd.mvccNode.EntryMVCCNode + obj.CreateNode = cmd.mvccNode.TxnMVCCNode + cmd.mvccNode.CommitSideEffect = func(id string, ts types.TS) { + obj.CreateNode.ApplyCommit(id) + obj.EntryMVCCNode.ApplyCommit(ts) rel.UpdateReplayEntryTs(obj, ts) } obj.ObjectMVCCNode = *cmd.mvccNode.BaseNode @@ -199,12 +199,12 @@ func (catalog *Catalog) onReplayUpdateObject( obj = cobj.Clone() obj.prevVersion = cobj cobj.nextVersion = obj - obj.EntryMVCCNode = *cmd.mvccNode.EntryMVCCNode - obj.DeleteNode = *cmd.mvccNode.TxnMVCCNode + obj.EntryMVCCNode = cmd.mvccNode.EntryMVCCNode + obj.DeleteNode = cmd.mvccNode.TxnMVCCNode obj.ObjectMVCCNode = *cmd.mvccNode.BaseNode - cmd.mvccNode.TxnMVCCNode = &obj.DeleteNode - cmd.mvccNode.EntryMVCCNode = &obj.EntryMVCCNode - cmd.mvccNode.CommitSideEffect = func(ts types.TS) { + cmd.mvccNode.CommitSideEffect = func(id string, ts types.TS) { + obj.DeleteNode.ApplyCommit(id) + obj.EntryMVCCNode.ApplyCommit(ts) rel.UpdateReplayEntryTs(obj, ts) } obj.ObjectState = ObjectState_Delete_ApplyCommit @@ -346,10 +346,10 @@ func (catalog *Catalog) onReplayCreateDB( } _ = catalog.AddEntryLocked(db, nil, true) un := &MVCCNode[*EmptyMVCCNode]{ - EntryMVCCNode: &EntryMVCCNode{ + EntryMVCCNode: EntryMVCCNode{ CreatedAt: txnNode.End, }, - TxnMVCCNode: txnNode, + TxnMVCCNode: *txnNode, } db.InsertLocked(un) } @@ -429,10 +429,10 @@ func (catalog *Catalog) onReplayCreateTable(dbid, tid uint64, schema *Schema, tx } // alter table un := &MVCCNode[*TableMVCCNode]{ - EntryMVCCNode: &EntryMVCCNode{ + EntryMVCCNode: EntryMVCCNode{ CreatedAt: tblCreatedAt, }, - TxnMVCCNode: txnNode, + TxnMVCCNode: *txnNode, BaseNode: &TableMVCCNode{ Schema: schema, TombstoneSchema: GetTombstoneSchema(schema), @@ -462,10 +462,10 @@ func (catalog *Catalog) onReplayCreateTable(dbid, tid uint64, schema *Schema, tx tbl.tableData = dataFactory.MakeTableFactory()(tbl) _ = db.AddEntryLocked(tbl, nil, true) un := &MVCCNode[*TableMVCCNode]{ - EntryMVCCNode: &EntryMVCCNode{ + EntryMVCCNode: EntryMVCCNode{ CreatedAt: txnNode.End, }, - TxnMVCCNode: txnNode, + TxnMVCCNode: *txnNode, BaseNode: &TableMVCCNode{ Schema: schema, TombstoneSchema: GetTombstoneSchema(schema), @@ -583,7 +583,7 @@ func (catalog *Catalog) onReplayCheckpointObject( rel.AddEntryLocked(obj) _, sarg, _ := fault.TriggerFault("back up UT") if sarg == "" { - obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(obj.CreatedAt) + obj.CreateNode = txnbase.NewTxnMVCCNodeWithTS(obj.CreatedAt) } logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v", objid.String(), rel.fullName, rel.ID, createTS.ToString(), @@ -598,8 +598,8 @@ func (catalog *Catalog) onReplayCheckpointObject( obj = newObject() rel.AddEntryLocked(obj) } - obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(createTS) - obj.DeleteNode = *txnbase.NewTxnMVCCNodeWithTS(deleteTS) + obj.CreateNode = txnbase.NewTxnMVCCNodeWithTS(createTS) + obj.DeleteNode = txnbase.NewTxnMVCCNodeWithTS(deleteTS) } } } diff --git a/pkg/vm/engine/tae/catalog/object.go b/pkg/vm/engine/tae/catalog/object.go index cd5af0d2d04f1..7007344fab6f6 100644 --- a/pkg/vm/engine/tae/catalog/object.go +++ b/pkg/vm/engine/tae/catalog/object.go @@ -111,9 +111,9 @@ func (entry *ObjectEntry) Clone() *ObjectEntry { } func (entry *ObjectEntry) GetCommandMVCCNode() *MVCCNode[*ObjectMVCCNode] { return &MVCCNode[*ObjectMVCCNode]{ - TxnMVCCNode: entry.GetLastMVCCNode(), + TxnMVCCNode: *entry.GetLastMVCCNode(), BaseNode: &entry.ObjectMVCCNode, - EntryMVCCNode: &entry.EntryMVCCNode, + EntryMVCCNode: entry.EntryMVCCNode, } } func (entry *ObjectEntry) GetDropEntry( @@ -122,7 +122,7 @@ func (entry *ObjectEntry) GetDropEntry( dropped = entry.Clone() dropped.ObjectState = ObjectState_Delete_Active dropped.DeletedAt = txnif.UncommitTS - dropped.DeleteNode = *txnbase.NewTxnMVCCNodeWithTxn(txn) + dropped.DeleteNode = txnbase.NewTxnMVCCNodeWithTxn(txn) dropped.GetObjectData().UpdateMeta(dropped) updatedCEntry = entry.Clone() updatedCEntry.nextVersion = dropped @@ -154,7 +154,7 @@ func (entry *ObjectEntry) GetUpdateEntry( return } isNewNode = true - dropped.DeleteNode = *txnbase.NewTxnMVCCNodeWithTxn(txn) + dropped.DeleteNode = txnbase.NewTxnMVCCNodeWithTxn(txn) return } func (entry *ObjectEntry) VisibleByTS(ts types.TS) bool { @@ -313,7 +313,7 @@ func NewObjectEntry( EntryMVCCNode: EntryMVCCNode{ CreatedAt: txnif.UncommitTS, }, - CreateNode: *txnbase.NewTxnMVCCNodeWithTxn(txn), + CreateNode: txnbase.NewTxnMVCCNodeWithTxn(txn), ObjectState: ObjectState_Create_Active, ObjectMVCCNode: ObjectMVCCNode{ ObjectStats: stats, @@ -341,7 +341,7 @@ func NewStandaloneObject(table *TableEntry, ts types.TS, isTombstone bool) *Obje EntryMVCCNode: EntryMVCCNode{ CreatedAt: ts, }, - CreateNode: *txnbase.NewTxnMVCCNodeWithTS(ts), + CreateNode: txnbase.NewTxnMVCCNodeWithTS(ts), ObjectState: ObjectState_Create_ApplyCommit, ObjectMVCCNode: ObjectMVCCNode{ ObjectStats: *stats, @@ -620,7 +620,7 @@ func MockObjEntryWithTbl(tbl *TableEntry, size uint64, isTombstone bool) *Object CreatedAt: ts, }, ObjectMVCCNode: ObjectMVCCNode{*stats}, - CreateNode: *txnbase.NewTxnMVCCNodeWithTS(ts), + CreateNode: txnbase.NewTxnMVCCNodeWithTS(ts), ObjectState: ObjectState_Create_ApplyCommit, } return e diff --git a/pkg/vm/engine/tae/catalog/object_list.go b/pkg/vm/engine/tae/catalog/object_list.go index ad77e4b5122a9..f3657d947c9a2 100644 --- a/pkg/vm/engine/tae/catalog/object_list.go +++ b/pkg/vm/engine/tae/catalog/object_list.go @@ -82,7 +82,7 @@ Committed Object ( LastestNode.Txn == nil ) && ( 0 < CreatedAt < MaxU64, 0 <= De type ObjectList struct { isTombstone bool - *sync.RWMutex + sync.RWMutex maxTs_objectID map[objectio.ObjectId]types.TS tree atomic.Pointer[btree.BTreeG[*ObjectEntry]] } @@ -94,7 +94,6 @@ func NewObjectList(isTombstone bool) *ObjectList { } tree := btree.NewBTreeGOptions((*ObjectEntry).Less, opts) list := &ObjectList{ - RWMutex: &sync.RWMutex{}, maxTs_objectID: make(map[types.Objectid]types.TS), isTombstone: isTombstone, } diff --git a/pkg/vm/engine/tae/catalog/table.go b/pkg/vm/engine/tae/catalog/table.go index 9f5df0730314b..868a06d96ac92 100644 --- a/pkg/vm/engine/tae/catalog/table.go +++ b/pkg/vm/engine/tae/catalog/table.go @@ -762,7 +762,7 @@ func (entry *TableEntry) AlterTable(ctx context.Context, txn txnif.TxnReader, re func (entry *TableEntry) CreateWithTxnAndSchema(txn txnif.AsyncTxn, schema *Schema) { node := &MVCCNode[*TableMVCCNode]{ - EntryMVCCNode: &EntryMVCCNode{ + EntryMVCCNode: EntryMVCCNode{ CreatedAt: txnif.UncommitTS, }, TxnMVCCNode: txnbase.NewTxnMVCCNodeWithTxn(txn), diff --git a/pkg/vm/engine/tae/catalog/table_test.go b/pkg/vm/engine/tae/catalog/table_test.go index d854b7b22d8fb..7c18712b2d695 100644 --- a/pkg/vm/engine/tae/catalog/table_test.go +++ b/pkg/vm/engine/tae/catalog/table_test.go @@ -48,7 +48,7 @@ func TestObjectList(t *testing.T) { CreatedAt: types.BuildTS(1, 0), }, ObjectMVCCNode: ObjectMVCCNode{ObjectStats: *objectio.NewObjectStatsWithObjectID(objectio.NewObjectid(), true, false, false)}, - CreateNode: *txnbase.NewTxnMVCCNodeWithTS(types.BuildTS(1, 0)), + CreateNode: txnbase.NewTxnMVCCNodeWithTS(types.BuildTS(1, 0)), ObjectState: ObjectState_Create_ApplyCommit, } entry2 := entry1.Clone() diff --git a/pkg/vm/engine/tae/txn/txnbase/mvccchain.go b/pkg/vm/engine/tae/txn/txnbase/mvccchain.go index 99f8a2f6eeea9..e92cde25beafa 100644 --- a/pkg/vm/engine/tae/txn/txnbase/mvccchain.go +++ b/pkg/vm/engine/tae/txn/txnbase/mvccchain.go @@ -31,11 +31,11 @@ type MVCCChain[T txnif.MVCCNode[T]] struct { zero T } -func NewMVCCChain[T txnif.MVCCNode[T]](comparefn func(T, T) int, newnodefn func() T, rwlocker *sync.RWMutex) *MVCCChain[T] { +func NewMVCCChain[T txnif.MVCCNode[T]](comparefn func(T, T) int, newnodefn func() T, rwlocker *sync.RWMutex) MVCCChain[T] { if rwlocker == nil { rwlocker = new(sync.RWMutex) } - return &MVCCChain[T]{ + return MVCCChain[T]{ MVCC: common.NewGenericSortedDList(comparefn), RWMutex: rwlocker, comparefn: comparefn, diff --git a/pkg/vm/engine/tae/txn/txnbase/mvccnode.go b/pkg/vm/engine/tae/txn/txnbase/mvccnode.go index 5ab40bac85560..a7deb54def27e 100644 --- a/pkg/vm/engine/tae/txn/txnbase/mvccnode.go +++ b/pkg/vm/engine/tae/txn/txnbase/mvccnode.go @@ -40,27 +40,27 @@ var ( SnapshotAttr_LogIndex_Size = "log_index_size" ) -func NewTxnMVCCNodeWithTxn(txn txnif.TxnReader) *TxnMVCCNode { +func NewTxnMVCCNodeWithTxn(txn txnif.TxnReader) TxnMVCCNode { var ts types.TS if txn != nil { ts = txn.GetStartTS() } - return &TxnMVCCNode{ + return TxnMVCCNode{ Start: ts, Prepare: txnif.UncommitTS, End: txnif.UncommitTS, Txn: txn, } } -func NewTxnMVCCNodeWithTS(ts types.TS) *TxnMVCCNode { - return &TxnMVCCNode{ +func NewTxnMVCCNodeWithTS(ts types.TS) TxnMVCCNode { + return TxnMVCCNode{ Start: ts, Prepare: ts, End: ts, } } -func NewTxnMVCCNodeWithStartEnd(start, end types.TS) *TxnMVCCNode { - return &TxnMVCCNode{ +func NewTxnMVCCNodeWithStartEnd(start, end types.TS) TxnMVCCNode { + return TxnMVCCNode{ Start: start, Prepare: end, End: end, @@ -390,8 +390,8 @@ func (un *TxnMVCCNode) Update(o *TxnMVCCNode) { } } -func (un *TxnMVCCNode) CloneAll() *TxnMVCCNode { - n := &TxnMVCCNode{} +func (un *TxnMVCCNode) CloneAll() TxnMVCCNode { + n := TxnMVCCNode{} n.Start = un.Start n.Prepare = un.Prepare n.End = un.End