diff --git a/cmd/nutw/nutw.go b/cmd/nutw/nutw.go index cd8c759..c805622 100644 --- a/cmd/nutw/nutw.go +++ b/cmd/nutw/nutw.go @@ -322,6 +322,7 @@ func requestMint(amount uint64, mintURL string) error { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, os.Kill, syscall.SIGTERM) + done := make(chan struct{}) go func() { select { @@ -332,6 +333,8 @@ func requestMint(amount uint64, mintURL string) error { case <-sigChan: fmt.Println("\nterminating... after paying the invoice you can also redeem the ecash by doing 'nutw mint --invoice [invoice]'") os.Exit(0) + case <-done: + return } }() @@ -357,6 +360,7 @@ func requestMint(amount uint64, mintURL string) error { if err := subMananger.CloseSubscripton(subscription.SubId()); err != nil { return err } + done <- struct{}{} return nil } } diff --git a/wallet/submanager/submanager.go b/wallet/submanager/submanager.go index 993ab29..b444614 100644 --- a/wallet/submanager/submanager.go +++ b/wallet/submanager/submanager.go @@ -1,6 +1,7 @@ package submanager import ( + "context" "crypto/sha256" "encoding/hex" "encoding/json" @@ -22,12 +23,17 @@ var ( ) type SubscriptionManager struct { - wsConn *websocket.Conn - mu sync.RWMutex + wsConn *websocket.Conn + mu sync.RWMutex + // aggregate writes through this channel since there can only be one concurrent writer. + send chan json.RawMessage + subs map[string]*Subscription idCounter int supportedMethods []nut17.SupportedMethod - quit chan struct{} + + ctx context.Context + cancel context.CancelFunc } func NewSubscriptionManager(mint string) (*SubscriptionManager, error) { @@ -54,78 +60,92 @@ func NewSubscriptionManager(mint string) (*SubscriptionManager, error) { return nil, err } + ctx, cancel := context.WithCancel(context.Background()) + subManager := &SubscriptionManager{ wsConn: conn, + send: make(chan json.RawMessage), subs: make(map[string]*Subscription), idCounter: 0, supportedMethods: mintInfo.Nuts.Nut17.Supported, - quit: make(chan struct{}), + ctx: ctx, + cancel: cancel, } return subManager, nil } -// this should be run on a separate goroutine -// if an error is sent in the err channel, subscription manager should be closed -func (sm *SubscriptionManager) Run(errChannel chan error) { - if err := sm.handleWsMessages(); err != nil { - errChannel <- err - return - } -} +// Run should be called in a separate goroutine to run in the background. +// It will be stopped when Close is called on the SubscriptionManager +func (sm *SubscriptionManager) Run(errChan chan<- error) { + wsMessages := make(chan []byte) -func (sm *SubscriptionManager) Close() error { - sm.quit <- struct{}{} - if err := sm.wsConn.Close(); err != nil { - return err - } - return nil -} + go func() { + for { + _, msg, err := sm.wsConn.ReadMessage() + if err != nil { + errChan <- err + return + } + wsMessages <- msg + } + }() + + // write messages on the connection in this goroutine + go func() { + for { + select { + case msgToSend := <-sm.send: + sm.wsConn.WriteMessage(websocket.TextMessage, msgToSend) + case <-sm.ctx.Done(): + return + } + } + }() -func (sm *SubscriptionManager) handleWsMessages() error { for { select { - case <-sm.quit: - return nil - default: - _, msg, err := sm.wsConn.ReadMessage() - if err != nil { - return err + case msg, ok := <-wsMessages: + if !ok { + return } - go func() { - var notification nut17.WsNotification - if err := json.Unmarshal(msg, ¬ification); err == nil { - subId := notification.Params.SubId - // if subscription exists, send notification on that channel - sub, ok := sm.subs[subId] - if ok { - sub.notificationChannel <- notification - } + var notification nut17.WsNotification + if err := json.Unmarshal(msg, ¬ification); err == nil { + subId := notification.Params.SubId + // if subscription exists, send notification on that channel + sub, ok := sm.subs[subId] + if ok { + sub.notificationChannel <- notification + continue } + } - // if could not parse as WsNotification, try parsing as WsResponse - var response nut17.WsResponse - if err := json.Unmarshal(msg, &response); err != nil { - // if could not parse as WsResponse, try parsing as WsError - var wsError nut17.WsError - if err := json.Unmarshal(msg, &wsError); err == nil { - // if WsError, check if there is subscription with that id and send on err channel - for _, subscription := range sm.subs { - if subscription.id == wsError.Id { - subscription.errChannel <- wsError - } + // if could not parse as WsNotification, try parsing as WsResponse + var response nut17.WsResponse + if err := json.Unmarshal(msg, &response); err != nil { + // if could not parse as WsResponse, try parsing as WsError + var wsError nut17.WsError + if err := json.Unmarshal(msg, &wsError); err == nil { + // if WsError, check if there is subscription with that id and send on err channel + for _, subscription := range sm.subs { + if subscription.id == wsError.Id { + subscription.errChannel <- wsError + continue } } } + } - // if WsResponse, check if there is subscription with that id and send on response channel - for _, subscription := range sm.subs { - if subscription.id == response.Id { - subscription.responseChannel <- response - } + // if WsResponse, check if there is subscription with that id and send on response channel + for _, subscription := range sm.subs { + if subscription.id == response.Id { + subscription.responseChannel <- response + continue } - }() + } + case <-sm.ctx.Done(): + return } } } @@ -161,9 +181,11 @@ func (sm *SubscriptionManager) Subscribe(kind nut17.SubscriptionKind, filters [] Id: id, } - if err := sm.wsConn.WriteJSON(request); err != nil { - return nil, fmt.Errorf("could not send request for subscription: %v", err) + jsonRequest, err := json.Marshal(request) + if err != nil { + return nil, fmt.Errorf("could not make subscription request: %v", err) } + sm.send <- jsonRequest sub := &Subscription{ id: id, @@ -219,9 +241,11 @@ func (sm *SubscriptionManager) CloseSubscripton(subId string) error { sm.idCounter++ sm.mu.Unlock() - if err := sm.wsConn.WriteJSON(request); err != nil { - return fmt.Errorf("could not send unsubscribe request to mint: %v", err) + jsonRequest, err := json.Marshal(request) + if err != nil { + return fmt.Errorf("could not make unsubscription request: %v", err) } + sm.send <- jsonRequest sm.removeSubscription(subId) return nil @@ -238,6 +262,14 @@ func (sm *SubscriptionManager) IsSubscriptionKindSupported(kind nut17.Subscripti return false } +func (sm *SubscriptionManager) Close() error { + sm.cancel() + if err := sm.wsConn.Close(); err != nil { + return err + } + return nil +} + type Subscription struct { subId string id int