Skip to content

Commit

Permalink
handle data race
Browse files Browse the repository at this point in the history
Signed-off-by: Jiangnan Jia <[email protected]>
  • Loading branch information
jnan806 committed Feb 7, 2023
1 parent 6bfdc07 commit 9fca930
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 21 deletions.
7 changes: 3 additions & 4 deletions pkg/client/stream/subscribeconfig_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (s *SubscribeConfigOutsStream) connectOutsStream() error {
subscribeConfigStream, err := s.transportServiceClient.SubscribeConfig(context.Background())
if err != nil {
logging.Error(err, "[OpenSergo SDK] SubscribeConfigStream of OpenSergoClient can not connect.")
s.pbStreamStatus.Store(PbStreamInterrupted)
return err
}
s.setSubscribeConfigPbStreamWrapper(&subscribeConfigPbStreamWrapper{pbStream: subscribeConfigStream})
Expand All @@ -120,12 +119,12 @@ func (s *SubscribeConfigOutsStream) keepAlive() {
logging.Info("start a keepalive() daemon goroutine to keep SubscribeConfigOutsStream of OpenSergoClient alive")
for {
status := s.StreamStatus()
if status == PbStreamInterrupted {
if status == PbStreamShutdown {
return
} else if status == PbStreamInterrupted {
logging.Info("Try to restart OpenSergoClient...")
// We do not handle error here because error has print in start()
_ = s.Start()
} else if status == PbStreamShutdown {
return
}
time.Sleep(time.Duration(10) * time.Second)
}
Expand Down
17 changes: 0 additions & 17 deletions pkg/client/stream/subscribeconfig_streamobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package stream
import (
"fmt"
"reflect"
"sync/atomic"

"github.com/opensergo/opensergo-go/pkg/common/logging"
"github.com/opensergo/opensergo-go/pkg/configkind"
Expand All @@ -36,22 +35,15 @@ type subscribeConfigPbStreamObserver struct {
outsStream *SubscribeConfigOutsStream

subscribeResponseChan chan *transportPb.SubscribeResponse

scObserverStatus atomic.Value // type of value is bool
}

func newSubscribeConfigPbStreamObserver(outsStream *SubscribeConfigOutsStream) *subscribeConfigPbStreamObserver {
scObserver := &subscribeConfigPbStreamObserver{
outsStream: outsStream,
}
scObserver.scObserverStatus.Store(PbStreamObserverInitial)
return scObserver
}

func (scObserver *subscribeConfigPbStreamObserver) setSubscribeConfigOutsStream(outsStream *SubscribeConfigOutsStream) {
scObserver.outsStream = outsStream
}

func (scObserver *subscribeConfigPbStreamObserver) start(subscribeResponseChan chan *transportPb.SubscribeResponse) {
scObserver.subscribeResponseChan = subscribeResponseChan

Expand All @@ -66,12 +58,7 @@ func (scObserver *subscribeConfigPbStreamObserver) start(subscribeResponseChan c
logging.Info("[OpenSergo SDK] started subscribeConfigPbStreamObserver with goroutines of doObserve() and doHandle().")
}

func (scObserver *subscribeConfigPbStreamObserver) pbStreamObserverStatus() PbStreamObserverStatus {
return scObserver.scObserverStatus.Load().(PbStreamObserverStatus)
}

func (scObserver *subscribeConfigPbStreamObserver) doObserve() {
scObserver.scObserverStatus.Store(PbStreamObserverRunning)
for {
pbStreamWrapper := scObserver.outsStream.subscribeConfigPbStreamWrapper()
if !reflect.DeepEqual(pbStreamWrapper, &subscribeConfigPbStreamWrapper{}) {
Expand Down Expand Up @@ -99,7 +86,6 @@ func (scObserver *subscribeConfigPbStreamObserver) observeReceive() (resp *trans
errRecover := errors.Errorf("%+v", r)
scObserver.outsStream.setSubscribeConfigPbStreamWrapper(&subscribeConfigPbStreamWrapper{})
scObserver.outsStream.pbStreamStatus.Store(PbStreamInterrupted)
scObserver.scObserverStatus.Store(PbStreamObserverStopped)
logging.Error(errRecover, "[OpenSergo SDK] interrupted gRpc Stream (SubscribeConfigStream) because of panic occurring when receive data from opensergo-control-plane.")
resp = nil
err = errRecover
Expand All @@ -117,7 +103,6 @@ func (scObserver *subscribeConfigPbStreamObserver) observeReceive() (resp *trans
logging.Error(errorLocal, "error when receive config-data.")
scObserver.outsStream.setSubscribeConfigPbStreamWrapper(&subscribeConfigPbStreamWrapper{})
scObserver.outsStream.pbStreamStatus.Store(PbStreamInterrupted)
scObserver.scObserverStatus.Store(PbStreamObserverStopped)
logging.Warn("[OpenSergo SDK] interrupted gRpc Stream (SubscribeConfigStream) because of error occurring when receive data from opensergo-control-plane.")
return nil, errorLocal
}
Expand All @@ -129,7 +114,6 @@ func (scObserver *subscribeConfigPbStreamObserver) observeReceive() (resp *trans
//
// Handle the subscribeResponse from the subscribeResponseChan channel in subscribeConfigPbStreamObserver
func (scObserver *subscribeConfigPbStreamObserver) doHandle() {
scObserver.scObserverStatus.Store(PbStreamObserverRunning)
for {
pbStreamWrapper := scObserver.outsStream.subscribeConfigPbStreamWrapper()
if !reflect.DeepEqual(pbStreamWrapper, &subscribeConfigPbStreamWrapper{}) {
Expand Down Expand Up @@ -175,7 +159,6 @@ func (scObserver *subscribeConfigPbStreamObserver) handleReceive(subscribeRespon
logging.Error(errRecover2, fmt.Sprintf("[OpenSergo SDK] [subscribeResponseId:%v] panic occurred when invoking doHandleReceive() for subscribe.", subscribeResponse.ResponseId))
scObserver.outsStream.setSubscribeConfigPbStreamWrapper(&subscribeConfigPbStreamWrapper{})
scObserver.outsStream.pbStreamStatus.Store(PbStreamInterrupted)
scObserver.scObserverStatus.Store(PbStreamObserverStopped)
logging.Warn("[OpenSergo SDK] interrupted gRpc Stream (SubscribeConfigStream) because of error occurring when receive data from opensergo-control-plane.")
err = errRecover2
}
Expand Down

0 comments on commit 9fca930

Please sign in to comment.