Skip to content

Commit

Permalink
Merge pull request #116 from kaleido-io/add_logs
Browse files Browse the repository at this point in the history
fix: race condition when reading and writing to inflight queue
  • Loading branch information
Chengxuan authored Apr 24, 2024
2 parents 5a5eed6 + da86f21 commit af4a223
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
39 changes: 27 additions & 12 deletions pkg/txhandler/simple/policyloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func (sth *simpleTransactionHandler) markInflightUpdate() {
}

func (sth *simpleTransactionHandler) updateInflightSet(ctx context.Context) bool {
sth.inflightRWMux.Lock()
defer sth.inflightRWMux.Unlock()

oldInflight := sth.inflight
sth.inflight = make([]*pendingState, 0, len(oldInflight))
Expand All @@ -118,6 +120,7 @@ func (sth *simpleTransactionHandler) updateInflightSet(ctx context.Context) bool

// If we are not at maximum, then query if there are more candidates now
spaces := sth.maxInFlight - len(sth.inflight)
log.L(sth.ctx).Tracef("Number of spaces left '%v'", spaces)
if spaces > 0 {
var after string
if len(sth.inflight) > 0 {
Expand Down Expand Up @@ -152,7 +155,7 @@ func (sth *simpleTransactionHandler) updateInflightSet(ctx context.Context) bool
}
newLen := len(sth.inflight)
if newLen > 0 {
log.L(ctx).Debugf("Inflight set updated len=%d head-seq=%s tail-seq=%s old-tail=%s", len(sth.inflight), sth.inflight[0].mtx.SequenceID, sth.inflight[newLen-1].mtx.SequenceID, after)
log.L(ctx).Debugf("Inflight set updated with %d additional transactions, length is now %d head-id:%s head-seq=%s tail-id:%s tail-seq=%s old-tail=%s", len(additional), len(sth.inflight), sth.inflight[0].mtx.ID, sth.inflight[0].mtx.SequenceID, sth.inflight[newLen-1].mtx.ID, sth.inflight[newLen-1].mtx.SequenceID, after)
}
}
sth.setTransactionInflightQueueMetrics(ctx)
Expand All @@ -161,6 +164,7 @@ func (sth *simpleTransactionHandler) updateInflightSet(ctx context.Context) bool
}

func (sth *simpleTransactionHandler) policyLoopCycle(ctx context.Context, inflightStale bool) {
log.L(ctx).Tracef("policyLoopCycle triggered inflightStatle=%v", inflightStale)

// Process any synchronous commands first - these might not be in our inflight set
sth.processPolicyAPIRequests(ctx)
Expand All @@ -170,9 +174,12 @@ func (sth *simpleTransactionHandler) policyLoopCycle(ctx context.Context, inflig
return
}
}
// Go through executing the policy engine against them

