From 130f090910fa6202341581260a71e0b8064f84b3 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 24 Jan 2025 18:46:56 +0530 Subject: [PATCH] feat: mark a tablet not serving in broadcast if the disk is stalled Signed-off-by: Manan Gupta --- .../vttablet/tabletmanager/rpc_replication.go | 2 +- go/vt/vttablet/tabletmanager/tm_init.go | 12 +----- go/vt/vttablet/tabletserver/controller.go | 3 ++ .../disk_health_monitor.go | 39 ++++++++++++++++++- .../disk_health_monitor_test.go | 18 ++++++++- go/vt/vttablet/tabletserver/state_manager.go | 3 +- .../tabletserver/state_manager_test.go | 28 ++++++++++++- go/vt/vttablet/tabletserver/tabletserver.go | 6 +++ go/vt/vttablet/tabletservermock/controller.go | 6 +++ 9 files changed, 102 insertions(+), 15 deletions(-) rename go/vt/vttablet/{tabletmanager => tabletserver}/disk_health_monitor.go (67%) rename go/vt/vttablet/{tabletmanager => tabletserver}/disk_health_monitor_test.go (85%) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index a30db3b4194..dec94ee6f16 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -64,7 +64,7 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful // Return if the disk is stalled or rejecting writes. // If the disk is stalled, we can't be sure if reads will go through // or not, so we should not run any reads either. - if tm.dhMonitor.IsDiskStalled() { + if tm.QueryServiceControl.IsDiskStalled() { return &replicationdatapb.FullStatus{ DiskStalled: true, }, nil diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index c22ea0a6e51..fbef04de357 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -95,11 +95,8 @@ var ( skipBuildInfoTags = "/.*/" initTags flagutil.StringMapValue - initTimeout = 1 * time.Minute - mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout - stalledDiskWriteDir = "" - stalledDiskWriteTimeout = 30 * time.Second - stalledDiskWriteInterval = 5 * time.Second + initTimeout = 1 * time.Minute + mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout ) func registerInitFlags(fs *pflag.FlagSet) { @@ -112,9 +109,6 @@ func registerInitFlags(fs *pflag.FlagSet) { fs.Var(&initTags, "init_tags", "(init parameter) comma separated list of key:value pairs used to tag the tablet") fs.DurationVar(&initTimeout, "init_timeout", initTimeout, "(init parameter) timeout to use for the init phase.") fs.DurationVar(&mysqlShutdownTimeout, "mysql-shutdown-timeout", mysqlShutdownTimeout, "timeout to use when MySQL is being shut down.") - fs.StringVar(&stalledDiskWriteDir, "disk-write-dir", stalledDiskWriteDir, "if provided, tablet will attempt to write a file to this directory to check if the disk is stalled") - fs.DurationVar(&stalledDiskWriteTimeout, "disk-write-timeout", stalledDiskWriteTimeout, "if writes exceed this duration, the disk is considered stalled") - fs.DurationVar(&stalledDiskWriteInterval, "disk-write-interval", stalledDiskWriteInterval, "how often to write to the disk to check whether it is stalled") } var ( @@ -170,7 +164,6 @@ type TabletManager struct { VREngine *vreplication.Engine VDiffEngine *vdiff.Engine Env *vtenv.Environment - dhMonitor DiskHealthMonitor // tmc is used to run an RPC against other vttablets. tmc tmclient.TabletManagerClient @@ -379,7 +372,6 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, config *tabletenv.Tabl tm.tmc = tmclient.NewTabletManagerClient() tm.tmState = newTMState(tm, tablet) tm.actionSema = semaphore.NewWeighted(1) - tm.dhMonitor = newDiskHealthMonitor(tm.BatchCtx) tm._waitForGrantsComplete = make(chan struct{}) tm.baseTabletType = tablet.Type diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index c4a4bef99fc..ab2875ae27b 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -122,6 +122,9 @@ type Controller interface { // SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. SetDemotePrimaryStalled() + + // IsDiskStalled returns if the disk is stalled. + IsDiskStalled() bool } // Ensure TabletServer satisfies Controller interface. diff --git a/go/vt/vttablet/tabletmanager/disk_health_monitor.go b/go/vt/vttablet/tabletserver/disk_health_monitor.go similarity index 67% rename from go/vt/vttablet/tabletmanager/disk_health_monitor.go rename to go/vt/vttablet/tabletserver/disk_health_monitor.go index e35bc662a12..f477f7fd30c 100644 --- a/go/vt/vttablet/tabletmanager/disk_health_monitor.go +++ b/go/vt/vttablet/tabletserver/disk_health_monitor.go @@ -1,4 +1,20 @@ -package tabletmanager +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletserver import ( "context" @@ -7,8 +23,29 @@ import ( "strconv" "sync" "time" + + "github.com/spf13/pflag" + + "vitess.io/vitess/go/vt/servenv" ) +var ( + stalledDiskWriteDir = "" + stalledDiskWriteTimeout = 30 * time.Second + stalledDiskWriteInterval = 5 * time.Second +) + +func init() { + servenv.OnParseFor("vtcombo", registerInitFlags) + servenv.OnParseFor("vttablet", registerInitFlags) +} + +func registerInitFlags(fs *pflag.FlagSet) { + fs.StringVar(&stalledDiskWriteDir, "disk-write-dir", stalledDiskWriteDir, "if provided, tablet will attempt to write a file to this directory to check if the disk is stalled") + fs.DurationVar(&stalledDiskWriteTimeout, "disk-write-timeout", stalledDiskWriteTimeout, "if writes exceed this duration, the disk is considered stalled") + fs.DurationVar(&stalledDiskWriteInterval, "disk-write-interval", stalledDiskWriteInterval, "how often to write to the disk to check whether it is stalled") +} + type DiskHealthMonitor interface { // IsDiskStalled returns true if the disk is stalled or rejecting writes. IsDiskStalled() bool diff --git a/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go b/go/vt/vttablet/tabletserver/disk_health_monitor_test.go similarity index 85% rename from go/vt/vttablet/tabletmanager/disk_health_monitor_test.go rename to go/vt/vttablet/tabletserver/disk_health_monitor_test.go index 68930f3061d..8b47e40ee79 100644 --- a/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go +++ b/go/vt/vttablet/tabletserver/disk_health_monitor_test.go @@ -1,4 +1,20 @@ -package tabletmanager +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletserver import ( "context" diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 4512b26f177..16a249e0630 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -97,6 +97,7 @@ type stateManager struct { replHealthy bool demotePrimaryStalled bool lameduck bool + dhMonitor DiskHealthMonitor alsoAllow []topodatapb.TabletType reason string transitionErr error @@ -777,7 +778,7 @@ func (sm *stateManager) IsServing() bool { } func (sm *stateManager) isServingLocked() bool { - return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.demotePrimaryStalled && !sm.lameduck + return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.demotePrimaryStalled && !sm.lameduck && !sm.dhMonitor.IsDiskStalled() } func (sm *stateManager) AppendDetails(details []*kv) []*kv { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index f8059d6edea..8323b2c6763 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -41,7 +41,9 @@ import ( var testNow = time.Now() func TestStateManagerStateByName(t *testing.T) { - sm := &stateManager{} + sm := &stateManager{ + dhMonitor: newNoopDiskHealthMonitor(), + } sm.replHealthy = true sm.wantState = StateServing @@ -147,6 +149,29 @@ func TestStateManagerUnservePrimary(t *testing.T) { assert.Equal(t, StateNotServing, sm.state) } +type testDiskMonitor struct { + isDiskStalled bool +} + +func (t *testDiskMonitor) IsDiskStalled() bool { + return t.isDiskStalled +} + +// TestIsServingLocked tests isServingLocked() functionality. +func TestIsServingLocked(t *testing.T) { + sm := newTestStateManager() + defer sm.StopService() + tdm := &testDiskMonitor{isDiskStalled: false} + sm.dhMonitor = tdm + + err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "") + require.NoError(t, err) + require.True(t, sm.isServingLocked()) + + tdm.isDiskStalled = true + require.False(t, sm.isServingLocked()) +} + func TestStateManagerUnserveNonPrimary(t *testing.T) { sm := newTestStateManager() defer sm.StopService() @@ -792,6 +817,7 @@ func newTestStateManager() *stateManager { te: &testTxEngine{}, messager: &testSubcomponent{}, ddle: &testOnlineDDLExecutor{}, + dhMonitor: newNoopDiskHealthMonitor(), throttler: &testLagThrottler{}, tableGC: &testTableGC{}, rw: newRequestsWaiter(), diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 30f73d2d818..be24d1c04a9 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -207,6 +207,7 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c throttler: tsv.lagThrottler, tableGC: tsv.tableGC, rw: newRequestsWaiter(), + dhMonitor: newDiskHealthMonitor(ctx), } tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { return int64(tsv.sm.State()) }) @@ -767,6 +768,11 @@ func (tsv *TabletServer) SetDemotePrimaryStalled() { tsv.BroadcastHealth() } +// IsDiskStalled returns if the disk is stalled or not. +func (tsv *TabletServer) IsDiskStalled() bool { + return tsv.sm.dhMonitor.IsDiskStalled() +} + // CreateTransaction creates the metadata for a 2PC transaction. func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) { return tsv.execRequest( diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index a5242751454..21b38755302 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -279,6 +279,12 @@ func (tqsc *Controller) SetDemotePrimaryStalled() { tqsc.MethodCalled["SetDemotePrimaryStalled"] = true } +// IsDiskStalled is part of the tabletserver.Controller interface +func (tqsc *Controller) IsDiskStalled() bool { + tqsc.MethodCalled["IsDiskStalled"] = true + return false +} + // EnterLameduck implements tabletserver.Controller. func (tqsc *Controller) EnterLameduck() { tqsc.mu.Lock()