Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor tn task framework #20698

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
17 changes: 11 additions & 6 deletions pkg/vm/engine/tae/blockio/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/util/metric/stats"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks"
w "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker"
)

var (
Expand Down Expand Up @@ -454,12 +453,18 @@ func (p *IoPipeline) onWait(jobs ...any) {
}

func (p *IoPipeline) crontask(ctx context.Context) {
hb := w.NewHeartBeaterWithFunc(time.Second*40, func() {
logutil.Info(objectio.BitmapPoolReport())
}, nil)
hb.Start()
job := tasks.NewCancelableCronJob(
"io-pipeline",
40*time.Second,
func(ctx context.Context) {
logutil.Info(objectio.BitmapPoolReport())
},
false,
0,
)
job.Start()
<-ctx.Done()
hb.Stop()
job.Stop()
}

func RunPipelineTest(
Expand Down
12 changes: 12 additions & 0 deletions pkg/vm/engine/tae/db/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) {
// 2.1 remove GC disk cron job. no new GC job will be issued from now on
RemoveCronJob(c.db, CronJobs_Name_GCDisk)
RemoveCronJob(c.db, CronJobs_Name_GCCheckpoint)
RemoveCronJob(c.db, CronJobs_Name_Compact)
RemoveCronJob(c.db, CronJobs_Name_MergeScheduler)
if err = c.db.DiskCleaner.SwitchToReplayMode(ctx); err != nil {
// Rollback
return
Expand Down Expand Up @@ -380,6 +382,16 @@ func (c *Controller) handleToWriteCmd(cmd *controlCmd) {
// Rollback
return
}
if err = AddCronJob(
c.db, CronJobs_Name_Compact, true,
); err != nil {
return
}
if err = AddCronJob(
c.db, CronJobs_Name_MergeScheduler, true,
); err != nil {
return
}
if err = CheckCronJobs(c.db, DBTxnMode_Write); err != nil {
// Rollback
return
Expand Down
23 changes: 23 additions & 0 deletions pkg/vm/engine/tae/db/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
CronJobs_Name_GCLogtail = "GC-Logtail"
CronJobs_Name_GCLockMerge = "GC-Lock-Merge"

CronJobs_Name_Compact = "Compact"
CronJobs_Name_MergeScheduler = "Merge-Scheduler"

CronJobs_Name_ReportStats = "Report-Stats"

CronJobs_Name_Checker = "Checker"
Expand All @@ -47,6 +50,8 @@ var CronJobs_Open_WriteMode = []string{
CronJobs_Name_GCLogtail,
CronJobs_Name_GCLockMerge,
CronJobs_Name_ReportStats,
CronJobs_Name_Compact,
CronJobs_Name_MergeScheduler,
}

var CronJobs_Open_ReplayMode = []string{
Expand All @@ -69,6 +74,8 @@ var CronJobs_Spec = map[string][]bool{
CronJobs_Name_GCLockMerge: {true, true, true, false},
CronJobs_Name_ReportStats: {true, true, true, true},
CronJobs_Name_Checker: {true, false, true, false},
CronJobs_Name_MergeScheduler: {true, true, false, false},
CronJobs_Name_Compact: {true, true, false, false},
}

func CanAddCronJob(name string, isWriteModeDB, skipMode bool) bool {
Expand Down Expand Up @@ -222,6 +229,22 @@ func AddCronJob(db *DB, name string, skipMode bool) (err error) {
1,
)
return
case CronJobs_Name_Compact:
err = db.CronJobs.AddJob(
CronJobs_Name_Compact,
db.Opts.CheckpointCfg.ScanInterval,
func(ctx context.Context) { db.LogtailMgr.TryCompactTable() },
1,
)
return
case CronJobs_Name_MergeScheduler:
err = db.CronJobs.AddJob(
CronJobs_Name_MergeScheduler,
db.Opts.CheckpointCfg.ScanInterval,
func(ctx context.Context) { db.MergeScheduler.Schedule() },
0,
)
return
}
err = moerr.NewInternalErrorNoCtxf(
"unknown cron job name: %s", name,
Expand Down
3 changes: 0 additions & 3 deletions pkg/vm/engine/tae/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables"
wb "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker/base"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnbase"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal"
)
Expand Down Expand Up @@ -103,7 +102,6 @@ type DB struct {

CronJobs *tasks.CancelableJobs

BGScanner wb.IHeartbeater
BGCheckpointRunner checkpoint.Runner
BGFlusher checkpoint.Flusher

Expand Down Expand Up @@ -369,7 +367,6 @@ func (db *DB) Close() error {
db.Closed.Store(ErrClosed)
db.Controller.Stop()
db.CronJobs.Reset()
db.BGScanner.Stop()
db.BGFlusher.Stop()
db.BGCheckpointRunner.Stop()
db.Runtime.Scheduler.Stop()
Expand Down
92 changes: 59 additions & 33 deletions pkg/vm/engine/tae/db/merge/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package merge

import (
"sync/atomic"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
Expand All @@ -25,43 +28,51 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
dto "github.com/prometheus/client_model/go"
"time"
)

type Scheduler struct {
*catalog.LoopProcessor
tid uint64

catalog *catalog.Catalog

policies *policyGroup
executor *executor

skipForTransPageLimit bool

rc *resourceController

stopMerge atomic.Bool
}

func NewScheduler(rt *dbutils.Runtime, sched *CNMergeScheduler) *Scheduler {
policySlice := []policy{
newObjOverlapPolicy(),
newObjCompactPolicy(rt.Fs.Service),
newTombstonePolicy(),
}
op := &Scheduler{
LoopProcessor: new(catalog.LoopProcessor),
policies: newPolicyGroup(policySlice...),
executor: newMergeExecutor(rt, sched),
rc: new(resourceController),
}

op.DatabaseFn = op.onDataBase
op.TableFn = op.onTable
op.ObjectFn = op.onObject
op.TombstoneFn = op.onTombstone
op.PostObjectFn = op.onPostObject
op.PostTableFn = op.onPostTable
policies: newPolicyGroup(
newObjOverlapPolicy(),
newObjCompactPolicy(rt.Fs.Service),
newTombstonePolicy(),
),
executor: newMergeExecutor(rt, sched),
rc: new(resourceController),
}
return op
}

func (s *Scheduler) Schedule() {
if s.stopMerge.Load() {
return
}
dbutils.PrintMemStats()
err := s.PreExecute()
if err != nil {
logutil.Errorf("pre execute err %s", err.Error())
}
if err = s.catalog.RecurLoop(s); err != nil {
logutil.Errorf("DBScanner Execute: %v", err)
}
s.rc.printStats()
}

func (s *Scheduler) ConfigPolicy(tbl *catalog.TableEntry, txn txnif.AsyncTxn, c *BasicPolicyConfig) error {
return s.policies.setConfig(tbl, txn, c)
}
Expand Down Expand Up @@ -95,13 +106,8 @@ func (s *Scheduler) PreExecute() error {
return nil
}

func (s *Scheduler) PostExecute() error {
s.rc.printStats()
return nil
}

func (s *Scheduler) onDataBase(*catalog.DBEntry) (err error) {
if StopMerge.Load() {
func (s *Scheduler) OnDatabase(*catalog.DBEntry) (err error) {
if s.stopMerge.Load() {
return moerr.GetOkStopCurrRecur()
}
if s.rc.availableMem() < 100*common.Const1MBytes {
Expand All @@ -115,8 +121,8 @@ func (s *Scheduler) onDataBase(*catalog.DBEntry) (err error) {
return
}

func (s *Scheduler) onTable(tableEntry *catalog.TableEntry) (err error) {
if StopMerge.Load() {
func (s *Scheduler) OnTable(tableEntry *catalog.TableEntry) (err error) {
if s.stopMerge.Load() {
return moerr.GetOkStopCurrRecur()
}

Expand All @@ -141,7 +147,10 @@ func (s *Scheduler) onTable(tableEntry *catalog.TableEntry) (err error) {
return
}

func (s *Scheduler) onPostTable(tableEntry *catalog.TableEntry) (err error) {
func (s *Scheduler) OnPostTable(tableEntry *catalog.TableEntry) (err error) {
if s.stopMerge.Load() {
return moerr.GetOkStopCurrRecur()
}
// base on the info of tableEntry, we can decide whether to merge or not
if s.tid == 0 {
return
Expand All @@ -157,18 +166,27 @@ func (s *Scheduler) onPostTable(tableEntry *catalog.TableEntry) (err error) {
return
}

func (s *Scheduler) onObject(objectEntry *catalog.ObjectEntry) (err error) {
func (s *Scheduler) OnObject(objectEntry *catalog.ObjectEntry) (err error) {
if s.stopMerge.Load() {
return moerr.GetOkStopCurrRecur()
}
if !objectValid(objectEntry) {
return moerr.GetOkStopCurrRecur()
}

s.policies.onObject(objectEntry)
return
}
func (s *Scheduler) onTombstone(objectEntry *catalog.ObjectEntry) (err error) {
return s.onObject(objectEntry)

func (s *Scheduler) OnTombstone(tombstone *catalog.ObjectEntry) error {
return s.OnObject(tombstone)
}

func (s *Scheduler) OnPostDatabase(database *catalog.DBEntry) error {
return nil
}
func (s *Scheduler) onPostObject(*catalog.ObjectEntry) (err error) {

func (s *Scheduler) OnPostObject(object *catalog.ObjectEntry) error {
return nil
}

Expand Down Expand Up @@ -196,6 +214,14 @@ func (s *Scheduler) StartMerge(tblID uint64, reentrant bool) error {
return s.executor.rt.LockMergeService.UnlockFromUser(tblID, reentrant)
}

func (s *Scheduler) StopMergeService() {
s.stopMerge.Store(true)
}

func (s *Scheduler) StartMergeService() {
s.stopMerge.Store(false)
}

func (s *Scheduler) CNActiveObjectsString() string {
return s.executor.cnSched.activeObjsString()
}
Expand Down
19 changes: 17 additions & 2 deletions pkg/vm/engine/tae/db/merge/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func TestStopStartMerge(t *testing.T) {
_, ok := lockService.Indexes()["__mo_index_test"]
require.True(t, ok)

require.Error(t, scheduler.onTable(tblEntry1))
require.Error(t, scheduler.onTable(tblEntry2))
require.Error(t, scheduler.OnTable(tblEntry1))
require.Error(t, scheduler.OnTable(tblEntry2))

require.Empty(t, scheduler.CNActiveObjectsString())

Expand All @@ -91,3 +91,18 @@ func TestStopStartMerge(t *testing.T) {
require.Equal(t, 0, len(lockService.LockedInfos()))
require.Equal(t, 0, len(lockService.Indexes()))
}

func TestSchedule(t *testing.T) {
cnScheduler := NewTaskServiceGetter(func() (taskservice.TaskService, bool) {
return taskservice.NewTaskService(runtime.DefaultRuntime(), taskservice.NewMemTaskStorage()), true
})

scheduler := Scheduler{
executor: newMergeExecutor(&dbutils.Runtime{
LockMergeService: dbutils.NewLockMergeService(),
}, cnScheduler),
}

scheduler.StopMergeService()
require.Error(t, scheduler.OnTombstone(nil))
}
3 changes: 0 additions & 3 deletions pkg/vm/engine/tae/db/merge/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math"
"os"
"slices"
"sync/atomic"
"time"

"github.com/KimMachineGun/automemlimit/memlimit"
Expand All @@ -37,8 +36,6 @@ import (
"github.com/shirou/gopsutil/v3/process"
)

var StopMerge atomic.Bool

type taskHostKind int

const (
Expand Down
14 changes: 2 additions & 12 deletions pkg/vm/engine/tae/db/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks"
w "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnbase"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnimpl"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal"
Expand Down Expand Up @@ -266,20 +265,9 @@ func Open(

db.DBLocker, dbLocker = dbLocker, nil

// Init timed scanner
scanner := NewDBScanner(db, nil)

// w-zr TODO: need to support replay and write mode
db.MergeScheduler = merge.NewScheduler(db.Runtime, merge.NewTaskServiceGetter(opts.TaskServiceGetter))
scanner.RegisterOp(db.MergeScheduler)
db.Wal.Start()
db.BGCheckpointRunner.Start()
db.BGFlusher.Start()

db.BGScanner = w.NewHeartBeater(
opts.CheckpointCfg.ScanInterval,
scanner)
db.BGScanner.Start()
// TODO: WithGCInterval requires configuration parameters
gc2.SetDeleteTimeout(opts.GCCfg.GCDeleteTimeout)
gc2.SetDeleteBatchSize(opts.GCCfg.GCDeleteBatchSize)
Expand Down Expand Up @@ -307,6 +295,8 @@ func Open(

db.CronJobs = tasks.NewCancelableJobs()

db.MergeScheduler = merge.NewScheduler(db.Runtime, merge.NewTaskServiceGetter(opts.TaskServiceGetter))

if err = AddCronJobs(db); err != nil {
return
}
Expand Down
Loading