diff --git a/orderer/consensus/smartbft/chain.go b/orderer/consensus/smartbft/chain.go index e61ff292195..8d95b5c7d87 100644 --- a/orderer/consensus/smartbft/chain.go +++ b/orderer/consensus/smartbft/chain.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package smartbft import ( + "context" "encoding/base64" "fmt" "reflect" @@ -72,6 +73,7 @@ type BFTChain struct { Logger *flogging.FabricLogger WALDir string consensus *smartbft.Consensus + syncCancel context.CancelFunc support consensus.ConsenterSupport ClusterService *cluster.ClusterService verifier *Verifier @@ -158,7 +160,9 @@ func NewChain( c.RuntimeConfig.Store(rtc) c.verifier = buildVerifier(cv, c.RuntimeConfig, support, requestInspector, policyManager) - c.consensus = bftSmartConsensusBuild(c, requestInspector, egressCommFactory, synchronizerFactory) + ctx, cancel := context.WithCancel(context.Background()) + c.syncCancel = cancel + c.consensus = bftSmartConsensusBuild(c, requestInspector, egressCommFactory, synchronizerFactory, ctx) // Setup communication with list of remotes notes for the new channel c.Comm.Configure(c.support.ChannelID(), rtc.RemoteNodes) @@ -177,6 +181,7 @@ func bftSmartConsensusBuild( requestInspector *RequestInspector, egressCommFactory EgressCommFactory, synchronizerFactory SynchronizerFactory, + syncCtx context.Context, ) *smartbft.Consensus { var err error @@ -213,6 +218,7 @@ func bftSmartConsensusBuild( c.support, c.bccsp, c.clusterDialer, + syncCtx, ) channelDecorator := zap.String("channel", c.support.ChannelID()) @@ -462,6 +468,7 @@ func (c *BFTChain) Start() { // Halt frees the resources which were allocated for this Chain. func (c *BFTChain) Halt() { c.Logger.Infof("Shutting down chain") + c.syncCancel() c.consensus.Stop() } diff --git a/orderer/consensus/smartbft/mocks/synchronizer_factory.go b/orderer/consensus/smartbft/mocks/synchronizer_factory.go index 1953a5a5947..bdab7b10f1f 100644 --- a/orderer/consensus/smartbft/mocks/synchronizer_factory.go +++ b/orderer/consensus/smartbft/mocks/synchronizer_factory.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.1. DO NOT EDIT. +// Code generated by mockery v2.45.0. DO NOT EDIT. package mocks @@ -12,6 +12,8 @@ import ( consensus "github.com/hyperledger/fabric/orderer/consensus" + context "context" + flogging "github.com/hyperledger/fabric-lib-go/common/flogging" localconfig "github.com/hyperledger/fabric/orderer/common/localconfig" @@ -36,17 +38,17 @@ func (_m *SynchronizerFactory) EXPECT() *SynchronizerFactory_Expecter { return &SynchronizerFactory_Expecter{mock: &_m.Mock} } -// CreateSynchronizer provides a mock function with given fields: logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, _a7, clusterDialer -func (_m *SynchronizerFactory) CreateSynchronizer(logger *flogging.FabricLogger, localConfigCluster localconfig.Cluster, rtc smartbft.RuntimeConfig, blockToDecision func(*common.Block) *types.Decision, pruneCommittedRequests func(*common.Block), updateRuntimeConfig func(*common.Block) types.Reconfig, support consensus.ConsenterSupport, _a7 bccsp.BCCSP, clusterDialer *cluster.PredicateDialer) api.Synchronizer { - ret := _m.Called(logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, _a7, clusterDialer) +// CreateSynchronizer provides a mock function with given fields: logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, _a7, clusterDialer, syncCtx +func (_m *SynchronizerFactory) CreateSynchronizer(logger *flogging.FabricLogger, localConfigCluster localconfig.Cluster, rtc smartbft.RuntimeConfig, blockToDecision func(*common.Block) *types.Decision, pruneCommittedRequests func(*common.Block), updateRuntimeConfig func(*common.Block) types.Reconfig, support consensus.ConsenterSupport, _a7 bccsp.BCCSP, clusterDialer *cluster.PredicateDialer, syncCtx context.Context) api.Synchronizer { + ret := _m.Called(logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, _a7, clusterDialer, syncCtx) if len(ret) == 0 { panic("no return value specified for CreateSynchronizer") } var r0 api.Synchronizer - if rf, ok := ret.Get(0).(func(*flogging.FabricLogger, localconfig.Cluster, smartbft.RuntimeConfig, func(*common.Block) *types.Decision, func(*common.Block), func(*common.Block) types.Reconfig, consensus.ConsenterSupport, bccsp.BCCSP, *cluster.PredicateDialer) api.Synchronizer); ok { - r0 = rf(logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, _a7, clusterDialer) + if rf, ok := ret.Get(0).(func(*flogging.FabricLogger, localconfig.Cluster, smartbft.RuntimeConfig, func(*common.Block) *types.Decision, func(*common.Block), func(*common.Block) types.Reconfig, consensus.ConsenterSupport, bccsp.BCCSP, *cluster.PredicateDialer, context.Context) api.Synchronizer); ok { + r0 = rf(logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, _a7, clusterDialer, syncCtx) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(api.Synchronizer) @@ -71,13 +73,14 @@ type SynchronizerFactory_CreateSynchronizer_Call struct { // - support consensus.ConsenterSupport // - _a7 bccsp.BCCSP // - clusterDialer *cluster.PredicateDialer -func (_e *SynchronizerFactory_Expecter) CreateSynchronizer(logger interface{}, localConfigCluster interface{}, rtc interface{}, blockToDecision interface{}, pruneCommittedRequests interface{}, updateRuntimeConfig interface{}, support interface{}, _a7 interface{}, clusterDialer interface{}) *SynchronizerFactory_CreateSynchronizer_Call { - return &SynchronizerFactory_CreateSynchronizer_Call{Call: _e.mock.On("CreateSynchronizer", logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, _a7, clusterDialer)} +// - syncCtx context.Context +func (_e *SynchronizerFactory_Expecter) CreateSynchronizer(logger interface{}, localConfigCluster interface{}, rtc interface{}, blockToDecision interface{}, pruneCommittedRequests interface{}, updateRuntimeConfig interface{}, support interface{}, _a7 interface{}, clusterDialer interface{}, syncCtx interface{}) *SynchronizerFactory_CreateSynchronizer_Call { + return &SynchronizerFactory_CreateSynchronizer_Call{Call: _e.mock.On("CreateSynchronizer", logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, _a7, clusterDialer, syncCtx)} } -func (_c *SynchronizerFactory_CreateSynchronizer_Call) Run(run func(logger *flogging.FabricLogger, localConfigCluster localconfig.Cluster, rtc smartbft.RuntimeConfig, blockToDecision func(*common.Block) *types.Decision, pruneCommittedRequests func(*common.Block), updateRuntimeConfig func(*common.Block) types.Reconfig, support consensus.ConsenterSupport, _a7 bccsp.BCCSP, clusterDialer *cluster.PredicateDialer)) *SynchronizerFactory_CreateSynchronizer_Call { +func (_c *SynchronizerFactory_CreateSynchronizer_Call) Run(run func(logger *flogging.FabricLogger, localConfigCluster localconfig.Cluster, rtc smartbft.RuntimeConfig, blockToDecision func(*common.Block) *types.Decision, pruneCommittedRequests func(*common.Block), updateRuntimeConfig func(*common.Block) types.Reconfig, support consensus.ConsenterSupport, _a7 bccsp.BCCSP, clusterDialer *cluster.PredicateDialer, syncCtx context.Context)) *SynchronizerFactory_CreateSynchronizer_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*flogging.FabricLogger), args[1].(localconfig.Cluster), args[2].(smartbft.RuntimeConfig), args[3].(func(*common.Block) *types.Decision), args[4].(func(*common.Block)), args[5].(func(*common.Block) types.Reconfig), args[6].(consensus.ConsenterSupport), args[7].(bccsp.BCCSP), args[8].(*cluster.PredicateDialer)) + run(args[0].(*flogging.FabricLogger), args[1].(localconfig.Cluster), args[2].(smartbft.RuntimeConfig), args[3].(func(*common.Block) *types.Decision), args[4].(func(*common.Block)), args[5].(func(*common.Block) types.Reconfig), args[6].(consensus.ConsenterSupport), args[7].(bccsp.BCCSP), args[8].(*cluster.PredicateDialer), args[9].(context.Context)) }) return _c } @@ -87,7 +90,7 @@ func (_c *SynchronizerFactory_CreateSynchronizer_Call) Return(_a0 api.Synchroniz return _c } -func (_c *SynchronizerFactory_CreateSynchronizer_Call) RunAndReturn(run func(*flogging.FabricLogger, localconfig.Cluster, smartbft.RuntimeConfig, func(*common.Block) *types.Decision, func(*common.Block), func(*common.Block) types.Reconfig, consensus.ConsenterSupport, bccsp.BCCSP, *cluster.PredicateDialer) api.Synchronizer) *SynchronizerFactory_CreateSynchronizer_Call { +func (_c *SynchronizerFactory_CreateSynchronizer_Call) RunAndReturn(run func(*flogging.FabricLogger, localconfig.Cluster, smartbft.RuntimeConfig, func(*common.Block) *types.Decision, func(*common.Block), func(*common.Block) types.Reconfig, consensus.ConsenterSupport, bccsp.BCCSP, *cluster.PredicateDialer, context.Context) api.Synchronizer) *SynchronizerFactory_CreateSynchronizer_Call { _c.Call.Return(run) return _c } diff --git a/orderer/consensus/smartbft/synchronizer.go b/orderer/consensus/smartbft/synchronizer.go index 4775be13928..d06ff15712a 100644 --- a/orderer/consensus/smartbft/synchronizer.go +++ b/orderer/consensus/smartbft/synchronizer.go @@ -7,7 +7,9 @@ SPDX-License-Identifier: Apache-2.0 package smartbft import ( + "context" "sort" + "time" "github.com/hyperledger-labs/SmartBFT/pkg/types" "github.com/hyperledger-labs/SmartBFT/smartbftprotos" @@ -34,6 +36,7 @@ type Synchronizer struct { LocalConfigCluster localconfig.Cluster BlockPullerFactory BlockPullerFactory Logger *flogging.FabricLogger + Ctx context.Context } // Sync synchronizes blocks and returns the response @@ -141,9 +144,22 @@ func (s *Synchronizer) synchronize() (*types.Decision, error) { return nil, errors.Errorf("failed pulling block %d", seq) } - startSeq := startHeight + // wait for the right height + ticker := time.NewTicker(50 * time.Millisecond) +loop: + for { + select { + case <-s.Ctx.Done(): + break loop + case <-ticker.C: + if s.Support.Height() > lastPulledBlock.GetHeader().GetNumber() { + break loop + } + } + } + s.Logger.Infof("Finished synchronizing with cluster, fetched %d blocks, starting from block [%d], up until and including block [%d]", - blocksFetched, startSeq, lastPulledBlock.Header.Number) + blocksFetched, startHeight, lastPulledBlock.Header.Number) viewMetadata, lastConfigSqn := s.getViewMetadataLastConfigSqnFromBlock(lastPulledBlock) diff --git a/orderer/consensus/smartbft/synchronizer_bft.go b/orderer/consensus/smartbft/synchronizer_bft.go index b0476e20d9c..a367e382926 100644 --- a/orderer/consensus/smartbft/synchronizer_bft.go +++ b/orderer/consensus/smartbft/synchronizer_bft.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package smartbft import ( + "context" "sort" "sync" "time" @@ -39,6 +40,7 @@ type BFTSynchronizer struct { VerifierFactory VerifierFactory BFTDelivererFactory BFTDelivererFactory Logger *flogging.FabricLogger + Ctx context.Context mutex sync.Mutex syncBuff *SyncBuffer @@ -277,9 +279,22 @@ func (s *BFTSynchronizer) getBlocksFromSyncBuffer(startHeight, targetHeight uint return nil, errors.Errorf("failed pulling block %d", seq) } - startSeq := startHeight + // wait for the right height + ticker := time.NewTicker(50 * time.Millisecond) +loop: + for { + select { + case <-s.Ctx.Done(): + break loop + case <-ticker.C: + if s.Support.Height() > lastPulledBlock.GetHeader().GetNumber() { + break loop + } + } + } + s.Logger.Infof("Finished synchronizing with cluster, fetched %d blocks, starting from block [%d], up until and including block [%d]", - blocksFetched, startSeq, lastPulledBlock.Header.Number) + blocksFetched, startHeight, lastPulledBlock.Header.Number) return lastPulledBlock, nil } diff --git a/orderer/consensus/smartbft/synchronizer_bft_test.go b/orderer/consensus/smartbft/synchronizer_bft_test.go index 93585ce5980..c936aff2363 100644 --- a/orderer/consensus/smartbft/synchronizer_bft_test.go +++ b/orderer/consensus/smartbft/synchronizer_bft_test.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package smartbft_test import ( + "context" "os" "sync" "testing" @@ -106,6 +107,7 @@ func TestBFTSynchronizer(t *testing.T) { LocalConfigCluster: localconfig.Cluster{}, BlockPullerFactory: bpf, Logger: flogging.MustGetLogger("test.smartbft"), + Ctx: context.Background(), } require.NotNil(t, bftSynchronizer) @@ -155,6 +157,7 @@ func TestBFTSynchronizer(t *testing.T) { LocalConfigCluster: localconfig.Cluster{}, BlockPullerFactory: bpf, Logger: flogging.MustGetLogger("test.smartbft"), + Ctx: context.Background(), } require.NotNil(t, bftSynchronizer) @@ -201,6 +204,7 @@ func TestBFTSynchronizer(t *testing.T) { LocalConfigCluster: localconfig.Cluster{}, BlockPullerFactory: bpf, Logger: flogging.MustGetLogger("test.smartbft"), + Ctx: context.Background(), } require.NotNil(t, bftSynchronizer) @@ -261,6 +265,7 @@ func TestBFTSynchronizer(t *testing.T) { LocalConfigCluster: localconfig.Cluster{}, BlockPullerFactory: bpf, Logger: flogging.MustGetLogger("test.smartbft"), + Ctx: context.Background(), } require.NotNil(t, bftSynchronizer) @@ -370,6 +375,7 @@ func TestBFTSynchronizer(t *testing.T) { VerifierFactory: fakeVerifierFactory, BFTDelivererFactory: fakeBFTDelivererFactory, Logger: flogging.MustGetLogger("test.smartbft"), + Ctx: context.Background(), } require.NotNil(t, bftSynchronizer) @@ -506,6 +512,7 @@ func TestBFTSynchronizer(t *testing.T) { VerifierFactory: fakeVerifierFactory, BFTDelivererFactory: fakeBFTDelivererFactory, Logger: flogging.MustGetLogger("test.smartbft"), + Ctx: context.Background(), } require.NotNil(t, bftSynchronizer) diff --git a/orderer/consensus/smartbft/synchronizer_factory.go b/orderer/consensus/smartbft/synchronizer_factory.go index 222b95ffa99..1a771ab9e5e 100644 --- a/orderer/consensus/smartbft/synchronizer_factory.go +++ b/orderer/consensus/smartbft/synchronizer_factory.go @@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0 package smartbft import ( + "context" + "github.com/hyperledger-labs/SmartBFT/pkg/api" "github.com/hyperledger-labs/SmartBFT/pkg/types" "github.com/hyperledger/fabric-lib-go/bccsp" @@ -35,6 +37,7 @@ type SynchronizerFactory interface { support consensus.ConsenterSupport, bccsp bccsp.BCCSP, clusterDialer *cluster.PredicateDialer, + syncCtx context.Context, ) api.Synchronizer } @@ -50,8 +53,9 @@ func (*synchronizerCreator) CreateSynchronizer( support consensus.ConsenterSupport, bccsp bccsp.BCCSP, clusterDialer *cluster.PredicateDialer, + syncCtx context.Context, ) api.Synchronizer { - return newSynchronizer(logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, bccsp, clusterDialer) + return newSynchronizer(logger, localConfigCluster, rtc, blockToDecision, pruneCommittedRequests, updateRuntimeConfig, support, bccsp, clusterDialer, syncCtx) } // newSynchronizer creates a new synchronizer @@ -65,6 +69,7 @@ func newSynchronizer( support consensus.ConsenterSupport, bccsp bccsp.BCCSP, clusterDialer *cluster.PredicateDialer, + syncCtx context.Context, ) api.Synchronizer { switch localConfigCluster.ReplicationPolicy { case "consensus": @@ -87,6 +92,7 @@ func newSynchronizer( VerifierFactory: &verifierCreator{}, BFTDelivererFactory: &bftDelivererCreator{}, Logger: logger, + Ctx: syncCtx, } case "simple": logger.Debug("Creating simple Synchronizer") @@ -106,6 +112,7 @@ func newSynchronizer( LatestConfig: func() (types.Configuration, []uint64) { return rtc.BFTConfig, rtc.Nodes }, + Ctx: syncCtx, } default: logger.Panicf("Unsupported Cluster.ReplicationPolicy: %s", localConfigCluster.ReplicationPolicy) diff --git a/orderer/consensus/smartbft/synchronizer_test.go b/orderer/consensus/smartbft/synchronizer_test.go index e57922059ea..0c16b70c4de 100644 --- a/orderer/consensus/smartbft/synchronizer_test.go +++ b/orderer/consensus/smartbft/synchronizer_test.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package smartbft_test import ( + "context" "os" "testing" @@ -80,6 +81,7 @@ func TestSynchronizerSync(t *testing.T) { BlockPullerFactory: bpf, Support: fakeCS, OnCommit: noopUpdateLastHash, + Ctx: context.Background(), } d := syn.Sync() @@ -143,6 +145,7 @@ func TestSynchronizerSync(t *testing.T) { BlockPullerFactory: bpf, Support: fakeCS, OnCommit: noopUpdateLastHash, + Ctx: context.Background(), } d := syn.Sync() @@ -205,6 +208,7 @@ func TestSynchronizerSync(t *testing.T) { BlockPullerFactory: bpf, Support: fakeCS, OnCommit: noopUpdateLastHash, + Ctx: context.Background(), } d := syn.Sync() @@ -266,6 +270,7 @@ func TestSynchronizerSync(t *testing.T) { BlockPullerFactory: bpf, Support: fakeCS, OnCommit: noopUpdateLastHash, + Ctx: context.Background(), } d := syn.Sync() @@ -329,6 +334,7 @@ func TestSynchronizerSync(t *testing.T) { BlockPullerFactory: bpf, Support: fakeCS, OnCommit: noopUpdateLastHash, + Ctx: context.Background(), } d := syn.Sync() diff --git a/orderer/consensus/smartbft/util_network_test.go b/orderer/consensus/smartbft/util_network_test.go index 80175038320..5c8d8600bc6 100644 --- a/orderer/consensus/smartbft/util_network_test.go +++ b/orderer/consensus/smartbft/util_network_test.go @@ -609,7 +609,7 @@ func createBFTChainUsingMocks(t *testing.T, node *Node, configInfo *ConfigInfo) } }).Maybe() synchronizerFactory := smartBFTMocks.NewSynchronizerFactory(t) - synchronizerFactory.EXPECT().CreateSynchronizer(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(synchronizerMock) + synchronizerFactory.EXPECT().CreateSynchronizer(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(synchronizerMock) localConfigCluster := localconfig.Cluster{ReplicationPolicy: "consensus"} clusterDialer := &cluster.PredicateDialer{}