Skip to content

Commit

Permalink
remove mvvcnode pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
aptend committed Jan 17, 2025
1 parent 507bc2a commit 0c2dbc7
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 66 deletions.
6 changes: 3 additions & 3 deletions pkg/vm/engine/tae/catalog/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type BaseEntry interface {

type BaseEntryImpl[T BaseNode[T]] struct {
//chain of MetadataMVCCNode
*txnbase.MVCCChain[*MVCCNode[T]]
txnbase.MVCCChain[*MVCCNode[T]]
}

func NewBaseEntry[T BaseNode[T]](factory func() T) *BaseEntryImpl[T] {
Expand All @@ -63,7 +63,7 @@ func (be *BaseEntryImpl[T]) PPStringLocked(level common.PPLevel, depth int, pref

func (be *BaseEntryImpl[T]) CreateWithTSLocked(ts types.TS, baseNode T) {
node := &MVCCNode[T]{
EntryMVCCNode: &EntryMVCCNode{
EntryMVCCNode: EntryMVCCNode{
CreatedAt: ts,
},
TxnMVCCNode: txnbase.NewTxnMVCCNodeWithTS(ts),
Expand All @@ -77,7 +77,7 @@ func (be *BaseEntryImpl[T]) CreateWithTxnLocked(txn txnif.AsyncTxn, baseNode T)
logutil.Warnf("unexpected txn is nil: %+v", stack.Callers(0))
}
node := &MVCCNode[T]{
EntryMVCCNode: &EntryMVCCNode{
EntryMVCCNode: EntryMVCCNode{
CreatedAt: txnif.UncommitTS,
},
TxnMVCCNode: txnbase.NewTxnMVCCNodeWithTxn(txn),
Expand Down
33 changes: 10 additions & 23 deletions pkg/vm/engine/tae/catalog/basemvccnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,8 @@ func (un EntryMVCCNode) IsCreating() bool {
return un.CreatedAt.Equal(&txnif.UncommitTS)
}

func (un EntryMVCCNode) IsDropping() bool {
return un.DeletedAt.Equal(&txnif.UncommitTS)
}

func (un EntryMVCCNode) Clone() *EntryMVCCNode {
return &EntryMVCCNode{
CreatedAt: un.CreatedAt,
DeletedAt: un.DeletedAt,
}
}

func (un EntryMVCCNode) CloneData() *EntryMVCCNode {
return &EntryMVCCNode{
func (un EntryMVCCNode) Clone() EntryMVCCNode {
return EntryMVCCNode{
CreatedAt: un.CreatedAt,
DeletedAt: un.DeletedAt,
}
Expand Down Expand Up @@ -201,24 +190,22 @@ type BaseNode[T any] interface {
}

type MVCCNode[T BaseNode[T]] struct {
*EntryMVCCNode
*txnbase.TxnMVCCNode
EntryMVCCNode
txnbase.TxnMVCCNode
BaseNode T
CommitSideEffect func(commitTs types.TS) // used for object replay, no need to persist
CommitSideEffect func(id string, commitTs types.TS) // used for object replay, no need to persist
}

func NewEmptyMVCCNodeFactory[T BaseNode[T]](factory func() T) func() *MVCCNode[T] {
return func() *MVCCNode[T] {
return &MVCCNode[T]{
EntryMVCCNode: &EntryMVCCNode{},
TxnMVCCNode: &txnbase.TxnMVCCNode{},
BaseNode: factory(),
BaseNode: factory(),
}
}
}

func CompareBaseNode[T BaseNode[T]](e, o *MVCCNode[T]) int {
return e.Compare(o.TxnMVCCNode)
return e.Compare(&o.TxnMVCCNode)
}

func (e *MVCCNode[T]) CloneAll() *MVCCNode[T] {
Expand All @@ -235,8 +222,8 @@ func (e *MVCCNode[T]) CloneAll() *MVCCNode[T] {

func (e *MVCCNode[T]) CloneData() *MVCCNode[T] {
return &MVCCNode[T]{
EntryMVCCNode: e.EntryMVCCNode.CloneData(),
TxnMVCCNode: &txnbase.TxnMVCCNode{},
EntryMVCCNode: e.EntryMVCCNode.Clone(),
TxnMVCCNode: txnbase.TxnMVCCNode{},
BaseNode: e.BaseNode.CloneData(),
}
}
Expand Down Expand Up @@ -266,7 +253,7 @@ func (e *MVCCNode[T]) ApplyCommit(id string) (err error) {
}
err = e.EntryMVCCNode.ApplyCommit(commitTS)
if e.CommitSideEffect != nil {
e.CommitSideEffect(commitTS)
e.CommitSideEffect(id, commitTS)
}
return err
}
Expand Down
38 changes: 19 additions & 19 deletions pkg/vm/engine/tae/catalog/catalogreplay.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ func (catalog *Catalog) onReplayUpdateObject(
obj.table = rel
obj.ObjectNode = *cmd.node
obj.SortHint = catalog.NextObject()
obj.EntryMVCCNode = *cmd.mvccNode.EntryMVCCNode
obj.CreateNode = *cmd.mvccNode.TxnMVCCNode
cmd.mvccNode.TxnMVCCNode = &obj.CreateNode
cmd.mvccNode.EntryMVCCNode = &obj.EntryMVCCNode
cmd.mvccNode.CommitSideEffect = func(ts types.TS) {
obj.EntryMVCCNode = cmd.mvccNode.EntryMVCCNode
obj.CreateNode = cmd.mvccNode.TxnMVCCNode
cmd.mvccNode.CommitSideEffect = func(id string, ts types.TS) {
obj.CreateNode.ApplyCommit(id)
obj.EntryMVCCNode.ApplyCommit(ts)
rel.UpdateReplayEntryTs(obj, ts)
}
obj.ObjectMVCCNode = *cmd.mvccNode.BaseNode
Expand All @@ -199,12 +199,12 @@ func (catalog *Catalog) onReplayUpdateObject(
obj = cobj.Clone()
obj.prevVersion = cobj
cobj.nextVersion = obj
obj.EntryMVCCNode = *cmd.mvccNode.EntryMVCCNode
obj.DeleteNode = *cmd.mvccNode.TxnMVCCNode
obj.EntryMVCCNode = cmd.mvccNode.EntryMVCCNode
obj.DeleteNode = cmd.mvccNode.TxnMVCCNode
obj.ObjectMVCCNode = *cmd.mvccNode.BaseNode
cmd.mvccNode.TxnMVCCNode = &obj.DeleteNode
cmd.mvccNode.EntryMVCCNode = &obj.EntryMVCCNode
cmd.mvccNode.CommitSideEffect = func(ts types.TS) {
cmd.mvccNode.CommitSideEffect = func(id string, ts types.TS) {
obj.DeleteNode.ApplyCommit(id)
obj.EntryMVCCNode.ApplyCommit(ts)
rel.UpdateReplayEntryTs(obj, ts)
}
obj.ObjectState = ObjectState_Delete_ApplyCommit
Expand Down Expand Up @@ -346,10 +346,10 @@ func (catalog *Catalog) onReplayCreateDB(
}
_ = catalog.AddEntryLocked(db, nil, true)
un := &MVCCNode[*EmptyMVCCNode]{
EntryMVCCNode: &EntryMVCCNode{
EntryMVCCNode: EntryMVCCNode{
CreatedAt: txnNode.End,
},
TxnMVCCNode: txnNode,
TxnMVCCNode: *txnNode,
}
db.InsertLocked(un)
}
Expand Down Expand Up @@ -429,10 +429,10 @@ func (catalog *Catalog) onReplayCreateTable(dbid, tid uint64, schema *Schema, tx
}
// alter table
un := &MVCCNode[*TableMVCCNode]{
EntryMVCCNode: &EntryMVCCNode{
EntryMVCCNode: EntryMVCCNode{
CreatedAt: tblCreatedAt,
},
TxnMVCCNode: txnNode,
TxnMVCCNode: *txnNode,
BaseNode: &TableMVCCNode{
Schema: schema,
TombstoneSchema: GetTombstoneSchema(schema),
Expand Down Expand Up @@ -462,10 +462,10 @@ func (catalog *Catalog) onReplayCreateTable(dbid, tid uint64, schema *Schema, tx
tbl.tableData = dataFactory.MakeTableFactory()(tbl)
_ = db.AddEntryLocked(tbl, nil, true)
un := &MVCCNode[*TableMVCCNode]{
EntryMVCCNode: &EntryMVCCNode{
EntryMVCCNode: EntryMVCCNode{
CreatedAt: txnNode.End,
},
TxnMVCCNode: txnNode,
TxnMVCCNode: *txnNode,
BaseNode: &TableMVCCNode{
Schema: schema,
TombstoneSchema: GetTombstoneSchema(schema),
Expand Down Expand Up @@ -583,7 +583,7 @@ func (catalog *Catalog) onReplayCheckpointObject(
rel.AddEntryLocked(obj)
_, sarg, _ := fault.TriggerFault("back up UT")
if sarg == "" {
obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(obj.CreatedAt)
obj.CreateNode = txnbase.NewTxnMVCCNodeWithTS(obj.CreatedAt)
}
logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v",
objid.String(), rel.fullName, rel.ID, createTS.ToString(),
Expand All @@ -598,8 +598,8 @@ func (catalog *Catalog) onReplayCheckpointObject(
obj = newObject()
rel.AddEntryLocked(obj)
}
obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(createTS)
obj.DeleteNode = *txnbase.NewTxnMVCCNodeWithTS(deleteTS)
obj.CreateNode = txnbase.NewTxnMVCCNodeWithTS(createTS)
obj.DeleteNode = txnbase.NewTxnMVCCNodeWithTS(deleteTS)
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/vm/engine/tae/catalog/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func (entry *ObjectEntry) Clone() *ObjectEntry {
}
func (entry *ObjectEntry) GetCommandMVCCNode() *MVCCNode[*ObjectMVCCNode] {
return &MVCCNode[*ObjectMVCCNode]{
TxnMVCCNode: entry.GetLastMVCCNode(),
TxnMVCCNode: *entry.GetLastMVCCNode(),
BaseNode: &entry.ObjectMVCCNode,
EntryMVCCNode: &entry.EntryMVCCNode,
EntryMVCCNode: entry.EntryMVCCNode,
}
}
func (entry *ObjectEntry) GetDropEntry(
Expand All @@ -122,7 +122,7 @@ func (entry *ObjectEntry) GetDropEntry(
dropped = entry.Clone()
dropped.ObjectState = ObjectState_Delete_Active
dropped.DeletedAt = txnif.UncommitTS
dropped.DeleteNode = *txnbase.NewTxnMVCCNodeWithTxn(txn)
dropped.DeleteNode = txnbase.NewTxnMVCCNodeWithTxn(txn)
dropped.GetObjectData().UpdateMeta(dropped)
updatedCEntry = entry.Clone()
updatedCEntry.nextVersion = dropped
Expand Down Expand Up @@ -154,7 +154,7 @@ func (entry *ObjectEntry) GetUpdateEntry(
return
}
isNewNode = true
dropped.DeleteNode = *txnbase.NewTxnMVCCNodeWithTxn(txn)
dropped.DeleteNode = txnbase.NewTxnMVCCNodeWithTxn(txn)
return
}
func (entry *ObjectEntry) VisibleByTS(ts types.TS) bool {
Expand Down Expand Up @@ -313,7 +313,7 @@ func NewObjectEntry(
EntryMVCCNode: EntryMVCCNode{
CreatedAt: txnif.UncommitTS,
},
CreateNode: *txnbase.NewTxnMVCCNodeWithTxn(txn),
CreateNode: txnbase.NewTxnMVCCNodeWithTxn(txn),
ObjectState: ObjectState_Create_Active,
ObjectMVCCNode: ObjectMVCCNode{
ObjectStats: stats,
Expand Down Expand Up @@ -341,7 +341,7 @@ func NewStandaloneObject(table *TableEntry, ts types.TS, isTombstone bool) *Obje
EntryMVCCNode: EntryMVCCNode{
CreatedAt: ts,
},
CreateNode: *txnbase.NewTxnMVCCNodeWithTS(ts),
CreateNode: txnbase.NewTxnMVCCNodeWithTS(ts),
ObjectState: ObjectState_Create_ApplyCommit,
ObjectMVCCNode: ObjectMVCCNode{
ObjectStats: *stats,
Expand Down Expand Up @@ -620,7 +620,7 @@ func MockObjEntryWithTbl(tbl *TableEntry, size uint64, isTombstone bool) *Object
CreatedAt: ts,
},
ObjectMVCCNode: ObjectMVCCNode{*stats},
CreateNode: *txnbase.NewTxnMVCCNodeWithTS(ts),
CreateNode: txnbase.NewTxnMVCCNodeWithTS(ts),
ObjectState: ObjectState_Create_ApplyCommit,
}
return e
Expand Down
3 changes: 1 addition & 2 deletions pkg/vm/engine/tae/catalog/object_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Committed Object ( LastestNode.Txn == nil ) && ( 0 < CreatedAt < MaxU64, 0 <= De

type ObjectList struct {
isTombstone bool
*sync.RWMutex
sync.RWMutex
maxTs_objectID map[objectio.ObjectId]types.TS
tree atomic.Pointer[btree.BTreeG[*ObjectEntry]]
}
Expand All @@ -94,7 +94,6 @@ func NewObjectList(isTombstone bool) *ObjectList {
}
tree := btree.NewBTreeGOptions((*ObjectEntry).Less, opts)
list := &ObjectList{
RWMutex: &sync.RWMutex{},
maxTs_objectID: make(map[types.Objectid]types.TS),
isTombstone: isTombstone,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/catalog/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func (entry *TableEntry) AlterTable(ctx context.Context, txn txnif.TxnReader, re

func (entry *TableEntry) CreateWithTxnAndSchema(txn txnif.AsyncTxn, schema *Schema) {
node := &MVCCNode[*TableMVCCNode]{
EntryMVCCNode: &EntryMVCCNode{
EntryMVCCNode: EntryMVCCNode{
CreatedAt: txnif.UncommitTS,
},
TxnMVCCNode: txnbase.NewTxnMVCCNodeWithTxn(txn),
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/catalog/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestObjectList(t *testing.T) {
CreatedAt: types.BuildTS(1, 0),
},
ObjectMVCCNode: ObjectMVCCNode{ObjectStats: *objectio.NewObjectStatsWithObjectID(objectio.NewObjectid(), true, false, false)},
CreateNode: *txnbase.NewTxnMVCCNodeWithTS(types.BuildTS(1, 0)),
CreateNode: txnbase.NewTxnMVCCNodeWithTS(types.BuildTS(1, 0)),
ObjectState: ObjectState_Create_ApplyCommit,
}
entry2 := entry1.Clone()
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/tae/txn/txnbase/mvccchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ type MVCCChain[T txnif.MVCCNode[T]] struct {
zero T
}

func NewMVCCChain[T txnif.MVCCNode[T]](comparefn func(T, T) int, newnodefn func() T, rwlocker *sync.RWMutex) *MVCCChain[T] {
func NewMVCCChain[T txnif.MVCCNode[T]](comparefn func(T, T) int, newnodefn func() T, rwlocker *sync.RWMutex) MVCCChain[T] {
if rwlocker == nil {
rwlocker = new(sync.RWMutex)
}
return &MVCCChain[T]{
return MVCCChain[T]{
MVCC: common.NewGenericSortedDList(comparefn),
RWMutex: rwlocker,
comparefn: comparefn,
Expand Down
16 changes: 8 additions & 8 deletions pkg/vm/engine/tae/txn/txnbase/mvccnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,27 @@ var (
SnapshotAttr_LogIndex_Size = "log_index_size"
)

func NewTxnMVCCNodeWithTxn(txn txnif.TxnReader) *TxnMVCCNode {
func NewTxnMVCCNodeWithTxn(txn txnif.TxnReader) TxnMVCCNode {
var ts types.TS
if txn != nil {
ts = txn.GetStartTS()
}
return &TxnMVCCNode{
return TxnMVCCNode{
Start: ts,
Prepare: txnif.UncommitTS,
End: txnif.UncommitTS,
Txn: txn,
}
}
func NewTxnMVCCNodeWithTS(ts types.TS) *TxnMVCCNode {
return &TxnMVCCNode{
func NewTxnMVCCNodeWithTS(ts types.TS) TxnMVCCNode {
return TxnMVCCNode{
Start: ts,
Prepare: ts,
End: ts,
}
}
func NewTxnMVCCNodeWithStartEnd(start, end types.TS) *TxnMVCCNode {
return &TxnMVCCNode{
func NewTxnMVCCNodeWithStartEnd(start, end types.TS) TxnMVCCNode {
return TxnMVCCNode{
Start: start,
Prepare: end,
End: end,
Expand Down Expand Up @@ -390,8 +390,8 @@ func (un *TxnMVCCNode) Update(o *TxnMVCCNode) {
}
}

func (un *TxnMVCCNode) CloneAll() *TxnMVCCNode {
n := &TxnMVCCNode{}
func (un *TxnMVCCNode) CloneAll() TxnMVCCNode {
n := TxnMVCCNode{}
n.Start = un.Start
n.Prepare = un.Prepare
n.End = un.End
Expand Down

0 comments on commit 0c2dbc7

Please sign in to comment.