Skip to content

Commit

Permalink
skip to subscribe to the deleted table when calculating the mo table …
Browse files Browse the repository at this point in the history
…stats. (#21252)

skip to subscribe to the deleted table when calculating the mo table stats.

for the deleted tables:
1. should not subscribe
2. even subscribed, should failed

However, in the current implementation, the mo table stats task may subscribe to a deleted table and succeed, which may cause some other bugs. So, it tries to skip to subscribe to a deleted table.

Approved by: @XuPeng-SH, @zhangxu19830126, @heni02
  • Loading branch information
gouhongshen authored Jan 17, 2025
1 parent 11446b9 commit eb06245
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 38 deletions.
34 changes: 34 additions & 0 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,40 @@ func IsMoErrCode(e error, rc uint16) bool {
return me.code == rc
}

func GetMoErrCode(e error) (uint16, bool) {
if e == nil {
return 0, false
}

me, ok := e.(*Error)
if !ok {
return 0, false
}

return me.code, true
}

func IsSameMoErr(a error, b error) bool {
if a == nil || b == nil {
return false
}

var (
ok bool
aa, bb *Error
)

if aa, ok = a.(*Error); !ok {
return false
}

if bb, ok = b.(*Error); !ok {
return false
}

return aa.code == bb.code
}

func DowncastError(e error) *Error {
if err, ok := e.(*Error); ok {
return err
Expand Down
50 changes: 50 additions & 0 deletions pkg/common/moerr/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,53 @@ func TestEncoding(t *testing.T) {
require.Nil(t, err)
require.Equal(t, e, e2)
}

type fakeErr struct {
}

func (f *fakeErr) Error() string {
return "fake error"
}

func TestIsSameMoErr(t *testing.T) {
var a, b error
require.False(t, IsSameMoErr(a, b))

_, ok := GetMoErrCode(a)
require.False(t, ok)

_, ok = GetMoErrCode(b)
require.False(t, ok)

a = &fakeErr{}
require.False(t, IsSameMoErr(a, b))

_, ok = GetMoErrCode(a)
require.False(t, ok)

b = &fakeErr{}
require.False(t, IsSameMoErr(a, b))

_, ok = GetMoErrCode(b)
require.False(t, ok)

a = GetOkExpectedEOB()
require.False(t, IsSameMoErr(a, b))

code, ok := GetMoErrCode(a)
require.True(t, ok)
require.Equal(t, OkExpectedEOB, code)

b = GetOkExpectedDup()
require.False(t, IsSameMoErr(a, b))

code, ok = GetMoErrCode(b)
require.True(t, ok)
require.Equal(t, OkExpectedDup, code)

b = nil
require.False(t, IsSameMoErr(a, b))

b = GetOkExpectedEOB()
require.True(t, IsSameMoErr(a, b))
}
4 changes: 4 additions & 0 deletions pkg/vm/engine/disttae/cache/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ type TableItem struct {
ClusterByIdx int
}

func (item *TableItem) IsDeleted() bool {
return item.deleted
}

func (item *TableItem) String() string {
return fmt.Sprintln(
"item ptr", uintptr(unsafe.Pointer(item)),
Expand Down
114 changes: 96 additions & 18 deletions pkg/vm/engine/disttae/mo_table_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"unsafe"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/bytejson"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
Expand Down Expand Up @@ -350,6 +351,10 @@ func initMoTableStatsConfig(

eng.dynamicCtx.executorPool = sync.Pool{
New: func() interface{} {
// only test will into this logic
if eng.config.ieFactory == nil {
return nil
}
return eng.config.ieFactory()
},
}
Expand Down Expand Up @@ -411,7 +416,7 @@ func initMoTableStatsConfig(
ants.WithNonblocking(false),
ants.WithPanicHandler(func(e interface{}) {
logutil.Error(logHeader,
zap.String("source", "beta task panic"),
zap.String("source", "gama task panic"),
zap.Any("error", e))
})); err != nil {
return
Expand Down Expand Up @@ -495,6 +500,13 @@ func initMoTableStatsConfig(
return err
}

func (d *dynamicCtx) isBetaRun() bool {
d.Lock()
defer d.Unlock()

return d.beta.running
}

func (d *dynamicCtx) initCronTask(
ctx context.Context,
) bool {
Expand Down Expand Up @@ -548,6 +560,16 @@ type taskState struct {
running bool
executor func(context.Context, string, engine.Engine)
launchTimes int

// for test
forbidden bool
}

func (ts taskState) String() string {
return fmt.Sprintf("running(%v)-launchTimes(%v)-forbidden(%v)",
ts.running,
ts.launchTimes,
ts.forbidden)
}

type dynamicCtx struct {
Expand Down Expand Up @@ -950,6 +972,10 @@ func joinAccountDatabaseTable(

func (d *dynamicCtx) executeSQL(ctx context.Context, sql string, hint string) ie.InternalExecResult {
exec := d.executorPool.Get()
if exec == nil {
return nil
}

defer d.executorPool.Put(exec)

ctx = turn2SysCtx(ctx)
Expand Down Expand Up @@ -1054,7 +1080,7 @@ func (d *dynamicCtx) forceUpdateQuery(
}

if err = d.callAlphaWithRetry(
ctx, d.de.service, pairs, caller, 2); err != nil {
ctx, d.de.service, pairs, caller, 1); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1413,7 +1439,7 @@ func buildTablePairFromCache(
) (tbl tablePair, ok bool) {

item := eng.(*Engine).GetLatestCatalogCache().GetTableById(uint32(accId), dbId, tblId)
if item == nil {
if item == nil || item.IsDeleted() {
// account, db, tbl may delete already
// the `update_time` not change anymore
return
Expand All @@ -1438,8 +1464,14 @@ func buildTablePairFromCache(
}

func (tp *tablePair) String() string {
return fmt.Sprintf("%d-%s(%d)-%s(%d)",
tp.acc, tp.dbName, tp.db, tp.tblName, tp.tbl)
return fmt.Sprintf(
"%d-%s(%d)-%s(%d)-valid(%v)-kind(%v)-alphaOrder(%d)-onlyTS(%v)",
tp.acc, tp.dbName, tp.db,
tp.tblName, tp.tbl,
tp.valid,
tp.relKind,
tp.alphaOrder,
tp.onlyUpdateTS)
}

func (tp *tablePair) Done(err error) {
Expand Down Expand Up @@ -1732,6 +1764,7 @@ func (d *dynamicCtx) alphaTask(
zap.Duration("takes", dur),
zap.Duration("wait err", waitErrDur),
zap.String("caller", caller),
zap.Bool("isBetaRunning", d.isBetaRun()),
zap.String("timeout tbl", timeoutBuf.String()))

v2.AlphaTaskDurationHistogram.Observe(dur.Seconds())
Expand All @@ -1752,19 +1785,23 @@ func (d *dynamicCtx) alphaTask(
errQueue := make(chan alphaError, len(tbls)*2)
ticker = time.NewTicker(time.Millisecond * 10)

var lastError error

for {
select {
case <-ctx.Done():
err = ctx.Err()
return

case ae := <-errQueue:
if ae.err != nil {
if ae.err != nil && !moerr.IsSameMoErr(ae.err, lastError) {
logutil.Error(logHeader,
zap.String("source", "alpha task received err"),
zap.Error(ae.err))
}

lastError = ae.err

tblBackup[ae.order].valid = false

errWaitToReceive--
Expand All @@ -1786,28 +1823,41 @@ func (d *dynamicCtx) alphaTask(
enterWait = true
}

if waitErrDur >= time.Minute*5 {
betaRunning := d.isBetaRun()

if !betaRunning || waitErrDur >= time.Minute*5 {
timeoutCnt := 0
// waiting reached the limit
for i := range tblBackup {
// cannot pin this pState
tblBackup[i].pState = nil
if !tblBackup[i].valid {
continue
}

timeoutCnt++
timeoutBuf.WriteString(fmt.Sprintf("%d-%d(%v)-%d(%v); ",
tblBackup[i].acc, tblBackup[i].db, tblBackup[i].dbName,
tblBackup[i].tbl, tblBackup[i].tblName))
}

if timeoutCnt > 20 {
timeoutBuf.Reset()
timeoutBuf.WriteString(fmt.Sprintf("timeout count: %d", timeoutCnt))
}

return true, nil
}

if waitErrDur > 0 && waitErrDur%(time.Second*30) == 0 {
if d.beta.forbidden || (waitErrDur > 0 && waitErrDur%(time.Second*30) == 0) {

logutil.Warn(logHeader,
zap.String("source", "alpha task"),
zap.String("event", "waited err another 30s"),
zap.String("caller", caller),
zap.Int("total", processed),
zap.Int("left", errWaitToReceive))
zap.Int("left", errWaitToReceive),
zap.Bool("is beta running", betaRunning))
}

continue
Expand Down Expand Up @@ -1879,6 +1929,11 @@ func (d *dynamicCtx) betaTask(
zap.Error(err))
}()

if d.beta.forbidden {
err = moerr.NewInternalErrorNoCtx("forbidden")
return
}

var (
de = eng.(*Engine)
slBat sync.Map
Expand Down Expand Up @@ -1908,11 +1963,15 @@ func (d *dynamicCtx) betaTask(
continue
}

// 1. view
// 2. no update
// 3. deleted table
if tbl.pState == nil {
// 1. view
// 2. no update
//err = updateTableOnlyTS(ctx, service, tbl)
onlyTSBat = append(onlyTSBat, tbl)
if tbl.onlyUpdateTS {
onlyTSBat = append(onlyTSBat, tbl)
}

tbl.Done(nil)
continue
}

Expand Down Expand Up @@ -2459,6 +2518,10 @@ func (d *dynamicCtx) statsCalculateOp(
pState *logtailreplay.PartitionState,
) (sl statsList, err error) {

if pState == nil {
return
}

bcs := betaCycleStash{
born: time.Now(),
snapshot: snapshot,
Expand Down Expand Up @@ -2776,7 +2839,11 @@ func getChangedTableList(
}

defer func() {
err = txnOperator.Commit(newCtx)
if err != nil {
txnOperator.Rollback(newCtx)
} else {
err = txnOperator.Commit(newCtx)
}
}()

proc := process.NewTopProcess(
Expand Down Expand Up @@ -2864,10 +2931,12 @@ func subscribeTable(
databaseName: tbl.dbName,
}

txnTbl.fake = true
txnTbl.eng = eng
txnTbl.relKind = tbl.relKind
txnTbl.primarySeqnum = tbl.pkSequence

if pState, err = eng.(*Engine).PushClient().toSubscribeTable(ctx, &txnTbl); err != nil {
if pState, err = txnTbl.tryToSubscribe(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -3036,10 +3105,10 @@ func applyTombstones(
//
// here, we only collect the deletes that have not paired inserts
// according to its LESS function, the deletes come first
var lastInsert *logtailreplay.RowEntry
var lastInsert logtailreplay.RowEntry
err = pState.ScanRows(true, func(entry *logtailreplay.RowEntry) (bool, error) {
if !entry.Deleted {
lastInsert = entry
lastInsert = *entry
return true, nil
}

Expand Down Expand Up @@ -3085,6 +3154,10 @@ func (d *dynamicCtx) bulkUpdateTableStatsList(
tbl := key.(tablePair)
sl := value.(statsList)

if sl.stats == nil {
return true
}

tbls = append(tbls, tbl)

if val, err = json.Marshal(sl.stats); err != nil {
Expand Down Expand Up @@ -3150,7 +3223,12 @@ func (d *dynamicCtx) bulkUpdateTableOnlyTS(

ret := d.executeSQL(ctx, sql, "bulk update only ts")

var err error
if ret != nil {
err = ret.Error()
}

for i := range tbls {
tbls[i].Done(ret.Error())
tbls[i].Done(err)
}
}
Loading

0 comments on commit eb06245

Please sign in to comment.