diff --git a/pkg/txn/client/client.go b/pkg/txn/client/client.go index 749d606d032c6..f8a964899ff8e 100644 --- a/pkg/txn/client/client.go +++ b/pkg/txn/client/client.go @@ -747,6 +747,9 @@ func (client *txnClient) removeFromWaitActiveLocked(txnID []byte) bool { func (client *txnClient) waitMarkAllActiveAbortedLocked() { if client.mu.waitMarkAllActiveAbortedC != nil { - <-client.mu.waitMarkAllActiveAbortedC + c := client.mu.waitMarkAllActiveAbortedC + client.mu.Unlock() + <-c + client.mu.Lock() } } diff --git a/pkg/txn/client/client_test.go b/pkg/txn/client/client_test.go index a061f02ce3a61..5818c64ce0a84 100644 --- a/pkg/txn/client/client_test.go +++ b/pkg/txn/client/client_test.go @@ -251,3 +251,16 @@ func TestOpenTxnWithWaitPausedDisabled(t *testing.T) { require.Error(t, c.openTxn(op)) } + +func TestWaitAbortMarked(t *testing.T) { + c := make(chan struct{}) + tc := &txnClient{} + tc.mu.waitMarkAllActiveAbortedC = c + tc.mu.state = normal + tc.mu.activeTxns = map[string]*txnOperator{} + go func() { + close(c) + }() + op := &txnOperator{} + require.NoError(t, tc.openTxn(op)) +} diff --git a/pkg/txn/client/operator.go b/pkg/txn/client/operator.go index 32a06c1e245d2..bbefe0766781f 100644 --- a/pkg/txn/client/operator.go +++ b/pkg/txn/client/operator.go @@ -24,8 +24,6 @@ import ( "sync/atomic" "time" - "go.uber.org/zap" - "github.com/matrixorigin/matrixone/pkg/common/log" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/runtime" @@ -38,6 +36,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/txn/rpc" "github.com/matrixorigin/matrixone/pkg/txn/util" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" + "go.uber.org/zap" ) var (