Skip to content

Commit

Permalink
mint - handle proof state subscriptions in websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
elnosh committed Jan 28, 2025
1 parent f7ed5be commit bfdf2b4
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 18 deletions.
31 changes: 29 additions & 2 deletions mint/mint.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ func (m *Mint) Swap(proofs cashu.Proofs, blindedMessages cashu.BlindedMessages)
errmsg := fmt.Sprintf("error invalidating proofs. Could not save proofs to db: %v", err)
return nil, cashu.BuildCashuError(errmsg, cashu.DBErrCode)
}
m.publishProofsStateChanges(proofs, nut07.Spent)

return blindedSignatures, nil
}
Expand Down Expand Up @@ -649,6 +650,7 @@ func (m *Mint) GetMeltQuoteState(ctx context.Context, quoteId string) (storage.M
errmsg := fmt.Sprintf("error updating melt quote state: %v", err)
return storage.MeltQuote{}, cashu.BuildCashuError(errmsg, cashu.DBErrCode)
}
m.publishProofsStateChanges(proofs, nut07.Spent)

case lightning.Failed:
m.logInfof("payment %v failed with error: %v. Setting melt quote '%v' to unpaid and removing proofs from pending",
Expand All @@ -660,7 +662,6 @@ func (m *Mint) GetMeltQuoteState(ctx context.Context, quoteId string) (storage.M
errmsg := fmt.Sprintf("error updating melt quote state: %v", err)
return storage.MeltQuote{}, cashu.BuildCashuError(errmsg, cashu.DBErrCode)
}

_, err = m.removePendingProofsForQuote(meltQuote.Id)
if err != nil {
errmsg := fmt.Sprintf("error removing pending proofs for quote: %v", err)
Expand Down Expand Up @@ -778,6 +779,7 @@ func (m *Mint) MeltTokens(ctx context.Context, meltTokensRequest nut05.PostMeltB
errmsg := fmt.Sprintf("error invalidating proofs. Could not save proofs to db: %v", err)
return storage.MeltQuote{}, cashu.BuildCashuError(errmsg, cashu.DBErrCode)
}
m.publishProofsStateChanges(proofs, nut07.Spent)
} else {
m.logInfof("attempting to pay invoice: %v", meltQuote.InvoiceRequest)
// if quote can't be settled internally, ask backend to make payment
Expand Down Expand Up @@ -924,6 +926,7 @@ func (m *Mint) settleProofs(Ys []string, proofs cashu.Proofs) error {
errmsg := fmt.Sprintf("error invalidating proofs. Could not save proofs to db: %v", err)
return cashu.BuildCashuError(errmsg, cashu.DBErrCode)
}
m.publishProofsStateChanges(proofs, nut07.Spent)

return nil
}
Expand Down Expand Up @@ -1448,7 +1451,14 @@ func (m *Mint) SetMintInfo(mintInfo MintInfo) {
Nut14: nut06.Supported{Supported: true},
Nut17: nut17.InfoSetting{
Supported: []nut17.SupportedMethod{
{Method: cashu.BOLT11_METHOD, Unit: cashu.Sat.String(), Commands: []string{nut17.Bolt11MintQuote.String()}},
{
Method: cashu.BOLT11_METHOD,
Unit: cashu.Sat.String(),
Commands: []string{
nut17.Bolt11MintQuote.String(),
nut17.ProofState.String(),
},
},
},
},
}
Expand Down Expand Up @@ -1509,3 +1519,20 @@ func (m Mint) RetrieveMintInfo() (nut06.MintInfo, error) {

return m.mintInfo, nil
}

func (m *Mint) publishProofsStateChanges(proofs cashu.Proofs, state nut07.State) {
proofStates := make([]nut07.ProofState, len(proofs))

for i, proof := range proofs {
Y, _ := crypto.HashToCurve([]byte(proof.Secret))
Yhex := hex.EncodeToString(Y.SerializeCompressed())
proofStates[i] = nut07.ProofState{Y: Yhex, State: state, Witness: proof.Witness}
}

stateResponse := nut07.PostCheckStateResponse{
States: proofStates,
}

proofStatesJson, _ := json.Marshal(&stateResponse)
m.publisher.Publish(PROOF_STATE_TOPIC, proofStatesJson)
}
188 changes: 172 additions & 16 deletions mint/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/elnosh/gonuts/cashu/nuts/nut04"
"github.com/elnosh/gonuts/cashu/nuts/nut07"
"github.com/elnosh/gonuts/cashu/nuts/nut17"
"github.com/elnosh/gonuts/mint/pubsub"
"github.com/elnosh/gonuts/mint/storage"
Expand Down Expand Up @@ -133,13 +134,17 @@ func (c *Client) readMessages() {
c.manager.mint.logErrorf("Got invalid websocket request. Sending error message: %v", wsErr)
jsonErrMsg, _ := json.Marshal(wsErr)
c.send <- jsonErrMsg
continue
}

c.manager.mint.logDebugf("received websocket message: %s", msg)

wsResponse, wsError := c.processRequest(wsRequest)
if wsError != nil {
jsonErrMsg, _ := json.Marshal(wsError)
c.manager.mint.logErrorf("Error processing websocket request. Sending error message: %v", wsError)
c.send <- jsonErrMsg
continue
}

// if successful request, send WsNotification with initial state
Expand Down Expand Up @@ -219,6 +224,7 @@ func (c *Client) subscriptionRequest(req nut17.WsRequest) (*nut17.WsResponse, *n
return nil, &wsErr
}

// check all quotes are valid before accepting subscription
quotes := make([]storage.MintQuote, len(quoteIds))
for i, quoteId := range quoteIds {
quote, err := c.manager.mint.db.GetMintQuote(quoteId)
Expand All @@ -230,11 +236,10 @@ func (c *Client) subscriptionRequest(req nut17.WsRequest) (*nut17.WsResponse, *n
}

mintQuotesClient := NewMintQuotesSubClient(req.Params.SubId, quotes, c.manager.mint.publisher)
c.manager.mint.logDebugf("adding new subscription of kind '%s' with sub id '%v'", req.Params.Kind, req.Params.SubId)
c.addSubscriptionClient(req.Params.SubId, mintQuotesClient)

// send initial quote state
go func() {
// send initial quote state
for _, quote := range quotes {
firstQuoteState := nut04.PostMintQuoteBolt11Response{
Quote: quote.Id,
Expand All @@ -251,20 +256,62 @@ func (c *Client) subscriptionRequest(req nut17.WsRequest) (*nut17.WsResponse, *n
Payload: jsonPayload,
},
}
jsonNotification, _ := json.Marshal(wsNotif)
jsonNotification, _ := json.Marshal(&wsNotif)
c.send <- jsonNotification
}
}()

go listenForSubscriptionUpdates(mintQuotesClient, c.send)

case nut17.Bolt11MeltQuote:
case nut17.ProofState:
Ys := req.Params.Filters
if len(Ys) > 100 {
wsErr := nut17.NewWsError(1000, "too many filters", req.Id)
return nil, &wsErr
}

// if any of the proofs are already spent, return errors since there can't be any
// other updates
usedProofs, _ := c.manager.mint.db.GetProofsUsed(Ys)
if len(usedProofs) > 0 {
wsErr := nut17.NewWsError(1000, "proofs in request are already spent", req.Id)
return nil, &wsErr
}
proofStatesClient := NewProofStatesSubClient(req.Params.SubId, Ys, c.manager.mint.publisher)
c.addSubscriptionClient(req.Params.SubId, proofStatesClient)

// send initial proof state
go func() {
proofStates := make([]nut07.ProofState, len(Ys))
for i, y := range Ys {
proofStates[i] = nut07.ProofState{Y: y, State: nut07.Unspent}
}
proofStateResponse := nut07.PostCheckStateResponse{
States: proofStates,
}

jsonPayload, _ := json.Marshal(&proofStateResponse)
wsNotif := nut17.WsNotification{
JsonRPC: nut17.JSONRPC_2,
Method: nut17.SUBSCRIBE,
Params: nut17.NotificationParams{
SubId: req.Params.SubId,
Payload: jsonPayload,
},
}
jsonNotification, _ := json.Marshal(&wsNotif)
c.send <- jsonNotification
}()

go listenForSubscriptionUpdates(proofStatesClient, c.send)

case nut17.Bolt11MeltQuote:
default:
wsErr := nut17.NewWsError(1000, "invalid request method", req.Id)
return nil, &wsErr
}

c.manager.mint.logDebugf("adding new subscription of kind '%s' with sub id '%v'", req.Params.Kind, req.Params.SubId)
return &nut17.WsResponse{
JsonRPC: nut17.JSONRPC_2,
Result: nut17.Result{
Expand Down Expand Up @@ -320,6 +367,9 @@ func (c *Client) close() error {
return nil
}

// listenForSubscriptionUpdates should be called in a goroutine to run in the background.
// It will listen on the notification channel for any updates on the subscription
// and send those to be written on the websocket connection
func listenForSubscriptionUpdates(subClient SubscriptionClient, send chan json.RawMessage) {
notifChan := subClient.Read()
for {
Expand All @@ -333,7 +383,12 @@ func listenForSubscriptionUpdates(subClient SubscriptionClient, send chan json.R
}
}

// SubscriptionClient interface for the different subscription kinds:
// - mint quotes
// - melt quotes
// - proof states
type SubscriptionClient interface {
// returns a channel to receive notifications for this subscription
Read() <-chan nut17.WsNotification
Context() context.Context
Close()
Expand Down Expand Up @@ -368,12 +423,16 @@ func NewMintQuotesSubClient(subId string, mintQuotes []storage.MintQuote, pubsub
}
}

func (quotesClient *MintQuotesSubClient) Read() <-chan nut17.WsNotification {
func (subClient *MintQuotesSubClient) Read() <-chan nut17.WsNotification {
notifChan := make(chan nut17.WsNotification)

// channel on which to receive db udpate events
messagesChan := quotesClient.subscriber.GetMessages()
messagesChan := subClient.subscriber.GetMessages()

// goroutine to listen for mint quote updates
// check if the update is related to a mint quote id this subscription is
// interested in and if it the state is different from the previous one recorded.
// if it is, it will send a notification on the channel
go func() {
for {
select {
Expand All @@ -385,11 +444,11 @@ func (quotesClient *MintQuotesSubClient) Read() <-chan nut17.WsNotification {
var mintQuote storage.MintQuote
json.Unmarshal(msg.Payload(), &mintQuote)

previousState, ok := quotesClient.quotes[mintQuote.Id]
previousState, ok := subClient.quotes[mintQuote.Id]
if ok {
// send notification if there was a state change
if previousState != mintQuote.State {
quotesClient.quotes[mintQuote.Id] = mintQuote.State
subClient.quotes[mintQuote.Id] = mintQuote.State

newQuoteState := nut04.PostMintQuoteBolt11Response{
Quote: mintQuote.Id,
Expand All @@ -403,15 +462,112 @@ func (quotesClient *MintQuotesSubClient) Read() <-chan nut17.WsNotification {
JsonRPC: nut17.JSONRPC_2,
Method: nut17.SUBSCRIBE,
Params: nut17.NotificationParams{
SubId: quotesClient.subId,
SubId: subClient.subId,
Payload: notificationPayload,
},
}
notifChan <- wsNotif
}
}

case <-quotesClient.ctx.Done():
case <-subClient.ctx.Done():
return
}
}
}()

return notifChan
}

func (subClient *MintQuotesSubClient) Context() context.Context {
return subClient.ctx
}

func (subClient *MintQuotesSubClient) Close() {
subClient.pubsub.Unsubscribe(subClient.subscriber, BOLT11_MINT_QUOTE_TOPIC)
subClient.subscriber.Close()
subClient.cancel()
}

type ProofStatesSubClient struct {
subId string
ctx context.Context
cancel context.CancelFunc

pubsub *pubsub.PubSub
subscriber *pubsub.Subscriber

proofs map[string]nut07.State
}

func NewProofStatesSubClient(subId string, Ys []string, pubsub *pubsub.PubSub) *ProofStatesSubClient {
ctx, cancel := context.WithCancel(context.Background())
subscriber := pubsub.Subscribe(PROOF_STATE_TOPIC)

proofs := make(map[string]nut07.State)
for _, y := range Ys {
proofs[y] = nut07.Unspent
}

return &ProofStatesSubClient{
pubsub: pubsub,
subId: subId,
ctx: ctx,
cancel: cancel,
proofs: proofs,
subscriber: subscriber,
}
}

func (subClient *ProofStatesSubClient) Read() <-chan nut17.WsNotification {
notifChan := make(chan nut17.WsNotification)

// channel on which to receive db udpate events
messagesChan := subClient.subscriber.GetMessages()

// check for updates on proofs related to this
// subscription
go func() {
for {
select {
case msg, ok := <-messagesChan:
if !ok {
return
}

var proofStates nut07.PostCheckStateResponse
json.Unmarshal(msg.Payload(), &proofStates)

newProofStates := make([]nut07.ProofState, 0, len(subClient.proofs))
for _, proofState := range proofStates.States {
previousState, ok := subClient.proofs[proofState.Y]
if ok {
if previousState != proofState.State {
subClient.proofs[proofState.Y] = proofState.State
newProofStates = append(newProofStates, proofState)
}
}
}

// send notification if there was a state change
if len(newProofStates) > 0 {
proofStatesResponse := nut07.PostCheckStateResponse{
States: newProofStates,
}

notificationPayload, _ := json.Marshal(&proofStatesResponse)
wsNotif := nut17.WsNotification{
JsonRPC: nut17.JSONRPC_2,
Method: nut17.SUBSCRIBE,
Params: nut17.NotificationParams{
SubId: subClient.subId,
Payload: notificationPayload,
},
}
notifChan <- wsNotif
}

case <-subClient.ctx.Done():
return
}
}
Expand All @@ -420,12 +576,12 @@ func (quotesClient *MintQuotesSubClient) Read() <-chan nut17.WsNotification {
return notifChan
}

func (quotesClient *MintQuotesSubClient) Context() context.Context {
return quotesClient.ctx
func (subClient *ProofStatesSubClient) Context() context.Context {
return subClient.ctx
}

func (quotesClient *MintQuotesSubClient) Close() {
quotesClient.pubsub.Unsubscribe(quotesClient.subscriber, BOLT11_MINT_QUOTE_TOPIC)
quotesClient.subscriber.Close()
quotesClient.cancel()
func (subClient *ProofStatesSubClient) Close() {
subClient.pubsub.Unsubscribe(subClient.subscriber, BOLT11_MELT_QUOTE_TOPIC)
subClient.subscriber.Close()
subClient.cancel()
}

0 comments on commit bfdf2b4

Please sign in to comment.