Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and cleanup custodian events cache handling #756

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 42 additions & 16 deletions tapgarden/custodian.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,14 @@ func (c *Custodian) watchInboundAssets() {
// If we didn't find a proof, we'll launch a goroutine to use
// the ProofCourier to import the proof into our local DB.
c.Wg.Add(1)
go c.receiveProof(event.Addr.Tap, event.Outpoint)
go func() {
defer c.Wg.Done()

recErr := c.receiveProof(event.Addr.Tap, event.Outpoint)
if recErr != nil {
reportErr(recErr)
}
}()
}

// Read all on-chain transactions and make sure they are mapped to an
Expand Down Expand Up @@ -402,7 +409,17 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error {
// ProofCourier to import the proof into our
// local DB.
c.Wg.Add(1)
go c.receiveProof(event.Addr.Tap, op)
go func() {
defer c.Wg.Done()

recErr := c.receiveProof(
event.Addr.Tap, op,
)
if recErr != nil {
log.Errorf("Unable to receive "+
"proof: %v", recErr)
}
}()
}

continue
Expand Down Expand Up @@ -434,19 +451,23 @@ func (c *Custodian) inspectWalletTx(walletTx *lndclient.Transaction) error {
// launch a goroutine to use the ProofCourier to import the
// proof into our local DB.
c.Wg.Add(1)
go c.receiveProof(addr, op)
go func() {
defer c.Wg.Done()

recErr := c.receiveProof(addr, op)
if recErr != nil {
log.Errorf("Unable to receive proof: %v",
recErr)
}
}()
}

return nil
}

// receiveProof attempts to receive a proof for the given address and outpoint
// via the proof courier service.
//
// NOTE: This must be called as a goroutine.
func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) {
defer c.Wg.Done()

func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) error {
ctx, cancel := c.WithCtxQuitNoTimeout()
defer cancel()

Expand All @@ -466,9 +487,8 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) {
&addr.ProofCourierAddr, recipient,
)
if err != nil {
log.Errorf("Unable to initiate proof courier service handle: "+
"%v", err)
return
return fmt.Errorf("unable to initiate proof courier service "+
"handle: %w", err)
}

// Update courier handle events subscribers before attempting to
Expand All @@ -484,7 +504,7 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) {
select {
case <-time.After(c.cfg.ProofRetrievalDelay):
case <-ctx.Done():
return
return nil
}

// Attempt to receive proof via proof courier service.
Expand All @@ -496,8 +516,8 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) {
}
addrProof, err := courier.ReceiveProof(ctx, loc)
if err != nil {
log.Errorf("Unable to receive proof using courier: %v", err)
return
return fmt.Errorf("unable to receive proof using courier: %w",
err)
}

log.Debugf("Received proof for: script_key=%x, asset_id=%x",
Expand All @@ -511,9 +531,15 @@ func (c *Custodian) receiveProof(addr *address.Tap, op wire.OutPoint) {
ctx, headerVerifier, c.cfg.GroupVerifier, false, addrProof,
)
if err != nil {
log.Errorf("Unable to import proofs: %v", err)
return
return fmt.Errorf("unable to import proofs: %w", err)
}

// The proof is now verified and in our local archive. We will now
// finalize handling the proof like we would with any other newly
// received proof.
c.proofSubscription.NewItemCreated.ChanIn() <- addrProof.Blob

return nil
}

// mapToTapAddr attempts to match a transaction output to a Taproot Asset
Expand Down