Skip to content

Commit

Permalink
delete some unnecessary code
Browse files Browse the repository at this point in the history
Signed-off-by: Jiangnan Jia <[email protected]>
  • Loading branch information
jnan806 committed Dec 6, 2022
1 parent 9e12d3e commit 669aedf
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 35 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/opensergo/opensergo-go
go 1.14

require (
github.com/avast/retry-go/v4 v4.3.1 // indirect
github.com/avast/retry-go/v4 v4.3.1
github.com/envoyproxy/protoc-gen-validate v0.1.0
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/pkg/errors v0.9.1
Expand Down
9 changes: 0 additions & 9 deletions pkg/client/stream/stream_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ const (
PbStreamShutdown
)

// PbStreamObserverStatus is the stream status of OpenSergo Universal Transport Service
type PbStreamObserverStatus uint8

const (
PbStreamObserverInitial PbStreamObserverStatus = iota
PbStreamObserverRunning
PbStreamObserverStopped
)

type outsStream interface {
StreamStatus() PbStreamStatus
}
24 changes: 15 additions & 9 deletions pkg/client/stream/subscribeconfig_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (s *SubscribeConfigOutsStream) connectOutsStream() error {
subscribeConfigStream, err := s.transportServiceClient.SubscribeConfig(context.Background())
if err != nil {
logging.Error(err, "[OpenSergo SDK] SubscribeConfigStream of OpenSergoClient can not connect.")
s.pbStreamStatus.Store(PbStreamInterrupted)
return err
}
s.setSubscribeConfigPbStreamWrapper(&subscribeConfigPbStreamWrapper{pbStream: subscribeConfigStream})
Expand All @@ -120,12 +119,12 @@ func (s *SubscribeConfigOutsStream) keepAlive() {
logging.Info("start a keepalive() daemon goroutine to keep SubscribeConfigOutsStream of OpenSergoClient alive")
for {
status := s.StreamStatus()
if status == PbStreamInterrupted {
if status == PbStreamShutdown {
return
} else if status == PbStreamInterrupted {
logging.Info("Try to restart OpenSergoClient...")
// We do not handle error here because error has print in start()
_ = s.Start()
} else if status == PbStreamShutdown {
return
}
time.Sleep(time.Duration(10) * time.Second)
}
Expand All @@ -143,14 +142,21 @@ func (s *SubscribeConfigOutsStream) Start() error {
// keepalive OpensergoClient
go s.keepAlive()

var subscribeResponseChan chan *transportPb.SubscribeResponse
//var subscribeResponseChan chan *transportPb.SubscribeResponse
//if s.subscribeConfigPbStreamObserver() == nil {
// subscribeResponseChan = make(chan *transportPb.SubscribeResponse, 1024)
//} else {
// subscribeResponseChan = s.subscribeConfigPbStreamObserver().subscribeResponseChan
//}
//s.newAndStoreSubscribeConfigPbStreamObserver()
//s.subscribeConfigPbStreamObserver().start(subscribeResponseChan)
if s.subscribeConfigPbStreamObserver() == nil {
subscribeResponseChan = make(chan *transportPb.SubscribeResponse, 1024)
s.newAndStoreSubscribeConfigPbStreamObserver()
s.subscribeConfigPbStreamObserver().start()
} else {
subscribeResponseChan = s.subscribeConfigPbStreamObserver().subscribeResponseChan
s.subscribeConfigPbStreamObserver().setSubscribeConfigOutsStream(s)
s.subscribeConfigPbStreamObserver().start()
}
s.newAndStoreSubscribeConfigPbStreamObserver()
s.subscribeConfigPbStreamObserver().start(subscribeResponseChan)

logging.Info("[OpenSergo SDK] begin to subscribe config-data...")
// TODO: handle error inside the ForEach here.
Expand Down
20 changes: 4 additions & 16 deletions pkg/client/stream/subscribeconfig_streamobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package stream
import (
"fmt"
"reflect"
"sync/atomic"

"github.com/opensergo/opensergo-go/pkg/common/logging"
"github.com/opensergo/opensergo-go/pkg/configkind"
Expand All @@ -36,24 +35,22 @@ type subscribeConfigPbStreamObserver struct {
outsStream *SubscribeConfigOutsStream

subscribeResponseChan chan *transportPb.SubscribeResponse

scObserverStatus atomic.Value // type of value is bool
}

func newSubscribeConfigPbStreamObserver(outsStream *SubscribeConfigOutsStream) *subscribeConfigPbStreamObserver {
scObserver := &subscribeConfigPbStreamObserver{
outsStream: outsStream,
subscribeResponseChan: make(chan *transportPb.SubscribeResponse, 1024),
outsStream: outsStream,
}
scObserver.scObserverStatus.Store(PbStreamObserverInitial)
return scObserver
}

func (scObserver *subscribeConfigPbStreamObserver) setSubscribeConfigOutsStream(outsStream *SubscribeConfigOutsStream) {
scObserver.outsStream = outsStream
}

func (scObserver *subscribeConfigPbStreamObserver) start(subscribeResponseChan chan *transportPb.SubscribeResponse) {
scObserver.subscribeResponseChan = subscribeResponseChan
func (scObserver *subscribeConfigPbStreamObserver) start() {
//scObserver.subscribeResponseChan = subscribeResponseChan

logging.Info("[OpenSergo SDK] starting subscribeConfigPbStreamObserver.")

Expand All @@ -66,12 +63,7 @@ func (scObserver *subscribeConfigPbStreamObserver) start(subscribeResponseChan c
logging.Info("[OpenSergo SDK] started subscribeConfigPbStreamObserver with goroutines of doObserve() and doHandle().")
}

func (scObserver *subscribeConfigPbStreamObserver) pbStreamObserverStatus() PbStreamObserverStatus {
return scObserver.scObserverStatus.Load().(PbStreamObserverStatus)
}

func (scObserver *subscribeConfigPbStreamObserver) doObserve() {
scObserver.scObserverStatus.Store(PbStreamObserverRunning)
for {
pbStreamWrapper := scObserver.outsStream.subscribeConfigPbStreamWrapper()
if !reflect.DeepEqual(pbStreamWrapper, &subscribeConfigPbStreamWrapper{}) {
Expand Down Expand Up @@ -99,7 +91,6 @@ func (scObserver *subscribeConfigPbStreamObserver) observeReceive() (resp *trans
errRecover := errors.Errorf("%+v", r)
scObserver.outsStream.setSubscribeConfigPbStreamWrapper(&subscribeConfigPbStreamWrapper{})
scObserver.outsStream.pbStreamStatus.Store(PbStreamInterrupted)
scObserver.scObserverStatus.Store(PbStreamObserverStopped)
logging.Error(errRecover, "[OpenSergo SDK] interrupted gRpc Stream (SubscribeConfigStream) because of panic occurring when receive data from opensergo-control-plane.")
resp = nil
err = errRecover
Expand All @@ -117,7 +108,6 @@ func (scObserver *subscribeConfigPbStreamObserver) observeReceive() (resp *trans
logging.Error(errorLocal, "error when receive config-data.")
scObserver.outsStream.setSubscribeConfigPbStreamWrapper(&subscribeConfigPbStreamWrapper{})
scObserver.outsStream.pbStreamStatus.Store(PbStreamInterrupted)
scObserver.scObserverStatus.Store(PbStreamObserverStopped)
logging.Warn("[OpenSergo SDK] interrupted gRpc Stream (SubscribeConfigStream) because of error occurring when receive data from opensergo-control-plane.")
return nil, errorLocal
}
Expand All @@ -129,7 +119,6 @@ func (scObserver *subscribeConfigPbStreamObserver) observeReceive() (resp *trans
//
// Handle the subscribeResponse from the subscribeResponseChan channel in subscribeConfigPbStreamObserver
func (scObserver *subscribeConfigPbStreamObserver) doHandle() {
scObserver.scObserverStatus.Store(PbStreamObserverRunning)
for {
pbStreamWrapper := scObserver.outsStream.subscribeConfigPbStreamWrapper()
if !reflect.DeepEqual(pbStreamWrapper, &subscribeConfigPbStreamWrapper{}) {
Expand Down Expand Up @@ -175,7 +164,6 @@ func (scObserver *subscribeConfigPbStreamObserver) handleReceive(subscribeRespon
logging.Error(errRecover2, fmt.Sprintf("[OpenSergo SDK] [subscribeResponseId:%v] panic occurred when invoking doHandleReceive() for subscribe.", subscribeResponse.ResponseId))
scObserver.outsStream.setSubscribeConfigPbStreamWrapper(&subscribeConfigPbStreamWrapper{})
scObserver.outsStream.pbStreamStatus.Store(PbStreamInterrupted)
scObserver.scObserverStatus.Store(PbStreamObserverStopped)
logging.Warn("[OpenSergo SDK] interrupted gRpc Stream (SubscribeConfigStream) because of error occurring when receive data from opensergo-control-plane.")
err = errRecover2
}
Expand Down

0 comments on commit 669aedf

Please sign in to comment.