sth.inflightRWMux.RLock()
defer sth.inflightRWMux.RUnlock()
// Go through executing the policy engine against them
for _, pending := range sth.inflight {
log.L(ctx).Tracef("Executing policy against tx-id=%v", pending.mtx.ID)
err := sth.execPolicy(ctx, pending, nil)
if err != nil {
log.L(ctx).Errorf("Failed policy cycle transaction=%s operation=%s: %s", pending.mtx.TransactionHash, pending.mtx.ID, err)
Expand Down Expand Up @@ -204,13 +211,17 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex

for _, request := range requests {
var pending *pendingState

sth.inflightRWMux.RLock()
// If this transaction is in-flight, we use that record
for _, inflight := range sth.inflight {
if inflight != nil && inflight.mtx != nil && inflight.mtx.ID == request.txID {
pending = inflight
break
}
}
sth.inflightRWMux.RUnlock()
// If this transaction is in-flight, we use that record
if pending == nil {
mtx, err := sth.getTransactionByID(ctx, request.txID)
if err != nil {
Expand Down Expand Up @@ -246,7 +257,9 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex
func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncRequest *policyEngineAPIRequestType) (ctx *RunContext, err error) {

// Take a snapshot of the pending state under the lock
sth.mux.Lock()
pending.mux.Lock()
defer pending.mux.Unlock()

mtx := pending.mtx
ctx = &RunContext{
Context: baseCtx,
Expand All @@ -267,7 +280,6 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context
ctx.UpdateType = Update // might change to delete later
ctx.TXUpdates.DeleteRequested = mtx.DeleteRequested
}
sth.mux.Unlock()

// Process any state updates that were queued to us from notifications from the confirmation manager
if receiptNotify != nil {
Expand All @@ -279,11 +291,9 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context
sth.incTransactionOperationCounter(ctx, pending.mtx.Namespace(ctx), "received_receipt")

// Clear the notification (as long as no other came through)
sth.mux.Lock()
if pending.receiptNotify == receiptNotify {
pending.receiptNotify = nil
}
sth.mux.Unlock()
}

if confirmNotify != nil && ctx.Confirmations != nil {
Expand All @@ -297,11 +307,9 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context
}

// Clear the notification (as long as no other came through)
sth.mux.Lock()
if pending.confirmNotify == confirmNotify {
pending.confirmNotify = nil
}
sth.mux.Unlock()
}

return ctx, nil
Expand All @@ -318,6 +326,7 @@ func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending
completed := false
switch {
case ctx.Confirmed && ctx.SyncAction != ActionDelete:
log.L(sth.ctx).Tracef("Transaction '%s' confirmed", ctx.TX.ID)
completed = true
ctx.UpdateType = Update
if ctx.Receipt != nil && ctx.Receipt.Success {
Expand Down Expand Up @@ -481,44 +490,50 @@ func (sth *simpleTransactionHandler) policyEngineAPIRequest(ctx context.Context,

func (sth *simpleTransactionHandler) HandleTransactionConfirmations(ctx context.Context, txID string, notification *apitypes.ConfirmationsNotification) (err error) {
// Will be picked up on the next policy loop cycle
sth.inflightRWMux.RLock()
var pending *pendingState
for _, p := range sth.inflight {
if p != nil && p.mtx != nil && p.mtx.ID == txID {
pending = p
break
}
}
sth.inflightRWMux.RUnlock()
if pending == nil {
err = i18n.NewError(ctx, tmmsgs.MsgTransactionNotFound, txID)
return
}
sth.mux.Lock()
pending.mux.Lock()
pending.confirmed = notification.Confirmed
pending.confirmNotify = fftypes.Now()
pending.confirmations = notification
pending.mux.Unlock()
log.L(ctx).Infof("Received %d confirmations (resync=%t)", len(notification.Confirmations), notification.NewFork)
sth.mux.Unlock()

sth.markInflightUpdate()
return
}
func (sth *simpleTransactionHandler) HandleTransactionReceiptReceived(ctx context.Context, txID string, receipt *ffcapi.TransactionReceiptResponse) (err error) {
log.L(ctx).Tracef("Handle transaction receipt received %s", txID)
sth.inflightRWMux.RLock()
var pending *pendingState
for _, p := range sth.inflight {
if p != nil && p.mtx != nil && p.mtx.ID == txID {
pending = p
break
}
}
sth.inflightRWMux.RUnlock()
if pending == nil {
err = i18n.NewError(ctx, tmmsgs.MsgTransactionNotFound, txID)
return
}
pending.mux.Lock()
// Will be picked up on the next policy loop cycle - guaranteed to occur before Confirmed
sth.mux.Lock()
pending.receiptNotify = fftypes.Now()
pending.receipt = receipt
sth.mux.Unlock()
pending.mux.Unlock()
// Will be picked up on the next policy loop cycle - guaranteed to occur before Confirmed
sth.markInflightUpdate()
return
}
10 changes: 7 additions & 3 deletions pkg/txhandler/simple/simple_transaction_handler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -176,7 +176,8 @@ type simpleTransactionHandler struct {
policyLoopDone chan struct{}
inflightStale chan bool
inflightUpdate chan bool
mux sync.Mutex
mux sync.RWMutex
inflightRWMux sync.RWMutex
inflight []*pendingState
policyEngineAPIRequests []*policyEngineAPIRequest
maxInFlight int
Expand All @@ -195,6 +196,9 @@ type pendingState struct {
confirmNotify *fftypes.FFTime
remove bool
subStatus apitypes.TxSubStatus
// This mutex only works in a slice when the slice contains a pointer to this struct
// appends to a slice copy memory but when storing pointers it does not
mux sync.Mutex
}

type simplePolicyInfo struct {
Expand Down Expand Up @@ -344,8 +348,8 @@ func (sth *simpleTransactionHandler) createManagedTx(ctx context.Context, txID s
}

func (sth *simpleTransactionHandler) submitTX(ctx *RunContext) (reason ffcapi.ErrorReason, err error) {

mtx := ctx.TX

mtx.GasPrice, err = sth.getGasPrice(ctx, sth.toolkit.Connector)
if err != nil {
ctx.AddSubStatusAction(apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"error":"`+err.Error()+`"}`))
Expand Down

0 comments on commit af4a223

Please sign in to comment.