Skip to content

Commit

Permalink
Refactor and migration upgrade steps based on new upgrade framework (m…
Browse files Browse the repository at this point in the history
…atrixorigin#14607)

1. 基于新的升级框架设计升级单元逻辑
2. 删除旧的升级框架,并将升级项迁移至新的升级框架
3. 支持手动执行升级语句和自动升级
4. 支持同一个版本的增量升级操作
5. 在Bootstrap时执行系统初始化,不再使用SystemInit同步任务
6. 解决mo系统初始化滞后端口监听问题

使用说明文档:
https://github.com/matrixorigin/docs/blob/main/design/mo/upgrade/upgrade-framework-operation-instructions.md

Approved by: @reusee, @daviszhen, @nnsgmsone, @aptend, @ouyuanning, @xzxiong, @aunjgr, @w-zr, @badboynt1, @iamlinjunhong, @zhangxu19830126, @aressu1985, @triump2020, @fengttt, @sukki37
  • Loading branch information
qingxinhome authored Apr 7, 2024
1 parent b71d6d0 commit f4cab18
Show file tree
Hide file tree
Showing 77 changed files with 14,137 additions and 13,147 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/e2e-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
mkdir -p ${{ github.workspace }}/docker-compose-log
docker compose -f etc/launch-tae-compose/compose.yaml --profile launch-multi-cn up -d --build
sleep 10
# wait for ready
i=1
while [ $(mysql -h 127.0.0.1 -P 6001 -u dump -p111 --execute 'create database if not exists compose_test;use compose_test; create table if not exists compose_test_table(col1 int auto_increment primary key);show tables;' 2>&1 | tee /dev/stderr | grep 'compose_test_table' | wc -l) -lt 1 ]; do
Expand All @@ -50,7 +50,7 @@ jobs:
exit 1;
fi
i=$(($i+1))
sleep 1
sleep 2
done
docker ps
Expand Down Expand Up @@ -137,7 +137,7 @@ jobs:
cat ./etc/launch-tae-compose/config/tn.toml
mkdir -p ${{ github.workspace }}/docker-compose-log
docker compose -f etc/launch-tae-compose/compose.yaml --profile launch-multi-cn up -d --build
sleep 10
# wait for ready
i=1
while [ $(mysql -h 127.0.0.1 -P 6001 -u dump -p111 --execute 'create database if not exists compose_test;use compose_test; create table if not exists compose_test_table(col1 int auto_increment primary key);show tables;' 2>&1 | tee /dev/stderr | grep 'compose_test_table' | wc -l) -lt 1 ]; do
Expand All @@ -148,7 +148,7 @@ jobs:
exit 1;
fi
i=$(($i+1))
sleep 1
sleep 2
done
docker ps
- name: Clone test-tool repository
Expand Down
3 changes: 3 additions & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@
# pkg/sort
/pkg/sort @nnsgmsone

# pkg/bootstrap
/pkg/bootstrap @daviszhen @zhangxu19830126 @qingxinhome

# pkg/sql
/pkg/sql @nnsgmsone @aunjgr
/pkg/sql/colexec @m-schen
Expand Down
3 changes: 3 additions & 0 deletions cmd/mo-service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ func NewConfig() *Config {
},
Observability: *config.NewObservabilityParameters(),
LogService: logservice.DefaultConfig(),
CN: cnservice.Config{
AutomaticUpgrade: true,
},
}
}

Expand Down
8 changes: 3 additions & 5 deletions optools/run_bvt.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
set -o nounset

MO_WORKSPACE=$1
SYSTEM_INIT_COMPLETED=$MO_WORKSPACE/mo-data/local/system_init_completed
LAUNCH=$2
PROXY=${3:-}

