Skip to content

Commit

Permalink
client/ws: loadcandles and loadmarket cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
norwnd committed Mar 10, 2023
1 parent d8f3e90 commit df024e2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 28 deletions.
5 changes: 3 additions & 2 deletions client/core/bookie.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func (c *candleCache) addCandle(msgCandle *msgjson.Candle) (recent msgjson.Candl
// supplied close() callback.
type bookie struct {
*orderbook.OrderBook
dc *dexConnection
dc *dexConnection
// candleCaches is indexing candle caches by durations [5m,1h,24h].
candleCaches map[string]*candleCache
log dex.Logger

Expand Down Expand Up @@ -178,7 +179,7 @@ func (b *bookie) logEpochReport(note *msgjson.EpochReportNote) error {
if err != nil {
return err
}
if note.Candle.EndStamp == 0 {
if note.Candle.EndStamp == 0 { // should never happen
return fmt.Errorf("epoch report has zero-valued candle end stamp")
}

Expand Down
61 changes: 35 additions & 26 deletions client/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
)

type bookFeed struct {
// core.BookFeed is kept here for its Candles() only, and it will be closed
// once loop terminates.
core.BookFeed
loop *dex.StartStopWaiter
host string
Expand All @@ -55,10 +57,13 @@ func newWSClient(addr string, conn ws.Connection, hndlr func(msg *msgjson.Messag
}

func (cl *wsClient) shutDownFeed() {
cl.feedMtx.Lock()
defer cl.feedMtx.Unlock()

if cl.feed != nil {
cl.feed.loop.Stop()
cl.feed.loop.WaitForShutdown()
cl.feed = nil
cl.feed = nil // allows for reuse of the same ws connection later
}
}

Expand Down Expand Up @@ -159,9 +164,7 @@ func (s *Server) connect(ctx context.Context, conn ws.Connection, addr string) {
s.clientsMtx.Unlock()

defer func() {
cl.feedMtx.Lock()
cl.shutDownFeed()
cl.feedMtx.Unlock()

s.clientsMtx.Lock()
delete(s.clients, cl.cid)
Expand Down Expand Up @@ -279,8 +282,9 @@ out:
m.feed.Close()
}

// wsLoadMarket is the handler for the 'loadmarket' websocket route. Subscribes
// the client to the notification feed and sends the order book.
// wsLoadMarket is the handler for the 'loadmarket' websocket route. Sends current
// order book state to client cl and subscribe him to notification feed that he can
// use to keep order book state in sync with what dex server has in the book.
func wsLoadMarket(s *Server, cl *wsClient, msg *msgjson.Message) *msgjson.Error {
req := new(marketLoad)
err := json.Unmarshal(msg.Payload, req)
Expand All @@ -294,6 +298,16 @@ func wsLoadMarket(s *Server, cl *wsClient, msg *msgjson.Message) *msgjson.Error
}

func loadMarket(s *Server, cl *wsClient, req *marketLoad) (*bookFeed, *msgjson.Error) {
cl.feedMtx.Lock() // ensure we initialize market (sync server order book) just once
defer cl.feedMtx.Unlock()

// If market is initialized already, and client isn't trying to change market here
// in a meaningful way, then all is set already - just return.
if cl.feed != nil &&
(cl.feed.host == req.Host && cl.feed.base == req.Base && cl.feed.quote == req.Quote) {
return cl.feed, nil // already initialized
}

name, err := dex.MarketName(req.Base, req.Quote)
if err != nil {
errMsg := fmt.Sprintf("unknown market: %v", err)
Expand All @@ -308,9 +322,6 @@ func loadMarket(s *Server, cl *wsClient, req *marketLoad) (*bookFeed, *msgjson.E
return nil, msgjson.NewError(msgjson.RPCOrderBookError, errMsg)
}

cl.feedMtx.Lock()
defer cl.feedMtx.Unlock()
cl.shutDownFeed()
cl.feed = &bookFeed{
BookFeed: feed,
loop: newMarketSyncer(cl, feed, s.log.SubLogger(name)),
Expand All @@ -321,6 +332,12 @@ func loadMarket(s *Server, cl *wsClient, req *marketLoad) (*bookFeed, *msgjson.E
return cl.feed, nil
}

// wsLoadCandles is the handler for the 'loadcandles' websocket route. If client cl
// hasn't yet subscribed to order book (via 'loadmarket' websocket route) we'll have
// to subscribe him here (exactly as wsLoadMarket does) because otherwise we won't
// be able to provide him with candles from dex server (that's current implementation
// nuance). Then we can fetch candles from dex server and subscribe to candle update
// feed that will allow client cl to keep cadle state in sync with what server has.
func wsLoadCandles(s *Server, cl *wsClient, msg *msgjson.Message) *msgjson.Error {
req := new(candlesLoad)
err := json.Unmarshal(msg.Payload, req)
Expand All @@ -329,21 +346,16 @@ func wsLoadCandles(s *Server, cl *wsClient, msg *msgjson.Message) *msgjson.Error
s.log.Errorf(errMsg)
return msgjson.NewError(msgjson.RPCInternal, errMsg)
}
cl.feedMtx.RLock()
feed := cl.feed
cl.feedMtx.RUnlock()
// If market hasn't been initialized/chosen yet (client should do it in a separate
// 'loadmarket' request), or if client wants to change currently chosen market (requesting
// candles for market that's different from currently chosen implies that) - we can
// try to load it here.
if feed == nil ||
(feed.host != req.Host || feed.base != req.Base || feed.quote != req.Quote) {
var msgErr *msgjson.Error
feed, msgErr = loadMarket(s, cl, &req.marketLoad)
if msgErr != nil {
return msgErr
}

// Corresponding market might have not been initialized for client cl, we must do
// it here to be able to server candles. It could happen if client doesn't serialize
// 'loadcandles' request to be sent after he received successful reply for 'loadmarket',
// or if he didn't issue the latter at all (and want to rely on lazy initialization).
feed, msgErr := loadMarket(s, cl, &req.marketLoad)
if msgErr != nil {
return msgErr
}

err = feed.Candles(req.Dur)
if err != nil {
return msgjson.NewError(msgjson.RPCInternal, err.Error())
Expand All @@ -354,12 +366,9 @@ func wsLoadCandles(s *Server, cl *wsClient, msg *msgjson.Message) *msgjson.Error
// wsUnmarket is the handler for the 'unmarket' websocket route. This empty
// message is sent when the user leaves the markets page. This closes the feed,
// and potentially unsubscribes from orderbook with the server if there are no
// other consumers
// other consumers.
func wsUnmarket(_ *Server, cl *wsClient, _ *msgjson.Message) *msgjson.Error {
cl.feedMtx.Lock()
cl.shutDownFeed()
cl.feedMtx.Unlock()

return nil
}

Expand Down

0 comments on commit df024e2

Please sign in to comment.