From 2e5bbffcb32e4b53c0e57e8205e8dee5457c03d8 Mon Sep 17 00:00:00 2001 From: Chengyu Liu Date: Fri, 2 Feb 2024 15:00:51 +0800 Subject: [PATCH] feat: recover it when the goroutine caused some panic (#2349) Co-authored-by: liuchengyu --- codis/pkg/topom/topom.go | 25 +++++---- codis/pkg/utils/runtime/goroutine.go | 84 ++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 12 deletions(-) create mode 100644 codis/pkg/utils/runtime/goroutine.go diff --git a/codis/pkg/topom/topom.go b/codis/pkg/topom/topom.go index 582452aa1d..67bcd7daca 100644 --- a/codis/pkg/topom/topom.go +++ b/codis/pkg/topom/topom.go @@ -22,6 +22,7 @@ import ( "pika/codis/v2/pkg/utils/math2" "pika/codis/v2/pkg/utils/redis" "pika/codis/v2/pkg/utils/rpc" + gxruntime "pika/codis/v2/pkg/utils/runtime" "pika/codis/v2/pkg/utils/sync2/atomic2" ) @@ -197,7 +198,7 @@ func (s *Topom) Start(routines bool) error { } // Check the status of all masters and slaves every 5 seconds - go func() { + gxruntime.GoUnterminated(func() { for !s.IsClosed() { if s.IsOnline() { w, _ := s.CheckMastersAndSlavesState(10 * time.Second) @@ -207,11 +208,11 @@ func (s *Topom) Start(routines bool) error { } time.Sleep(s.Config().SentinelCheckServerStateInterval.Duration()) } - }() + }, nil, true, 0) // Check the status of the pre-offline master every 1 second // to determine whether to automatically switch master and slave - go func() { + gxruntime.GoUnterminated(func() { for !s.IsClosed() { if s.IsOnline() { w, _ := s.CheckPreOffineMastersState(5 * time.Second) @@ -221,9 +222,9 @@ func (s *Topom) Start(routines bool) error { } time.Sleep(s.Config().SentinelCheckMasterFailoverInterval.Duration()) } - }() + }, nil, true, 0) - go func() { + gxruntime.GoUnterminated(func() { for !s.IsClosed() { if s.IsOnline() { w, _ := s.RefreshRedisStats(time.Second) @@ -233,9 +234,9 @@ func (s *Topom) Start(routines bool) error { } time.Sleep(time.Second) } - }() + }, nil, true, 0) - go func() { + gxruntime.GoUnterminated(func() { for !s.IsClosed() { if s.IsOnline() { w, _ := s.RefreshProxyStats(time.Second) @@ -245,9 +246,9 @@ func (s *Topom) Start(routines bool) error { } time.Sleep(time.Second) } - }() + }, nil, true, 0) - go func() { + gxruntime.GoUnterminated(func() { for !s.IsClosed() { if s.IsOnline() { if err := s.ProcessSlotAction(); err != nil { @@ -257,9 +258,9 @@ func (s *Topom) Start(routines bool) error { } time.Sleep(time.Second) } - }() + }, nil, true, 0) - go func() { + gxruntime.GoUnterminated(func() { for !s.IsClosed() { if s.IsOnline() { if err := s.ProcessSyncAction(); err != nil { @@ -269,7 +270,7 @@ func (s *Topom) Start(routines bool) error { } time.Sleep(time.Second) } - }() + }, nil, true, 0) return nil } diff --git a/codis/pkg/utils/runtime/goroutine.go b/codis/pkg/utils/runtime/goroutine.go new file mode 100644 index 0000000000..8de8a07bb8 --- /dev/null +++ b/codis/pkg/utils/runtime/goroutine.go @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gxruntime + +import ( + "fmt" + "os" + "runtime/debug" + "sync" + "time" +) + +// GoSafely wraps a `go func()` with recover() +func GoSafely(wg *sync.WaitGroup, ignoreRecover bool, handler func(), catchFunc func(r interface{})) { + if wg != nil { + wg.Add(1) + } + go func() { + defer func() { + if r := recover(); r != nil { + if !ignoreRecover { + fmt.Fprintf(os.Stderr, "%s goroutine panic: %v\n%s\n", + time.Now(), r, string(debug.Stack())) + } + if catchFunc != nil { + if wg != nil { + wg.Add(1) + } + go func() { + defer func() { + if p := recover(); p != nil { + if !ignoreRecover { + fmt.Fprintf(os.Stderr, "recover goroutine panic:%v\n%s\n", + p, string(debug.Stack())) + } + } + + if wg != nil { + wg.Done() + } + }() + catchFunc(r) + }() + } + } + if wg != nil { + wg.Done() + } + }() + handler() + }() +} + +// GoUnterminated is used for which goroutine wanna long live as its process. +// @period: sleep time duration after panic to defeat @handle panic so frequently. if it is not positive, +// +// the @handle will be invoked asap after panic. +func GoUnterminated(handle func(), wg *sync.WaitGroup, ignoreRecover bool, period time.Duration) { + GoSafely(wg, + ignoreRecover, + handle, + func(r interface{}) { + if period > 0 { + time.Sleep(period) + } + GoUnterminated(handle, wg, ignoreRecover, period) + }, + ) +}