Expand All @@ -29,12 +28,11 @@ function launch_mo() {
# this will wait mo all system init completed
function wait_system_init() {
for num in {1..300}
do
if [ -f "$SYSTEM_INIT_COMPLETED" ]; then
do
if MYSQL_PWD=111 mysql -h 127.0.0.1 -P 6001 -u dump -e "show databases;"; then
echo "ok, cost $num seconds"
return 0
fi
echo "mo init not completed"
fi
sleep 1
done
return 1
Expand Down
282 changes: 282 additions & 0 deletions pkg/bootstrap/custom_upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bootstrap

import (
"context"
"fmt"
"time"

"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"go.uber.org/zap"
)

func (s *service) UpgradeTenant(ctx context.Context, tenantName string, retryCount uint32, isALLAccount bool) (bool, error) {
// Before manually upgrade, check if there are unready upgrade tasks in system upgrade environment
err := s.UpgradePreCheck(ctx)
if err != nil {
return true, err
}

if isALLAccount {
return true, s.BootstrapUpgrade(ctx)
} else {
err = s.CheckAndUpgradeCluster(ctx)
if err != nil {
return true, err
}

tenantID, err := s.CheckUpgradeAccount(ctx, tenantName)
if err != nil {
return true, err
}

err = s.stopper.RunNamedRetryTask("UpgradeTenant", tenantID, retryCount, s.UpgradeOneTenant)
if err != nil {
return true, err
}
//if err = s.UpgradeOneTenant(ctx, tenantID); err != nil {
// return true, err
//}
}
return true, nil
}

// CheckAndUpgradeCluster Before manually upgrade, it is necessary to ensure that the cluster upgrade is completed.
// When performing the cluster upgrade, the tenant upgrade task will be initialized
func (s *service) CheckAndUpgradeCluster(ctx context.Context) error {
s.adjustUpgrade()

if err := retryRun(ctx, "doCheckUpgrade", s.doCheckUpgrade); err != nil {
getUpgradeLogger().Error("check upgrade failed", zap.Error(err))
return err
}
if err := s.stopper.RunTask(s.asyncUpgradeTask); err != nil {
return err
}
return nil
}

// UpgradeOneTenant Perform metadata upgrade for individual tenants
func (s *service) UpgradeOneTenant(ctx context.Context, tenantID int32) error {
s.mu.RLock()
checked := s.mu.tenants[tenantID]
s.mu.RUnlock()
if checked {
return nil
}

ctx, cancel := context.WithTimeout(ctx, time.Hour*12)
defer cancel()

opts := executor.Options{}.
WithMinCommittedTS(s.now()).
WithWaitCommittedLogApplied().
WithTimeZone(time.Local)
err := s.exec.ExecTxn(
ctx,
func(txn executor.TxnExecutor) error {
txn.Use(catalog.MO_CATALOG)

version, err := versions.GetTenantVersion(tenantID, txn)
if err != nil {
return err
}

// tenant create at current cn, can work correctly
currentCN := s.getFinalVersionHandle().Metadata()
if versions.Compare(currentCN.Version, version) < 0 {
// tenant create at 1.4.0, current tenant version 1.5.0, it must be cannot work
return moerr.NewInvalidInputNoCtx("tenant version %s is greater than current cn version %s",
version, currentCN.Version)
}

// arrive here means tenant version <= current cn version, need upgrade.
latestVersion, err := versions.GetLatestVersion(txn)
if err != nil {
return err
}
if latestVersion.Version != currentCN.Version {
panic("BUG: current cn's version(" +
currentCN.Version +
") must equal cluster latest version(" +
latestVersion.Version +
")")
}

for {
// upgrade completed
if s.upgrade.finalVersionCompleted.Load() {
break
}

upgrades, err := versions.GetUpgradeVersions(latestVersion.Version, latestVersion.VersionOffset, txn, false, true)
if err != nil {
return err
}
// latest cluster is already upgrade completed
if upgrades[len(upgrades)-1].State == versions.StateUpgradingTenant ||
upgrades[len(upgrades)-1].State == versions.StateReady {
break
}

time.Sleep(time.Second)
}

// upgrade in current goroutine immediately
version, err = versions.GetTenantCreateVersionForUpdate(tenantID, txn)
if err != nil {
return err
}
from := version
for _, v := range s.handles {
if versions.Compare(v.Metadata().Version, from) >= 0 &&
v.Metadata().CanDirectUpgrade(from) {
if err := v.HandleTenantUpgrade(ctx, tenantID, txn); err != nil {
return err
}
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, txn); err != nil {
return err
}
from = v.Metadata().Version
}
}
return nil
},
opts)
if err != nil {
return err
}
s.mu.Lock()
s.mu.tenants[tenantID] = true
s.mu.Unlock()
return nil
}

// CheckUpgradeAccount Custom upgrade account Check if the tenant name exists and is legal
func (s *service) CheckUpgradeAccount(ctx context.Context, accountName string) (int32, error) {
var accountId int32

opts := executor.Options{}.
WithDatabase(catalog.MO_CATALOG).
WithMinCommittedTS(s.now()).
WithWaitCommittedLogApplied().
WithTimeZone(time.Local)
err := s.exec.ExecTxn(
ctx,
func(txn executor.TxnExecutor) error {
var err error
accountId, err = GetAccountIdByName(accountName, txn)
if err != nil {
getUpgradeLogger().Error("failed to get accountId by accountName when upgrade account",
zap.Error(err))
return err
}
return nil
},
opts)
return accountId, err
}

// GetAccountIdByName get accountId by accountName when upgrade account
func GetAccountIdByName(accountName string, txn executor.TxnExecutor) (int32, error) {
sql := fmt.Sprintf("select account_id, account_name from %s.%s where account_name = '%s'",
catalog.MO_CATALOG, catalog.MOAccountTable, accountName)
res, err := txn.Exec(sql, executor.StatementOption{})
if err != nil {
return -1, err
}

// Check if the group account name exists
var accountId int32 = -1
res.ReadRows(func(rows int, cols []*vector.Vector) bool {
accountId = vector.GetFixedAt[int32](cols[0], 0)
return true
})

if accountId == -1 {
return -1, moerr.NewInvalidInputNoCtx("The input account name '%s' is invalid, please check input!", accountName)
}
return accountId, nil
}

// UpgradePreCheck Manual upgrade environment pre check, check if there are any unready upgrade tasks in upgrade environment.
// If there are, the unready upgrade tasks need to be processed first
func (s *service) UpgradePreCheck(ctx context.Context) error {
opts := executor.Options{}.
WithDatabase(catalog.MO_CATALOG).
WithMinCommittedTS(s.now()).
WithWaitCommittedLogApplied().
WithTimeZone(time.Local)
err := s.exec.ExecTxn(
ctx,
func(txn executor.TxnExecutor) error {
created, err := versions.IsFrameworkTablesCreated(txn)
if err != nil {
getUpgradeLogger().Error("failed to check upgrade framework",
zap.Error(err))
return err
}
if !created {
return nil
}

final := s.getFinalVersionHandle().Metadata()
unReady, err := checkUpgradePerVersionUnready(txn, final)
if err != nil {
getUpgradeLogger().Error("failed to check task status in pgrade environment", zap.Error(err))
return err
}

if unReady {
getUpgradeLogger().Info("There are unexecuted tenant upgrade tasks in upgrade environment, start asynchronous supplementary execution")
s.adjustUpgrade()
if err := s.stopper.RunTask(s.asyncUpgradeTask); err != nil {
return err
}
for i := 0; i < s.upgrade.upgradeTenantTasks; i++ {
if err := s.stopper.RunTask(s.asyncUpgradeTenantTask); err != nil {
return err
}
}
return moerr.NewInternalError(ctx, "There is an untrigged upgrade tasks in the system, execution started, Please try again later")
}
return nil
},
opts)
return err
}

// checkUpgradeEnvUnReady Check if the upgrade environment is ready
func checkUpgradePerVersionUnready(txn executor.TxnExecutor, final versions.Version) (bool, error) {
sql := fmt.Sprintf("select id, from_version, to_version, final_version, final_version_offset from %s.%s "+
"where state = 1 and final_version != '%s' and final_version_offset != %d",
catalog.MO_CATALOG, catalog.MOUpgradeTable, final.Version, final.VersionOffset)
res, err := txn.Exec(sql, executor.StatementOption{})
if err != nil {
return false, err
}

var loaded bool
res.ReadRows(func(rows int, cols []*vector.Vector) bool {
loaded = true
return false
})
return loaded, nil
}
Loading

0 comments on commit f4cab18

Please sign in to comment.