diff --git a/pkg/vm/engine/tae/blockio/pipeline.go b/pkg/vm/engine/tae/blockio/pipeline.go index a3f4f825dcf3e..ae740c05bb6b2 100644 --- a/pkg/vm/engine/tae/blockio/pipeline.go +++ b/pkg/vm/engine/tae/blockio/pipeline.go @@ -30,7 +30,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/util/metric/stats" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" - w "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker" ) var ( @@ -454,12 +453,18 @@ func (p *IoPipeline) onWait(jobs ...any) { } func (p *IoPipeline) crontask(ctx context.Context) { - hb := w.NewHeartBeaterWithFunc(time.Second*40, func() { - logutil.Info(objectio.BitmapPoolReport()) - }, nil) - hb.Start() + job := tasks.NewCancelableCronJob( + "io-pipeline", + 40*time.Second, + func(ctx context.Context) { + logutil.Info(objectio.BitmapPoolReport()) + }, + false, + 0, + ) + job.Start() <-ctx.Done() - hb.Stop() + job.Stop() } func RunPipelineTest( diff --git a/pkg/vm/engine/tae/db/controller.go b/pkg/vm/engine/tae/db/controller.go index db0580d88aeb6..7ec23db7a7267 100644 --- a/pkg/vm/engine/tae/db/controller.go +++ b/pkg/vm/engine/tae/db/controller.go @@ -226,6 +226,8 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) { // 2.1 remove GC disk cron job. no new GC job will be issued from now on RemoveCronJob(c.db, CronJobs_Name_GCDisk) RemoveCronJob(c.db, CronJobs_Name_GCCheckpoint) + RemoveCronJob(c.db, CronJobs_Name_Compact) + RemoveCronJob(c.db, CronJobs_Name_MergeScheduler) if err = c.db.DiskCleaner.SwitchToReplayMode(ctx); err != nil { // Rollback return @@ -380,6 +382,16 @@ func (c *Controller) handleToWriteCmd(cmd *controlCmd) { // Rollback return } + if err = AddCronJob( + c.db, CronJobs_Name_Compact, true, + ); err != nil { + return + } + if err = AddCronJob( + c.db, CronJobs_Name_MergeScheduler, true, + ); err != nil { + return + } if err = CheckCronJobs(c.db, DBTxnMode_Write); err != nil { // Rollback return diff --git a/pkg/vm/engine/tae/db/cronjobs.go b/pkg/vm/engine/tae/db/cronjobs.go index 2a37d53f9a84d..b09f0c431a905 100644 --- a/pkg/vm/engine/tae/db/cronjobs.go +++ b/pkg/vm/engine/tae/db/cronjobs.go @@ -34,6 +34,9 @@ const ( CronJobs_Name_GCLogtail = "GC-Logtail" CronJobs_Name_GCLockMerge = "GC-Lock-Merge" + CronJobs_Name_Compact = "Compact" + CronJobs_Name_MergeScheduler = "Merge-Scheduler" + CronJobs_Name_ReportStats = "Report-Stats" CronJobs_Name_Checker = "Checker" @@ -47,6 +50,8 @@ var CronJobs_Open_WriteMode = []string{ CronJobs_Name_GCLogtail, CronJobs_Name_GCLockMerge, CronJobs_Name_ReportStats, + CronJobs_Name_Compact, + CronJobs_Name_MergeScheduler, } var CronJobs_Open_ReplayMode = []string{ @@ -69,6 +74,8 @@ var CronJobs_Spec = map[string][]bool{ CronJobs_Name_GCLockMerge: {true, true, true, false}, CronJobs_Name_ReportStats: {true, true, true, true}, CronJobs_Name_Checker: {true, false, true, false}, + CronJobs_Name_MergeScheduler: {true, true, false, false}, + CronJobs_Name_Compact: {true, true, false, false}, } func CanAddCronJob(name string, isWriteModeDB, skipMode bool) bool { @@ -222,6 +229,22 @@ func AddCronJob(db *DB, name string, skipMode bool) (err error) { 1, ) return + case CronJobs_Name_Compact: + err = db.CronJobs.AddJob( + CronJobs_Name_Compact, + db.Opts.CheckpointCfg.ScanInterval, + func(ctx context.Context) { db.LogtailMgr.TryCompactTable() }, + 1, + ) + return + case CronJobs_Name_MergeScheduler: + err = db.CronJobs.AddJob( + CronJobs_Name_MergeScheduler, + db.Opts.CheckpointCfg.ScanInterval, + func(ctx context.Context) { db.MergeScheduler.Schedule() }, + 0, + ) + return } err = moerr.NewInternalErrorNoCtxf( "unknown cron job name: %s", name, diff --git a/pkg/vm/engine/tae/db/db.go b/pkg/vm/engine/tae/db/db.go index 6492ab11b45be..2efcdd081bc68 100644 --- a/pkg/vm/engine/tae/db/db.go +++ b/pkg/vm/engine/tae/db/db.go @@ -39,7 +39,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables" - wb "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker/base" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnbase" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal" ) @@ -103,7 +102,6 @@ type DB struct { CronJobs *tasks.CancelableJobs - BGScanner wb.IHeartbeater BGCheckpointRunner checkpoint.Runner BGFlusher checkpoint.Flusher @@ -369,7 +367,6 @@ func (db *DB) Close() error { db.Closed.Store(ErrClosed) db.Controller.Stop() db.CronJobs.Reset() - db.BGScanner.Stop() db.BGFlusher.Stop() db.BGCheckpointRunner.Stop() db.Runtime.Scheduler.Stop() diff --git a/pkg/vm/engine/tae/db/merge/scheduler.go b/pkg/vm/engine/tae/db/merge/scheduler.go index 1f7c685618251..e3586da513eaf 100644 --- a/pkg/vm/engine/tae/db/merge/scheduler.go +++ b/pkg/vm/engine/tae/db/merge/scheduler.go @@ -15,6 +15,9 @@ package merge import ( + "sync/atomic" + "time" + "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" @@ -25,43 +28,51 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" dto "github.com/prometheus/client_model/go" - "time" ) type Scheduler struct { - *catalog.LoopProcessor tid uint64 + catalog *catalog.Catalog + policies *policyGroup executor *executor skipForTransPageLimit bool rc *resourceController + + stopMerge atomic.Bool } func NewScheduler(rt *dbutils.Runtime, sched *CNMergeScheduler) *Scheduler { - policySlice := []policy{ - newObjOverlapPolicy(), - newObjCompactPolicy(rt.Fs.Service), - newTombstonePolicy(), - } op := &Scheduler{ - LoopProcessor: new(catalog.LoopProcessor), - policies: newPolicyGroup(policySlice...), - executor: newMergeExecutor(rt, sched), - rc: new(resourceController), - } - - op.DatabaseFn = op.onDataBase - op.TableFn = op.onTable - op.ObjectFn = op.onObject - op.TombstoneFn = op.onTombstone - op.PostObjectFn = op.onPostObject - op.PostTableFn = op.onPostTable + policies: newPolicyGroup( + newObjOverlapPolicy(), + newObjCompactPolicy(rt.Fs.Service), + newTombstonePolicy(), + ), + executor: newMergeExecutor(rt, sched), + rc: new(resourceController), + } return op } +func (s *Scheduler) Schedule() { + if s.stopMerge.Load() { + return + } + dbutils.PrintMemStats() + err := s.PreExecute() + if err != nil { + logutil.Errorf("pre execute err %s", err.Error()) + } + if err = s.catalog.RecurLoop(s); err != nil { + logutil.Errorf("DBScanner Execute: %v", err) + } + s.rc.printStats() +} + func (s *Scheduler) ConfigPolicy(tbl *catalog.TableEntry, txn txnif.AsyncTxn, c *BasicPolicyConfig) error { return s.policies.setConfig(tbl, txn, c) } @@ -95,13 +106,8 @@ func (s *Scheduler) PreExecute() error { return nil } -func (s *Scheduler) PostExecute() error { - s.rc.printStats() - return nil -} - -func (s *Scheduler) onDataBase(*catalog.DBEntry) (err error) { - if StopMerge.Load() { +func (s *Scheduler) OnDatabase(*catalog.DBEntry) (err error) { + if s.stopMerge.Load() { return moerr.GetOkStopCurrRecur() } if s.rc.availableMem() < 100*common.Const1MBytes { @@ -115,8 +121,8 @@ func (s *Scheduler) onDataBase(*catalog.DBEntry) (err error) { return } -func (s *Scheduler) onTable(tableEntry *catalog.TableEntry) (err error) { - if StopMerge.Load() { +func (s *Scheduler) OnTable(tableEntry *catalog.TableEntry) (err error) { + if s.stopMerge.Load() { return moerr.GetOkStopCurrRecur() } @@ -141,7 +147,10 @@ func (s *Scheduler) onTable(tableEntry *catalog.TableEntry) (err error) { return } -func (s *Scheduler) onPostTable(tableEntry *catalog.TableEntry) (err error) { +func (s *Scheduler) OnPostTable(tableEntry *catalog.TableEntry) (err error) { + if s.stopMerge.Load() { + return moerr.GetOkStopCurrRecur() + } // base on the info of tableEntry, we can decide whether to merge or not if s.tid == 0 { return @@ -157,7 +166,10 @@ func (s *Scheduler) onPostTable(tableEntry *catalog.TableEntry) (err error) { return } -func (s *Scheduler) onObject(objectEntry *catalog.ObjectEntry) (err error) { +func (s *Scheduler) OnObject(objectEntry *catalog.ObjectEntry) (err error) { + if s.stopMerge.Load() { + return moerr.GetOkStopCurrRecur() + } if !objectValid(objectEntry) { return moerr.GetOkStopCurrRecur() } @@ -165,10 +177,16 @@ func (s *Scheduler) onObject(objectEntry *catalog.ObjectEntry) (err error) { s.policies.onObject(objectEntry) return } -func (s *Scheduler) onTombstone(objectEntry *catalog.ObjectEntry) (err error) { - return s.onObject(objectEntry) + +func (s *Scheduler) OnTombstone(tombstone *catalog.ObjectEntry) error { + return s.OnObject(tombstone) +} + +func (s *Scheduler) OnPostDatabase(database *catalog.DBEntry) error { + return nil } -func (s *Scheduler) onPostObject(*catalog.ObjectEntry) (err error) { + +func (s *Scheduler) OnPostObject(object *catalog.ObjectEntry) error { return nil } @@ -196,6 +214,14 @@ func (s *Scheduler) StartMerge(tblID uint64, reentrant bool) error { return s.executor.rt.LockMergeService.UnlockFromUser(tblID, reentrant) } +func (s *Scheduler) StopMergeService() { + s.stopMerge.Store(true) +} + +func (s *Scheduler) StartMergeService() { + s.stopMerge.Store(false) +} + func (s *Scheduler) CNActiveObjectsString() string { return s.executor.cnSched.activeObjsString() } diff --git a/pkg/vm/engine/tae/db/merge/scheduler_test.go b/pkg/vm/engine/tae/db/merge/scheduler_test.go index a360b8d93af70..9aa8d18165f05 100644 --- a/pkg/vm/engine/tae/db/merge/scheduler_test.go +++ b/pkg/vm/engine/tae/db/merge/scheduler_test.go @@ -80,8 +80,8 @@ func TestStopStartMerge(t *testing.T) { _, ok := lockService.Indexes()["__mo_index_test"] require.True(t, ok) - require.Error(t, scheduler.onTable(tblEntry1)) - require.Error(t, scheduler.onTable(tblEntry2)) + require.Error(t, scheduler.OnTable(tblEntry1)) + require.Error(t, scheduler.OnTable(tblEntry2)) require.Empty(t, scheduler.CNActiveObjectsString()) @@ -91,3 +91,18 @@ func TestStopStartMerge(t *testing.T) { require.Equal(t, 0, len(lockService.LockedInfos())) require.Equal(t, 0, len(lockService.Indexes())) } + +func TestSchedule(t *testing.T) { + cnScheduler := NewTaskServiceGetter(func() (taskservice.TaskService, bool) { + return taskservice.NewTaskService(runtime.DefaultRuntime(), taskservice.NewMemTaskStorage()), true + }) + + scheduler := Scheduler{ + executor: newMergeExecutor(&dbutils.Runtime{ + LockMergeService: dbutils.NewLockMergeService(), + }, cnScheduler), + } + + scheduler.StopMergeService() + require.Error(t, scheduler.OnTombstone(nil)) +} diff --git a/pkg/vm/engine/tae/db/merge/utils.go b/pkg/vm/engine/tae/db/merge/utils.go index 8327646f93978..6c59f97a1a913 100644 --- a/pkg/vm/engine/tae/db/merge/utils.go +++ b/pkg/vm/engine/tae/db/merge/utils.go @@ -20,7 +20,6 @@ import ( "math" "os" "slices" - "sync/atomic" "time" "github.com/KimMachineGun/automemlimit/memlimit" @@ -37,8 +36,6 @@ import ( "github.com/shirou/gopsutil/v3/process" ) -var StopMerge atomic.Bool - type taskHostKind int const ( diff --git a/pkg/vm/engine/tae/db/open.go b/pkg/vm/engine/tae/db/open.go index bf1e6802486f0..ab36888c47651 100644 --- a/pkg/vm/engine/tae/db/open.go +++ b/pkg/vm/engine/tae/db/open.go @@ -43,7 +43,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" - w "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnbase" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnimpl" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal" @@ -266,20 +265,9 @@ func Open( db.DBLocker, dbLocker = dbLocker, nil - // Init timed scanner - scanner := NewDBScanner(db, nil) - - // w-zr TODO: need to support replay and write mode - db.MergeScheduler = merge.NewScheduler(db.Runtime, merge.NewTaskServiceGetter(opts.TaskServiceGetter)) - scanner.RegisterOp(db.MergeScheduler) db.Wal.Start() db.BGCheckpointRunner.Start() db.BGFlusher.Start() - - db.BGScanner = w.NewHeartBeater( - opts.CheckpointCfg.ScanInterval, - scanner) - db.BGScanner.Start() // TODO: WithGCInterval requires configuration parameters gc2.SetDeleteTimeout(opts.GCCfg.GCDeleteTimeout) gc2.SetDeleteBatchSize(opts.GCCfg.GCDeleteBatchSize) @@ -307,6 +295,8 @@ func Open( db.CronJobs = tasks.NewCancelableJobs() + db.MergeScheduler = merge.NewScheduler(db.Runtime, merge.NewTaskServiceGetter(opts.TaskServiceGetter)) + if err = AddCronJobs(db); err != nil { return } diff --git a/pkg/vm/engine/tae/db/scanner.go b/pkg/vm/engine/tae/db/scanner.go deleted file mode 100644 index 2a1a6e7736e5b..0000000000000 --- a/pkg/vm/engine/tae/db/scanner.go +++ /dev/null @@ -1,224 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package db - -import ( - "github.com/RoaringBitmap/roaring" - "github.com/matrixorigin/matrixone/pkg/common/moerr" - "github.com/matrixorigin/matrixone/pkg/logutil" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker/base" -) - -type DBScanner interface { - base.IHBHandle - RegisterOp(ScannerOp) -} - -type ErrHandler interface { - OnObjectErr(entry *catalog.ObjectEntry, err error) error - OnTombstoneErr(entry *catalog.ObjectEntry, err error) error - OnTableErr(entry *catalog.TableEntry, err error) error - OnDatabaseErr(entry *catalog.DBEntry, err error) error -} - -type NoopErrHandler struct{} - -func (h *NoopErrHandler) OnObjectErr(entry *catalog.ObjectEntry, err error) error { return nil } -func (h *NoopErrHandler) OnTombstoneErr(entry *catalog.ObjectEntry, err error) error { return nil } -func (h *NoopErrHandler) OnTableErr(entry *catalog.TableEntry, err error) error { return nil } -func (h *NoopErrHandler) OnDatabaseErr(entry *catalog.DBEntry, err error) error { return nil } - -type dbScanner struct { - *catalog.LoopProcessor - db *DB - ops []ScannerOp - errHandler ErrHandler - dbmask *roaring.Bitmap - tablemask *roaring.Bitmap - objmask *roaring.Bitmap -} - -func (scanner *dbScanner) OnStopped() { - logutil.Infof("DBScanner Stopped") -} - -func (scanner *dbScanner) OnExec() { - scanner.dbmask.Clear() - scanner.tablemask.Clear() - scanner.objmask.Clear() - dbutils.PrintMemStats() - - // compact logtail table - scanner.db.LogtailMgr.TryCompactTable() - - for _, op := range scanner.ops { - err := op.PreExecute() - if err != nil { - panic(err) - } - } - if err := scanner.db.Catalog.RecurLoop(scanner); err != nil { - logutil.Errorf("DBScanner Execute: %v", err) - } - for _, op := range scanner.ops { - err := op.PostExecute() - if err != nil { - panic(err) - } - } -} - -func NewDBScanner(db *DB, errHandler ErrHandler) *dbScanner { - if errHandler == nil { - errHandler = new(NoopErrHandler) - } - scanner := &dbScanner{ - LoopProcessor: new(catalog.LoopProcessor), - db: db, - ops: make([]ScannerOp, 0), - errHandler: errHandler, - dbmask: roaring.New(), - tablemask: roaring.New(), - objmask: roaring.New(), - } - scanner.ObjectFn = scanner.onObject - scanner.TombstoneFn = scanner.onTombstone - scanner.PostObjectFn = scanner.onPostObject - scanner.TableFn = scanner.onTable - scanner.PostTableFn = scanner.onPostTable - scanner.DatabaseFn = scanner.onDatabase - scanner.PostDatabaseFn = scanner.onPostDatabase - return scanner -} - -func (scanner *dbScanner) RegisterOp(op ScannerOp) { - scanner.ops = append(scanner.ops, op) -} - -func (scanner *dbScanner) onPostObject(entry *catalog.ObjectEntry) (err error) { - for _, op := range scanner.ops { - err = op.OnPostObject(entry) - if err = scanner.errHandler.OnObjectErr(entry, err); err != nil { - break - } - } - return -} - -func (scanner *dbScanner) onPostTable(entry *catalog.TableEntry) (err error) { - for _, op := range scanner.ops { - err = op.OnPostTable(entry) - if err = scanner.errHandler.OnTableErr(entry, err); err != nil { - break - } - } - return -} - -func (scanner *dbScanner) onPostDatabase(entry *catalog.DBEntry) (err error) { - for _, op := range scanner.ops { - err = op.OnPostDatabase(entry) - if err = scanner.errHandler.OnDatabaseErr(entry, err); err != nil { - break - } - } - return -} - -func (scanner *dbScanner) onObject(entry *catalog.ObjectEntry) (err error) { - scanner.objmask.Clear() - for i, op := range scanner.ops { - if scanner.tablemask.Contains(uint32(i)) { - scanner.objmask.Add(uint32(i)) - continue - } - err = op.OnObject(entry) - if moerr.IsMoErrCode(err, moerr.OkStopCurrRecur) { - scanner.objmask.Add(uint32(i)) - } - if err = scanner.errHandler.OnObjectErr(entry, err); err != nil { - break - } - } - if scanner.objmask.GetCardinality() == uint64(len(scanner.ops)) { - err = moerr.GetOkStopCurrRecur() - } - return -} -func (scanner *dbScanner) onTombstone(entry *catalog.ObjectEntry) (err error) { - scanner.objmask.Clear() - for i, op := range scanner.ops { - if scanner.tablemask.Contains(uint32(i)) { - scanner.objmask.Add(uint32(i)) - continue - } - err = op.OnTombstone(entry) - if moerr.IsMoErrCode(err, moerr.OkStopCurrRecur) { - scanner.objmask.Add(uint32(i)) - } - if err = scanner.errHandler.OnObjectErr(entry, err); err != nil { - break - } - } - if scanner.objmask.GetCardinality() == uint64(len(scanner.ops)) { - err = moerr.GetOkStopCurrRecur() - } - return -} - -func (scanner *dbScanner) onTable(entry *catalog.TableEntry) (err error) { - scanner.tablemask.Clear() - for i, op := range scanner.ops { - // If the specified op was masked OnDatabase. skip it - if scanner.dbmask.Contains(uint32(i)) { - scanner.tablemask.Add(uint32(i)) - continue - } - err = op.OnTable(entry) - if moerr.IsMoErrCode(err, moerr.OkStopCurrRecur) { - scanner.tablemask.Add(uint32(i)) - } - if err = scanner.errHandler.OnTableErr(entry, err); err != nil { - break - } - } - if scanner.tablemask.GetCardinality() == uint64(len(scanner.ops)) { - err = moerr.GetOkStopCurrRecur() - } - return -} - -func (scanner *dbScanner) onDatabase(entry *catalog.DBEntry) (err error) { - // if entry.IsSystemDB() { - // err = catalog.ErrStopCurrRecur - // return - // } - scanner.dbmask.Clear() - for i, op := range scanner.ops { - err = op.OnDatabase(entry) - if moerr.IsMoErrCode(err, moerr.OkStopCurrRecur) { - scanner.dbmask.Add(uint32(i)) - } - if err = scanner.errHandler.OnDatabaseErr(entry, err); err != nil { - break - } - } - if scanner.dbmask.GetCardinality() == uint64(len(scanner.ops)) { - err = moerr.GetOkStopCurrRecur() - } - return -} diff --git a/pkg/vm/engine/tae/db/scannerop.go b/pkg/vm/engine/tae/db/scannerop.go deleted file mode 100644 index c87b2d21cf3e7..0000000000000 --- a/pkg/vm/engine/tae/db/scannerop.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package db - -import ( - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" -) - -type ScannerOp interface { - catalog.Processor - PreExecute() error - PostExecute() error -} diff --git a/pkg/vm/engine/tae/db/scheduler.go b/pkg/vm/engine/tae/db/scheduler.go index 0b9a773a98906..6cf7169536daf 100644 --- a/pkg/vm/engine/tae/db/scheduler.go +++ b/pkg/vm/engine/tae/db/scheduler.go @@ -18,11 +18,9 @@ import ( "fmt" "github.com/matrixorigin/matrixone/pkg/common/moerr" - "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops/base" ) var ( @@ -51,7 +49,7 @@ func newTaskScheduler(db *DB, asyncWorkers int, ioWorkers int) *taskScheduler { jobHandler.Start() jobDispatcher.RegisterHandler(tasks.DataCompactionTask, jobHandler) // jobDispatcher.RegisterHandler(tasks.GCTask, jobHandler) - gcHandler := tasks.NewSingleWorkerHandler(db.Opts.Ctx, "gc") + gcHandler := tasks.NewBaseEventHandler(db.Opts.Ctx, "gc") gcHandler.Start() jobDispatcher.RegisterHandler(tasks.GCTask, gcHandler) @@ -61,14 +59,14 @@ func newTaskScheduler(db *DB, asyncWorkers int, ioWorkers int) *taskScheduler { ckpDispatcher := tasks.NewBaseScopedDispatcher(tasks.DefaultScopeSharder) for i := 0; i < 4; i++ { - handler := tasks.NewSingleWorkerHandler(db.Opts.Ctx, fmt.Sprintf("[ckpworker-%d]", i)) + handler := tasks.NewBaseEventHandler(db.Opts.Ctx, fmt.Sprintf("[ckpworker-%d]", i)) ckpDispatcher.AddHandle(handler) handler.Start() } ioDispatcher := tasks.NewBaseScopedDispatcher(nil) for i := 0; i < ioWorkers; i++ { - handler := tasks.NewSingleWorkerHandler(db.Opts.Ctx, fmt.Sprintf("[ioworker-%d]", i)) + handler := tasks.NewBaseEventHandler(db.Opts.Ctx, fmt.Sprintf("[ioworker-%d]", i)) ioDispatcher.AddHandle(handler) handler.Start() } @@ -110,7 +108,7 @@ func (s *taskScheduler) ScheduleMultiScopedTxnTaskWithObserver( taskType tasks.TaskType, scopes []common.ID, factory tasks.TxnTaskFactory, - observers ...base.Observer) (task tasks.Task, err error) { + observers ...tasks.Observer) (task tasks.Task, err error) { task = NewScheduledTxnTask(ctx, s.db, taskType, scopes, factory) for _, observer := range observers { task.AddObserver(observer) diff --git a/pkg/vm/engine/tae/db/task.go b/pkg/vm/engine/tae/db/task.go index 038ded614de15..d017518b1cb48 100644 --- a/pkg/vm/engine/tae/db/task.go +++ b/pkg/vm/engine/tae/db/task.go @@ -61,7 +61,7 @@ func (task *ScheduledTxnTask) Execute(ctx context.Context) (err error) { logutil.Warnf("Execute ScheduleTxnTask: %v. Rollbacked", err) return } - err = txnTask.OnExec(task.db.Opts.Ctx) + err = txnTask.OnExec(ctx) if err != nil { logutil.Warnf("Task[%d] exec error: %v", task.ID(), err) err2 := txn.Rollback(ctx) diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index efa62d0ea3f93..52529bac1d8ff 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -65,7 +65,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" - ops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils/config" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnbase" @@ -681,7 +680,7 @@ func TestAddObjsWithMetaLoc(t *testing.T) { db := testutil.InitTestDB(ctx, ModuleName, t, opts) defer db.Close() - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop() schema := catalog.MockSchemaAll(13, 2) @@ -859,7 +858,7 @@ func TestCompactMemAlter(t *testing.T) { db := testutil.InitTestDB(ctx, ModuleName, t, opts) defer db.Close() - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop() schema := catalog.MockSchemaAll(5, 2) @@ -920,7 +919,7 @@ func TestFlushTableMergeOrder(t *testing.T) { tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop() @@ -994,7 +993,7 @@ func TestFlushTableMergeOrderPK(t *testing.T) { tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop() @@ -1070,7 +1069,7 @@ func TestFlushTableNoPk(t *testing.T) { tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop() schema := catalog.MockSchemaAll(13, -1) @@ -1106,7 +1105,7 @@ func TestFlushTableDroppedEntry(t *testing.T) { tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop() schema := catalog.MockSchemaAll(3, 1) @@ -1172,7 +1171,7 @@ func TestFlushTableErrorHandle(t *testing.T) { tae := testutil.NewTestEngine(context.Background(), ModuleName, t, opts) defer tae.Close() - worker := ops.NewOpWorker(ctx, "xx") + worker := tasks.NewOpWorker(ctx, "xx") worker.Start() defer worker.Stop() schema := catalog.MockSchemaAll(13, 2) @@ -1223,10 +1222,10 @@ func TestFlushTableErrorHandle2(t *testing.T) { tae := testutil.NewTestEngine(context.Background(), ModuleName, t, opts) defer tae.Close() - worker := ops.NewOpWorker(ctx, "xx") + worker := tasks.NewOpWorker(ctx, "xx") worker.Start() defer worker.Stop() - goodworker := ops.NewOpWorker(context.Background(), "goodworker") + goodworker := tasks.NewOpWorker(context.Background(), "goodworker") goodworker.Start() defer goodworker.Stop() schema := catalog.MockSchemaAll(13, 2) @@ -1236,7 +1235,7 @@ func TestFlushTableErrorHandle2(t *testing.T) { bat1, bat2 := bats[0], bats[1] defer bat1.Close() defer bat2.Close() - flushTable := func(worker *ops.OpWorker) { + flushTable := func(worker *tasks.OpWorker) { txn, rel := testutil.GetDefaultRelation(t, tae.DB, schema.Name) blkMetas := testutil.GetAllAppendableMetas(rel, false) tombstoneMetas := testutil.GetAllAppendableMetas(rel, true) @@ -1274,7 +1273,7 @@ func TestFlushTabletail(t *testing.T) { tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop() schema := catalog.MockSchemaAll(13, 2) @@ -3582,12 +3581,12 @@ func TestDelete3(t *testing.T) { // this task won't affect logic of TestAppend2, it just prints logs about dirty count forest := logtail.NewDirtyCollector(tae.LogtailMgr, opts.Clock, tae.Catalog, new(catalog.LoopProcessor)) - hb := ops.NewHeartBeaterWithFunc(5*time.Millisecond, func() { + job := tasks.NewCancelableCronJob("TestDelete3", 5*time.Millisecond, func(ctx context.Context) { forest.Run(0) t.Log(forest.String()) - }, nil) - hb.Start() - defer hb.Stop() + }, false, 0) + job.Start() + defer job.Stop() schema := catalog.MockSchemaAll(3, 2) schema.Extra.BlockMaxRows = 10 schema.Extra.ObjectMaxBlocks = 2 @@ -6468,12 +6467,12 @@ func TestAppendAndGC(t *testing.T) { opts := new(options.Options) opts = config.WithQuickScanAndCKPOpts(opts) options.WithDisableGCCheckpoint()(opts) - merge.StopMerge.Store(true) - defer merge.StopMerge.Store(false) tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() db := tae.DB + db.MergeScheduler.StopMergeService() + defer db.MergeScheduler.StartMergeService() schema1 := catalog.MockSchemaAll(13, 2) schema1.Extra.BlockMaxRows = 10 @@ -6857,11 +6856,11 @@ func TestSnapshotMeta(t *testing.T) { opts := new(options.Options) opts = config.WithQuickScanAndCKPOpts(opts) options.WithDisableGCCheckpoint()(opts) - merge.StopMerge.Store(true) - defer merge.StopMerge.Store(false) tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() db := tae.DB + db.MergeScheduler.StopMergeService() + defer db.MergeScheduler.StartMergeService() snapshotSchema := catalog.MockSnapShotSchema() snapshotSchema.Extra.BlockMaxRows = 2 @@ -10122,25 +10121,31 @@ func TestStartStopTableMerge(t *testing.T) { tbl := rel.GetMeta().(*catalog.TableEntry) require.NoError(t, scheduler.StopMerge(tbl, false)) - require.ErrorIs(t, scheduler.LoopProcessor.OnTable(tbl), moerr.GetOkStopCurrRecur()) + require.ErrorIs(t, scheduler.OnTable(tbl), moerr.GetOkStopCurrRecur()) require.NoError(t, scheduler.StartMerge(tbl.GetID(), false)) - require.NoError(t, scheduler.LoopProcessor.OnTable(tbl)) + require.NoError(t, scheduler.OnTable(tbl)) require.Error(t, scheduler.StartMerge(tbl.GetID(), false)) require.NoError(t, scheduler.StopMerge(tbl, true)) - require.ErrorIs(t, scheduler.LoopProcessor.OnTable(tbl), moerr.GetOkStopCurrRecur()) + require.ErrorIs(t, scheduler.OnTable(tbl), moerr.GetOkStopCurrRecur()) require.NoError(t, scheduler.StopMerge(tbl, true)) - require.ErrorIs(t, scheduler.LoopProcessor.OnTable(tbl), moerr.GetOkStopCurrRecur()) + require.ErrorIs(t, scheduler.OnTable(tbl), moerr.GetOkStopCurrRecur()) require.Error(t, scheduler.StopMerge(tbl, false)) - require.ErrorIs(t, scheduler.LoopProcessor.OnTable(tbl), moerr.GetOkStopCurrRecur()) + require.ErrorIs(t, scheduler.OnTable(tbl), moerr.GetOkStopCurrRecur()) require.NoError(t, scheduler.StartMerge(tbl.GetID(), true)) - require.ErrorIs(t, scheduler.LoopProcessor.OnTable(tbl), moerr.GetOkStopCurrRecur()) + require.ErrorIs(t, scheduler.OnTable(tbl), moerr.GetOkStopCurrRecur()) require.NoError(t, scheduler.StartMerge(tbl.GetID(), true)) - require.NoError(t, scheduler.LoopProcessor.OnTable(tbl)) + require.NoError(t, scheduler.OnTable(tbl)) + require.NoError(t, scheduler.OnPostTable(tbl)) + + scheduler.StopMergeService() + require.ErrorIs(t, scheduler.OnTable(tbl), moerr.GetOkStopCurrRecur()) + require.ErrorIs(t, scheduler.OnPostTable(tbl), moerr.GetOkStopCurrRecur()) + scheduler.StartMergeService() } func TestDeleteByPhyAddrKeys(t *testing.T) { diff --git a/pkg/vm/engine/tae/rpc/inspect.go b/pkg/vm/engine/tae/rpc/inspect.go index bb612c58a6c15..586058ea08322 100644 --- a/pkg/vm/engine/tae/rpc/inspect.go +++ b/pkg/vm/engine/tae/rpc/inspect.go @@ -843,10 +843,10 @@ func (c *mergePolicyArg) Run() error { common.RuntimeMaxObjOsize.Store(maxosize) common.RuntimeMinCNMergeSize.Store(cnsize) if c.maxMergeObjN == 0 && c.minOsizeQualified == 0 { - merge.StopMerge.Store(true) + c.ctx.db.MergeScheduler.StopMergeService() c.ctx.resp.Payload = []byte("auto merge is disabled") } else { - merge.StopMerge.Store(false) + c.ctx.db.MergeScheduler.StartMergeService() c.ctx.resp.Payload = []byte("general setting has been refreshed") } } else { diff --git a/pkg/vm/engine/tae/tables/txnentries/flushTableTail_test.go b/pkg/vm/engine/tae/tables/txnentries/flushTableTail_test.go index cbf37a5ce95cf..b024a92d74222 100644 --- a/pkg/vm/engine/tae/tables/txnentries/flushTableTail_test.go +++ b/pkg/vm/engine/tae/tables/txnentries/flushTableTail_test.go @@ -26,7 +26,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" - iops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops/base" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils" ) @@ -49,7 +48,7 @@ func (tSched *testScheduler) ScheduleMultiScopedTxnTask(ctx *tasks.Context, task panic("implement me") } -func (tSched *testScheduler) ScheduleMultiScopedTxnTaskWithObserver(ctx *tasks.Context, taskType tasks.TaskType, scopes []common.ID, factory tasks.TxnTaskFactory, observers ...iops.Observer) (tasks.Task, error) { +func (tSched *testScheduler) ScheduleMultiScopedTxnTaskWithObserver(ctx *tasks.Context, taskType tasks.TaskType, scopes []common.ID, factory tasks.TxnTaskFactory, observers ...tasks.Observer) (tasks.Task, error) { //TODO implement me panic("implement me") } diff --git a/pkg/vm/engine/tae/tasks/base.go b/pkg/vm/engine/tae/tasks/base.go index 302f156a85899..418449443e200 100644 --- a/pkg/vm/engine/tae/tasks/base.go +++ b/pkg/vm/engine/tae/tasks/base.go @@ -20,15 +20,13 @@ import ( "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops/base" ) var WaitableCtx = &Context{Waitable: true} type Context struct { ID uint64 - DoneCB ops.OpDoneCB + DoneCB OpDoneCB Waitable bool } @@ -37,7 +35,7 @@ type Context struct { // } type BaseTask struct { - ops.Op + Op impl Task id uint64 taskType TaskType @@ -56,7 +54,7 @@ func NewBaseTask(impl Task, taskType TaskType, ctx *Context) *BaseTask { taskType: taskType, impl: impl, } - var doneCB ops.OpDoneCB + var doneCB OpDoneCB if ctx != nil { if ctx.DoneCB == nil && !ctx.Waitable { doneCB = task.onDone @@ -67,20 +65,20 @@ func NewBaseTask(impl Task, taskType TaskType, ctx *Context) *BaseTask { if impl == nil { impl = task } - task.Op = ops.Op{ - Impl: impl.(base.IOpInternal), - DoneCB: doneCB, + task.Op = Op{ + impl: impl.(IOpInternal), + doneCB: doneCB, } if doneCB == nil { - task.Op.ErrorC = make(chan error, 1) + task.Op.errorC = make(chan error, 1) } return task } -func (task *BaseTask) onDone(_ base.IOp) { +func (task *BaseTask) onDone(_ IOp) { logutil.Debug("[Done]", common.OperationField(task.impl.Name()), common.DurationField(time.Duration(task.GetExecutTime())), - common.ErrorField(task.Err)) + common.ErrorField(task.err)) } func (task *BaseTask) Type() TaskType { return task.taskType } func (task *BaseTask) Cancel() (err error) { panic("todo") } diff --git a/pkg/vm/engine/tae/tasks/dispatcher.go b/pkg/vm/engine/tae/tasks/dispatcher.go index c91d530cd6883..a92bc0cb69dca 100644 --- a/pkg/vm/engine/tae/tasks/dispatcher.go +++ b/pkg/vm/engine/tae/tasks/dispatcher.go @@ -32,7 +32,7 @@ type Dispatcher interface { } type TaskHandler interface { - io.Closer + Stop() Start() Enqueue(Task) Execute(Task) @@ -43,10 +43,9 @@ type BaseDispatcher struct { } func NewBaseDispatcher() *BaseDispatcher { - d := &BaseDispatcher{ + return &BaseDispatcher{ handlers: make(map[TaskType]TaskHandler), } - return d } func (d *BaseDispatcher) Dispatch(task Task) { @@ -63,7 +62,7 @@ func (d *BaseDispatcher) RegisterHandler(t TaskType, h TaskHandler) { func (d *BaseDispatcher) Close() error { for _, h := range d.handlers { - h.Close() + h.Stop() } return nil } @@ -107,7 +106,7 @@ func (d *BaseScopedDispatcher) Dispatch(task Task) { func (d *BaseScopedDispatcher) Close() error { for _, h := range d.handlers { - h.Close() + h.Stop() } return nil } diff --git a/pkg/vm/engine/tae/tasks/handler.go b/pkg/vm/engine/tae/tasks/handler.go index 11aec10794dea..719b33825938a 100644 --- a/pkg/vm/engine/tae/tasks/handler.go +++ b/pkg/vm/engine/tae/tasks/handler.go @@ -16,27 +16,29 @@ package tasks import ( "context" + "sync" + "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/logutil" - ops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker" + "github.com/panjf2000/ants/v2" ) var ( ErrTaskHandleEnqueue = moerr.NewInternalErrorNoCtx("tae: task handle enqueue") ) -type BaseTaskHandler struct { - ops.OpWorker +type baseTaskHandler struct { + OpWorker } -func NewBaseEventHandler(ctx context.Context, name string) *BaseTaskHandler { - h := &BaseTaskHandler{ - OpWorker: *ops.NewOpWorker(ctx, name), +func NewBaseEventHandler(ctx context.Context, name string) *baseTaskHandler { + h := &baseTaskHandler{ + OpWorker: *NewOpWorker(ctx, name), } return h } -func (h *BaseTaskHandler) Enqueue(task Task) { +func (h *baseTaskHandler) Enqueue(task Task) { if !h.SendOp(task) { task.SetError(ErrTaskHandleEnqueue) err := task.Cancel() @@ -46,22 +48,58 @@ func (h *BaseTaskHandler) Enqueue(task Task) { } } -func (h *BaseTaskHandler) Execute(task Task) { - h.ExecFunc(task) +func (h *baseTaskHandler) Execute(task Task) { + h.execFunc(task) } -func (h *BaseTaskHandler) Close() error { - h.Stop() - return nil -} +var ( + poolHandlerName = "PoolHandler" +) -type singleWorkerHandler struct { - BaseTaskHandler +type poolHandler struct { + baseTaskHandler + opExec OpExecFunc + pool *ants.Pool + wg *sync.WaitGroup } -func NewSingleWorkerHandler(ctx context.Context, name string) *singleWorkerHandler { - h := &singleWorkerHandler{ - BaseTaskHandler: *NewBaseEventHandler(ctx, name), +func NewPoolHandler(ctx context.Context, num int) *poolHandler { + pool, err := ants.NewPool(num) + if err != nil { + panic(err) } + h := &poolHandler{ + baseTaskHandler: *NewBaseEventHandler(ctx, poolHandlerName), + pool: pool, + wg: &sync.WaitGroup{}, + } + h.opExec = h.execFunc + h.execFunc = h.doHandle return h } + +func (h *poolHandler) Execute(task Task) { + h.opExec(task) +} + +func (h *poolHandler) doHandle(op IOp) { + closure := func(o IOp, wg *sync.WaitGroup) func() { + return func() { + h.opExec(o) + wg.Done() + } + } + h.wg.Add(1) + err := h.pool.Submit(closure(op, h.wg)) + if err != nil { + logutil.Warnf("%v", err) + op.SetError(err) + h.wg.Done() + } +} + +func (h *poolHandler) Stop() { + h.pool.Release() + h.baseTaskHandler.Stop() + h.wg.Wait() +} diff --git a/pkg/vm/engine/tae/tasks/ops.go b/pkg/vm/engine/tae/tasks/ops.go new file mode 100644 index 0000000000000..3d3e1ca754128 --- /dev/null +++ b/pkg/vm/engine/tae/tasks/ops.go @@ -0,0 +1,94 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "time" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" +) + +func (op *Op) GetError() error { + return op.err +} + +func (op *Op) SetError(err error) { + op.endTime = time.Now() + op.err = err + if op.errorC != nil { + op.errorC <- err + } else if op.doneCB != nil { + op.doneCB(op) + } else { + panic("logic error") + } + for _, observer := range op.observers { + observer.OnExecDone(op.impl) + } +} + +func (op *Op) Waitable() bool { + return op.doneCB == nil +} + +func (op *Op) WaitDone(ctx context.Context) error { + if op.waitedOnce.Load() { + return moerr.NewTAEErrorNoCtx("wait done twice") + } + defer op.waitedOnce.Store(true) + + if op.errorC == nil { + return moerr.NewTAEErrorNoCtx("wait done without error channel") + } + select { + case err := <-op.errorC: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +func (op *Op) Execute() error { + return nil +} + +func (op *Op) OnExec(ctx context.Context) error { + op.startTime = time.Now() + return op.impl.Execute(ctx) +} + +func (op *Op) GetCreateTime() time.Time { + return op.createTime +} + +func (op *Op) GetStartTime() time.Time { + return op.startTime +} + +func (op *Op) GetEndTime() time.Time { + return op.endTime +} + +func (op *Op) GetExecutTime() int64 { + return op.endTime.Sub(op.startTime).Microseconds() +} + +func (op *Op) AddObserver(o Observer) { + if op.observers == nil { + op.observers = make([]Observer, 0) + } + op.observers = append(op.observers, o) +} diff --git a/pkg/vm/engine/tae/tasks/ops/base/types.go b/pkg/vm/engine/tae/tasks/ops/base/types.go deleted file mode 100644 index f88dd6c831f95..0000000000000 --- a/pkg/vm/engine/tae/tasks/ops/base/types.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package base - -import ( - "context" - "time" -) - -type Observer interface { - OnExecDone(any) -} - -type IOpInternal interface { - PreExecute() error - Execute(ctx context.Context) error - PostExecute() error -} - -type IOp interface { - OnExec(ctx context.Context) error - SetError(err error) - GetError() error - WaitDone(ctx context.Context) error - Waitable() bool - GetCreateTime() time.Time - GetStartTime() time.Time - GetEndTime() time.Time - GetExecutTime() int64 - AddObserver(Observer) -} diff --git a/pkg/vm/engine/tae/tasks/ops/ops.go b/pkg/vm/engine/tae/tasks/ops/ops.go deleted file mode 100644 index eaa29f556de8a..0000000000000 --- a/pkg/vm/engine/tae/tasks/ops/ops.go +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ops - -import ( - "context" - "time" - - "github.com/matrixorigin/matrixone/pkg/common/moerr" - iops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops/base" - iworker "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker/base" -) - -func NewOp(impl iops.IOpInternal, w iworker.IOpWorker) *Op { - op := &Op{ - Impl: impl, - Worker: w, - CreateTime: time.Now(), - } - return op -} - -func (op *Op) Push() error { - r := op.Worker.SendOp(op) - if !r { - return moerr.NewInternalErrorNoCtx("send op error") - } - return nil -} - -func (op *Op) GetError() error { - return op.Err -} - -func (op *Op) SetError(err error) { - op.EndTime = time.Now() - op.Err = err - if op.ErrorC != nil { - op.ErrorC <- err - } else if op.DoneCB != nil { - op.DoneCB(op) - } else { - panic("logic error") - } - if op.Observers != nil { - for _, observer := range op.Observers { - observer.OnExecDone(op.Impl) - } - } -} - -func (op *Op) Waitable() bool { - return op.DoneCB == nil -} - -func (op *Op) WaitDone(ctx context.Context) error { - if op.WaitedOnce.Load() { - return moerr.NewTAEErrorNoCtx("wait done twice") - } - defer op.WaitedOnce.Store(true) - - if op.ErrorC == nil { - return moerr.NewTAEErrorNoCtx("wait done without error channel") - } - select { - case err := <-op.ErrorC: - return err - case <-ctx.Done(): - return ctx.Err() - } -} - -func (op *Op) PreExecute() error { - return nil -} - -func (op *Op) PostExecute() error { - return nil -} - -func (op *Op) Execute() error { - return nil -} - -func (op *Op) OnExec(ctx context.Context) error { - op.StartTime = time.Now() - err := op.Impl.PreExecute() - if err != nil { - return err - } - err = op.Impl.Execute(ctx) - if err != nil { - return err - } - err = op.Impl.PostExecute() - return err -} - -func (op *Op) GetCreateTime() time.Time { - return op.CreateTime -} - -func (op *Op) GetStartTime() time.Time { - return op.StartTime -} - -func (op *Op) GetEndTime() time.Time { - return op.EndTime -} - -func (op *Op) GetExecutTime() int64 { - return op.EndTime.Sub(op.StartTime).Microseconds() -} - -func (op *Op) AddObserver(o iops.Observer) { - if op.Observers == nil { - op.Observers = make([]iops.Observer, 0) - } - op.Observers = append(op.Observers, o) -} diff --git a/pkg/vm/engine/tae/tasks/ops/types.go b/pkg/vm/engine/tae/tasks/ops/types.go deleted file mode 100644 index 68edd265d9521..0000000000000 --- a/pkg/vm/engine/tae/tasks/ops/types.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ops - -import ( - "sync/atomic" - "time" - - iops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops/base" - iworker "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker/base" -) - -type OpDoneCB = func(iops.IOp) - -type Op struct { - Impl iops.IOpInternal - ErrorC chan error - WaitedOnce atomic.Bool - Worker iworker.IOpWorker - Err error - Result any - CreateTime time.Time - StartTime time.Time - EndTime time.Time - DoneCB OpDoneCB - Observers []iops.Observer -} diff --git a/pkg/vm/engine/tae/tasks/poolhandler.go b/pkg/vm/engine/tae/tasks/poolhandler.go deleted file mode 100644 index 1e46657d9f86b..0000000000000 --- a/pkg/vm/engine/tae/tasks/poolhandler.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tasks - -import ( - "context" - "sync" - - "github.com/matrixorigin/matrixone/pkg/logutil" - iops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops/base" - ops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker" - "github.com/panjf2000/ants/v2" -) - -var ( - poolHandlerName = "PoolHandler" -) - -type poolHandler struct { - BaseTaskHandler - opExec ops.OpExecFunc - pool *ants.Pool - wg *sync.WaitGroup -} - -func NewPoolHandler(ctx context.Context, num int) *poolHandler { - pool, err := ants.NewPool(num) - if err != nil { - panic(err) - } - h := &poolHandler{ - BaseTaskHandler: *NewBaseEventHandler(ctx, poolHandlerName), - pool: pool, - wg: &sync.WaitGroup{}, - } - h.opExec = h.ExecFunc - h.ExecFunc = h.doHandle - return h -} - -func (h *poolHandler) Execute(task Task) { - h.opExec(task) -} - -func (h *poolHandler) doHandle(op iops.IOp) { - closure := func(o iops.IOp, wg *sync.WaitGroup) func() { - return func() { - h.opExec(o) - wg.Done() - } - } - h.wg.Add(1) - err := h.pool.Submit(closure(op, h.wg)) - if err != nil { - logutil.Warnf("%v", err) - op.SetError(err) - h.wg.Done() - } -} - -func (h *poolHandler) Close() error { - h.pool.Release() - h.BaseTaskHandler.Close() - h.wg.Wait() - return nil -} diff --git a/pkg/vm/engine/tae/tasks/scheduler.go b/pkg/vm/engine/tae/tasks/scheduler.go index 00733328b8471..5a5c2790752ed 100644 --- a/pkg/vm/engine/tae/tasks/scheduler.go +++ b/pkg/vm/engine/tae/tasks/scheduler.go @@ -16,13 +16,10 @@ package tasks import ( "context" - "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" - iops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops/base" - ops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker" ) var ( @@ -40,7 +37,7 @@ type TaskScheduler interface { Scheduler ScheduleTxnTask(ctx *Context, taskType TaskType, factory TxnTaskFactory) (Task, error) ScheduleMultiScopedTxnTask(ctx *Context, taskType TaskType, scopes []common.ID, factory TxnTaskFactory) (Task, error) - ScheduleMultiScopedTxnTaskWithObserver(ctx *Context, taskType TaskType, scopes []common.ID, factory TxnTaskFactory, observers ...iops.Observer) (Task, error) + ScheduleMultiScopedTxnTaskWithObserver(ctx *Context, taskType TaskType, scopes []common.ID, factory TxnTaskFactory, observers ...Observer) (Task, error) ScheduleMultiScopedFn(ctx *Context, taskType TaskType, scopes []common.ID, fn FuncT) (Task, error) ScheduleFn(ctx *Context, taskType TaskType, fn func() error) (Task, error) ScheduleScopedFn(ctx *Context, taskType TaskType, scope *common.ID, fn func() error) (Task, error) @@ -52,18 +49,18 @@ type TaskScheduler interface { } type BaseScheduler struct { - ops.OpWorker + OpWorker idAlloc *common.IdAllocator Dispatchers map[TaskType]Dispatcher } func NewBaseScheduler(ctx context.Context, name string) *BaseScheduler { scheduler := &BaseScheduler{ - OpWorker: *ops.NewOpWorker(ctx, name), + OpWorker: *NewOpWorker(ctx, name), idAlloc: common.NewIdAllocator(1), Dispatchers: make(map[TaskType]Dispatcher), } - scheduler.ExecFunc = scheduler.doDispatch + scheduler.execFunc = scheduler.doDispatch return scheduler } @@ -79,7 +76,7 @@ func (s *BaseScheduler) Schedule(task Task) error { return nil } -func (s *BaseScheduler) doDispatch(op iops.IOp) { +func (s *BaseScheduler) doDispatch(op IOp) { task := op.(Task) dispatcher := s.Dispatchers[task.Type()] if dispatcher == nil { diff --git a/pkg/vm/engine/tae/tasks/types.go b/pkg/vm/engine/tae/tasks/types.go index 23afd1ecfd620..14f852eefa76e 100644 --- a/pkg/vm/engine/tae/tasks/types.go +++ b/pkg/vm/engine/tae/tasks/types.go @@ -17,12 +17,13 @@ package tasks import ( "context" "hash/fnv" + "sync/atomic" + "time" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops/base" ) var ( @@ -81,7 +82,7 @@ func NextTaskId() uint64 { } type Task interface { - base.IOp + IOp ID() uint64 Type() TaskType Cancel() error @@ -164,3 +165,47 @@ func NewMultiScopedFnTask(ctx *Context, taskType TaskType, scopes []common.ID, f func (task *MultiScopedFnTask) Scopes() []common.ID { return task.scopes } + +type IOpWorker interface { + Start() + Stop() + SendOp(IOp) bool + StopReceiver() + WaitStop() + StatsString() string +} + +type OpDoneCB func(IOp) + +type Op struct { + impl IOpInternal + errorC chan error + waitedOnce atomic.Bool + err error + createTime time.Time + startTime time.Time + endTime time.Time + doneCB OpDoneCB + observers []Observer +} + +type Observer interface { + OnExecDone(any) +} + +type IOpInternal interface { + Execute(ctx context.Context) error +} + +type IOp interface { + OnExec(ctx context.Context) error + SetError(err error) + GetError() error + WaitDone(ctx context.Context) error + Waitable() bool + GetCreateTime() time.Time + GetStartTime() time.Time + GetEndTime() time.Time + GetExecutTime() int64 + AddObserver(Observer) +} diff --git a/pkg/vm/engine/tae/tasks/worker/worker.go b/pkg/vm/engine/tae/tasks/worker.go similarity index 63% rename from pkg/vm/engine/tae/tasks/worker/worker.go rename to pkg/vm/engine/tae/tasks/worker.go index 60b420d96e485..a0451e1b75bce 100644 --- a/pkg/vm/engine/tae/tasks/worker/worker.go +++ b/pkg/vm/engine/tae/tasks/worker.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package ops +package tasks import ( "context" @@ -22,8 +22,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" - iops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops/base" - iw "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker/base" ) type Cmd = uint8 @@ -46,10 +44,10 @@ const ( ) var ( - _ iw.IOpWorker = (*OpWorker)(nil) + _ IOpWorker = (*OpWorker)(nil) ) -type OpExecFunc func(op iops.IOp) +type OpExecFunc func(op IOp) type Stats struct { Processed atomic.Uint64 @@ -87,16 +85,17 @@ func (s *Stats) String() string { } type OpWorker struct { - Ctx context.Context - Name string - OpC chan iops.IOp - CmdC chan Cmd - State atomic.Int32 - Pending atomic.Int64 - ClosedCh chan struct{} - Stats Stats - ExecFunc OpExecFunc - CancelFunc OpExecFunc + ctx context.Context + cancel context.CancelFunc + name string + opC chan IOp + cmdC chan Cmd + state atomic.Int32 + pending atomic.Int64 + closedC chan struct{} + stats Stats + execFunc OpExecFunc + cancelFunc OpExecFunc } func NewOpWorker(ctx context.Context, name string, args ...int) *OpWorker { @@ -114,40 +113,40 @@ func NewOpWorker(ctx context.Context, name string, args ...int) *OpWorker { name = fmt.Sprintf("[worker-%d]", common.NextGlobalSeqNum()) } worker := &OpWorker{ - Ctx: ctx, - Name: name, - OpC: make(chan iops.IOp, l), - CmdC: make(chan Cmd, l), - ClosedCh: make(chan struct{}), + name: name, + opC: make(chan IOp, l), + cmdC: make(chan Cmd, l), + closedC: make(chan struct{}), } - worker.State.Store(CREATED) - worker.ExecFunc = worker.onOp - worker.CancelFunc = worker.opCancelOp + worker.ctx, worker.cancel = context.WithCancel(ctx) + worker.state.Store(CREATED) + worker.execFunc = worker.onOp + worker.cancelFunc = worker.opCancelOp return worker } func (w *OpWorker) Start() { - logutil.Debugf("%s Started", w.Name) - if w.State.Load() != CREATED { - panic(fmt.Sprintf("logic error: %v", w.State.Load())) + logutil.Debugf("%s Started", w.name) + if w.state.Load() != CREATED { + panic(fmt.Sprintf("logic error: %v", w.state.Load())) } - w.State.Store(RUNNING) + w.state.Store(RUNNING) go func() { for { - state := w.State.Load() + state := w.state.Load() if state == STOPPED { break } select { - case op := <-w.OpC: - w.ExecFunc(op) + case op := <-w.opC: + w.execFunc(op) // if state == RUNNING { // w.ExecFunc(op) // } else { // w.CancelFunc(op) // } - w.Pending.Add(-1) - case cmd := <-w.CmdC: + w.pending.Add(-1) + case cmd := <-w.cmdC: w.onCmd(cmd) } } @@ -157,85 +156,85 @@ func (w *OpWorker) Start() { func (w *OpWorker) Stop() { w.StopReceiver() w.WaitStop() - logutil.Debugf("%s Stopped", w.Name) + logutil.Debugf("%s Stopped", w.name) } func (w *OpWorker) StopReceiver() { - state := w.State.Load() + state := w.state.Load() if state >= StoppingReceiver { return } - w.State.CompareAndSwap(state, StoppingReceiver) + w.state.CompareAndSwap(state, StoppingReceiver) } func (w *OpWorker) WaitStop() { - state := w.State.Load() + state := w.state.Load() if state <= RUNNING { panic("logic error") } if state == STOPPED { return } - if w.State.CompareAndSwap(StoppingReceiver, StoppingCMD) { - pending := w.Pending.Load() + if w.state.CompareAndSwap(StoppingReceiver, StoppingCMD) { + pending := w.pending.Load() for { if pending == 0 { break } - pending = w.Pending.Load() + pending = w.pending.Load() } - w.CmdC <- QUIT + w.cmdC <- QUIT } - <-w.ClosedCh + <-w.closedC } -func (w *OpWorker) SendOp(op iops.IOp) bool { - state := w.State.Load() +func (w *OpWorker) SendOp(op IOp) bool { + state := w.state.Load() if state != RUNNING { return false } - w.Pending.Add(1) - if w.State.Load() != RUNNING { - w.Pending.Add(-1) + w.pending.Add(1) + if w.state.Load() != RUNNING { + w.pending.Add(-1) return false } select { - case w.OpC <- op: + case w.opC <- op: return true default: } - w.Pending.Add(-1) + w.pending.Add(-1) return false } -func (w *OpWorker) opCancelOp(op iops.IOp) { +func (w *OpWorker) opCancelOp(op IOp) { op.SetError(moerr.NewInternalErrorNoCtx("op cancelled")) } -func (w *OpWorker) onOp(op iops.IOp) { - err := op.OnExec(w.Ctx) - w.Stats.AddProcessed() +func (w *OpWorker) onOp(op IOp) { + err := op.OnExec(w.ctx) + w.stats.AddProcessed() if err != nil { - w.Stats.AddFailed() + w.stats.AddFailed() } else { - w.Stats.AddSuccessed() + w.stats.AddSuccessed() } op.SetError(err) - w.Stats.RecordTime(op.GetExecutTime()) + w.stats.RecordTime(op.GetExecutTime()) } func (w *OpWorker) onCmd(cmd Cmd) { switch cmd { case QUIT: // log.Infof("Quit OpWorker") - close(w.CmdC) - close(w.OpC) - if !w.State.CompareAndSwap(StoppingCMD, STOPPED) { + close(w.cmdC) + close(w.opC) + if !w.state.CompareAndSwap(StoppingCMD, STOPPED) { panic("logic error") } - w.ClosedCh <- struct{}{} + w.closedC <- struct{}{} case stuck: // For test only - <-w.Ctx.Done() + <-w.ctx.Done() return default: panic(fmt.Sprintf("Unsupported cmd %d", cmd)) @@ -243,6 +242,6 @@ func (w *OpWorker) onCmd(cmd Cmd) { } func (w *OpWorker) StatsString() string { - s := fmt.Sprintf("| Stats | %s | w | %s", w.Stats.String(), w.Name) + s := fmt.Sprintf("| Stats | %s | w | %s", w.stats.String(), w.name) return s } diff --git a/pkg/vm/engine/tae/tasks/worker/base/types.go b/pkg/vm/engine/tae/tasks/worker/base/types.go deleted file mode 100644 index be4646078fc09..0000000000000 --- a/pkg/vm/engine/tae/tasks/worker/base/types.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package base - -import ( - ops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops/base" -) - -type IOpWorker interface { - Start() - Stop() - SendOp(ops.IOp) bool - StopReceiver() - WaitStop() - StatsString() string -} - -type IHeartbeater interface { - Start() - Stop() -} - -type IHBHandle interface { - OnExec() - OnStopped() -} diff --git a/pkg/vm/engine/tae/tasks/worker/heartbeater.go b/pkg/vm/engine/tae/tasks/worker/heartbeater.go deleted file mode 100644 index 0ad49afc07d57..0000000000000 --- a/pkg/vm/engine/tae/tasks/worker/heartbeater.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2021 Matrix Origin -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ops - -import ( - "context" - "sync" - "time" - - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker/base" -) - -type lamdaHandle struct { - onExec func() - onStop func() -} - -func (h *lamdaHandle) OnExec() { - if h.onExec != nil { - h.onExec() - } -} - -func (h *lamdaHandle) OnStopped() { - if h.onStop != nil { - h.onStop() - } -} - -type heartbeater struct { - handle base.IHBHandle - interval time.Duration - ctx context.Context - cancel context.CancelFunc - wg *sync.WaitGroup -} - -func NewHeartBeaterWithFunc(interval time.Duration, onExec, onStop func()) *heartbeater { - h := &lamdaHandle{onExec: onExec, onStop: onStop} - return NewHeartBeater(interval, h) -} - -func NewHeartBeater(interval time.Duration, handle base.IHBHandle) *heartbeater { - c := &heartbeater{ - interval: interval, - handle: handle, - } - c.ctx, c.cancel = context.WithCancel(context.Background()) - return c -} - -func (c *heartbeater) Start() { - c.wg = &sync.WaitGroup{} - c.wg.Add(1) - go func() { - defer c.wg.Done() - ticker := time.NewTicker(c.interval) - for { - select { - case <-c.ctx.Done(): - ticker.Stop() - return - case <-ticker.C: - c.handle.OnExec() - } - } - }() -} - -func (c *heartbeater) Stop() { - c.cancel() - c.wg.Wait() - c.handle.OnStopped() -} diff --git a/pkg/vm/engine/tae/tasks/worker/worker_test.go b/pkg/vm/engine/tae/tasks/worker_test.go similarity index 89% rename from pkg/vm/engine/tae/tasks/worker/worker_test.go rename to pkg/vm/engine/tae/tasks/worker_test.go index dd302273c7bd4..e3701ee3a61c5 100644 --- a/pkg/vm/engine/tae/tasks/worker/worker_test.go +++ b/pkg/vm/engine/tae/tasks/worker_test.go @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package ops +package tasks import ( "context" - iops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops/base" "github.com/stretchr/testify/require" "testing" "time" @@ -58,15 +57,15 @@ func (t testOp) GetExecutTime() int64 { return 0 } -func (t testOp) AddObserver(iops.Observer) {} +func (t testOp) AddObserver(Observer) {} func TestOpWorker(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() worker := NewOpWorker(ctx, "test", 100) worker.Start() - worker.CmdC <- stuck - for len(worker.CmdC) > 0 { + worker.cmdC <- stuck + for len(worker.cmdC) > 0 { } for i := 0; i < 100; i++ { require.True(t, worker.SendOp(testOp{})) diff --git a/pkg/vm/engine/test/disttae_engine_test.go b/pkg/vm/engine/test/disttae_engine_test.go index db004aaea5baa..bc15e549a6d39 100644 --- a/pkg/vm/engine/test/disttae_engine_test.go +++ b/pkg/vm/engine/test/disttae_engine_test.go @@ -46,7 +46,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" - ops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils/config" "github.com/matrixorigin/matrixone/pkg/vm/engine/test/testutil" @@ -567,7 +566,7 @@ func TestColumnsTransfer(t *testing.T) { catalogDB, _ := txn.GetDatabaseByID(catalog.MO_CATALOG_ID) columnsTbl, _ := catalogDB.GetRelationByID(catalog.MO_COLUMNS_ID) - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop() @@ -601,7 +600,7 @@ func TestInProgressTransfer(t *testing.T) { p := testutil.InitEnginePack(testutil.TestOptions{TaeEngineOptions: opts}, t) defer p.Close() tae := p.T.GetDB() - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop() @@ -839,7 +838,7 @@ func TestShowDatabasesInRestoreTxn(t *testing.T) { catalogDB, _ := txn.GetDatabaseByID(catalog.MO_CATALOG_ID) dbTbl, _ := catalogDB.GetRelationByID(catalog.MO_DATABASE_ID) - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop() @@ -1096,7 +1095,7 @@ func TestApplyDeletesForWorkspaceAndPart(t *testing.T) { udb, _ := txn.GetDatabaseByID(did) utbl, _ := udb.GetRelationByID(tid) - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop() @@ -1163,7 +1162,7 @@ func TestApplyDeletesFromTombstoneObjects(t *testing.T) { txn, _ := tae.StartTxn(nil) udb, _ := txn.GetDatabaseByID(catalog.MO_CATALOG_ID) utbl, _ := udb.GetRelationByID(catalog.MO_COLUMNS_ID) - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop() // flusht tombstone ahead of data insert diff --git a/pkg/vm/engine/test/partition_state_test.go b/pkg/vm/engine/test/partition_state_test.go index 35df87a7734b7..965c0c6bee29e 100644 --- a/pkg/vm/engine/test/partition_state_test.go +++ b/pkg/vm/engine/test/partition_state_test.go @@ -40,7 +40,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" - ops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils/config" "github.com/matrixorigin/matrixone/pkg/vm/engine/test/testutil" ) @@ -671,7 +670,7 @@ func Test_Bug_DupEntryWhenGCInMemTombstones(t *testing.T) { tombstone := it.GetObject().GetMeta().(*catalog.ObjectEntry) require.NoError(t, it.Close()) - worker := ops.NewOpWorker(context.Background(), "xx") + worker := tasks.NewOpWorker(context.Background(), "xx") worker.Start() defer worker.Stop()