From 7a8efb2e9bbbe2ad56fa52cba33160c73d80e6fa Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Sat, 12 Sep 2020 12:28:32 -0400 Subject: [PATCH 01/15] fix flag name --- main/params.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main/params.go b/main/params.go index 8a6bf472ceca..9827f7994bce 100644 --- a/main/params.go +++ b/main/params.go @@ -194,7 +194,7 @@ func init() { fs.BoolVar(&Config.HTTPSEnabled, "http-tls-enabled", false, "Upgrade the HTTP server to HTTPs") fs.StringVar(&Config.HTTPSKeyFile, "http-tls-key-file", "", "TLS private key file for the HTTPs server") fs.StringVar(&Config.HTTPSCertFile, "http-tls-cert-file", "", "TLS certificate file for the HTTPs server") - fs.BoolVar(&Config.APIRequireAuthToken, "api-require-auth", false, "Require authorization token to call HTTP APIs") + fs.BoolVar(&Config.APIRequireAuthToken, "api-auth-required", false, "Require authorization token to call HTTP APIs") fs.StringVar(&Config.APIAuthPassword, "api-auth-password", "", "Password used to create/validate API authorization tokens. Can be changed via API call.") // Bootstrapping: @@ -427,7 +427,7 @@ func init() { Config.HTTPPort = uint16(*httpPort) if Config.APIRequireAuthToken { if Config.APIAuthPassword == "" { - errs.Add(errors.New("api-auth-password must be provided if api-require-auth is true")) + errs.Add(errors.New("api-auth-password must be provided if api-auth-required is true")) return } if !password.SufficientlyStrong(Config.APIAuthPassword, password.OK) { From 770907c8f987d94f3475462a5379ed2fef718a0a Mon Sep 17 00:00:00 2001 From: Gabriel Cardona Date: Sat, 12 Sep 2020 14:17:00 -0700 Subject: [PATCH 02/15] Pass Chain in to the log. --- api/info/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/info/service.go b/api/info/service.go index e4f8711a3b65..b234a0566dca 100644 --- a/api/info/service.go +++ b/api/info/service.go @@ -151,7 +151,7 @@ type IsBootstrappedResponse struct { // IsBootstrapped returns nil and sets [reply.IsBootstrapped] == true iff [args.Chain] exists and is done bootstrapping // Returns an error if the chain doesn't exist func (service *Info) IsBootstrapped(_ *http.Request, args *IsBootstrappedArgs, reply *IsBootstrappedResponse) error { - service.log.Info("Info: IsBootstrapped called") + service.log.Info("Info: IsBootstrapped called with chain: %s", args.Chain) if args.Chain == "" { return fmt.Errorf("argument 'chain' not given") } From fa74933e16da5d4d06dd745a13fb381ec7d94521 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Sat, 12 Sep 2020 18:14:48 -0400 Subject: [PATCH 03/15] wip simplifying locking --- chains/awaiter.go | 58 -- chains/awaiter_test.go | 44 -- chains/manager.go | 44 +- network/network.go | 49 +- network/network_test.go | 741 ------------------ node/node.go | 63 +- .../avalanche/bootstrap/bootstrapper.go | 17 + snow/engine/common/bootstrapper.go | 36 +- snow/engine/common/config.go | 1 + snow/engine/common/engine.go | 6 + snow/engine/common/test_engine.go | 35 + snow/engine/snowman/bootstrap/bootstrapper.go | 17 + snow/networking/router/chain_router.go | 91 ++- snow/networking/router/handler.go | 22 + snow/networking/router/message.go | 6 + snow/networking/router/metrics.go | 3 + snow/networking/router/router.go | 9 +- snow/validators/connector.go | 9 +- vms/platformvm/service.go | 6 +- vms/platformvm/vm.go | 42 +- 20 files changed, 277 insertions(+), 1022 deletions(-) delete mode 100644 chains/awaiter.go delete mode 100644 chains/awaiter_test.go diff --git a/chains/awaiter.go b/chains/awaiter.go deleted file mode 100644 index 548d2088b780..000000000000 --- a/chains/awaiter.go +++ /dev/null @@ -1,58 +0,0 @@ -// (c) 2019-2020, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package chains - -import ( - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/validators" - "github.com/ava-labs/avalanchego/utils/math" -) - -type awaitConnected struct { - connected func() - vdrs validators.Set - reqWeight uint64 - weight uint64 -} - -// NewAwaiter returns a new handler that will await for a sufficient number of -// validators to be connected. -func NewAwaiter(vdrs validators.Set, reqWeight uint64, connected func()) validators.Connector { - return &awaitConnected{ - vdrs: vdrs, - reqWeight: reqWeight, - connected: connected, - } -} - -func (a *awaitConnected) Connected(vdrID ids.ShortID) bool { - weight, ok := a.vdrs.GetWeight(vdrID) - if !ok { - return false - } - weight, err := math.Add64(weight, a.weight) - a.weight = weight - // If the error is non-nil, then an overflow error has occurred such that - // the required weight was surpassed. As per network.Handler interface, - // this handler should be removed and never called again after returning true. - if err == nil && a.weight < a.reqWeight { - return false - } - - go a.connected() - return true -} - -func (a *awaitConnected) Disconnected(vdrID ids.ShortID) bool { - if weight, ok := a.vdrs.GetWeight(vdrID); ok { - // TODO: Account for weight changes in a more robust manner. - - // Sub64 should rarely error since only validators that have added their - // weight can become disconnected. Because it is possible that there are - // changes to the validators set, we utilize that Sub64 returns 0 on - // error. - a.weight, _ = math.Sub64(a.weight, weight) - } - return false -} diff --git a/chains/awaiter_test.go b/chains/awaiter_test.go deleted file mode 100644 index ba127142dcf2..000000000000 --- a/chains/awaiter_test.go +++ /dev/null @@ -1,44 +0,0 @@ -// (c) 2019-2020, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package chains - -import ( - "testing" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow/validators" -) - -func TestAwaiter(t *testing.T) { - vdrID0 := ids.NewShortID([20]byte{0}) - vdrID1 := ids.NewShortID([20]byte{1}) - vdrID2 := ids.NewShortID([20]byte{2}) - vdrID3 := ids.NewShortID([20]byte{3}) - - s := validators.NewSet() - s.AddWeight(vdrID0, 1) - s.AddWeight(vdrID1, 1) - s.AddWeight(vdrID3, 1) - - called := make(chan struct{}, 1) - aw := NewAwaiter(s, 3, func() { - called <- struct{}{} - }) - - if aw.Connected(vdrID0) { - t.Fatalf("shouldn't have finished handling yet") - } else if aw.Connected(vdrID1) { - t.Fatalf("shouldn't have finished handling yet") - } else if aw.Disconnected(vdrID1) { - t.Fatalf("shouldn't have finished handling yet") - } else if aw.Connected(vdrID1) { - t.Fatalf("shouldn't have finished handling yet") - } else if aw.Connected(vdrID2) { - t.Fatalf("shouldn't have finished handling yet") - } else if !aw.Connected(vdrID3) { - t.Fatalf("should have finished handling") - } - - <-called -} diff --git a/chains/manager.go b/chains/manager.go index 3866e2d531cb..34fd931f1050 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -351,28 +351,6 @@ func (m *manager) buildChain(chainParams ChainParameters) (*chain, error) { ctx.Log.Error("Chain with ID: %s was shutdown due to a panic", chainParams.ID) }) } - - reqWeight := (3*bootstrapWeight + 3) / 4 - if reqWeight == 0 { - if err := chain.Engine.Startup(); err != nil { - chain.Handler.Shutdown() - return nil, fmt.Errorf("failed to start consensus engine: %w", err) - } - } else { - awaiter := NewAwaiter(beacons, reqWeight, func() { - ctx.Lock.Lock() - defer ctx.Lock.Unlock() - if err := chain.Engine.Startup(); err != nil { - chain.Ctx.Log.Error("failed to start consensus engine: %s", err) - chain.Handler.Shutdown() - } - }) - go m.Net.RegisterConnector(awaiter) - } - - if connector, ok := vm.(validators.Connector); ok { - go m.Net.RegisterConnector(connector) - } return chain, nil } @@ -439,11 +417,12 @@ func (m *manager) createAvalancheChain( if err := engine.Initialize(aveng.Config{ Config: avbootstrap.Config{ Config: common.Config{ - Ctx: ctx, - Validators: validators, - Beacons: beacons, - Alpha: bootstrapWeight/2 + 1, // must be > 50% - Sender: &sender, + Ctx: ctx, + Validators: validators, + Beacons: beacons, + StartupAlpha: (3*bootstrapWeight + 3) / 4, + Alpha: bootstrapWeight/2 + 1, // must be > 50% + Sender: &sender, }, VtxBlocked: vtxBlocker, TxBlocked: txBlocker, @@ -519,11 +498,12 @@ func (m *manager) createSnowmanChain( if err := engine.Initialize(smeng.Config{ Config: smbootstrap.Config{ Config: common.Config{ - Ctx: ctx, - Validators: validators, - Beacons: beacons, - Alpha: bootstrapWeight/2 + 1, // must be > 50% - Sender: &sender, + Ctx: ctx, + Validators: validators, + Beacons: beacons, + StartupAlpha: (3*bootstrapWeight + 3) / 4, + Alpha: bootstrapWeight/2 + 1, // must be > 50% + Sender: &sender, }, Blocked: blocked, VM: vm, diff --git a/network/network.go b/network/network.go index a4867ff51081..ca931f86bd0c 100644 --- a/network/network.go +++ b/network/network.go @@ -72,12 +72,6 @@ type Network interface { // IP. Track(ip utils.IPDesc) - // Register a new connector that is called whenever a peer is connected to - // or disconnected from. If the connector returns true, then it will never - // be called again. Thread safety must be managed internally in the network. - // The handler will initially be called with this local node's ID. - RegisterConnector(h validators.Connector) - // Returns the description of the nodes this network is currently connected // to externally. Thread safety must be managed internally to the network. Peers() []PeerID @@ -138,9 +132,8 @@ type network struct { connectedIPs map[string]struct{} retryDelay map[string]time.Duration // TODO: bound the size of [myIPs] to avoid DoS. LRU caching would be ideal - myIPs map[string]struct{} // set of IPs that resulted in my ID. - peers map[[20]byte]*peer - handlers []validators.Connector + myIPs map[string]struct{} // set of IPs that resulted in my ID. + peers map[[20]byte]*peer } // NewDefaultNetwork returns a new Network implementation with the provided @@ -655,24 +648,6 @@ func (n *network) Dispatch() error { } } -// RegisterConnector implements the Network interface -func (n *network) RegisterConnector(h validators.Connector) { - n.stateLock.Lock() - defer n.stateLock.Unlock() - - if h.Connected(n.id) { - return - } - for _, peer := range n.peers { - if peer.connected { - if h.Connected(peer.id) { - return - } - } - } - n.handlers = append(n.handlers, h) -} - // IPs implements the Network interface func (n *network) Peers() []PeerID { n.stateLock.Lock() @@ -1039,15 +1014,7 @@ func (n *network) connected(p *peer) { n.connectedIPs[str] = struct{}{} } - for i := 0; i < len(n.handlers); { - if n.handlers[i].Connected(p.id) { - newLen := len(n.handlers) - 1 - n.handlers[i] = n.handlers[newLen] // remove the current handler - n.handlers = n.handlers[:newLen] - } else { - i++ - } - } + n.router.Connected(p.id) } // assumes the stateLock is held when called @@ -1068,14 +1035,6 @@ func (n *network) disconnected(p *peer) { } if p.connected { - for i := 0; i < len(n.handlers); { - if n.handlers[i].Disconnected(p.id) { - newLen := len(n.handlers) - 1 - n.handlers[i] = n.handlers[newLen] // remove the current handler - n.handlers = n.handlers[:newLen] - } else { - i++ - } - } + n.router.Disconnected(p.id) } } diff --git a/network/network_test.go b/network/network_test.go index 468b6838597e..08eda2b88c23 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -140,18 +140,6 @@ func (c *testConn) SetDeadline(time.Time) error { return nil } func (c *testConn) SetReadDeadline(time.Time) error { return nil } func (c *testConn) SetWriteDeadline(time.Time) error { return nil } -type testHandler struct { - connected func(ids.ShortID) bool - disconnected func(ids.ShortID) bool -} - -func (h *testHandler) Connected(id ids.ShortID) bool { - return h.connected != nil && h.connected(id) -} -func (h *testHandler) Disconnected(id ids.ShortID) bool { - return h.disconnected != nil && h.disconnected(id) -} - func TestNewDefaultNetwork(t *testing.T) { log := logging.NoLog{} ip := utils.IPDesc{ @@ -211,735 +199,6 @@ func TestNewDefaultNetwork(t *testing.T) { assert.Error(t, err) } -func TestEstablishConnection(t *testing.T) { - log := logging.NoLog{} - networkID := uint32(0) - appVersion := version.NewDefaultVersion("app", 0, 1, 0) - versionParser := version.NewDefaultParser() - - ip0 := utils.IPDesc{ - IP: net.IPv6loopback, - Port: 0, - } - id0 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip0.String()))) - ip1 := utils.IPDesc{ - IP: net.IPv6loopback, - Port: 1, - } - id1 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip1.String()))) - - listener0 := &testListener{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 0, - }, - inbound: make(chan net.Conn, 1<<10), - closed: make(chan struct{}), - } - caller0 := &testDialer{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 0, - }, - outbounds: make(map[string]*testListener), - } - listener1 := &testListener{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 1, - }, - inbound: make(chan net.Conn, 1<<10), - closed: make(chan struct{}), - } - caller1 := &testDialer{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 1, - }, - outbounds: make(map[string]*testListener), - } - - caller0.outbounds[ip1.String()] = listener1 - caller1.outbounds[ip0.String()] = listener0 - - serverUpgrader := NewIPUpgrader() - clientUpgrader := NewIPUpgrader() - - vdrs := validators.NewSet() - handler := router.Router(nil) - - net0 := NewDefaultNetwork( - prometheus.NewRegistry(), - log, - id0, - ip0, - networkID, - appVersion, - versionParser, - listener0, - caller0, - serverUpgrader, - clientUpgrader, - vdrs, - vdrs, - handler, - ) - assert.NotNil(t, net0) - - net1 := NewDefaultNetwork( - prometheus.NewRegistry(), - log, - id1, - ip1, - networkID, - appVersion, - versionParser, - listener1, - caller1, - serverUpgrader, - clientUpgrader, - vdrs, - vdrs, - handler, - ) - assert.NotNil(t, net1) - - var ( - wg0 sync.WaitGroup - wg1 sync.WaitGroup - ) - wg0.Add(1) - wg1.Add(1) - - h0 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id0) { - wg0.Done() - } - return false - }, - } - h1 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id1) { - wg1.Done() - } - return false - }, - } - - net0.RegisterConnector(h0) - net1.RegisterConnector(h1) - - net0.Track(ip1) - - go func() { - err := net0.Dispatch() - assert.Error(t, err) - }() - go func() { - err := net1.Dispatch() - assert.Error(t, err) - }() - - wg0.Wait() - wg1.Wait() - - err := net0.Close() - assert.NoError(t, err) - - err = net1.Close() - assert.NoError(t, err) -} - -func TestDoubleTrack(t *testing.T) { - log := logging.NoLog{} - networkID := uint32(0) - appVersion := version.NewDefaultVersion("app", 0, 1, 0) - versionParser := version.NewDefaultParser() - - ip0 := utils.IPDesc{ - IP: net.IPv6loopback, - Port: 0, - } - id0 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip0.String()))) - ip1 := utils.IPDesc{ - IP: net.IPv6loopback, - Port: 1, - } - id1 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip1.String()))) - - listener0 := &testListener{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 0, - }, - inbound: make(chan net.Conn, 1<<10), - closed: make(chan struct{}), - } - caller0 := &testDialer{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 0, - }, - outbounds: make(map[string]*testListener), - } - listener1 := &testListener{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 1, - }, - inbound: make(chan net.Conn, 1<<10), - closed: make(chan struct{}), - } - caller1 := &testDialer{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 1, - }, - outbounds: make(map[string]*testListener), - } - - caller0.outbounds[ip1.String()] = listener1 - caller1.outbounds[ip0.String()] = listener0 - - serverUpgrader := NewIPUpgrader() - clientUpgrader := NewIPUpgrader() - - vdrs := validators.NewSet() - handler := router.Router(nil) - - net0 := NewDefaultNetwork( - prometheus.NewRegistry(), - log, - id0, - ip0, - networkID, - appVersion, - versionParser, - listener0, - caller0, - serverUpgrader, - clientUpgrader, - vdrs, - vdrs, - handler, - ) - assert.NotNil(t, net0) - - net1 := NewDefaultNetwork( - prometheus.NewRegistry(), - log, - id1, - ip1, - networkID, - appVersion, - versionParser, - listener1, - caller1, - serverUpgrader, - clientUpgrader, - vdrs, - vdrs, - handler, - ) - assert.NotNil(t, net1) - - var ( - wg0 sync.WaitGroup - wg1 sync.WaitGroup - ) - wg0.Add(1) - wg1.Add(1) - - h0 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id0) { - wg0.Done() - } - return false - }, - } - h1 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id1) { - wg1.Done() - } - return false - }, - } - - net0.RegisterConnector(h0) - net1.RegisterConnector(h1) - - net0.Track(ip1) - net0.Track(ip1) - - go func() { - err := net0.Dispatch() - assert.Error(t, err) - }() - go func() { - err := net1.Dispatch() - assert.Error(t, err) - }() - - wg0.Wait() - wg1.Wait() - - err := net0.Close() - assert.NoError(t, err) - - err = net1.Close() - assert.NoError(t, err) -} - -func TestDoubleClose(t *testing.T) { - log := logging.NoLog{} - networkID := uint32(0) - appVersion := version.NewDefaultVersion("app", 0, 1, 0) - versionParser := version.NewDefaultParser() - - ip0 := utils.IPDesc{ - IP: net.IPv6loopback, - Port: 0, - } - id0 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip0.String()))) - ip1 := utils.IPDesc{ - IP: net.IPv6loopback, - Port: 1, - } - id1 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip1.String()))) - - listener0 := &testListener{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 0, - }, - inbound: make(chan net.Conn, 1<<10), - closed: make(chan struct{}), - } - caller0 := &testDialer{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 0, - }, - outbounds: make(map[string]*testListener), - } - listener1 := &testListener{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 1, - }, - inbound: make(chan net.Conn, 1<<10), - closed: make(chan struct{}), - } - caller1 := &testDialer{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 1, - }, - outbounds: make(map[string]*testListener), - } - - caller0.outbounds[ip1.String()] = listener1 - caller1.outbounds[ip0.String()] = listener0 - - serverUpgrader := NewIPUpgrader() - clientUpgrader := NewIPUpgrader() - - vdrs := validators.NewSet() - handler := router.Router(nil) - - net0 := NewDefaultNetwork( - prometheus.NewRegistry(), - log, - id0, - ip0, - networkID, - appVersion, - versionParser, - listener0, - caller0, - serverUpgrader, - clientUpgrader, - vdrs, - vdrs, - handler, - ) - assert.NotNil(t, net0) - - net1 := NewDefaultNetwork( - prometheus.NewRegistry(), - log, - id1, - ip1, - networkID, - appVersion, - versionParser, - listener1, - caller1, - serverUpgrader, - clientUpgrader, - vdrs, - vdrs, - handler, - ) - assert.NotNil(t, net1) - - var ( - wg0 sync.WaitGroup - wg1 sync.WaitGroup - ) - wg0.Add(1) - wg1.Add(1) - - h0 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id0) { - wg0.Done() - } - return false - }, - } - h1 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id1) { - wg1.Done() - } - return false - }, - } - - net0.RegisterConnector(h0) - net1.RegisterConnector(h1) - - net0.Track(ip1) - - go func() { - err := net0.Dispatch() - assert.Error(t, err) - }() - go func() { - err := net1.Dispatch() - assert.Error(t, err) - }() - - wg0.Wait() - wg1.Wait() - - err := net0.Close() - assert.NoError(t, err) - - err = net1.Close() - assert.NoError(t, err) - - err = net0.Close() - assert.NoError(t, err) - - err = net1.Close() - assert.NoError(t, err) -} - -func TestRemoveHandlers(t *testing.T) { - log := logging.NoLog{} - networkID := uint32(0) - appVersion := version.NewDefaultVersion("app", 0, 1, 0) - versionParser := version.NewDefaultParser() - - ip0 := utils.IPDesc{ - IP: net.IPv6loopback, - Port: 0, - } - id0 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip0.String()))) - ip1 := utils.IPDesc{ - IP: net.IPv6loopback, - Port: 1, - } - id1 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip1.String()))) - - listener0 := &testListener{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 0, - }, - inbound: make(chan net.Conn, 1<<10), - closed: make(chan struct{}), - } - caller0 := &testDialer{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 0, - }, - outbounds: make(map[string]*testListener), - } - listener1 := &testListener{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 1, - }, - inbound: make(chan net.Conn, 1<<10), - closed: make(chan struct{}), - } - caller1 := &testDialer{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 1, - }, - outbounds: make(map[string]*testListener), - } - - caller0.outbounds[ip1.String()] = listener1 - caller1.outbounds[ip0.String()] = listener0 - - serverUpgrader := NewIPUpgrader() - clientUpgrader := NewIPUpgrader() - - vdrs := validators.NewSet() - handler := router.Router(nil) - - net0 := NewDefaultNetwork( - prometheus.NewRegistry(), - log, - id0, - ip0, - networkID, - appVersion, - versionParser, - listener0, - caller0, - serverUpgrader, - clientUpgrader, - vdrs, - vdrs, - handler, - ) - assert.NotNil(t, net0) - - net1 := NewDefaultNetwork( - prometheus.NewRegistry(), - log, - id1, - ip1, - networkID, - appVersion, - versionParser, - listener1, - caller1, - serverUpgrader, - clientUpgrader, - vdrs, - vdrs, - handler, - ) - assert.NotNil(t, net1) - - var ( - wg0 sync.WaitGroup - wg1 sync.WaitGroup - ) - wg0.Add(1) - wg1.Add(1) - - h0 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id0) { - wg0.Done() - } - return false - }, - } - h1 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id1) { - wg1.Done() - } - return false - }, - } - - net0.RegisterConnector(h0) - net1.RegisterConnector(h1) - - net0.Track(ip1) - - go func() { - err := net0.Dispatch() - assert.Error(t, err) - }() - go func() { - err := net1.Dispatch() - assert.Error(t, err) - }() - - wg0.Wait() - wg1.Wait() - - h3 := &testHandler{ - connected: func(id ids.ShortID) bool { - assert.Equal(t, id0, id) - return true - }, - } - h4 := &testHandler{ - connected: func(id ids.ShortID) bool { - return id.Equals(id0) - }, - } - - net0.RegisterConnector(h3) - net1.RegisterConnector(h4) - - err := net0.Close() - assert.NoError(t, err) - - err = net1.Close() - assert.NoError(t, err) -} - -func TestTrackConnected(t *testing.T) { - log := logging.NoLog{} - networkID := uint32(0) - appVersion := version.NewDefaultVersion("app", 0, 1, 0) - versionParser := version.NewDefaultParser() - - ip0 := utils.IPDesc{ - IP: net.IPv6loopback, - Port: 0, - } - id0 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip0.String()))) - ip1 := utils.IPDesc{ - IP: net.IPv6loopback, - Port: 1, - } - id1 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip1.String()))) - - listener0 := &testListener{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 0, - }, - inbound: make(chan net.Conn, 1<<10), - closed: make(chan struct{}), - } - caller0 := &testDialer{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 0, - }, - outbounds: make(map[string]*testListener), - } - listener1 := &testListener{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 1, - }, - inbound: make(chan net.Conn, 1<<10), - closed: make(chan struct{}), - } - caller1 := &testDialer{ - addr: &net.TCPAddr{ - IP: net.IPv6loopback, - Port: 1, - }, - outbounds: make(map[string]*testListener), - } - - caller0.outbounds[ip1.String()] = listener1 - caller1.outbounds[ip0.String()] = listener0 - - serverUpgrader := NewIPUpgrader() - clientUpgrader := NewIPUpgrader() - - vdrs := validators.NewSet() - handler := router.Router(nil) - - net0 := NewDefaultNetwork( - prometheus.NewRegistry(), - log, - id0, - ip0, - networkID, - appVersion, - versionParser, - listener0, - caller0, - serverUpgrader, - clientUpgrader, - vdrs, - vdrs, - handler, - ) - assert.NotNil(t, net0) - - net1 := NewDefaultNetwork( - prometheus.NewRegistry(), - log, - id1, - ip1, - networkID, - appVersion, - versionParser, - listener1, - caller1, - serverUpgrader, - clientUpgrader, - vdrs, - vdrs, - handler, - ) - assert.NotNil(t, net1) - - var ( - wg0 sync.WaitGroup - wg1 sync.WaitGroup - ) - wg0.Add(1) - wg1.Add(1) - - h0 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id0) { - wg0.Done() - } - return false - }, - } - h1 := &testHandler{ - connected: func(id ids.ShortID) bool { - if !id.Equals(id1) { - wg1.Done() - } - return false - }, - } - - net0.RegisterConnector(h0) - net1.RegisterConnector(h1) - - net0.Track(ip1) - - go func() { - err := net0.Dispatch() - assert.Error(t, err) - }() - go func() { - err := net1.Dispatch() - assert.Error(t, err) - }() - - wg0.Wait() - wg1.Wait() - - net0.Track(ip1) - - err := net0.Close() - assert.NoError(t, err) - - err = net1.Close() - assert.NoError(t, err) -} - func TestTrackConnectedRace(t *testing.T) { log := logging.NoLog{} networkID := uint32(0) diff --git a/node/node.go b/node/node.go index 38019183f1bd..bf4909fc5a0f 100644 --- a/node/node.go +++ b/node/node.go @@ -31,6 +31,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/ipcs" "github.com/ava-labs/avalanchego/network" + "github.com/ava-labs/avalanchego/snow/networking/router" "github.com/ava-labs/avalanchego/snow/networking/timeout" "github.com/ava-labs/avalanchego/snow/triggers" "github.com/ava-labs/avalanchego/snow/validators" @@ -38,7 +39,6 @@ import ( "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/hashing" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/timer" "github.com/ava-labs/avalanchego/utils/wrappers" "github.com/ava-labs/avalanchego/version" "github.com/ava-labs/avalanchego/vms" @@ -161,6 +161,15 @@ func (n *Node) initNetworking() error { return err } + consensusRouter := n.Config.ConsensusRouter + if !n.Config.EnableStaking { + consensusRouter = &insecureValidatorManager{ + Router: consensusRouter, + vdrs: primaryNetworkValidators, + weight: n.Config.DisabledStakingWeight, + } + } + n.Net = network.NewDefaultNetwork( n.Config.ConsensusParams.Metrics, n.Log, @@ -175,16 +184,9 @@ func (n *Node) initNetworking() error { clientUpgrader, primaryNetworkValidators, n.beacons, - n.Config.ConsensusRouter, + consensusRouter, ) - if !n.Config.EnableStaking { - n.Net.RegisterConnector(&insecureValidatorManager{ - vdrs: primaryNetworkValidators, - weight: n.Config.DisabledStakingWeight, - }) - } - n.nodeCloser = utils.HandleSignals(func(os.Signal) { // errors are already logged internally if they are meaningful _ = n.Net.Close() @@ -194,20 +196,21 @@ func (n *Node) initNetworking() error { } type insecureValidatorManager struct { + router.Router vdrs validators.Set weight uint64 } -func (i *insecureValidatorManager) Connected(vdrID ids.ShortID) bool { +func (i *insecureValidatorManager) Connected(vdrID ids.ShortID) { _ = i.vdrs.AddWeight(vdrID, i.weight) - return false + i.Router.Connected(vdrID) } -func (i *insecureValidatorManager) Disconnected(vdrID ids.ShortID) bool { +func (i *insecureValidatorManager) Disconnected(vdrID ids.ShortID) { // Shouldn't error unless the set previously had an error, which should // never happen as described above _ = i.vdrs.RemoveWeight(vdrID, i.weight) - return false + i.Router.Disconnected(vdrID) } // Dispatch starts the node's servers. @@ -363,28 +366,30 @@ func (n *Node) initChains(genesisBytes []byte, avaxAssetID ids.ID) error { CustomBeacons: n.beacons, }) - bootstrapWeight := n.beacons.Weight() + // TODO: Introduce this back in - reqWeight := (3*bootstrapWeight + 3) / 4 + // bootstrapWeight := n.beacons.Weight() - if reqWeight == 0 { - return nil - } + // reqWeight := (3*bootstrapWeight + 3) / 4 - connectToBootstrapsTimeout := timer.NewTimer(func() { - n.Log.Fatal("Failed to connect to bootstrap nodes. Node shutting down...") - go n.Net.Close() - }) + // if reqWeight == 0 { + // return nil + // } - awaiter := chains.NewAwaiter(n.beacons, reqWeight, func() { - n.Log.Info("Connected to required bootstrap nodes. Starting Platform Chain...") - connectToBootstrapsTimeout.Cancel() - }) + // connectToBootstrapsTimeout := timer.NewTimer(func() { + // n.Log.Fatal("Failed to connect to bootstrap nodes. Node shutting down...") + // go n.Net.Close() + // }) + + // awaiter := chains.NewAwaiter(n.beacons, reqWeight, func() { + // n.Log.Info("Connected to required bootstrap nodes. Starting Platform Chain...") + // connectToBootstrapsTimeout.Cancel() + // }) - go connectToBootstrapsTimeout.Dispatch() - connectToBootstrapsTimeout.SetTimeoutIn(15 * time.Second) + // go connectToBootstrapsTimeout.Dispatch() + // connectToBootstrapsTimeout.SetTimeoutIn(15 * time.Second) - n.Net.RegisterConnector(awaiter) + // n.Net.RegisterConnector(awaiter) return nil } diff --git a/snow/engine/avalanche/bootstrap/bootstrapper.go b/snow/engine/avalanche/bootstrap/bootstrapper.go index 42d75394ee9e..af8515396910 100644 --- a/snow/engine/avalanche/bootstrap/bootstrapper.go +++ b/snow/engine/avalanche/bootstrap/bootstrapper.go @@ -16,6 +16,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/common/queue" "github.com/ava-labs/avalanchego/snow/triggers" + "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/formatting" ) @@ -395,3 +396,19 @@ func (b *Bootstrapper) executeAll(jobs *queue.Jobs, events *triggers.EventDispat b.Ctx.Log.Info("executed %d operations", numExecuted) return nil } + +// Connected implements the Engine interface. +func (b *Bootstrapper) Connected(validatorID ids.ShortID) error { + if connector, ok := b.VM.(validators.Connector); ok { + connector.Connected(validatorID) + } + return b.Bootstrapper.Connected(validatorID) +} + +// Disconnected implements the Engine interface. +func (b *Bootstrapper) Disconnected(validatorID ids.ShortID) error { + if connector, ok := b.VM.(validators.Connector); ok { + connector.Disconnected(validatorID) + } + return b.Bootstrapper.Disconnected(validatorID) +} diff --git a/snow/engine/common/bootstrapper.go b/snow/engine/common/bootstrapper.go index cc0e8e4b49f6..02b551ed3bb4 100644 --- a/snow/engine/common/bootstrapper.go +++ b/snow/engine/common/bootstrapper.go @@ -34,6 +34,8 @@ const ( type Bootstrapper struct { Config + RequestID uint32 + // IDs of validators we have requested the accepted frontier from but haven't // received a reply from pendingAcceptedFrontier ids.ShortSet @@ -42,7 +44,8 @@ type Bootstrapper struct { pendingAccepted ids.ShortSet acceptedVotes map[[32]byte]uint64 - RequestID uint32 + // current weight + weight uint64 } // Initialize implements the Engine interface. @@ -170,3 +173,34 @@ func (b *Bootstrapper) Accepted(validatorID ids.ShortID, requestID uint32, conta return b.Bootstrapable.ForceAccepted(accepted) } + +// Connected implements the Engine interface. +func (b *Bootstrapper) Connected(validatorID ids.ShortID) error { + weight, ok := b.Validators.GetWeight(validatorID) + if !ok { + return nil + } + weight, err := math.Add64(weight, b.weight) + if err != nil { + return err + } + b.weight = weight + if b.weight < b.StartupAlpha { + return nil + } + return b.Startup() +} + +// Disconnected implements the Engine interface. +func (b *Bootstrapper) Disconnected(validatorID ids.ShortID) error { + if weight, ok := b.Validators.GetWeight(validatorID); ok { + // TODO: Account for weight changes in a more robust manner. + + // Sub64 should rarely error since only validators that have added their + // weight can become disconnected. Because it is possible that there are + // changes to the validators set, we utilize that Sub64 returns 0 on + // error. + b.weight, _ = math.Sub64(b.weight, weight) + } + return nil +} diff --git a/snow/engine/common/config.go b/snow/engine/common/config.go index 60e8074b65ba..4ddb6c2bdcd6 100644 --- a/snow/engine/common/config.go +++ b/snow/engine/common/config.go @@ -15,6 +15,7 @@ type Config struct { Validators validators.Set Beacons validators.Set + StartupAlpha uint64 Alpha uint64 Sender Sender Bootstrapable Bootstrapable diff --git a/snow/engine/common/engine.go b/snow/engine/common/engine.go index 872f7243a1c8..cc802fa9d8b2 100644 --- a/snow/engine/common/engine.go +++ b/snow/engine/common/engine.go @@ -292,4 +292,10 @@ type InternalHandler interface { // Notify this engine of a message from the virtual machine. Notify(Message) error + + // Notify this engine of a new peer. + Connected(validatorID ids.ShortID) error + + // Notify this engine of a removed peer. + Disconnected(validatorID ids.ShortID) error } diff --git a/snow/engine/common/test_engine.go b/snow/engine/common/test_engine.go index b3e3b7175e3a..3c65427f915d 100644 --- a/snow/engine/common/test_engine.go +++ b/snow/engine/common/test_engine.go @@ -44,6 +44,9 @@ type EngineTest struct { CantQueryFailed, CantChits bool + CantConnected, + CantDisconnected bool + IsBootstrappedF func() bool ContextF func() *snow.Context StartupF, GossipF, ShutdownF func() error @@ -54,6 +57,7 @@ type EngineTest struct { AcceptedFrontierF, GetAcceptedF, AcceptedF, ChitsF func(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) error GetAcceptedFrontierF, GetFailedF, GetAncestorsFailedF, QueryFailedF, GetAcceptedFrontierFailedF, GetAcceptedFailedF func(validatorID ids.ShortID, requestID uint32) error + ConnectedF, DisconnectedF func(validatorID ids.ShortID) error } var _ Engine = &EngineTest{} @@ -89,6 +93,9 @@ func (e *EngineTest) Default(cant bool) { e.CantPullQuery = cant e.CantQueryFailed = cant e.CantChits = cant + + e.CantConnected = cant + e.CantDisconnected = cant } // Context ... @@ -383,6 +390,34 @@ func (e *EngineTest) Chits(validatorID ids.ShortID, requestID uint32, containerI return errors.New("unexpectedly called Chits") } +// Connected ... +func (e *EngineTest) Connected(validatorID ids.ShortID) error { + if e.ConnectedF != nil { + return e.ConnectedF(validatorID) + } + if !e.CantConnected { + return nil + } + if e.T != nil { + e.T.Fatalf("Unexpectedly called Connected") + } + return errors.New("unexpectedly called Connected") +} + +// Disconnected ... +func (e *EngineTest) Disconnected(validatorID ids.ShortID) error { + if e.DisconnectedF != nil { + return e.DisconnectedF(validatorID) + } + if !e.CantDisconnected { + return nil + } + if e.T != nil { + e.T.Fatalf("Unexpectedly called Disconnected") + } + return errors.New("unexpectedly called Disconnected") +} + // IsBootstrapped ... func (e *EngineTest) IsBootstrapped() bool { if e.IsBootstrappedF != nil { diff --git a/snow/engine/snowman/bootstrap/bootstrapper.go b/snow/engine/snowman/bootstrap/bootstrapper.go index 7ecea2552c2b..d47d3609c032 100644 --- a/snow/engine/snowman/bootstrap/bootstrapper.go +++ b/snow/engine/snowman/bootstrap/bootstrapper.go @@ -14,6 +14,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/common/queue" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" + "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/formatting" ) @@ -285,3 +286,19 @@ func (b *Bootstrapper) executeAll(jobs *queue.Jobs) error { b.Ctx.Log.Info("executed %d blocks", numExecuted) return nil } + +// Connected implements the Engine interface. +func (b *Bootstrapper) Connected(validatorID ids.ShortID) error { + if connector, ok := b.VM.(validators.Connector); ok { + connector.Connected(validatorID) + } + return b.Bootstrapper.Connected(validatorID) +} + +// Disconnected implements the Engine interface. +func (b *Bootstrapper) Disconnected(validatorID ids.ShortID) error { + if connector, ok := b.VM.(validators.Connector); ok { + connector.Disconnected(validatorID) + } + return b.Bootstrapper.Disconnected(validatorID) +} diff --git a/snow/networking/router/chain_router.go b/snow/networking/router/chain_router.go index b8364e4bea7b..33f2a736adcb 100644 --- a/snow/networking/router/chain_router.go +++ b/snow/networking/router/chain_router.go @@ -19,6 +19,10 @@ const ( defaultCPUInterval = 5 * time.Second ) +var ( + _ Router = &ChainRouter{} +) + // ChainRouter routes incoming messages from the validator network // to the consensus engines that the messages are intended for. // Note that consensus engines are uniquely identified by the ID of the chain @@ -31,6 +35,7 @@ type ChainRouter struct { gossiper *timer.Repeater intervalNotifier *timer.Repeater closeTimeout time.Duration + peers ids.ShortSet } // Initialize the router. @@ -50,14 +55,43 @@ func (sr *ChainRouter) Initialize( sr.log = log sr.chains = make(map[[32]byte]*Handler) sr.timeouts = timeouts - sr.gossiper = timer.NewRepeater(sr.Gossip, gossipFrequency) - sr.intervalNotifier = timer.NewRepeater(sr.EndInterval, defaultCPUInterval) + sr.gossiper = timer.NewRepeater(sr.gossip, gossipFrequency) + sr.intervalNotifier = timer.NewRepeater(sr.endInterval, defaultCPUInterval) sr.closeTimeout = closeTimeout go log.RecoverAndPanic(sr.gossiper.Dispatch) go log.RecoverAndPanic(sr.intervalNotifier.Dispatch) } +// Shutdown shuts down this router +func (sr *ChainRouter) Shutdown() { + sr.lock.Lock() + prevChains := sr.chains + sr.chains = map[[32]byte]*Handler{} + sr.lock.Unlock() + + sr.gossiper.Stop() + sr.intervalNotifier.Stop() + + for _, chain := range prevChains { + chain.Shutdown() + } + + ticker := time.NewTicker(sr.closeTimeout) + timedout := false + for _, chain := range prevChains { + select { + case <-chain.closed: + case <-ticker.C: + timedout = true + } + } + if timedout { + sr.log.Warn("timed out while shutting down the chains") + } + ticker.Stop() +} + // AddChain registers the specified chain so that incoming // messages can be routed to it func (sr *ChainRouter) AddChain(chain *Handler) { @@ -68,6 +102,10 @@ func (sr *ChainRouter) AddChain(chain *Handler) { sr.log.Debug("registering chain %s with chain router", chainID) chain.toClose = func() { sr.RemoveChain(chainID) } sr.chains[chainID.Key()] = chain + + for _, validatorID := range sr.peers.List() { + chain.Connected(validatorID) + } } // RemoveChain removes the specified chain so that incoming @@ -335,39 +373,32 @@ func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requ } } -// Shutdown shuts down this router -func (sr *ChainRouter) Shutdown() { +// Connected routes an incoming notification that a validator was just connected +func (sr *ChainRouter) Connected(validatorID ids.ShortID) { sr.lock.Lock() - prevChains := sr.chains - sr.chains = map[[32]byte]*Handler{} - sr.lock.Unlock() - - sr.gossiper.Stop() - sr.intervalNotifier.Stop() + defer sr.lock.Unlock() - for _, chain := range prevChains { - chain.Shutdown() + sr.peers.Add(validatorID) + for _, chain := range sr.chains { + chain.Connected(validatorID) } +} - ticker := time.NewTicker(sr.closeTimeout) - timedout := false - for _, chain := range prevChains { - select { - case <-chain.closed: - case <-ticker.C: - timedout = true - } - } - if timedout { - sr.log.Warn("timed out while shutting down the chains") +// Disconnected routes an incoming notification that a validator was connected +func (sr *ChainRouter) Disconnected(validatorID ids.ShortID) { + sr.lock.Lock() + defer sr.lock.Unlock() + + sr.peers.Remove(validatorID) + for _, chain := range sr.chains { + chain.Disconnected(validatorID) } - ticker.Stop() } // Gossip accepted containers -func (sr *ChainRouter) Gossip() { - sr.lock.Lock() - defer sr.lock.Unlock() +func (sr *ChainRouter) gossip() { + sr.lock.RLock() + defer sr.lock.RUnlock() for _, chain := range sr.chains { chain.Gossip() @@ -375,9 +406,9 @@ func (sr *ChainRouter) Gossip() { } // EndInterval notifies the chains that the current CPU interval has ended -func (sr *ChainRouter) EndInterval() { - sr.lock.Lock() - defer sr.lock.Unlock() +func (sr *ChainRouter) endInterval() { + sr.lock.RLock() + defer sr.lock.RUnlock() for _, chain := range sr.chains { chain.endInterval() diff --git a/snow/networking/router/handler.go b/snow/networking/router/handler.go index 90905cefa0ce..b8a3fd5a3b79 100644 --- a/snow/networking/router/handler.go +++ b/snow/networking/router/handler.go @@ -438,6 +438,20 @@ func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) { }) } +// Connected passes a new connection notification to the consensus engine +func (h *Handler) Connected(validatorID ids.ShortID) { + h.sendReliableMsg(message{ + messageType: gossipMsg, + }) +} + +// Disconnected passes a new connection notification to the consensus engine +func (h *Handler) Disconnected(validatorID ids.ShortID) { + h.sendReliableMsg(message{ + messageType: gossipMsg, + }) +} + // Gossip passes a gossip request to the consensus engine func (h *Handler) Gossip() { h.sendReliableMsg(message{ @@ -547,6 +561,14 @@ func (h *Handler) handleValidatorMsg(msg message, startTime time.Time) error { err = h.engine.Chits(msg.validatorID, msg.requestID, msg.containerIDs) timeConsumed = h.clock.Time().Sub(startTime) h.chits.Observe(float64(timeConsumed.Nanoseconds())) + case connectedMsg: + err = h.engine.Connected(msg.validatorID) + timeConsumed = h.clock.Time().Sub(startTime) + h.connected.Observe(float64(timeConsumed.Nanoseconds())) + case disconnectedMsg: + err = h.engine.Disconnected(msg.validatorID) + timeConsumed = h.clock.Time().Sub(startTime) + h.disconnected.Observe(float64(timeConsumed.Nanoseconds())) } h.serviceQueue.UtilizeCPU(msg.validatorID, timeConsumed) diff --git a/snow/networking/router/message.go b/snow/networking/router/message.go index 12b613339700..9e8d33800a76 100644 --- a/snow/networking/router/message.go +++ b/snow/networking/router/message.go @@ -30,6 +30,8 @@ const ( pullQueryMsg chitsMsg queryFailedMsg + connectedMsg + disconnectedMsg notifyMsg gossipMsg getAncestorsMsg @@ -107,6 +109,10 @@ func (t msgType) String() string { return "Chits Message" case queryFailedMsg: return "Query Failed Message" + case connectedMsg: + return "Connected Message" + case disconnectedMsg: + return "Disconnected Message" case notifyMsg: return "Notify Message" case gossipMsg: diff --git a/snow/networking/router/metrics.go b/snow/networking/router/metrics.go index 29af9b2f4112..1db026e9e5e4 100644 --- a/snow/networking/router/metrics.go +++ b/snow/networking/router/metrics.go @@ -41,6 +41,7 @@ type metrics struct { getAncestors, multiPut, getAncestorsFailed, get, put, getFailed, pushQuery, pullQuery, chits, queryFailed, + connected, disconnected, notify, gossip, cpu, @@ -106,6 +107,8 @@ func (m *metrics) Initialize(namespace string, registerer prometheus.Registerer) m.pullQuery = initHistogram(namespace, "pull_query", registerer, &errs) m.chits = initHistogram(namespace, "chits", registerer, &errs) m.queryFailed = initHistogram(namespace, "query_failed", registerer, &errs) + m.connected = initHistogram(namespace, "connected", registerer, &errs) + m.disconnected = initHistogram(namespace, "disconnected", registerer, &errs) m.notify = initHistogram(namespace, "notify", registerer, &errs) m.gossip = initHistogram(namespace, "gossip", registerer, &errs) diff --git a/snow/networking/router/router.go b/snow/networking/router/router.go index 0d8593141a9e..0b0c5bbfd5ca 100644 --- a/snow/networking/router/router.go +++ b/snow/networking/router/router.go @@ -17,15 +17,15 @@ type Router interface { ExternalRouter InternalRouter - AddChain(chain *Handler) - RemoveChain(chainID ids.ID) - Shutdown() Initialize( log logging.Logger, timeouts *timeout.Manager, gossipFrequency, shutdownTimeout time.Duration, ) + Shutdown() + AddChain(chain *Handler) + RemoveChain(chainID ids.ID) } // ExternalRouter routes messages from the network to the @@ -51,4 +51,7 @@ type InternalRouter interface { GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) + + Connected(validatorID ids.ShortID) + Disconnected(validatorID ids.ShortID) } diff --git a/snow/validators/connector.go b/snow/validators/connector.go index 384c7006a7b6..bf99076548ab 100644 --- a/snow/validators/connector.go +++ b/snow/validators/connector.go @@ -3,12 +3,13 @@ package validators -import "github.com/ava-labs/avalanchego/ids" +import ( + "github.com/ava-labs/avalanchego/ids" +) // Connector represents a handler that is called when a connection is marked as // connected or disconnected type Connector interface { - // returns true if the handler should be removed - Connected(id ids.ShortID) bool - Disconnected(id ids.ShortID) bool + Connected(id ids.ShortID) + Disconnected(id ids.ShortID) } diff --git a/vms/platformvm/service.go b/vms/platformvm/service.go index 15cca410d197..35adb488f31c 100644 --- a/vms/platformvm/service.go +++ b/vms/platformvm/service.go @@ -620,9 +620,7 @@ func (service *Service) GetCurrentValidators(_ *http.Request, args *GetCurrentVa } uptime := json.Float32(rawUptime) - service.vm.uptimeLock.Lock() _, connected := service.vm.connections[nodeID.Key()] - service.vm.uptimeLock.Unlock() var rewardOwner *APIOwner owner, ok := staker.RewardsOwner.(*secp256k1fx.OutputOwners) @@ -723,9 +721,7 @@ func (service *Service) GetPendingValidators(_ *http.Request, args *GetPendingVa weight := json.Uint64(staker.Validator.Weight()) delegationFee := json.Float32(100 * float32(staker.Shares) / float32(PercentDenominator)) - service.vm.uptimeLock.Lock() _, connected := service.vm.connections[nodeID.Key()] - service.vm.uptimeLock.Unlock() reply.Validators = append(reply.Validators, APIPrimaryValidator{ APIStaker: APIStaker{ NodeID: staker.Validator.ID().PrefixedString(constants.NodeIDPrefix), @@ -877,7 +873,7 @@ func (service *Service) AddValidator(_ *http.Request, args *AddValidatorArgs, re nodeID, // Node ID rewardAddress, // Reward Address uint32(10000*args.DelegationFeeRate), // Shares - privKeys, // Private keys + privKeys, // Private keys ) if err != nil { return fmt.Errorf("couldn't create tx: %w", err) diff --git a/vms/platformvm/vm.go b/vms/platformvm/vm.go index c176dfa59c2e..6465d5154fb0 100644 --- a/vms/platformvm/vm.go +++ b/vms/platformvm/vm.go @@ -6,7 +6,6 @@ package platformvm import ( "errors" "fmt" - "sync" "time" "github.com/ava-labs/avalanchego/cache" @@ -202,7 +201,6 @@ type VM struct { bootstrappedTime time.Time - uptimeLock sync.Mutex connections map[[20]byte]time.Time } @@ -536,9 +534,6 @@ func (vm *VM) Shutdown() error { stopIter := stopDB.NewIterator() defer stopIter.Release() - vm.uptimeLock.Lock() - defer vm.uptimeLock.Unlock() - for stopIter.Next() { // Iterates in order of increasing start time txBytes := stopIter.Value() @@ -818,40 +813,30 @@ func (vm *VM) CreateStaticHandlers() map[string]*common.HTTPHandler { } // Connected implements validators.Connector -func (vm *VM) Connected(vdrID ids.ShortID) bool { - // Locking is required here because this is called directly from the - // networking library. - - vm.uptimeLock.Lock() - defer vm.uptimeLock.Unlock() - +func (vm *VM) Connected(vdrID ids.ShortID) { vm.connections[vdrID.Key()] = time.Unix(vm.clock.Time().Unix(), 0) - return false } // Disconnected implements validators.Connector -func (vm *VM) Disconnected(vdrID ids.ShortID) bool { - // Locking is required here because this is called directly from the - // networking library. - - vm.uptimeLock.Lock() - defer vm.uptimeLock.Unlock() - +func (vm *VM) Disconnected(vdrID ids.ShortID) { vdrKey := vdrID.Key() - timeConnected := vm.connections[vdrKey] + timeConnected, ok := vm.connections[vdrKey] + if !ok { + return + } delete(vm.connections, vdrKey) if !vm.bootstrapped { - return false + return } txIntf, isValidator, err := vm.isValidator(vm.DB, constants.PrimaryNetworkID, vdrID) if err != nil || !isValidator { - return false + return } tx, ok := txIntf.(*UnsignedAddValidatorTx) if !ok { - return false + return } uptime, err := vm.uptime(vm.DB, vdrID) @@ -861,7 +846,7 @@ func (vm *VM) Disconnected(vdrID ids.ShortID) bool { LastUpdated: uint64(tx.StartTime().Unix()), } case err != nil: - return false + return } if timeConnected.Before(vm.bootstrappedTime) { @@ -875,7 +860,7 @@ func (vm *VM) Disconnected(vdrID ids.ShortID) bool { currentLocalTime := vm.clock.Time() if !currentLocalTime.After(lastUpdated) { - return false + return } uptime.UpDuration += uint64(currentLocalTime.Sub(timeConnected) / time.Second) @@ -887,7 +872,7 @@ func (vm *VM) Disconnected(vdrID ids.ShortID) bool { if err := vm.DB.Commit(); err != nil { vm.Ctx.Log.Error("failed to commit database changes") } - return false + return } // Check if there is a block ready to be added to consensus @@ -1376,9 +1361,6 @@ func (vm *VM) FormatAddress(chainID ids.ID, addr ids.ShortID) (string, error) { } func (vm *VM) calculateUptime(db database.Database, nodeID ids.ShortID, startTime time.Time) (float64, error) { - vm.uptimeLock.Lock() - defer vm.uptimeLock.Unlock() - uptime, err := vm.uptime(db, nodeID) switch { case err == database.ErrNotFound: From 29d1cb53b21ffc145533a051e2a891cb0a710014 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Sat, 12 Sep 2020 18:41:15 -0400 Subject: [PATCH 04/15] cleaned up connection registry --- node/node.go | 86 +++++++++++++++++++++--------- snow/engine/common/bootstrapper.go | 4 +- snow/networking/router/handler.go | 6 ++- 3 files changed, 68 insertions(+), 28 deletions(-) diff --git a/node/node.go b/node/node.go index 44c6270e70a4..14c300663324 100644 --- a/node/node.go +++ b/node/node.go @@ -39,6 +39,8 @@ import ( "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/hashing" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/math" + "github.com/ava-labs/avalanchego/utils/timer" "github.com/ava-labs/avalanchego/utils/wrappers" "github.com/ava-labs/avalanchego/version" "github.com/ava-labs/avalanchego/vms" @@ -170,6 +172,26 @@ func (n *Node) initNetworking() error { } } + bootstrapWeight := n.beacons.Weight() + reqWeight := (3*bootstrapWeight + 3) / 4 + + if reqWeight > 0 { + timer := timer.NewTimer(func() { + n.Log.Fatal("Failed to connect to bootstrap nodes. Node shutting down...") + go n.Net.Close() + }) + + go timer.Dispatch() + timer.SetTimeoutIn(15 * time.Second) + + consensusRouter = &beaconManager{ + Router: consensusRouter, + timer: timer, + beacons: n.beacons, + requiredWeight: reqWeight, + } + } + n.Net = network.NewDefaultNetwork( n.Config.ConsensusParams.Metrics, n.Log, @@ -213,6 +235,46 @@ func (i *insecureValidatorManager) Disconnected(vdrID ids.ShortID) { i.Router.Disconnected(vdrID) } +type beaconManager struct { + router.Router + timer *timer.Timer + beacons validators.Set + requiredWeight uint64 + weight uint64 +} + +func (b *beaconManager) Connected(vdrID ids.ShortID) { + weight, ok := b.beacons.GetWeight(vdrID) + if !ok { + b.Router.Connected(vdrID) + return + } + weight, err := math.Add64(weight, b.weight) + if err != nil { + b.timer.Cancel() + b.Router.Connected(vdrID) + return + } + b.weight = weight + if b.weight >= b.requiredWeight { + b.timer.Cancel() + } + b.Router.Connected(vdrID) +} + +func (b *beaconManager) Disconnected(vdrID ids.ShortID) { + if weight, ok := b.beacons.GetWeight(vdrID); ok { + // TODO: Account for weight changes in a more robust manner. + + // Sub64 should rarely error since only validators that have added their + // weight can become disconnected. Because it is possible that there are + // changes to the validators set, we utilize that Sub64 returns 0 on + // error. + b.weight, _ = math.Sub64(b.weight, weight) + } + b.Router.Disconnected(vdrID) +} + // Dispatch starts the node's servers. // Returns when the node exits. func (n *Node) Dispatch() error { @@ -366,30 +428,6 @@ func (n *Node) initChains(genesisBytes []byte, avaxAssetID ids.ID) error { CustomBeacons: n.beacons, }) - // TODO: Introduce this back in - - // bootstrapWeight := n.beacons.Weight() - - // reqWeight := (3*bootstrapWeight + 3) / 4 - - // if reqWeight == 0 { - // return nil - // } - - // connectToBootstrapsTimeout := timer.NewTimer(func() { - // n.Log.Fatal("Failed to connect to bootstrap nodes. Node shutting down...") - // go n.Net.Close() - // }) - - // awaiter := chains.NewAwaiter(n.beacons, reqWeight, func() { - // n.Log.Info("Connected to required bootstrap nodes. Starting Platform Chain...") - // connectToBootstrapsTimeout.Cancel() - // }) - - // go connectToBootstrapsTimeout.Dispatch() - // connectToBootstrapsTimeout.SetTimeoutIn(15 * time.Second) - - // n.Net.RegisterConnector(awaiter) return nil } diff --git a/snow/engine/common/bootstrapper.go b/snow/engine/common/bootstrapper.go index 02b551ed3bb4..cf027403a010 100644 --- a/snow/engine/common/bootstrapper.go +++ b/snow/engine/common/bootstrapper.go @@ -176,7 +176,7 @@ func (b *Bootstrapper) Accepted(validatorID ids.ShortID, requestID uint32, conta // Connected implements the Engine interface. func (b *Bootstrapper) Connected(validatorID ids.ShortID) error { - weight, ok := b.Validators.GetWeight(validatorID) + weight, ok := b.Beacons.GetWeight(validatorID) if !ok { return nil } @@ -193,7 +193,7 @@ func (b *Bootstrapper) Connected(validatorID ids.ShortID) error { // Disconnected implements the Engine interface. func (b *Bootstrapper) Disconnected(validatorID ids.ShortID) error { - if weight, ok := b.Validators.GetWeight(validatorID); ok { + if weight, ok := b.Beacons.GetWeight(validatorID); ok { // TODO: Account for weight changes in a more robust manner. // Sub64 should rarely error since only validators that have added their diff --git a/snow/networking/router/handler.go b/snow/networking/router/handler.go index b8a3fd5a3b79..1b6033f923db 100644 --- a/snow/networking/router/handler.go +++ b/snow/networking/router/handler.go @@ -441,14 +441,16 @@ func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) { // Connected passes a new connection notification to the consensus engine func (h *Handler) Connected(validatorID ids.ShortID) { h.sendReliableMsg(message{ - messageType: gossipMsg, + messageType: connectedMsg, + validatorID: validatorID, }) } // Disconnected passes a new connection notification to the consensus engine func (h *Handler) Disconnected(validatorID ids.ShortID) { h.sendReliableMsg(message{ - messageType: gossipMsg, + messageType: disconnectedMsg, + validatorID: validatorID, }) } From 451609120e0e98cd11c2f05b0921d0953770e865 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Sat, 12 Sep 2020 18:41:39 -0400 Subject: [PATCH 05/15] version bump --- node/node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/node.go b/node/node.go index 14c300663324..7e30656f7699 100644 --- a/node/node.go +++ b/node/node.go @@ -64,7 +64,7 @@ var ( genesisHashKey = []byte("genesisID") // Version is the version of this code - Version = version.NewDefaultVersion(constants.PlatformName, 0, 8, 1) + Version = version.NewDefaultVersion(constants.PlatformName, 0, 8, 2) versionParser = version.NewDefaultParser() ) From c195e906fcabfb765b4a5e99f9757bf8481ff76a Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Sat, 12 Sep 2020 18:46:20 -0400 Subject: [PATCH 06/15] formatted --- vms/platformvm/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vms/platformvm/service.go b/vms/platformvm/service.go index 35adb488f31c..95f9a7c3d450 100644 --- a/vms/platformvm/service.go +++ b/vms/platformvm/service.go @@ -873,7 +873,7 @@ func (service *Service) AddValidator(_ *http.Request, args *AddValidatorArgs, re nodeID, // Node ID rewardAddress, // Reward Address uint32(10000*args.DelegationFeeRate), // Shares - privKeys, // Private keys + privKeys, // Private keys ) if err != nil { return fmt.Errorf("couldn't create tx: %w", err) From c746b2c2d0a24285739baaec5c11229b00859446 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Sat, 12 Sep 2020 19:05:48 -0400 Subject: [PATCH 07/15] added tests back in --- network/network_test.go | 570 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 570 insertions(+) diff --git a/network/network_test.go b/network/network_test.go index 08eda2b88c23..790896cc78fe 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -140,6 +140,23 @@ func (c *testConn) SetDeadline(time.Time) error { return nil } func (c *testConn) SetReadDeadline(time.Time) error { return nil } func (c *testConn) SetWriteDeadline(time.Time) error { return nil } +type testHandler struct { + router.Router + connected func(ids.ShortID) + disconnected func(ids.ShortID) +} + +func (h *testHandler) Connected(id ids.ShortID) { + if h.connected != nil { + h.connected(id) + } +} +func (h *testHandler) Disconnected(id ids.ShortID) { + if h.disconnected != nil { + h.disconnected(id) + } +} + func TestNewDefaultNetwork(t *testing.T) { log := logging.NoLog{} ip := utils.IPDesc{ @@ -199,6 +216,559 @@ func TestNewDefaultNetwork(t *testing.T) { assert.Error(t, err) } +func TestEstablishConnection(t *testing.T) { + log := logging.NoLog{} + networkID := uint32(0) + appVersion := version.NewDefaultVersion("app", 0, 1, 0) + versionParser := version.NewDefaultParser() + + ip0 := utils.IPDesc{ + IP: net.IPv6loopback, + Port: 0, + } + id0 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip0.String()))) + ip1 := utils.IPDesc{ + IP: net.IPv6loopback, + Port: 1, + } + id1 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip1.String()))) + + listener0 := &testListener{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 0, + }, + inbound: make(chan net.Conn, 1<<10), + closed: make(chan struct{}), + } + caller0 := &testDialer{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 0, + }, + outbounds: make(map[string]*testListener), + } + listener1 := &testListener{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 1, + }, + inbound: make(chan net.Conn, 1<<10), + closed: make(chan struct{}), + } + caller1 := &testDialer{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 1, + }, + outbounds: make(map[string]*testListener), + } + + caller0.outbounds[ip1.String()] = listener1 + caller1.outbounds[ip0.String()] = listener0 + + serverUpgrader := NewIPUpgrader() + clientUpgrader := NewIPUpgrader() + + vdrs := validators.NewSet() + + var ( + wg0 sync.WaitGroup + wg1 sync.WaitGroup + ) + wg0.Add(1) + wg1.Add(1) + + handler0 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id0) { + wg0.Done() + } + }, + } + + handler1 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id1) { + wg1.Done() + } + }, + } + + net0 := NewDefaultNetwork( + prometheus.NewRegistry(), + log, + id0, + ip0, + networkID, + appVersion, + versionParser, + listener0, + caller0, + serverUpgrader, + clientUpgrader, + vdrs, + vdrs, + handler0, + ) + assert.NotNil(t, net0) + + net1 := NewDefaultNetwork( + prometheus.NewRegistry(), + log, + id1, + ip1, + networkID, + appVersion, + versionParser, + listener1, + caller1, + serverUpgrader, + clientUpgrader, + vdrs, + vdrs, + handler1, + ) + assert.NotNil(t, net1) + + net0.Track(ip1) + + go func() { + err := net0.Dispatch() + assert.Error(t, err) + }() + go func() { + err := net1.Dispatch() + assert.Error(t, err) + }() + + wg0.Wait() + wg1.Wait() + + err := net0.Close() + assert.NoError(t, err) + + err = net1.Close() + assert.NoError(t, err) +} + +func TestDoubleTrack(t *testing.T) { + log := logging.NoLog{} + networkID := uint32(0) + appVersion := version.NewDefaultVersion("app", 0, 1, 0) + versionParser := version.NewDefaultParser() + + ip0 := utils.IPDesc{ + IP: net.IPv6loopback, + Port: 0, + } + id0 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip0.String()))) + ip1 := utils.IPDesc{ + IP: net.IPv6loopback, + Port: 1, + } + id1 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip1.String()))) + + listener0 := &testListener{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 0, + }, + inbound: make(chan net.Conn, 1<<10), + closed: make(chan struct{}), + } + caller0 := &testDialer{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 0, + }, + outbounds: make(map[string]*testListener), + } + listener1 := &testListener{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 1, + }, + inbound: make(chan net.Conn, 1<<10), + closed: make(chan struct{}), + } + caller1 := &testDialer{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 1, + }, + outbounds: make(map[string]*testListener), + } + + caller0.outbounds[ip1.String()] = listener1 + caller1.outbounds[ip0.String()] = listener0 + + serverUpgrader := NewIPUpgrader() + clientUpgrader := NewIPUpgrader() + + vdrs := validators.NewSet() + + var ( + wg0 sync.WaitGroup + wg1 sync.WaitGroup + ) + wg0.Add(1) + wg1.Add(1) + + handler0 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id0) { + wg0.Done() + } + }, + } + + handler1 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id1) { + wg1.Done() + } + }, + } + + net0 := NewDefaultNetwork( + prometheus.NewRegistry(), + log, + id0, + ip0, + networkID, + appVersion, + versionParser, + listener0, + caller0, + serverUpgrader, + clientUpgrader, + vdrs, + vdrs, + handler0, + ) + assert.NotNil(t, net0) + + net1 := NewDefaultNetwork( + prometheus.NewRegistry(), + log, + id1, + ip1, + networkID, + appVersion, + versionParser, + listener1, + caller1, + serverUpgrader, + clientUpgrader, + vdrs, + vdrs, + handler1, + ) + assert.NotNil(t, net1) + + net0.Track(ip1) + net0.Track(ip1) + + go func() { + err := net0.Dispatch() + assert.Error(t, err) + }() + go func() { + err := net1.Dispatch() + assert.Error(t, err) + }() + + wg0.Wait() + wg1.Wait() + + err := net0.Close() + assert.NoError(t, err) + + err = net1.Close() + assert.NoError(t, err) +} + +func TestDoubleClose(t *testing.T) { + log := logging.NoLog{} + networkID := uint32(0) + appVersion := version.NewDefaultVersion("app", 0, 1, 0) + versionParser := version.NewDefaultParser() + + ip0 := utils.IPDesc{ + IP: net.IPv6loopback, + Port: 0, + } + id0 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip0.String()))) + ip1 := utils.IPDesc{ + IP: net.IPv6loopback, + Port: 1, + } + id1 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip1.String()))) + + listener0 := &testListener{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 0, + }, + inbound: make(chan net.Conn, 1<<10), + closed: make(chan struct{}), + } + caller0 := &testDialer{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 0, + }, + outbounds: make(map[string]*testListener), + } + listener1 := &testListener{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 1, + }, + inbound: make(chan net.Conn, 1<<10), + closed: make(chan struct{}), + } + caller1 := &testDialer{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 1, + }, + outbounds: make(map[string]*testListener), + } + + caller0.outbounds[ip1.String()] = listener1 + caller1.outbounds[ip0.String()] = listener0 + + serverUpgrader := NewIPUpgrader() + clientUpgrader := NewIPUpgrader() + + vdrs := validators.NewSet() + + var ( + wg0 sync.WaitGroup + wg1 sync.WaitGroup + ) + wg0.Add(1) + wg1.Add(1) + + handler0 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id0) { + wg0.Done() + } + }, + } + + handler1 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id1) { + wg1.Done() + } + }, + } + + net0 := NewDefaultNetwork( + prometheus.NewRegistry(), + log, + id0, + ip0, + networkID, + appVersion, + versionParser, + listener0, + caller0, + serverUpgrader, + clientUpgrader, + vdrs, + vdrs, + handler0, + ) + assert.NotNil(t, net0) + + net1 := NewDefaultNetwork( + prometheus.NewRegistry(), + log, + id1, + ip1, + networkID, + appVersion, + versionParser, + listener1, + caller1, + serverUpgrader, + clientUpgrader, + vdrs, + vdrs, + handler1, + ) + assert.NotNil(t, net1) + + net0.Track(ip1) + + go func() { + err := net0.Dispatch() + assert.Error(t, err) + }() + go func() { + err := net1.Dispatch() + assert.Error(t, err) + }() + + wg0.Wait() + wg1.Wait() + + err := net0.Close() + assert.NoError(t, err) + + err = net1.Close() + assert.NoError(t, err) + + err = net0.Close() + assert.NoError(t, err) + + err = net1.Close() + assert.NoError(t, err) +} + +func TestTrackConnected(t *testing.T) { + log := logging.NoLog{} + networkID := uint32(0) + appVersion := version.NewDefaultVersion("app", 0, 1, 0) + versionParser := version.NewDefaultParser() + + ip0 := utils.IPDesc{ + IP: net.IPv6loopback, + Port: 0, + } + id0 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip0.String()))) + ip1 := utils.IPDesc{ + IP: net.IPv6loopback, + Port: 1, + } + id1 := ids.NewShortID(hashing.ComputeHash160Array([]byte(ip1.String()))) + + listener0 := &testListener{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 0, + }, + inbound: make(chan net.Conn, 1<<10), + closed: make(chan struct{}), + } + caller0 := &testDialer{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 0, + }, + outbounds: make(map[string]*testListener), + } + listener1 := &testListener{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 1, + }, + inbound: make(chan net.Conn, 1<<10), + closed: make(chan struct{}), + } + caller1 := &testDialer{ + addr: &net.TCPAddr{ + IP: net.IPv6loopback, + Port: 1, + }, + outbounds: make(map[string]*testListener), + } + + caller0.outbounds[ip1.String()] = listener1 + caller1.outbounds[ip0.String()] = listener0 + + serverUpgrader := NewIPUpgrader() + clientUpgrader := NewIPUpgrader() + + vdrs := validators.NewSet() + + var ( + wg0 sync.WaitGroup + wg1 sync.WaitGroup + ) + wg0.Add(1) + wg1.Add(1) + + handler0 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id0) { + wg0.Done() + } + }, + } + + handler1 := &testHandler{ + connected: func(id ids.ShortID) { + if !id.Equals(id1) { + wg1.Done() + } + }, + } + + net0 := NewDefaultNetwork( + prometheus.NewRegistry(), + log, + id0, + ip0, + networkID, + appVersion, + versionParser, + listener0, + caller0, + serverUpgrader, + clientUpgrader, + vdrs, + vdrs, + handler0, + ) + assert.NotNil(t, net0) + + net1 := NewDefaultNetwork( + prometheus.NewRegistry(), + log, + id1, + ip1, + networkID, + appVersion, + versionParser, + listener1, + caller1, + serverUpgrader, + clientUpgrader, + vdrs, + vdrs, + handler1, + ) + assert.NotNil(t, net1) + + net0.Track(ip1) + + go func() { + err := net0.Dispatch() + assert.Error(t, err) + }() + go func() { + err := net1.Dispatch() + assert.Error(t, err) + }() + + wg0.Wait() + wg1.Wait() + + net0.Track(ip1) + + err := net0.Close() + assert.NoError(t, err) + + err = net1.Close() + assert.NoError(t, err) +} + func TestTrackConnectedRace(t *testing.T) { log := logging.NoLog{} networkID := uint32(0) From a9550893bd5cd3aa2b8a26a90c46e5508e491cd6 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Sat, 12 Sep 2020 19:41:20 -0400 Subject: [PATCH 08/15] added exported functions back in for testing --- snow/networking/router/chain_router.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/snow/networking/router/chain_router.go b/snow/networking/router/chain_router.go index 33f2a736adcb..2157a768553a 100644 --- a/snow/networking/router/chain_router.go +++ b/snow/networking/router/chain_router.go @@ -55,8 +55,8 @@ func (sr *ChainRouter) Initialize( sr.log = log sr.chains = make(map[[32]byte]*Handler) sr.timeouts = timeouts - sr.gossiper = timer.NewRepeater(sr.gossip, gossipFrequency) - sr.intervalNotifier = timer.NewRepeater(sr.endInterval, defaultCPUInterval) + sr.gossiper = timer.NewRepeater(sr.Gossip, gossipFrequency) + sr.intervalNotifier = timer.NewRepeater(sr.EndInterval, defaultCPUInterval) sr.closeTimeout = closeTimeout go log.RecoverAndPanic(sr.gossiper.Dispatch) @@ -396,7 +396,7 @@ func (sr *ChainRouter) Disconnected(validatorID ids.ShortID) { } // Gossip accepted containers -func (sr *ChainRouter) gossip() { +func (sr *ChainRouter) Gossip() { sr.lock.RLock() defer sr.lock.RUnlock() @@ -406,7 +406,7 @@ func (sr *ChainRouter) gossip() { } // EndInterval notifies the chains that the current CPU interval has ended -func (sr *ChainRouter) endInterval() { +func (sr *ChainRouter) EndInterval() { sr.lock.RLock() defer sr.lock.RUnlock() From a3d6d5202082de6763e0fa12cb073a8345d4280d Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Sat, 12 Sep 2020 20:04:43 -0400 Subject: [PATCH 09/15] handle bootstrapping with no bootstrappers correctly --- snow/engine/avalanche/bootstrap/bootstrapper.go | 3 +-- snow/engine/common/bootstrapper.go | 6 +++++- snow/engine/snowman/bootstrap/bootstrapper.go | 3 +-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/snow/engine/avalanche/bootstrap/bootstrapper.go b/snow/engine/avalanche/bootstrap/bootstrapper.go index af8515396910..a2010d39c9e3 100644 --- a/snow/engine/avalanche/bootstrap/bootstrapper.go +++ b/snow/engine/avalanche/bootstrap/bootstrapper.go @@ -95,8 +95,7 @@ func (b *Bootstrapper) Initialize( }) config.Bootstrapable = b - b.Bootstrapper.Initialize(config.Config) - return nil + return b.Bootstrapper.Initialize(config.Config) } // CurrentAcceptedFrontier returns the set of vertices that this node has accepted diff --git a/snow/engine/common/bootstrapper.go b/snow/engine/common/bootstrapper.go index cf027403a010..b55f6f0db6b1 100644 --- a/snow/engine/common/bootstrapper.go +++ b/snow/engine/common/bootstrapper.go @@ -49,7 +49,7 @@ type Bootstrapper struct { } // Initialize implements the Engine interface. -func (b *Bootstrapper) Initialize(config Config) { +func (b *Bootstrapper) Initialize(config Config) error { b.Config = config for _, vdr := range b.Beacons.List() { @@ -59,6 +59,10 @@ func (b *Bootstrapper) Initialize(config Config) { } b.acceptedVotes = make(map[[32]byte]uint64) + if b.Config.StartupAlpha > 0 { + return nil + } + return b.Startup() } // Startup implements the Engine interface. diff --git a/snow/engine/snowman/bootstrap/bootstrapper.go b/snow/engine/snowman/bootstrap/bootstrapper.go index d47d3609c032..9d748cbbde7a 100644 --- a/snow/engine/snowman/bootstrap/bootstrapper.go +++ b/snow/engine/snowman/bootstrap/bootstrapper.go @@ -71,8 +71,7 @@ func (b *Bootstrapper) Initialize( }) config.Bootstrapable = b - b.Bootstrapper.Initialize(config.Config) - return nil + return b.Bootstrapper.Initialize(config.Config) } // CurrentAcceptedFrontier returns the last accepted block From 4ef1e5fbb182cff8ca3e48d23920aa125ed35981 Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Sat, 12 Sep 2020 20:10:08 -0400 Subject: [PATCH 10/15] Add metrics to track the number of vtx/blocks put into MultiPut msgs --- network/network.go | 4 ++++ snow/engine/avalanche/metrics.go | 17 +++++++++++++++++ snow/engine/avalanche/transitive.go | 1 + snow/engine/snowman/metrics.go | 17 +++++++++++++++++ snow/engine/snowman/transitive.go | 1 + 5 files changed, 40 insertions(+) diff --git a/network/network.go b/network/network.go index a4867ff51081..e72d4ea4fb50 100644 --- a/network/network.go +++ b/network/network.go @@ -288,6 +288,10 @@ func (n *network) GetAcceptedFrontier(validatorIDs ids.ShortSet, chainID ids.ID, sent = peer.send(msg) } if !sent { + n.log.Debug("failed to send GetAcceptedFrontier(%s, %s, %d)", + vID, + chainID, + requestID) n.executor.Add(func() { n.router.GetAcceptedFrontierFailed(vID, chainID, requestID) }) n.getAcceptedFrontier.numFailed.Inc() } else { diff --git a/snow/engine/avalanche/metrics.go b/snow/engine/avalanche/metrics.go index 212474a7200c..3624bf211713 100644 --- a/snow/engine/avalanche/metrics.go +++ b/snow/engine/avalanche/metrics.go @@ -11,6 +11,7 @@ import ( type metrics struct { numVtxRequests, numPendingVts, numMissingTxs prometheus.Gauge + getAncestorsVtxs prometheus.Histogram } // Initialize implements the Engine interface @@ -30,6 +31,22 @@ func (m *metrics) Initialize(namespace string, registerer prometheus.Registerer) Name: "missing_txs", Help: "Number of missing transactions", }) + m.getAncestorsVtxs = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Name: "get_ancestors_vtxs", + Help: "The number of vertices fetched in a call to GetAncestors", + Buckets: []float64{ + 0, + 1, + 5, + 10, + 100, + 500, + 1000, + 1500, + 2000, + }, + }) errs := wrappers.Errs{} errs.Add( diff --git a/snow/engine/avalanche/transitive.go b/snow/engine/avalanche/transitive.go index b73d51ae75fa..d69f219d2957 100644 --- a/snow/engine/avalanche/transitive.go +++ b/snow/engine/avalanche/transitive.go @@ -186,6 +186,7 @@ func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, vtxID ids.I } } + t.metrics.getAncestorsVtxs.Observe(float64(len(ancestorsBytes))) t.Sender.MultiPut(vdr, requestID, ancestorsBytes) return nil } diff --git a/snow/engine/snowman/metrics.go b/snow/engine/snowman/metrics.go index 5ab5fb520320..6b7485b043b9 100644 --- a/snow/engine/snowman/metrics.go +++ b/snow/engine/snowman/metrics.go @@ -11,6 +11,7 @@ import ( type metrics struct { numRequests, numBlocked prometheus.Gauge + getAncestorsBlks prometheus.Histogram } // Initialize the metrics @@ -25,6 +26,22 @@ func (m *metrics) Initialize(namespace string, registerer prometheus.Registerer) Name: "blocked", Help: "Number of blocks that are pending issuance", }) + m.getAncestorsBlks = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Name: "get_ancestors_blks", + Help: "The number of blocks fetched in a call to GetAncestors", + Buckets: []float64{ + 0, + 1, + 5, + 10, + 100, + 500, + 1000, + 1500, + 2000, + }, + }) errs := wrappers.Errs{} errs.Add( diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index 2093ed6ef54f..38839132dd24 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -179,6 +179,7 @@ func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, blkID ids.I } } + t.metrics.getAncestorsBlks.Observe(float64(len(ancestorsBytes))) t.Sender.MultiPut(vdr, requestID, ancestorsBytes) return nil } From ec154d9824013f3755fa0657eb9d08f2e77d0486 Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Sat, 12 Sep 2020 20:27:30 -0400 Subject: [PATCH 11/15] Change block to container --- snow/networking/router/handler.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/snow/networking/router/handler.go b/snow/networking/router/handler.go index 90905cefa0ce..9f84d99ac687 100644 --- a/snow/networking/router/handler.go +++ b/snow/networking/router/handler.go @@ -394,26 +394,26 @@ func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) { } // PushQuery passes a PushQuery message received from the network to the consensus engine. -func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, deadline time.Time, blockID ids.ID, block []byte) bool { +func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, deadline time.Time, containerID ids.ID, container []byte) bool { return h.serviceQueue.PushMessage(message{ messageType: pushQueryMsg, validatorID: validatorID, requestID: requestID, deadline: deadline, - containerID: blockID, - container: block, + containerID: containerID, + container: container, received: h.clock.Time(), }) } // PullQuery passes a PullQuery message received from the network to the consensus engine. -func (h *Handler) PullQuery(validatorID ids.ShortID, requestID uint32, deadline time.Time, blockID ids.ID) bool { +func (h *Handler) PullQuery(validatorID ids.ShortID, requestID uint32, deadline time.Time, containerID ids.ID) bool { return h.serviceQueue.PushMessage(message{ messageType: pullQueryMsg, validatorID: validatorID, requestID: requestID, deadline: deadline, - containerID: blockID, + containerID: containerID, received: h.clock.Time(), }) } From aab065b5b97837e8864160fd3144b336131e214a Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Sat, 12 Sep 2020 20:27:51 -0400 Subject: [PATCH 12/15] Improve message string function --- snow/networking/router/message.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/snow/networking/router/message.go b/snow/networking/router/message.go index 12b613339700..8d6a41bf0e71 100644 --- a/snow/networking/router/message.go +++ b/snow/networking/router/message.go @@ -60,9 +60,26 @@ func (m message) String() string { sb.WriteString(fmt.Sprintf("\n messageType: %s", m.messageType)) sb.WriteString(fmt.Sprintf("\n validatorID: %s", m.validatorID)) sb.WriteString(fmt.Sprintf("\n requestID: %d", m.requestID)) - sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) - sb.WriteString(fmt.Sprintf("\n containerIDs: %s", m.containerIDs)) - if m.messageType == notifyMsg { + switch m.messageType { + case getAcceptedMsg: + sb.WriteString(fmt.Sprintf("\n containerIDs: %s", m.containerIDs)) + case acceptedMsg: + sb.WriteString(fmt.Sprintf("\n containerIDs: %s", m.containerIDs)) + case getMsg: + sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) + case getAncestorsMsg: + sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) + case putMsg: + sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) + case multiPutMsg: + sb.WriteString(fmt.Sprintf("\n numContainers: %d", len(m.containers))) + case pushQueryMsg: + sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) + case pullQueryMsg: + sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) + case chitsMsg: + sb.WriteString(fmt.Sprintf("\n containerIDs: %s", m.containerIDs)) + case notifyMsg: sb.WriteString(fmt.Sprintf("\n notification: %s", m.notification)) } if !m.deadline.IsZero() { From 5689e5e29cee3660f362e6c5deb61755a3452ebd Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Sat, 12 Sep 2020 20:37:54 -0400 Subject: [PATCH 13/15] fixed tests --- snow/engine/avalanche/transitive_test.go | 53 ++++++++++++++++++++++-- snow/engine/snowman/transitive_test.go | 52 ++++++++++++++--------- vms/platformvm/vm_test.go | 12 +++--- 3 files changed, 87 insertions(+), 30 deletions(-) diff --git a/snow/engine/avalanche/transitive_test.go b/snow/engine/avalanche/transitive_test.go index c0590751ee45..e9869c840730 100644 --- a/snow/engine/avalanche/transitive_test.go +++ b/snow/engine/avalanche/transitive_test.go @@ -853,8 +853,15 @@ func TestEngineRejectDoubleSpendTx(t *testing.T) { }, nil } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true + te.finishBootstrapping() te.Ctx.Bootstrapped() @@ -939,8 +946,15 @@ func TestEngineRejectDoubleSpendIssuedTx(t *testing.T) { panic("Should have errored") } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true + te.finishBootstrapping() te.Ctx.Bootstrapped() @@ -1136,8 +1150,15 @@ func TestEngineReissue(t *testing.T) { panic("Should have errored") } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true + te.finishBootstrapping() te.Ctx.Bootstrapped() @@ -1268,8 +1289,15 @@ func TestEngineLargeIssue(t *testing.T) { panic("Should have errored") } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true + te.finishBootstrapping() te.Ctx.Bootstrapped() @@ -2985,6 +3013,12 @@ func TestEngineAggressivePolling(t *testing.T) { manager := &vertex.TestManager{T: t} config.Manager = manager + vm := &vertex.TestVM{} + vm.T = t + config.VM = vm + + vm.Default(true) + gVtx := &avalanche.TestVertex{ TestDecidable: choices.TestDecidable{ IDV: ids.GenerateTestID(), @@ -3019,14 +3053,16 @@ func TestEngineAggressivePolling(t *testing.T) { BytesV: []byte{1}, } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} if err := te.Initialize(config); err != nil { t.Fatal(err) } - if err := te.finishBootstrapping(); err != nil { - t.Fatal(err) - } - te.Ctx.Bootstrapped() + + vm.CantBootstrapping = true + vm.CantBootstrapped = true parsed := new(bool) manager.ParseVertexF = func(b []byte) (avalanche.Vertex, error) { @@ -3056,6 +3092,8 @@ func TestEngineAggressivePolling(t *testing.T) { numPullQueries := new(int) sender.PullQueryF = func(ids.ShortSet, uint32, ids.ID) { *numPullQueries++ } + vm.CantPendingTxs = false + te.Put(vdr, 0, vtx.ID(), vtx.Bytes()) if *numPushQueries != 1 { @@ -3133,8 +3171,15 @@ func TestEngineDuplicatedIssuance(t *testing.T) { panic("Should have errored") } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true + te.finishBootstrapping() te.Ctx.Bootstrapped() diff --git a/snow/engine/snowman/transitive_test.go b/snow/engine/snowman/transitive_test.go index 0805e200f9b3..b9946250b466 100644 --- a/snow/engine/snowman/transitive_test.go +++ b/snow/engine/snowman/transitive_test.go @@ -56,9 +56,8 @@ func setup(t *testing.T) (ids.ShortID, validators.Set, *common.SenderTest, *bloc vm.LastAcceptedF = func() ids.ID { return gBlk.ID() } sender.CantGetAcceptedFrontier = false - te := &Transitive{} - - te.Initialize(config) + vm.CantBootstrapping = false + vm.CantBootstrapped = false vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { if !blkID.Equals(gBlk.ID()) { @@ -67,8 +66,11 @@ func setup(t *testing.T) (ids.ShortID, validators.Set, *common.SenderTest, *bloc return gBlk, nil } - te.finishBootstrapping() - te.Ctx.Bootstrapped() + te := &Transitive{} + te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true vm.GetBlockF = nil vm.LastAcceptedF = nil @@ -391,9 +393,8 @@ func TestEngineMultipleQuery(t *testing.T) { vm.LastAcceptedF = func() ids.ID { return gBlk.ID() } sender.CantGetAcceptedFrontier = false - te := &Transitive{} - te.Initialize(config) - + vm.CantBootstrapping = false + vm.CantBootstrapped = false vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { if !blkID.Equals(gBlk.ID()) { t.Fatalf("Wrong block requested") @@ -401,8 +402,11 @@ func TestEngineMultipleQuery(t *testing.T) { return gBlk, nil } - te.finishBootstrapping() - te.Ctx.Bootstrapped() + te := &Transitive{} + te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true vm.GetBlockF = nil vm.LastAcceptedF = nil @@ -810,10 +814,14 @@ func TestVoteCanceling(t *testing.T) { } sender.CantGetAcceptedFrontier = false + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) - te.finishBootstrapping() - te.Ctx.Bootstrapped() + + vm.CantBootstrapping = false + vm.CantBootstrapped = false vm.LastAcceptedF = nil sender.CantGetAcceptedFrontier = true @@ -1530,9 +1538,8 @@ func TestEngineAggressivePolling(t *testing.T) { vm.LastAcceptedF = func() ids.ID { return gBlk.ID() } sender.CantGetAcceptedFrontier = false - te := &Transitive{} - - te.Initialize(config) + vm.CantBootstrapping = false + vm.CantBootstrapped = false vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { if !blkID.Equals(gBlk.ID()) { @@ -1541,8 +1548,11 @@ func TestEngineAggressivePolling(t *testing.T) { return gBlk, nil } - te.finishBootstrapping() - te.Ctx.Bootstrapped() + te := &Transitive{} + te.Initialize(config) + + vm.CantBootstrapping = true + vm.CantBootstrapped = true vm.GetBlockF = nil vm.LastAcceptedF = nil @@ -1649,10 +1659,14 @@ func TestEngineDoubleChit(t *testing.T) { panic("Should have errored") } + vm.CantBootstrapping = false + vm.CantBootstrapped = false + te := &Transitive{} te.Initialize(config) - te.finishBootstrapping() - te.Ctx.Bootstrapped() + + vm.CantBootstrapping = true + vm.CantBootstrapped = true vm.LastAcceptedF = nil sender.CantGetAcceptedFrontier = true diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index c31c9ca007f5..6af4b6fe8f84 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -1666,6 +1666,11 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { sender.Initialize(ctx, externalSender, chainRouter, &timeoutManager) + reqID := new(uint32) + externalSender.GetAcceptedFrontierF = func(_ ids.ShortSet, _ ids.ID, requestID uint32, _ time.Time) { + *reqID = requestID + } + // The engine handles consensus engine := smeng.Transitive{} engine.Initialize(smeng.Config{ @@ -1709,13 +1714,6 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { chainRouter.AddChain(handler) go ctx.Log.RecoverAndPanic(handler.Dispatch) - reqID := new(uint32) - externalSender.GetAcceptedFrontierF = func(_ ids.ShortSet, _ ids.ID, requestID uint32, _ time.Time) { - *reqID = requestID - } - - engine.Startup() - externalSender.GetAcceptedFrontierF = nil externalSender.GetAcceptedF = func(_ ids.ShortSet, _ ids.ID, requestID uint32, _ time.Time, _ ids.Set) { *reqID = requestID From 85b37bdc439bd0b64509e72f35afa5c72bb3c499 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Sat, 12 Sep 2020 21:03:16 -0400 Subject: [PATCH 14/15] cleaned up switch case --- snow/networking/router/message.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/snow/networking/router/message.go b/snow/networking/router/message.go index 8d6a41bf0e71..4f55fa22eae8 100644 --- a/snow/networking/router/message.go +++ b/snow/networking/router/message.go @@ -61,24 +61,12 @@ func (m message) String() string { sb.WriteString(fmt.Sprintf("\n validatorID: %s", m.validatorID)) sb.WriteString(fmt.Sprintf("\n requestID: %d", m.requestID)) switch m.messageType { - case getAcceptedMsg: - sb.WriteString(fmt.Sprintf("\n containerIDs: %s", m.containerIDs)) - case acceptedMsg: + case getAcceptedMsg, acceptedMsg, chitsMsg: sb.WriteString(fmt.Sprintf("\n containerIDs: %s", m.containerIDs)) - case getMsg: - sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) - case getAncestorsMsg: - sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) - case putMsg: + case getMsg, getAncestorsMsg, putMsg, pushQueryMsg, pullQueryMsg: sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) case multiPutMsg: sb.WriteString(fmt.Sprintf("\n numContainers: %d", len(m.containers))) - case pushQueryMsg: - sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) - case pullQueryMsg: - sb.WriteString(fmt.Sprintf("\n containerID: %s", m.containerID)) - case chitsMsg: - sb.WriteString(fmt.Sprintf("\n containerIDs: %s", m.containerIDs)) case notifyMsg: sb.WriteString(fmt.Sprintf("\n notification: %s", m.notification)) } From c25dc794cf15feac39499b70c71990af6fe48d26 Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Sat, 12 Sep 2020 22:02:18 -0400 Subject: [PATCH 15/15] Register new metrics --- snow/engine/avalanche/metrics.go | 1 + snow/engine/snowman/metrics.go | 1 + 2 files changed, 2 insertions(+) diff --git a/snow/engine/avalanche/metrics.go b/snow/engine/avalanche/metrics.go index 3624bf211713..a0bda312a015 100644 --- a/snow/engine/avalanche/metrics.go +++ b/snow/engine/avalanche/metrics.go @@ -53,6 +53,7 @@ func (m *metrics) Initialize(namespace string, registerer prometheus.Registerer) registerer.Register(m.numVtxRequests), registerer.Register(m.numPendingVts), registerer.Register(m.numMissingTxs), + registerer.Register(m.getAncestorsVtxs), ) return errs.Err } diff --git a/snow/engine/snowman/metrics.go b/snow/engine/snowman/metrics.go index 6b7485b043b9..77fb9af1385f 100644 --- a/snow/engine/snowman/metrics.go +++ b/snow/engine/snowman/metrics.go @@ -47,6 +47,7 @@ func (m *metrics) Initialize(namespace string, registerer prometheus.Registerer) errs.Add( registerer.Register(m.numRequests), registerer.Register(m.numBlocked), + registerer.Register(m.getAncestorsBlks), ) return errs.Err }