From c10a1ceb32d626f39cba74267478fca37f0fe98d Mon Sep 17 00:00:00 2001 From: ttt733 Date: Mon, 12 Aug 2019 17:40:24 -0500 Subject: [PATCH 1/2] Implement re-subscription on websocket reconnection --- alpaca/stream.go | 12 +++++++++++- polygon/stream.go | 12 +++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/alpaca/stream.go b/alpaca/stream.go index e043955..883020a 100644 --- a/alpaca/stream.go +++ b/alpaca/stream.go @@ -53,6 +53,7 @@ func (s *Stream) Subscribe(channel string, handler func(msg interface{})) (err e s.handlers.Store(channel, handler) if err = s.sub(channel); err != nil { + s.handlers.Delete(channel) return } default: @@ -83,6 +84,15 @@ func (s *Stream) Close() error { return s.conn.Close() } +func (s *Stream) reconnect() { + s.conn = openSocket() + s.handlers.Range(func(key, value interface{}) bool { + // there should be no errors if we've previously successfully connected + s.sub(key.(string)) + return true + }) +} + func (s *Stream) start() { for { msg := ServerMsg{} @@ -111,7 +121,7 @@ func (s *Stream) start() { log.Printf("alpaca stream read error (%v)", err) } - s.conn = openSocket() + s.reconnect() } } } diff --git a/polygon/stream.go b/polygon/stream.go index b707550..fb240dd 100644 --- a/polygon/stream.go +++ b/polygon/stream.go @@ -58,6 +58,7 @@ func (s *Stream) Subscribe(channel string, handler func(msg interface{})) (err e s.handlers.Store(topic, handler) if err = s.sub(channel); err != nil { + s.handlers.Delete(topic) return } @@ -86,6 +87,15 @@ func (s *Stream) Close() error { return s.conn.Close() } +func (s *Stream) reconnect() { + s.conn = openSocket() + s.handlers.Range(func(key, value interface{}) bool { + // there should be no errors if we've previously successfully connected + s.sub(key.(string)) + return true + }) +} + func (s *Stream) handleError(err error) { if websocket.IsCloseError(err) { // if this was a graceful closure, don't reconnect @@ -96,7 +106,7 @@ func (s *Stream) handleError(err error) { log.Printf("polygon stream read error (%v)", err) } - s.conn = openSocket() + s.reconnect() } func (s *Stream) start() { From d430452475dcf17ac3689189be70d8c96377b5af Mon Sep 17 00:00:00 2001 From: ttt733 Date: Tue, 13 Aug 2019 09:34:24 -0500 Subject: [PATCH 2/2] Fix authentication issues and race condition in websocket streams --- alpaca/stream.go | 6 ++++++ polygon/stream.go | 40 ++++++++++++++++++++++++++-------------- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/alpaca/stream.go b/alpaca/stream.go index 883020a..ac832d3 100644 --- a/alpaca/stream.go +++ b/alpaca/stream.go @@ -85,7 +85,11 @@ func (s *Stream) Close() error { } func (s *Stream) reconnect() { + s.authenticated.Store(false) s.conn = openSocket() + if err := s.auth(); err != nil { + return + } s.handlers.Range(func(key, value interface{}) bool { // there should be no errors if we've previously successfully connected s.sub(key.(string)) @@ -186,6 +190,8 @@ func (s *Stream) auth() (err error) { return fmt.Errorf("failed to authorize alpaca stream") } + s.authenticated.Store(true) + return } diff --git a/polygon/stream.go b/polygon/stream.go index fb240dd..a34910c 100644 --- a/polygon/stream.go +++ b/polygon/stream.go @@ -40,12 +40,6 @@ func (s *Stream) Subscribe(channel string, handler func(msg interface{})) (err e s.conn = openSocket() } - // read connection message - msg := []PolgyonServerMsg{} - if err = s.conn.ReadJSON(&msg); err != nil { - return - } - if err = s.auth(); err != nil { return } @@ -54,11 +48,10 @@ func (s *Stream) Subscribe(channel string, handler func(msg interface{})) (err e go s.start() }) - topic := channel[:strings.IndexByte(channel, '.')] - s.handlers.Store(topic, handler) + s.handlers.Store(channel, handler) if err = s.sub(channel); err != nil { - s.handlers.Delete(topic) + s.handlers.Delete(channel) return } @@ -88,7 +81,11 @@ func (s *Stream) Close() error { } func (s *Stream) reconnect() { + s.authenticated.Store(false) s.conn = openSocket() + if err := s.auth(); err != nil { + return + } s.handlers.Range(func(key, value interface{}) bool { // there should be no errors if we've previously successfully connected s.sub(key.(string)) @@ -116,7 +113,13 @@ func (s *Stream) start() { if err := json.Unmarshal(arrayBytes, &msgArray); err == nil { for _, msg := range msgArray { msgMap := msg.(map[string]interface{}) - if v, ok := s.handlers.Load(msgMap["ev"]); ok { + channel := fmt.Sprintf("%s.%s", msgMap["ev"], msgMap["sym"]) + handler, ok := s.handlers.Load(channel) + if !ok { + // see if an "all symbols" handler was registered + handler, ok = s.handlers.Load(fmt.Sprintf("%s.*", msgMap["ev"])) + } + if ok { msgBytes, _ := json.Marshal(msg) switch msgMap["ev"] { case SecondAggs: @@ -124,7 +127,7 @@ func (s *Stream) start() { case MinuteAggs: var minuteAgg StreamAggregate if err := json.Unmarshal(msgBytes, &minuteAgg); err == nil { - h := v.(func(msg interface{})) + h := handler.(func(msg interface{})) h(minuteAgg) } else { s.handleError(err) @@ -132,7 +135,7 @@ func (s *Stream) start() { case Quotes: var quoteUpdate StreamQuote if err := json.Unmarshal(msgBytes, "eUpdate); err == nil { - h := v.(func(msg interface{})) + h := handler.(func(msg interface{})) h(quoteUpdate) } else { s.handleError(err) @@ -140,12 +143,14 @@ func (s *Stream) start() { case Trades: var tradeUpdate StreamTrade if err := json.Unmarshal(msgBytes, &tradeUpdate); err == nil { - h := v.(func(msg interface{})) + h := handler.(func(msg interface{})) h(tradeUpdate) } else { s.handleError(err) } } + } else { + } } } else { @@ -205,9 +210,11 @@ func (s *Stream) auth() (err error) { } if !strings.EqualFold(msg[0].Status, "auth_success") { - return fmt.Errorf("failed to authorize alpaca stream") + return fmt.Errorf("failed to authorize Polygon stream") } + s.authenticated.Store(true) + return } @@ -235,5 +242,10 @@ func openSocket() *websocket.Conn { if err != nil { panic(err) } + // read connection message + msg := []PolgyonServerMsg{} + if err = c.ReadJSON(&msg); err != nil { + panic(err) + } return c }