diff --git a/api/info/service.go b/api/info/service.go index e4f8711a3b65..b234a0566dca 100644 --- a/api/info/service.go +++ b/api/info/service.go @@ -151,7 +151,7 @@ type IsBootstrappedResponse struct { // IsBootstrapped returns nil and sets [reply.IsBootstrapped] == true iff [args.Chain] exists and is done bootstrapping // Returns an error if the chain doesn't exist func (service *Info) IsBootstrapped(_ *http.Request, args *IsBootstrappedArgs, reply *IsBootstrappedResponse) error { - service.log.Info("Info: IsBootstrapped called") + service.log.Info("Info: IsBootstrapped called with chain: %s", args.Chain) if args.Chain == "" { return fmt.Errorf("argument 'chain' not given") } diff --git a/chains/awaiter.go b/chains/awaiter.go deleted file mode 100644 index 548d2088b780..000000000000 --- a/chains/awaiter.go +++ /dev/null @@ -1,58 +0,0 @@ -// (c) 2019-2020, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package chains - -import ( - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/validators" - "github.com/ava-labs/avalanchego/utils/math" -) - -type awaitConnected struct { - connected func() - vdrs validators.Set - reqWeight uint64 - weight uint64 -} - -// NewAwaiter returns a new handler that will await for a sufficient number of -// validators to be connected. -func NewAwaiter(vdrs validators.Set, reqWeight uint64, connected func()) validators.Connector { - return &awaitConnected{ - vdrs: vdrs, - reqWeight: reqWeight, - connected: connected, - } -} - -func (a *awaitConnected) Connected(vdrID ids.ShortID) bool { - weight, ok := a.vdrs.GetWeight(vdrID) - if !ok { - return false - } - weight, err := math.Add64(weight, a.weight) - a.weight = weight - // If the error is non-nil, then an overflow error has occurred such that - // the required weight was surpassed. As per network.Handler interface, - // this handler should be removed and never called again after returning true. - if err == nil && a.weight < a.reqWeight { - return false - } - - go a.connected() - return true -} - -func (a *awaitConnected) Disconnected(vdrID ids.ShortID) bool { - if weight, ok := a.vdrs.GetWeight(vdrID); ok { - // TODO: Account for weight changes in a more robust manner. - - // Sub64 should rarely error since only validators that have added their - // weight can become disconnected. Because it is possible that there are - // changes to the validators set, we utilize that Sub64 returns 0 on - // error. - a.weight, _ = math.Sub64(a.weight, weight) - } - return false -} diff --git a/chains/awaiter_test.go b/chains/awaiter_test.go deleted file mode 100644 index ba127142dcf2..000000000000 --- a/chains/awaiter_test.go +++ /dev/null @@ -1,44 +0,0 @@ -// (c) 2019-2020, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package chains - -import ( - "testing" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/validators" -) - -func TestAwaiter(t *testing.T) { - vdrID0 := ids.NewShortID([20]byte{0}) - vdrID1 := ids.NewShortID([20]byte{1}) - vdrID2 := ids.NewShortID([20]byte{2}) - vdrID3 := ids.NewShortID([20]byte{3}) - - s := validators.NewSet() - s.AddWeight(vdrID0, 1) - s.AddWeight(vdrID1, 1) - s.AddWeight(vdrID3, 1) - - called := make(chan struct{}, 1) - aw := NewAwaiter(s, 3, func() { - called <- struct{}{} - }) - - if aw.Connected(vdrID0) { - t.Fatalf("shouldn't have finished handling yet") - } else if aw.Connected(vdrID1) { - t.Fatalf("shouldn't have finished handling yet") - } else if aw.Disconnected(vdrID1) { - t.Fatalf("shouldn't have finished handling yet") - } else if aw.Connected(vdrID1) { - t.Fatalf("shouldn't have finished handling yet") - } else if aw.Connected(vdrID2) { - t.Fatalf("shouldn't have finished handling yet") - } else if !aw.Connected(vdrID3) { - t.Fatalf("should have finished handling") - } - - <-called -} diff --git a/chains/manager.go b/chains/manager.go index 3866e2d531cb..34fd931f1050 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -351,28 +351,6 @@ func (m *manager) buildChain(chainParams ChainParameters) (*chain, error) { ctx.Log.Error("Chain with ID: %s was shutdown due to a panic", chainParams.ID) }) } - - reqWeight := (3*bootstrapWeight + 3) / 4 - if reqWeight == 0 { - if err := chain.Engine.Startup(); err != nil { - chain.Handler.Shutdown() - return nil, fmt.Errorf("failed to start consensus engine: %w", err) - } - } else { - awaiter := NewAwaiter(beacons, reqWeight, func() { - ctx.Lock.Lock() - defer ctx.Lock.Unlock() - if err := chain.Engine.Startup(); err != nil { - chain.Ctx.Log.Error("failed to start consensus engine: %s", err) - chain.Handler.Shutdown() - } - }) - go m.Net.RegisterConnector(awaiter) - } - - if connector, ok := vm.(validators.Connector); ok { - go m.Net.RegisterConnector(connector) - } return chain, nil } @@ -439,11 +417,12 @@ func (m *manager) createAvalancheChain( if err := engine.Initialize(aveng.Config{ Config: avbootstrap.Config{ Config: common.Config{ - Ctx: ctx, - Validators: validators, - Beacons: beacons, - Alpha: bootstrapWeight/2 + 1, // must be > 50% - Sender: &sender, + Ctx: ctx, + Validators: validators, + Beacons: beacons, + StartupAlpha: (3*bootstrapWeight + 3) / 4, + Alpha: bootstrapWeight/2 + 1, // must be > 50% + Sender: &sender, }, VtxBlocked: vtxBlocker, TxBlocked: txBlocker, @@ -519,11 +498,12 @@ func (m *manager) createSnowmanChain( if err := engine.Initialize(smeng.Config{ Config: smbootstrap.Config{ Config: common.Config{ - Ctx: ctx, - Validators: validators, - Beacons: beacons, - Alpha: bootstrapWeight/2 + 1, // must be > 50% - Sender: &sender, + Ctx: ctx, + Validators: validators, + Beacons: beacons, + StartupAlpha: (3*bootstrapWeight + 3) / 4, + Alpha: bootstrapWeight/2 + 1, // must be > 50% + Sender: &sender, }, Blocked: blocked, VM: vm, diff --git a/main/params.go b/main/params.go index 8a6bf472ceca..9827f7994bce 100644 --- a/main/params.go +++ b/main/params.go @@ -194,7 +194,7 @@ func init() { fs.BoolVar(&Config.HTTPSEnabled, "http-tls-enabled", false, "Upgrade the HTTP server to HTTPs") fs.StringVar(&Config.HTTPSKeyFile, "http-tls-key-file", "", "TLS private key file for the HTTPs server") fs.StringVar(&Config.HTTPSCertFile, "http-tls-cert-file", "", "TLS certificate file for the HTTPs server") - fs.BoolVar(&Config.APIRequireAuthToken, "api-require-auth", false, "Require authorization token to call HTTP APIs") + fs.BoolVar(&Config.APIRequireAuthToken, "api-auth-required", false, "Require authorization token to call HTTP APIs") fs.StringVar(&Config.APIAuthPassword, "api-auth-password", "", "Password used to create/validate API authorization tokens. Can be changed via API call.") // Bootstrapping: @@ -427,7 +427,7 @@ func init() { Config.HTTPPort = uint16(*httpPort) if Config.APIRequireAuthToken { if Config.APIAuthPassword == "" { - errs.Add(errors.New("api-auth-password must be provided if api-require-auth is true")) + errs.Add(errors.New("api-auth-password must be provided if api-auth-required is true")) return } if !password.SufficientlyStrong(Config.APIAuthPassword, password.OK) { diff --git a/network/network.go b/network/network.go index a4867ff51081..f4ed5d6ed978 100644 --- a/network/network.go +++ b/network/network.go @@ -72,12 +72,6 @@ type Network interface { // IP. Track(ip utils.IPDesc) - // Register a new connector that is called whenever a peer is connected to - // or disconnected from. If the connector returns true, then it will never - // be called again. Thread safety must be managed internally in the network. - // The handler will initially be called with this local node's ID. - RegisterConnector(h validators.Connector) - // Returns the description of the nodes this network is currently connected // to externally. Thread safety must be managed internally to the network. Peers() []PeerID @@ -138,9 +132,8 @@ type network struct { connectedIPs map[string]struct{} retryDelay map[string]time.Duration // TODO: bound the size of [myIPs] to avoid DoS. LRU caching would be ideal - myIPs map[string]struct{} // set of IPs that resulted in my ID. - peers map[[20]byte]*peer - handlers []validators.Connector + myIPs map[string]struct{} // set of IPs that resulted in my ID. + peers map[[20]byte]*peer } // NewDefaultNetwork returns a new Network implementation with the provided @@ -288,6 +281,10 @@ func (n *network) GetAcceptedFrontier(validatorIDs ids.ShortSet, chainID ids.ID, sent = peer.send(msg) } if !sent { + n.log.Debug("failed to send GetAcceptedFrontier(%s, %s, %d)", + vID, + chainID, + requestID) n.executor.Add(func() { n.router.GetAcceptedFrontierFailed(vID, chainID, requestID) }) n.getAcceptedFrontier.numFailed.Inc() } else { @@ -655,24 +652,6 @@ func (n *network) Dispatch() error { } } -// RegisterConnector implements the Network interface -func (n *network) RegisterConnector(h validators.Connector) { - n.stateLock.Lock() - defer n.stateLock.Unlock() - - if h.Connected(n.id) { - return - } - for _, peer := range n.peers { - if peer.connected { - if h.Connected(peer.id) { - return - } - } - } - n.handlers = append(n.handlers, h) -} - // IPs implements the Network interface func (n *network) Peers() []PeerID { n.stateLock.Lock() @@ -1039,15 +1018,7 @@ func (n *network) connected(p *peer) { n.connectedIPs[str] = struct{}{} } - for i := 0; i < len(n.handlers); { - if n.handlers[i].Connected(p.id) { - newLen := len(n.handlers) - 1 - n.handlers[i] = n.handlers[newLen] // remove the current handler - n.handlers = n.handlers[:newLen] - } else { - i++ - } - } + n.router.Connected(p.id) } // assumes the stateLock is held when called @@ -1068,14 +1039,6 @@ func (n *network) disconnected(p *peer) { } if p.connected { - for i := 0; i < len(n.handlers); { - if n.handlers[i].Disconnected(p.id) { - newLen := len(n.handlers) - 1 - n.handlers[i] = n.handlers[newLen] // remove the current handler - n.handlers = n.handlers[:newLen] - } else { - i++ - } - } + n.router.Disconnected(p.id) } } diff --git a/network/network_test.go b/network/network_test.go index 468b6838597e..790896cc78fe 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -141,15 +141,20 @@ func (c *testConn) SetReadDeadline(time.Time) error { return nil } func (c *testConn) SetWriteDeadline(time.Time) error { return nil } type testHandler struct { - connected func(ids.ShortID) bool - disconnected func(ids.ShortID) bool + router.Router + connected func(ids.ShortID) + disconnected func(ids.ShortID) } -func (h *testHandler) Connected(id ids.ShortID) bool { - return h.connected != nil && h.connected(id) +func (h *testHandler) Connected(id ids.ShortID) { + if h.connected != nil { + h.connected(id) + } } -func (h *testHandler) Disconnected(id ids.ShortID) bool { - return h.disconnected != nil && h.disconnected(id) +func (h *testHandler) Disconnected(id ids.ShortID) { + if h.disconnected != nil { + h.disconnected(id) + } } func TestNewDefaultNetwork(t *testing.T) { @@ -266,7 +271,29 @@ func TestEstablishConnection(t *testing.T) { clientUpgrader := NewIPUpgrader() vdrs := validators.NewSet() - handler := router.Router(nil) + + var ( + wg0 sync.WaitGroup + wg1 sync.WaitGroup + ) + wg0.Add(1) + wg1.Add(1) + + handler0 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id0) { + wg0.Done() + } + }, + } + + handler1 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id1) { + wg1.Done() + } + }, + } net0 := NewDefaultNetwork( prometheus.NewRegistry(), @@ -282,7 +309,7 @@ func TestEstablishConnection(t *testing.T) { clientUpgrader, vdrs, vdrs, - handler, + handler0, ) assert.NotNil(t, net0) @@ -300,37 +327,10 @@ func TestEstablishConnection(t *testing.T) { clientUpgrader, vdrs, vdrs, - handler, + handler1, ) assert.NotNil(t, net1) - var ( - wg0 sync.WaitGroup - wg1 sync.WaitGroup - ) - wg0.Add(1) - wg1.Add(1) - - h0 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id0) { - wg0.Done() - } - return false - }, - } - h1 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id1) { - wg1.Done() - } - return false - }, - } - - net0.RegisterConnector(h0) - net1.RegisterConnector(h1) - net0.Track(ip1) go func() { @@ -407,7 +407,29 @@ func TestDoubleTrack(t *testing.T) { clientUpgrader := NewIPUpgrader() vdrs := validators.NewSet() - handler := router.Router(nil) + + var ( + wg0 sync.WaitGroup + wg1 sync.WaitGroup + ) + wg0.Add(1) + wg1.Add(1) + + handler0 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id0) { + wg0.Done() + } + }, + } + + handler1 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id1) { + wg1.Done() + } + }, + } net0 := NewDefaultNetwork( prometheus.NewRegistry(), @@ -423,7 +445,7 @@ func TestDoubleTrack(t *testing.T) { clientUpgrader, vdrs, vdrs, - handler, + handler0, ) assert.NotNil(t, net0) @@ -441,37 +463,10 @@ func TestDoubleTrack(t *testing.T) { clientUpgrader, vdrs, vdrs, - handler, + handler1, ) assert.NotNil(t, net1) - var ( - wg0 sync.WaitGroup - wg1 sync.WaitGroup - ) - wg0.Add(1) - wg1.Add(1) - - h0 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id0) { - wg0.Done() - } - return false - }, - } - h1 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id1) { - wg1.Done() - } - return false - }, - } - - net0.RegisterConnector(h0) - net1.RegisterConnector(h1) - net0.Track(ip1) net0.Track(ip1) @@ -549,7 +544,29 @@ func TestDoubleClose(t *testing.T) { clientUpgrader := NewIPUpgrader() vdrs := validators.NewSet() - handler := router.Router(nil) + + var ( + wg0 sync.WaitGroup + wg1 sync.WaitGroup + ) + wg0.Add(1) + wg1.Add(1) + + handler0 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id0) { + wg0.Done() + } + }, + } + + handler1 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id1) { + wg1.Done() + } + }, + } net0 := NewDefaultNetwork( prometheus.NewRegistry(), @@ -565,7 +582,7 @@ func TestDoubleClose(t *testing.T) { clientUpgrader, vdrs, vdrs, - handler, + handler0, ) assert.NotNil(t, net0) @@ -583,37 +600,10 @@ func TestDoubleClose(t *testing.T) { clientUpgrader, vdrs, vdrs, - handler, + handler1, ) assert.NotNil(t, net1) - var ( - wg0 sync.WaitGroup - wg1 sync.WaitGroup - ) - wg0.Add(1) - wg1.Add(1) - - h0 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id0) { - wg0.Done() - } - return false - }, - } - h1 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id1) { - wg1.Done() - } - return false - }, - } - - net0.RegisterConnector(h0) - net1.RegisterConnector(h1) - net0.Track(ip1) go func() { @@ -641,7 +631,7 @@ func TestDoubleClose(t *testing.T) { assert.NoError(t, err) } -func TestRemoveHandlers(t *testing.T) { +func TestTrackConnected(t *testing.T) { log := logging.NoLog{} networkID := uint32(0) appVersion := version.NewDefaultVersion("app", 0, 1, 0) @@ -696,43 +686,6 @@ func TestRemoveHandlers(t *testing.T) { clientUpgrader := NewIPUpgrader() vdrs := validators.NewSet() - handler := router.Router(nil) - - net0 := NewDefaultNetwork( - prometheus.NewRegistry(), - log, - id0, - ip0, - networkID, - appVersion, - versionParser, - listener0, - caller0, - serverUpgrader, - clientUpgrader, - vdrs, - vdrs, - handler, - ) - assert.NotNil(t, net0) - - net1 := NewDefaultNetwork( - prometheus.NewRegistry(), - log, - id1, - ip1, - networkID, - appVersion, - versionParser, - listener1, - caller1, - serverUpgrader, - clientUpgrader, - vdrs, - vdrs, - handler, - ) - assert.NotNil(t, net1) var ( wg0 sync.WaitGroup @@ -741,118 +694,21 @@ func TestRemoveHandlers(t *testing.T) { wg0.Add(1) wg1.Add(1) - h0 := &testHandler{ - connected: func(id ids.ShortID) bool { + handler0 := &testHandler{ + connected: func(id ids.ShortID) { if !id.Equals(id0) { wg0.Done() } - return false }, } - h1 := &testHandler{ - connected: func(id ids.ShortID) bool { + + handler1 := &testHandler{ + connected: func(id ids.ShortID) { if !id.Equals(id1) { wg1.Done() } - return false - }, - } - - net0.RegisterConnector(h0) - net1.RegisterConnector(h1) - - net0.Track(ip1) - - go func() { - err := net0.Dispatch() - assert.Error(t, err) - }() - go func() { - err := net1.Dispatch() - assert.Error(t, err) - }() - - wg0.Wait() - wg1.Wait() - - h3 := &testHandler{ - connected: func(id ids.ShortID) bool { - assert.Equal(t, id0, id) - return true }, } - h4 := &testHandler{ - connected: func(id ids.ShortID) bool { - return id.Equals(id0) - }, - } - - net0.RegisterConnector(h3) - net1.RegisterConnector(h4) - - err := net0.Close() - assert.NoError(t, err) - - err = net1.Close() - assert.NoError(t, err) -} - -func TestTrackConnected(t *testing.T) { - log := logging.NoLog{} - networkID := uint32(0) - appVersion := version.NewDefaultVersion("app", 0, 1, 0) - versionParser := version.NewDefaultParser() - - ip0 := utils.IPDesc{ - IP: net.IPv6loopback, - Port: 0, - } - id0 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip0.String()))) - ip1 := utils.IPDesc{ - IP: net.IPv6loopback, - Port: 1, - } - id1 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip1.String()))) - - listener0 := &testListener{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 0, - }, - inbound: make(chan net.Conn, 1<<10), - closed: make(chan struct{}), - } - caller0 := &testDialer{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 0, - }, - outbounds: make(map[string]*testListener), - } - listener1 := &testListener{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 1, - }, - inbound: make(chan net.Conn, 1<<10), - closed: make(chan struct{}), - } - caller1 := &testDialer{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 1, - }, - outbounds: make(map[string]*testListener), - } - - caller0.outbounds[ip1.String()] = listener1 - caller1.outbounds[ip0.String()] = listener0 - - serverUpgrader := NewIPUpgrader() - clientUpgrader := NewIPUpgrader() - - vdrs := validators.NewSet() - handler := router.Router(nil) net0 := NewDefaultNetwork( prometheus.NewRegistry(), @@ -868,7 +724,7 @@ func TestTrackConnected(t *testing.T) { clientUpgrader, vdrs, vdrs, - handler, + handler0, ) assert.NotNil(t, net0) @@ -886,37 +742,10 @@ func TestTrackConnected(t *testing.T) { clientUpgrader, vdrs, vdrs, - handler, + handler1, ) assert.NotNil(t, net1) - var ( - wg0 sync.WaitGroup - wg1 sync.WaitGroup - ) - wg0.Add(1) - wg1.Add(1) - - h0 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id0) { - wg0.Done() - } - return false - }, - } - h1 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id1) { - wg1.Done() - } - return false - }, - } - - net0.RegisterConnector(h0) - net1.RegisterConnector(h1) - net0.Track(ip1) go func() { diff --git a/node/node.go b/node/node.go index 06facc41109b..7e30656f7699 100644 --- a/node/node.go +++ b/node/node.go @@ -31,6 +31,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/ipcs" "github.com/ava-labs/avalanchego/network" + "github.com/ava-labs/avalanchego/snow/networking/router" "github.com/ava-labs/avalanchego/snow/networking/timeout" "github.com/ava-labs/avalanchego/snow/triggers" "github.com/ava-labs/avalanchego/snow/validators" @@ -38,6 +39,7 @@ import ( "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/hashing" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/math" "github.com/ava-labs/avalanchego/utils/timer" "github.com/ava-labs/avalanchego/utils/wrappers" "github.com/ava-labs/avalanchego/version" @@ -62,7 +64,7 @@ var ( genesisHashKey = []byte("genesisID") // Version is the version of this code - Version = version.NewDefaultVersion(constants.PlatformName, 0, 8, 1) + Version = version.NewDefaultVersion(constants.PlatformName, 0, 8, 2) versionParser = version.NewDefaultParser() ) @@ -161,6 +163,35 @@ func (n *Node) initNetworking() error { return err } + consensusRouter := n.Config.ConsensusRouter + if !n.Config.EnableStaking { + consensusRouter = &insecureValidatorManager{ + Router: consensusRouter, + vdrs: primaryNetworkValidators, + weight: n.Config.DisabledStakingWeight, + } + } + + bootstrapWeight := n.beacons.Weight() + reqWeight := (3*bootstrapWeight + 3) / 4 + + if reqWeight > 0 { + timer := timer.NewTimer(func() { + n.Log.Fatal("Failed to connect to bootstrap nodes. Node shutting down...") + go n.Net.Close() + }) + + go timer.Dispatch() + timer.SetTimeoutIn(15 * time.Second) + + consensusRouter = &beaconManager{ + Router: consensusRouter, + timer: timer, + beacons: n.beacons, + requiredWeight: reqWeight, + } + } + n.Net = network.NewDefaultNetwork( n.Config.ConsensusParams.Metrics, n.Log, @@ -175,16 +206,9 @@ func (n *Node) initNetworking() error { clientUpgrader, primaryNetworkValidators, n.beacons, - n.Config.ConsensusRouter, + consensusRouter, ) - if !n.Config.EnableStaking { - n.Net.RegisterConnector(&insecureValidatorManager{ - vdrs: primaryNetworkValidators, - weight: n.Config.DisabledStakingWeight, - }) - } - n.nodeCloser = utils.HandleSignals(func(os.Signal) { // errors are already logged internally if they are meaningful _ = n.Net.Close() @@ -194,20 +218,61 @@ func (n *Node) initNetworking() error { } type insecureValidatorManager struct { + router.Router vdrs validators.Set weight uint64 } -func (i *insecureValidatorManager) Connected(vdrID ids.ShortID) bool { +func (i *insecureValidatorManager) Connected(vdrID ids.ShortID) { _ = i.vdrs.AddWeight(vdrID, i.weight) - return false + i.Router.Connected(vdrID) } -func (i *insecureValidatorManager) Disconnected(vdrID ids.ShortID) bool { +func (i *insecureValidatorManager) Disconnected(vdrID ids.ShortID) { // Shouldn't error unless the set previously had an error, which should // never happen as described above _ = i.vdrs.RemoveWeight(vdrID, i.weight) - return false + i.Router.Disconnected(vdrID) +} + +type beaconManager struct { + router.Router + timer *timer.Timer + beacons validators.Set + requiredWeight uint64 + weight uint64 +} + +func (b *beaconManager) Connected(vdrID ids.ShortID) { + weight, ok := b.beacons.GetWeight(vdrID) + if !ok { + b.Router.Connected(vdrID) + return + } + weight, err := math.Add64(weight, b.weight) + if err != nil { + b.timer.Cancel() + b.Router.Connected(vdrID) + return + } + b.weight = weight + if b.weight >= b.requiredWeight { + b.timer.Cancel() + } + b.Router.Connected(vdrID) +} + +func (b *beaconManager) Disconnected(vdrID ids.ShortID) { + if weight, ok := b.beacons.GetWeight(vdrID); ok { + // TODO: Account for weight changes in a more robust manner. + + // Sub64 should rarely error since only validators that have added their + // weight can become disconnected. Because it is possible that there are + // changes to the validators set, we utilize that Sub64 returns 0 on + // error. + b.weight, _ = math.Sub64(b.weight, weight) + } + b.Router.Disconnected(vdrID) } // Dispatch starts the node's servers. @@ -363,28 +428,6 @@ func (n *Node) initChains(genesisBytes []byte, avaxAssetID ids.ID) error { CustomBeacons: n.beacons, }) - bootstrapWeight := n.beacons.Weight() - - reqWeight := (3*bootstrapWeight + 3) / 4 - - if reqWeight == 0 { - return nil - } - - connectToBootstrapsTimeout := timer.NewTimer(func() { - n.Log.Fatal("Failed to connect to bootstrap nodes. Node shutting down...") - go n.Net.Close() - }) - - awaiter := chains.NewAwaiter(n.beacons, reqWeight, func() { - n.Log.Info("Connected to required bootstrap nodes. Starting Platform Chain...") - connectToBootstrapsTimeout.Cancel() - }) - - go connectToBootstrapsTimeout.Dispatch() - connectToBootstrapsTimeout.SetTimeoutIn(15 * time.Second) - - n.Net.RegisterConnector(awaiter) return nil } diff --git a/snow/engine/avalanche/bootstrap/bootstrapper.go b/snow/engine/avalanche/bootstrap/bootstrapper.go index 42d75394ee9e..a2010d39c9e3 100644 --- a/snow/engine/avalanche/bootstrap/bootstrapper.go +++ b/snow/engine/avalanche/bootstrap/bootstrapper.go @@ -16,6 +16,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/common/queue" "github.com/ava-labs/avalanchego/snow/triggers" + "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/formatting" ) @@ -94,8 +95,7 @@ func (b *Bootstrapper) Initialize( }) config.Bootstrapable = b - b.Bootstrapper.Initialize(config.Config) - return nil + return b.Bootstrapper.Initialize(config.Config) } // CurrentAcceptedFrontier returns the set of vertices that this node has accepted @@ -395,3 +395,19 @@ func (b *Bootstrapper) executeAll(jobs *queue.Jobs, events *triggers.EventDispat b.Ctx.Log.Info("executed %d operations", numExecuted) return nil } + +// Connected implements the Engine interface. +func (b *Bootstrapper) Connected(validatorID ids.ShortID) error { + if connector, ok := b.VM.(validators.Connector); ok { + connector.Connected(validatorID) + } + return b.Bootstrapper.Connected(validatorID) +} + +// Disconnected implements the Engine interface. +func (b *Bootstrapper) Disconnected(validatorID ids.ShortID) error { + if connector, ok := b.VM.(validators.Connector); ok { + connector.Disconnected(validatorID) + } + return b.Bootstrapper.Disconnected(validatorID) +} diff --git a/snow/engine/avalanche/metrics.go b/snow/engine/avalanche/metrics.go index 212474a7200c..a0bda312a015 100644 --- a/snow/engine/avalanche/metrics.go +++ b/snow/engine/avalanche/metrics.go @@ -11,6 +11,7 @@ import ( type metrics struct { numVtxRequests, numPendingVts, numMissingTxs prometheus.Gauge + getAncestorsVtxs prometheus.Histogram } // Initialize implements the Engine interface @@ -30,12 +31,29 @@ func (m *metrics) Initialize(namespace string, registerer prometheus.Registerer) Name: "missing_txs", Help: "Number of missing transactions", }) + m.getAncestorsVtxs = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Name: "get_ancestors_vtxs", + Help: "The number of vertices fetched in a call to GetAncestors", + Buckets: []float64{ + 0, + 1, + 5, + 10, + 100, + 500, + 1000, + 1500, + 2000, + }, + }) errs := wrappers.Errs{} errs.Add( registerer.Register(m.numVtxRequests), registerer.Register(m.numPendingVts), registerer.Register(m.numMissingTxs), + registerer.Register(m.getAncestorsVtxs), ) return errs.Err } diff --git a/snow/engine/avalanche/transitive.go b/snow/engine/avalanche/transitive.go index b73d51ae75fa..d69f219d2957 100644 --- a/snow/engine/avalanche/transitive.go +++ b/snow/engine/avalanche/transitive.go @@ -186,6 +186,7 @@ func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, vtxID ids.I } } + t.metrics.getAncestorsVtxs.Observe(float64(len(ancestorsBytes))) t.Sender.MultiPut(vdr, requestID, ancestorsBytes) return nil } diff --git a/snow/engine/avalanche/transitive_test.go b/snow/engine/avalanche/transitive_test.go index c0590751ee45..e9869c840730 100644 --- a/snow/engine/avalanche/transitive_test.go +++ b/snow/engine/avalanche/transitive_test.go @@ -853,8 +853,15 @@ func TestEngineRejectDoubleSpendTx(t *testing.T) { }, nil } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true + te.finishBootstrapping() te.Ctx.Bootstrapped() @@ -939,8 +946,15 @@ func TestEngineRejectDoubleSpendIssuedTx(t *testing.T) { panic("Should have errored") } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true + te.finishBootstrapping() te.Ctx.Bootstrapped() @@ -1136,8 +1150,15 @@ func TestEngineReissue(t *testing.T) { panic("Should have errored") } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true + te.finishBootstrapping() te.Ctx.Bootstrapped() @@ -1268,8 +1289,15 @@ func TestEngineLargeIssue(t *testing.T) { panic("Should have errored") } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true + te.finishBootstrapping() te.Ctx.Bootstrapped() @@ -2985,6 +3013,12 @@ func TestEngineAggressivePolling(t *testing.T) { manager := &vertex.TestManager{T: t} config.Manager = manager + vm := &vertex.TestVM{} + vm.T = t + config.VM = vm + + vm.Default(true) + gVtx := &avalanche.TestVertex{ TestDecidable: choices.TestDecidable{ IDV: ids.GenerateTestID(), @@ -3019,14 +3053,16 @@ func TestEngineAggressivePolling(t *testing.T) { BytesV: []byte{1}, } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} if err := te.Initialize(config); err != nil { t.Fatal(err) } - if err := te.finishBootstrapping(); err != nil { - t.Fatal(err) - } - te.Ctx.Bootstrapped() + + vm.CantBootstrapping = true + vm.CantBootstrapped = true parsed := new(bool) manager.ParseVertexF = func(b []byte) (avalanche.Vertex, error) { @@ -3056,6 +3092,8 @@ func TestEngineAggressivePolling(t *testing.T) { numPullQueries := new(int) sender.PullQueryF = func(ids.ShortSet, uint32, ids.ID) { *numPullQueries++ } + vm.CantPendingTxs = false + te.Put(vdr, 0, vtx.ID(), vtx.Bytes()) if *numPushQueries != 1 { @@ -3133,8 +3171,15 @@ func TestEngineDuplicatedIssuance(t *testing.T) { panic("Should have errored") } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true + te.finishBootstrapping() te.Ctx.Bootstrapped() diff --git a/snow/engine/common/bootstrapper.go b/snow/engine/common/bootstrapper.go index cc0e8e4b49f6..b55f6f0db6b1 100644 --- a/snow/engine/common/bootstrapper.go +++ b/snow/engine/common/bootstrapper.go @@ -34,6 +34,8 @@ const ( type Bootstrapper struct { Config + RequestID uint32 + // IDs of validators we have requested the accepted frontier from but haven't // received a reply from pendingAcceptedFrontier ids.ShortSet @@ -42,11 +44,12 @@ type Bootstrapper struct { pendingAccepted ids.ShortSet acceptedVotes map[[32]byte]uint64 - RequestID uint32 + // current weight + weight uint64 } // Initialize implements the Engine interface. -func (b *Bootstrapper) Initialize(config Config) { +func (b *Bootstrapper) Initialize(config Config) error { b.Config = config for _, vdr := range b.Beacons.List() { @@ -56,6 +59,10 @@ func (b *Bootstrapper) Initialize(config Config) { } b.acceptedVotes = make(map[[32]byte]uint64) + if b.Config.StartupAlpha > 0 { + return nil + } + return b.Startup() } // Startup implements the Engine interface. @@ -170,3 +177,34 @@ func (b *Bootstrapper) Accepted(validatorID ids.ShortID, requestID uint32, conta return b.Bootstrapable.ForceAccepted(accepted) } + +// Connected implements the Engine interface. +func (b *Bootstrapper) Connected(validatorID ids.ShortID) error { + weight, ok := b.Beacons.GetWeight(validatorID) + if !ok { + return nil + } + weight, err := math.Add64(weight, b.weight) + if err != nil { + return err + } + b.weight = weight + if b.weight < b.StartupAlpha { + return nil + } + return b.Startup() +} + +// Disconnected implements the Engine interface. +func (b *Bootstrapper) Disconnected(validatorID ids.ShortID) error { + if weight, ok := b.Beacons.GetWeight(validatorID); ok { + // TODO: Account for weight changes in a more robust manner. + + // Sub64 should rarely error since only validators that have added their + // weight can become disconnected. Because it is possible that there are + // changes to the validators set, we utilize that Sub64 returns 0 on + // error. + b.weight, _ = math.Sub64(b.weight, weight) + } + return nil +} diff --git a/snow/engine/common/config.go b/snow/engine/common/config.go index 60e8074b65ba..4ddb6c2bdcd6 100644 --- a/snow/engine/common/config.go +++ b/snow/engine/common/config.go @@ -15,6 +15,7 @@ type Config struct { Validators validators.Set Beacons validators.Set + StartupAlpha uint64 Alpha uint64 Sender Sender Bootstrapable Bootstrapable diff --git a/snow/engine/common/engine.go b/snow/engine/common/engine.go index 872f7243a1c8..cc802fa9d8b2 100644 --- a/snow/engine/common/engine.go +++ b/snow/engine/common/engine.go @@ -292,4 +292,10 @@ type InternalHandler interface { // Notify this engine of a message from the virtual machine. Notify(Message) error + + // Notify this engine of a new peer. + Connected(validatorID ids.ShortID) error + + // Notify this engine of a removed peer. + Disconnected(validatorID ids.ShortID) error } diff --git a/snow/engine/common/test_engine.go b/snow/engine/common/test_engine.go index b3e3b7175e3a..3c65427f915d 100644 --- a/snow/engine/common/test_engine.go +++ b/snow/engine/common/test_engine.go @@ -44,6 +44,9 @@ type EngineTest struct { CantQueryFailed, CantChits bool + CantConnected, + CantDisconnected bool + IsBootstrappedF func() bool ContextF func() *snow.Context StartupF, GossipF, ShutdownF func() error @@ -54,6 +57,7 @@ type EngineTest struct { AcceptedFrontierF, GetAcceptedF, AcceptedF, ChitsF func(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) error GetAcceptedFrontierF, GetFailedF, GetAncestorsFailedF, QueryFailedF, GetAcceptedFrontierFailedF, GetAcceptedFailedF func(validatorID ids.ShortID, requestID uint32) error + ConnectedF, DisconnectedF func(validatorID ids.ShortID) error } var _ Engine = &EngineTest{} @@ -89,6 +93,9 @@ func (e *EngineTest) Default(cant bool) { e.CantPullQuery = cant e.CantQueryFailed = cant e.CantChits = cant + + e.CantConnected = cant + e.CantDisconnected = cant } // Context ... @@ -383,6 +390,34 @@ func (e *EngineTest) Chits(validatorID ids.ShortID, requestID uint32, containerI return errors.New("unexpectedly called Chits") } +// Connected ... +func (e *EngineTest) Connected(validatorID ids.ShortID) error { + if e.ConnectedF != nil { + return e.ConnectedF(validatorID) + } + if !e.CantConnected { + return nil + } + if e.T != nil { + e.T.Fatalf("Unexpectedly called Connected") + } + return errors.New("unexpectedly called Connected") +} + +// Disconnected ... +func (e *EngineTest) Disconnected(validatorID ids.ShortID) error { + if e.DisconnectedF != nil { + return e.DisconnectedF(validatorID) + } + if !e.CantDisconnected { + return nil + } + if e.T != nil { + e.T.Fatalf("Unexpectedly called Disconnected") + } + return errors.New("unexpectedly called Disconnected") +} + // IsBootstrapped ... func (e *EngineTest) IsBootstrapped() bool { if e.IsBootstrappedF != nil { diff --git a/snow/engine/snowman/bootstrap/bootstrapper.go b/snow/engine/snowman/bootstrap/bootstrapper.go index 7ecea2552c2b..9d748cbbde7a 100644 --- a/snow/engine/snowman/bootstrap/bootstrapper.go +++ b/snow/engine/snowman/bootstrap/bootstrapper.go @@ -14,6 +14,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/common/queue" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/formatting" ) @@ -70,8 +71,7 @@ func (b *Bootstrapper) Initialize( }) config.Bootstrapable = b - b.Bootstrapper.Initialize(config.Config) - return nil + return b.Bootstrapper.Initialize(config.Config) } // CurrentAcceptedFrontier returns the last accepted block @@ -285,3 +285,19 @@ func (b *Bootstrapper) executeAll(jobs *queue.Jobs) error { b.Ctx.Log.Info("executed %d blocks", numExecuted) return nil } + +// Connected implements the Engine interface. +func (b *Bootstrapper) Connected(validatorID ids.ShortID) error { + if connector, ok := b.VM.(validators.Connector); ok { + connector.Connected(validatorID) + } + return b.Bootstrapper.Connected(validatorID) +} + +// Disconnected implements the Engine interface. +func (b *Bootstrapper) Disconnected(validatorID ids.ShortID) error { + if connector, ok := b.VM.(validators.Connector); ok { + connector.Disconnected(validatorID) + } + return b.Bootstrapper.Disconnected(validatorID) +} diff --git a/snow/engine/snowman/metrics.go b/snow/engine/snowman/metrics.go index 5ab5fb520320..77fb9af1385f 100644 --- a/snow/engine/snowman/metrics.go +++ b/snow/engine/snowman/metrics.go @@ -11,6 +11,7 @@ import ( type metrics struct { numRequests, numBlocked prometheus.Gauge + getAncestorsBlks prometheus.Histogram } // Initialize the metrics @@ -25,11 +26,28 @@ func (m *metrics) Initialize(namespace string, registerer prometheus.Registerer) Name: "blocked", Help: "Number of blocks that are pending issuance", }) + m.getAncestorsBlks = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Name: "get_ancestors_blks", + Help: "The number of blocks fetched in a call to GetAncestors", + Buckets: []float64{ + 0, + 1, + 5, + 10, + 100, + 500, + 1000, + 1500, + 2000, + }, + }) errs := wrappers.Errs{} errs.Add( registerer.Register(m.numRequests), registerer.Register(m.numBlocked), + registerer.Register(m.getAncestorsBlks), ) return errs.Err } diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index 2093ed6ef54f..38839132dd24 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -179,6 +179,7 @@ func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, blkID ids.I } } + t.metrics.getAncestorsBlks.Observe(float64(len(ancestorsBytes))) t.Sender.MultiPut(vdr, requestID, ancestorsBytes) return nil } diff --git a/snow/engine/snowman/transitive_test.go b/snow/engine/snowman/transitive_test.go index 0805e200f9b3..b9946250b466 100644 --- a/snow/engine/snowman/transitive_test.go +++ b/snow/engine/snowman/transitive_test.go @@ -56,9 +56,8 @@ func setup(t *testing.T) (ids.ShortID, validators.Set, *common.SenderTest, *bloc vm.LastAcceptedF = func() ids.ID { return gBlk.ID() } sender.CantGetAcceptedFrontier = false - te := &Transitive{} - - te.Initialize(config) + vm.CantBootstrapping = false + vm.CantBootstrapped = false vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { if !blkID.Equals(gBlk.ID()) { @@ -67,8 +66,11 @@ func setup(t *testing.T) (ids.ShortID, validators.Set, *common.SenderTest, *bloc return gBlk, nil } - te.finishBootstrapping() - te.Ctx.Bootstrapped() + te := &Transitive{} + te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true vm.GetBlockF = nil vm.LastAcceptedF = nil @@ -391,9 +393,8 @@ func TestEngineMultipleQuery(t *testing.T) { vm.LastAcceptedF = func() ids.ID { return gBlk.ID() } sender.CantGetAcceptedFrontier = false - te := &Transitive{} - te.Initialize(config) - + vm.CantBootstrapping = false + vm.CantBootstrapped = false vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { if !blkID.Equals(gBlk.ID()) { t.Fatalf("Wrong block requested") @@ -401,8 +402,11 @@ func TestEngineMultipleQuery(t *testing.T) { return gBlk, nil } - te.finishBootstrapping() - te.Ctx.Bootstrapped() + te := &Transitive{} + te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true vm.GetBlockF = nil vm.LastAcceptedF = nil @@ -810,10 +814,14 @@ func TestVoteCanceling(t *testing.T) { } sender.CantGetAcceptedFrontier = false + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) - te.finishBootstrapping() - te.Ctx.Bootstrapped() + + vm.CantBootstrapping = false + vm.CantBootstrapped = false vm.LastAcceptedF = nil sender.CantGetAcceptedFrontier = true @@ -1530,9 +1538,8 @@ func TestEngineAggressivePolling(t *testing.T) { vm.LastAcceptedF = func() ids.ID { return gBlk.ID() } sender.CantGetAcceptedFrontier = false - te := &Transitive{} - - te.Initialize(config) + vm.CantBootstrapping = false + vm.CantBootstrapped = false vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { if !blkID.Equals(gBlk.ID()) { @@ -1541,8 +1548,11 @@ func TestEngineAggressivePolling(t *testing.T) { return gBlk, nil } - te.finishBootstrapping() - te.Ctx.Bootstrapped() + te := &Transitive{} + te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true vm.GetBlockF = nil vm.LastAcceptedF = nil @@ -1649,10 +1659,14 @@ func TestEngineDoubleChit(t *testing.T) { panic("Should have errored") } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) - te.finishBootstrapping() - te.Ctx.Bootstrapped() + + vm.CantBootstrapping = true + vm.CantBootstrapped = true vm.LastAcceptedF = nil sender.CantGetAcceptedFrontier = true diff --git a/snow/networking/router/chain_router.go b/snow/networking/router/chain_router.go index b8364e4bea7b..2157a768553a 100644 --- a/snow/networking/router/chain_router.go +++ b/snow/networking/router/chain_router.go @@ -19,6 +19,10 @@ const ( defaultCPUInterval = 5 * time.Second ) +var ( + _ Router = &ChainRouter{} +) + // ChainRouter routes incoming messages from the validator network // to the consensus engines that the messages are intended for. // Note that consensus engines are uniquely identified by the ID of the chain @@ -31,6 +35,7 @@ type ChainRouter struct { gossiper *timer.Repeater intervalNotifier *timer.Repeater closeTimeout time.Duration + peers ids.ShortSet } // Initialize the router. @@ -58,6 +63,35 @@ func (sr *ChainRouter) Initialize( go log.RecoverAndPanic(sr.intervalNotifier.Dispatch) } +// Shutdown shuts down this router +func (sr *ChainRouter) Shutdown() { + sr.lock.Lock() + prevChains := sr.chains + sr.chains = map[[32]byte]*Handler{} + sr.lock.Unlock() + + sr.gossiper.Stop() + sr.intervalNotifier.Stop() + + for _, chain := range prevChains { + chain.Shutdown() + } + + ticker := time.NewTicker(sr.closeTimeout) + timedout := false + for _, chain := range prevChains { + select { + case <-chain.closed: + case <-ticker.C: + timedout = true + } + } + if timedout { + sr.log.Warn("timed out while shutting down the chains") + } + ticker.Stop() +} + // AddChain registers the specified chain so that incoming // messages can be routed to it func (sr *ChainRouter) AddChain(chain *Handler) { @@ -68,6 +102,10 @@ func (sr *ChainRouter) AddChain(chain *Handler) { sr.log.Debug("registering chain %s with chain router", chainID) chain.toClose = func() { sr.RemoveChain(chainID) } sr.chains[chainID.Key()] = chain + + for _, validatorID := range sr.peers.List() { + chain.Connected(validatorID) + } } // RemoveChain removes the specified chain so that incoming @@ -335,39 +373,32 @@ func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requ } } -// Shutdown shuts down this router -func (sr *ChainRouter) Shutdown() { +// Connected routes an incoming notification that a validator was just connected +func (sr *ChainRouter) Connected(validatorID ids.ShortID) { sr.lock.Lock() - prevChains := sr.chains - sr.chains = map[[32]byte]*Handler{} - sr.lock.Unlock() - - sr.gossiper.Stop() - sr.intervalNotifier.Stop() + defer sr.lock.Unlock() - for _, chain := range prevChains { - chain.Shutdown() + sr.peers.Add(validatorID) + for _, chain := range sr.chains { + chain.Connected(validatorID) } +} - ticker := time.NewTicker(sr.closeTimeout) - timedout := false - for _, chain := range prevChains { - select { - case <-chain.closed: - case <-ticker.C: - timedout = true - } - } - if timedout { - sr.log.Warn("timed out while shutting down the chains") +// Disconnected routes an incoming notification that a validator was connected +func (sr *ChainRouter) Disconnected(validatorID ids.ShortID) { + sr.lock.Lock() + defer sr.lock.Unlock() + + sr.peers.Remove(validatorID) + for _, chain := range sr.chains { + chain.Disconnected(validatorID) } - ticker.Stop() } // Gossip accepted containers func (sr *ChainRouter) Gossip() { - sr.lock.Lock() - defer sr.lock.Unlock() + sr.lock.RLock() + defer sr.lock.RUnlock() for _, chain := range sr.chains { chain.Gossip() @@ -376,8 +407,8 @@ func (sr *ChainRouter) Gossip() { // EndInterval notifies the chains that the current CPU interval has ended func (sr *ChainRouter) EndInterval() { - sr.lock.Lock() - defer sr.lock.Unlock() + sr.lock.RLock() + defer sr.lock.RUnlock() for _, chain := range sr.chains { chain.endInterval() diff --git a/snow/networking/router/handler.go b/snow/networking/router/handler.go index 90905cefa0ce..647efde3ef2d 100644 --- a/snow/networking/router/handler.go +++ b/snow/networking/router/handler.go @@ -394,26 +394,26 @@ func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) { } // PushQuery passes a PushQuery message received from the network to the consensus engine. -func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, deadline time.Time, blockID ids.ID, block []byte) bool { +func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, deadline time.Time, containerID ids.ID, container []byte) bool { return h.serviceQueue.PushMessage(message{ messageType: pushQueryMsg, validatorID: validatorID, requestID: requestID, deadline: deadline, - containerID: blockID, - container: block, + containerID: containerID, + container: container, received: h.clock.Time(), }) } // PullQuery passes a PullQuery message received from the network to the consensus engine. -func (h *Handler) PullQuery(validatorID ids.ShortID, requestID uint32, deadline time.Time, blockID ids.ID) bool { +func (h *Handler) PullQuery(validatorID ids.ShortID, requestID uint32, deadline time.Time, containerID ids.ID) bool { return h.serviceQueue.PushMessage(message{ messageType: pullQueryMsg, validatorID: validatorID, requestID: requestID, deadline: deadline, - containerID: blockID, + containerID: containerID, received: h.clock.Time(), }) } @@ -438,6 +438,22 @@ func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) { }) } +// Connected passes a new connection notification to the consensus engine +func (h *Handler) Connected(validatorID ids.ShortID) { + h.sendReliableMsg(message{ + messageType: connectedMsg, + validatorID: validatorID, + }) +} + +// Disconnected passes a new connection notification to the consensus engine +func (h *Handler) Disconnected(validatorID ids.ShortID) { + h.sendReliableMsg(message{ + messageType: disconnectedMsg, + validatorID: validatorID, + }) +} + // Gossip passes a gossip request to the consensus engine func (h *Handler) Gossip() { h.sendReliableMsg(message{ @@ -547,6 +563,14 @@ func (h *Handler) handleValidatorMsg(msg message, startTime time.Time) error { err = h.engine.Chits(msg.validatorID, msg.requestID, msg.containerIDs) timeConsumed = h.clock.Time().Sub(startTime) h.chits.Observe(float64(timeConsumed.Nanoseconds())) + case connectedMsg: + err = h.engine.Connected(msg.validatorID) + timeConsumed = h.clock.Time().Sub(startTime) + h.connected.Observe(float64(timeConsumed.Nanoseconds())) + case disconnectedMsg: + err = h.engine.Disconnected(msg.validatorID) + timeConsumed = h.clock.Time().Sub(startTime) + h.disconnected.Observe(float64(timeConsumed.Nanoseconds())) } h.serviceQueue.UtilizeCPU(msg.validatorID, timeConsumed) diff --git a/snow/networking/router/message.go b/snow/networking/router/message.go index 12b613339700..dabba401c420 100644 --- a/snow/networking/router/message.go +++ b/snow/networking/router/message.go @@ -30,6 +30,8 @@ const ( pullQueryMsg chitsMsg queryFailedMsg + connectedMsg + disconnectedMsg notifyMsg gossipMsg getAncestorsMsg @@ -60,9 +62,14 @@ func (m message) String() string { sb.WriteString(fmt.Sprintf("\n messageType: %s", m.messageType)) sb.WriteString(fmt.Sprintf("\n validatorID: %s", m.validatorID)) sb.WriteString(fmt.Sprintf("\n requestID: %d", m.requestID)) - sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) - sb.WriteString(fmt.Sprintf("\n containerIDs: %s", m.containerIDs)) - if m.messageType == notifyMsg { + switch m.messageType { + case getAcceptedMsg, acceptedMsg, chitsMsg: + sb.WriteString(fmt.Sprintf("\n containerIDs: %s", m.containerIDs)) + case getMsg, getAncestorsMsg, putMsg, pushQueryMsg, pullQueryMsg: + sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) + case multiPutMsg: + sb.WriteString(fmt.Sprintf("\n numContainers: %d", len(m.containers))) + case notifyMsg: sb.WriteString(fmt.Sprintf("\n notification: %s", m.notification)) } if !m.deadline.IsZero() { @@ -107,6 +114,10 @@ func (t msgType) String() string { return "Chits Message" case queryFailedMsg: return "Query Failed Message" + case connectedMsg: + return "Connected Message" + case disconnectedMsg: + return "Disconnected Message" case notifyMsg: return "Notify Message" case gossipMsg: diff --git a/snow/networking/router/metrics.go b/snow/networking/router/metrics.go index 29af9b2f4112..1db026e9e5e4 100644 --- a/snow/networking/router/metrics.go +++ b/snow/networking/router/metrics.go @@ -41,6 +41,7 @@ type metrics struct { getAncestors, multiPut, getAncestorsFailed, get, put, getFailed, pushQuery, pullQuery, chits, queryFailed, + connected, disconnected, notify, gossip, cpu, @@ -106,6 +107,8 @@ func (m *metrics) Initialize(namespace string, registerer prometheus.Registerer) m.pullQuery = initHistogram(namespace, "pull_query", registerer, &errs) m.chits = initHistogram(namespace, "chits", registerer, &errs) m.queryFailed = initHistogram(namespace, "query_failed", registerer, &errs) + m.connected = initHistogram(namespace, "connected", registerer, &errs) + m.disconnected = initHistogram(namespace, "disconnected", registerer, &errs) m.notify = initHistogram(namespace, "notify", registerer, &errs) m.gossip = initHistogram(namespace, "gossip", registerer, &errs) diff --git a/snow/networking/router/router.go b/snow/networking/router/router.go index 0d8593141a9e..0b0c5bbfd5ca 100644 --- a/snow/networking/router/router.go +++ b/snow/networking/router/router.go @@ -17,15 +17,15 @@ type Router interface { ExternalRouter InternalRouter - AddChain(chain *Handler) - RemoveChain(chainID ids.ID) - Shutdown() Initialize( log logging.Logger, timeouts *timeout.Manager, gossipFrequency, shutdownTimeout time.Duration, ) + Shutdown() + AddChain(chain *Handler) + RemoveChain(chainID ids.ID) } // ExternalRouter routes messages from the network to the @@ -51,4 +51,7 @@ type InternalRouter interface { GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) + + Connected(validatorID ids.ShortID) + Disconnected(validatorID ids.ShortID) } diff --git a/snow/validators/connector.go b/snow/validators/connector.go index 384c7006a7b6..bf99076548ab 100644 --- a/snow/validators/connector.go +++ b/snow/validators/connector.go @@ -3,12 +3,13 @@ package validators -import "github.com/ava-labs/avalanchego/ids" +import ( + "github.com/ava-labs/avalanchego/ids" +) // Connector represents a handler that is called when a connection is marked as // connected or disconnected type Connector interface { - // returns true if the handler should be removed - Connected(id ids.ShortID) bool - Disconnected(id ids.ShortID) bool + Connected(id ids.ShortID) + Disconnected(id ids.ShortID) } diff --git a/vms/platformvm/service.go b/vms/platformvm/service.go index 15cca410d197..95f9a7c3d450 100644 --- a/vms/platformvm/service.go +++ b/vms/platformvm/service.go @@ -620,9 +620,7 @@ func (service *Service) GetCurrentValidators(_ *http.Request, args *GetCurrentVa } uptime := json.Float32(rawUptime) - service.vm.uptimeLock.Lock() _, connected := service.vm.connections[nodeID.Key()] - service.vm.uptimeLock.Unlock() var rewardOwner *APIOwner owner, ok := staker.RewardsOwner.(*secp256k1fx.OutputOwners) @@ -723,9 +721,7 @@ func (service *Service) GetPendingValidators(_ *http.Request, args *GetPendingVa weight := json.Uint64(staker.Validator.Weight()) delegationFee := json.Float32(100 * float32(staker.Shares) / float32(PercentDenominator)) - service.vm.uptimeLock.Lock() _, connected := service.vm.connections[nodeID.Key()] - service.vm.uptimeLock.Unlock() reply.Validators = append(reply.Validators, APIPrimaryValidator{ APIStaker: APIStaker{ NodeID: staker.Validator.ID().PrefixedString(constants.NodeIDPrefix), diff --git a/vms/platformvm/vm.go b/vms/platformvm/vm.go index c176dfa59c2e..6465d5154fb0 100644 --- a/vms/platformvm/vm.go +++ b/vms/platformvm/vm.go @@ -6,7 +6,6 @@ package platformvm import ( "errors" "fmt" - "sync" "time" "github.com/ava-labs/avalanchego/cache" @@ -202,7 +201,6 @@ type VM struct { bootstrappedTime time.Time - uptimeLock sync.Mutex connections map[[20]byte]time.Time } @@ -536,9 +534,6 @@ func (vm *VM) Shutdown() error { stopIter := stopDB.NewIterator() defer stopIter.Release() - vm.uptimeLock.Lock() - defer vm.uptimeLock.Unlock() - for stopIter.Next() { // Iterates in order of increasing start time txBytes := stopIter.Value() @@ -818,40 +813,30 @@ func (vm *VM) CreateStaticHandlers() map[string]*common.HTTPHandler { } // Connected implements validators.Connector -func (vm *VM) Connected(vdrID ids.ShortID) bool { - // Locking is required here because this is called directly from the - // networking library. - - vm.uptimeLock.Lock() - defer vm.uptimeLock.Unlock() - +func (vm *VM) Connected(vdrID ids.ShortID) { vm.connections[vdrID.Key()] = time.Unix(vm.clock.Time().Unix(), 0) - return false } // Disconnected implements validators.Connector -func (vm *VM) Disconnected(vdrID ids.ShortID) bool { - // Locking is required here because this is called directly from the - // networking library. - - vm.uptimeLock.Lock() - defer vm.uptimeLock.Unlock() - +func (vm *VM) Disconnected(vdrID ids.ShortID) { vdrKey := vdrID.Key() - timeConnected := vm.connections[vdrKey] + timeConnected, ok := vm.connections[vdrKey] + if !ok { + return + } delete(vm.connections, vdrKey) if !vm.bootstrapped { - return false + return } txIntf, isValidator, err := vm.isValidator(vm.DB, constants.PrimaryNetworkID, vdrID) if err != nil || !isValidator { - return false + return } tx, ok := txIntf.(*UnsignedAddValidatorTx) if !ok { - return false + return } uptime, err := vm.uptime(vm.DB, vdrID) @@ -861,7 +846,7 @@ func (vm *VM) Disconnected(vdrID ids.ShortID) bool { LastUpdated: uint64(tx.StartTime().Unix()), } case err != nil: - return false + return } if timeConnected.Before(vm.bootstrappedTime) { @@ -875,7 +860,7 @@ func (vm *VM) Disconnected(vdrID ids.ShortID) bool { currentLocalTime := vm.clock.Time() if !currentLocalTime.After(lastUpdated) { - return false + return } uptime.UpDuration += uint64(currentLocalTime.Sub(timeConnected) / time.Second) @@ -887,7 +872,7 @@ func (vm *VM) Disconnected(vdrID ids.ShortID) bool { if err := vm.DB.Commit(); err != nil { vm.Ctx.Log.Error("failed to commit database changes") } - return false + return } // Check if there is a block ready to be added to consensus @@ -1376,9 +1361,6 @@ func (vm *VM) FormatAddress(chainID ids.ID, addr ids.ShortID) (string, error) { } func (vm *VM) calculateUptime(db database.Database, nodeID ids.ShortID, startTime time.Time) (float64, error) { - vm.uptimeLock.Lock() - defer vm.uptimeLock.Unlock() - uptime, err := vm.uptime(db, nodeID) switch { case err == database.ErrNotFound: diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index c31c9ca007f5..6af4b6fe8f84 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -1666,6 +1666,11 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { sender.Initialize(ctx, externalSender, chainRouter, &timeoutManager) + reqID := new(uint32) + externalSender.GetAcceptedFrontierF = func(_ ids.ShortSet, _ ids.ID, requestID uint32, _ time.Time) { + *reqID = requestID + } + // The engine handles consensus engine := smeng.Transitive{} engine.Initialize(smeng.Config{ @@ -1709,13 +1714,6 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { chainRouter.AddChain(handler) go ctx.Log.RecoverAndPanic(handler.Dispatch) - reqID := new(uint32) - externalSender.GetAcceptedFrontierF = func(_ ids.ShortSet, _ ids.ID, requestID uint32, _ time.Time) { - *reqID = requestID - } - - engine.Startup() - externalSender.GetAcceptedFrontierF = nil externalSender.GetAcceptedF = func(_ ids.ShortSet, _ ids.ID, requestID uint32, _ time.Time, _ ids.Set) { *reqID = requestID