Skip to content

Commit

Permalink
fix nodes panic during synchronization
Browse files Browse the repository at this point in the history
Signed-off-by: Fedor Partanskiy <[email protected]>
  • Loading branch information
pfi79 committed Dec 18, 2024
1 parent 0501b0d commit 8431891
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 18 deletions.
9 changes: 8 additions & 1 deletion orderer/consensus/smartbft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package smartbft

import (
"context"
"encoding/base64"
"fmt"
"reflect"
Expand Down Expand Up @@ -72,6 +73,7 @@ type BFTChain struct {
Logger *flogging.FabricLogger
WALDir string
consensus *smartbft.Consensus
syncCancel context.CancelFunc
support consensus.ConsenterSupport
ClusterService *cluster.ClusterService
verifier *Verifier
Expand Down Expand Up @@ -158,7 +160,9 @@ func NewChain(
c.RuntimeConfig.Store(rtc)

c.verifier = buildVerifier(cv, c.RuntimeConfig, support, requestInspector, policyManager)
c.consensus = bftSmartConsensusBuild(c, requestInspector, egressCommFactory, synchronizerFactory)
ctx, cancel := context.WithCancel(context.Background())
c.syncCancel = cancel
c.consensus = bftSmartConsensusBuild(c, requestInspector, egressCommFactory, synchronizerFactory, ctx)

// Setup communication with list of remotes notes for the new channel
c.Comm.Configure(c.support.ChannelID(), rtc.RemoteNodes)
Expand All @@ -177,6 +181,7 @@ func bftSmartConsensusBuild(
requestInspector *RequestInspector,
egressCommFactory EgressCommFactory,
synchronizerFactory SynchronizerFactory,
syncCtx context.Context,
) *smartbft.Consensus {
var err error

Expand Down Expand Up @@ -213,6 +218,7 @@ func bftSmartConsensusBuild(
c.support,
c.bccsp,
c.clusterDialer,
syncCtx,
)

channelDecorator := zap.String("channel", c.support.ChannelID())
Expand Down Expand Up @@ -462,6 +468,7 @@ func (c *BFTChain) Start() {
// Halt frees the resources which were allocated for this Chain.
func (c *BFTChain) Halt() {
c.Logger.Infof("Shutting down chain")
c.syncCancel()
c.consensus.Stop()
}

Expand Down
25 changes: 14 additions & 11 deletions orderer/consensus/smartbft/mocks/synchronizer_factory.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 18 additions & 2 deletions orderer/consensus/smartbft/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ SPDX-License-Identifier: Apache-2.0
package smartbft

import (
"context"
"sort"
"time"

"github.com/hyperledger-labs/SmartBFT/pkg/types"
"github.com/hyperledger-labs/SmartBFT/smartbftprotos"
Expand All @@ -34,6 +36,7 @@ type Synchronizer struct {
LocalConfigCluster localconfig.Cluster
BlockPullerFactory BlockPullerFactory
Logger *flogging.FabricLogger
Ctx context.Context
}

// Sync synchronizes blocks and returns the response
Expand Down Expand Up @@ -141,9 +144,22 @@ func (s *Synchronizer) synchronize() (*types.Decision, error) {
return nil, errors.Errorf("failed pulling block %d", seq)
}

startSeq := startHeight
// wait for the right height
ticker := time.NewTicker(50 * time.Millisecond)
loop:
for {
select {
case <-s.Ctx.Done():
break loop
case <-ticker.C:
if s.Support.Height() > lastPulledBlock.GetHeader().GetNumber() {
break loop
}
}
}

s.Logger.Infof("Finished synchronizing with cluster, fetched %d blocks, starting from block [%d], up until and including block [%d]",
blocksFetched, startSeq, lastPulledBlock.Header.Number)
blocksFetched, startHeight, lastPulledBlock.Header.Number)

viewMetadata, lastConfigSqn := s.getViewMetadataLastConfigSqnFromBlock(lastPulledBlock)

Expand Down
19 changes: 17 additions & 2 deletions orderer/consensus/smartbft/synchronizer_bft.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package smartbft

import (
"context"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -39,6 +40,7 @@ type BFTSynchronizer struct {
VerifierFactory VerifierFactory
BFTDelivererFactory BFTDelivererFactory
Logger *flogging.FabricLogger
Ctx context.Context

mutex sync.Mutex
syncBuff *SyncBuffer
Expand Down Expand Up @@ -277,9 +279,22 @@ func (s *BFTSynchronizer) getBlocksFromSyncBuffer(startHeight, targetHeight uint
return nil, errors.Errorf("failed pulling block %d", seq)
}

startSeq := startHeight
// wait for the right height
ticker := time.NewTicker(50 * time.Millisecond)
loop:
for {
select {
case <-s.Ctx.Done():
break loop
case <-ticker.C:
if s.Support.Height() > lastPulledBlock.GetHeader().GetNumber() {
break loop
}
}
}

s.Logger.Infof("Finished synchronizing with cluster, fetched %d blocks, starting from block [%d], up until and including block [%d]",
blocksFetched, startSeq, lastPulledBlock.Header.Number)
blocksFetched, startHeight, lastPulledBlock.Header.Number)

return lastPulledBlock, nil
}
7 changes: 7 additions & 0 deletions orderer/consensus/smartbft/synchronizer_bft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package smartbft_test

import (
"context"
"os"
"sync"
"testing"
Expand Down Expand Up @@ -106,6 +107,7 @@ func TestBFTSynchronizer(t *testing.T) {
LocalConfigCluster: localconfig.Cluster{},
BlockPullerFactory: bpf,
Logger: flogging.MustGetLogger("test.smartbft"),
Ctx: context.Background(),
}

require.NotNil(t, bftSynchronizer)
Expand Down Expand Up @@ -155,6 +157,7 @@ func TestBFTSynchronizer(t *testing.T) {
LocalConfigCluster: localconfig.Cluster{},
BlockPullerFactory: bpf,
Logger: flogging.MustGetLogger("test.smartbft"),
Ctx: context.Background(),
}

require.NotNil(t, bftSynchronizer)
Expand Down Expand Up @@ -201,6 +204,7 @@ func TestBFTSynchronizer(t *testing.T) {
LocalConfigCluster: localconfig.Cluster{},
BlockPullerFactory: bpf,
Logger: flogging.MustGetLogger("test.smartbft"),
Ctx: context.Background(),
}

require.NotNil(t, bftSynchronizer)
Expand Down Expand Up @@ -261,6 +265,7 @@ func TestBFTSynchronizer(t *testing.T) {
LocalConfigCluster: localconfig.Cluster{},
BlockPullerFactory: bpf,
Logger: flogging.MustGetLogger("test.smartbft"),
Ctx: context.Background(),
}

require.NotNil(t, bftSynchronizer)
Expand Down Expand Up @@ -370,6 +375,7 @@ func TestBFTSynchronizer(t *testing.T) {
VerifierFactory: fakeVerifierFactory,
BFTDelivererFactory: fakeBFTDelivererFactory,
Logger: flogging.MustGetLogger("test.smartbft"),
Ctx: context.Background(),
}

require.NotNil(t, bftSynchronizer)
Expand Down Expand Up @@ -506,6 +512,7 @@ func TestBFTSynchronizer(t *testing.T) {
VerifierFactory: fakeVerifierFactory,
BFTDelivererFactory: fakeBFTDelivererFactory,
Logger: flogging.MustGetLogger("test.smartbft"),
Ctx: context.Background(),
}
require.NotNil(t, bftSynchronizer)

Expand Down
9 changes: 8 additions & 1 deletion orderer/consensus/smartbft/synchronizer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
package smartbft

import (
"context"

"github.com/hyperledger-labs/SmartBFT/pkg/api"
"github.com/hyperledger-labs/SmartBFT/pkg/types"
"github.com/hyperledger/fabric-lib-go/bccsp"
Expand Down Expand Up @@ -35,6 +37,7 @@ type SynchronizerFactory interface {
support consensus.ConsenterSupport,
bccsp bccsp.BCCSP,
clusterDialer *cluster.PredicateDialer,
syncCtx context.Context,
) api.Synchronizer
}

Expand All @@ -50,8 +53,9 @@ func (*synchronizerCreator) CreateSynchronizer(
support consensus.ConsenterSupport,
bccsp bccsp.BCCSP,
clusterDialer *cluster.PredicateDialer,
syncCtx context.Context,
) api.Synchronizer {
return newSynchronizer(logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, bccsp, clusterDialer)
return newSynchronizer(logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, bccsp, clusterDialer, syncCtx)
}

// newSynchronizer creates a new synchronizer
Expand All @@ -65,6 +69,7 @@ func newSynchronizer(
support consensus.ConsenterSupport,
bccsp bccsp.BCCSP,
clusterDialer *cluster.PredicateDialer,
syncCtx context.Context,
) api.Synchronizer {
switch localConfigCluster.ReplicationPolicy {
case "consensus":
Expand All @@ -87,6 +92,7 @@ func newSynchronizer(
VerifierFactory: &verifierCreator{},
BFTDelivererFactory: &bftDelivererCreator{},
Logger: logger,
Ctx: syncCtx,
}
case "simple":
logger.Debug("Creating simple Synchronizer")
Expand All @@ -106,6 +112,7 @@ func newSynchronizer(
LatestConfig: func() (types.Configuration, []uint64) {
return rtc.BFTConfig, rtc.Nodes
},
Ctx: syncCtx,
}
default:
logger.Panicf("Unsupported Cluster.ReplicationPolicy: %s", localConfigCluster.ReplicationPolicy)
Expand Down
6 changes: 6 additions & 0 deletions orderer/consensus/smartbft/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package smartbft_test

import (
"context"
"os"
"testing"

Expand Down Expand Up @@ -80,6 +81,7 @@ func TestSynchronizerSync(t *testing.T) {
BlockPullerFactory: bpf,
Support: fakeCS,
OnCommit: noopUpdateLastHash,
Ctx: context.Background(),
}

d := syn.Sync()
Expand Down Expand Up @@ -143,6 +145,7 @@ func TestSynchronizerSync(t *testing.T) {
BlockPullerFactory: bpf,
Support: fakeCS,
OnCommit: noopUpdateLastHash,
Ctx: context.Background(),
}

d := syn.Sync()
Expand Down Expand Up @@ -205,6 +208,7 @@ func TestSynchronizerSync(t *testing.T) {
BlockPullerFactory: bpf,
Support: fakeCS,
OnCommit: noopUpdateLastHash,
Ctx: context.Background(),
}

d := syn.Sync()
Expand Down Expand Up @@ -266,6 +270,7 @@ func TestSynchronizerSync(t *testing.T) {
BlockPullerFactory: bpf,
Support: fakeCS,
OnCommit: noopUpdateLastHash,
Ctx: context.Background(),
}

d := syn.Sync()
Expand Down Expand Up @@ -329,6 +334,7 @@ func TestSynchronizerSync(t *testing.T) {
BlockPullerFactory: bpf,
Support: fakeCS,
OnCommit: noopUpdateLastHash,
Ctx: context.Background(),
}

d := syn.Sync()
Expand Down
2 changes: 1 addition & 1 deletion orderer/consensus/smartbft/util_network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func createBFTChainUsingMocks(t *testing.T, node *Node, configInfo *ConfigInfo)
}
}).Maybe()
synchronizerFactory := smartBFTMocks.NewSynchronizerFactory(t)
synchronizerFactory.EXPECT().CreateSynchronizer(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(synchronizerMock)
synchronizerFactory.EXPECT().CreateSynchronizer(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(synchronizerMock)

localConfigCluster := localconfig.Cluster{ReplicationPolicy: "consensus"}
clusterDialer := &cluster.PredicateDialer{}
Expand Down

0 comments on commit 8431891

Please sign in to comment.