diff --git a/common/config/config.go b/common/config/config.go index 6cf03ede..90266afa 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -42,6 +42,9 @@ type Configuration struct { EncryptAlg string `json:"EncryptAlg"` MaxLogSize int64 `json:"MaxLogSize"` MaxTxInBlock int `json:"MaxTransactionInBlock"` + DefaultMaxPeers uint `json:"DefaultMaxPeers"` + GetAddrMax uint `json:"GetAddrMax"` + MaxOutboundCnt uint `json:"MaxOutboundCnt"` } type ConfigFile struct { diff --git a/net/message/address.go b/net/message/address.go index 3c5be932..8e4b56db 100644 --- a/net/message/address.go +++ b/net/message/address.go @@ -92,7 +92,9 @@ func (msg addrReq) Handle(node Noder) error { // lock var addrstr []NodeAddr var count uint64 - addrstr, count = node.LocalNode().GetNeighborAddrs() + + addrstr = node.LocalNode().RandSelectAddresses() + count = uint64(len(addrstr)) buf, err := NewAddrs(addrstr, count) if err != nil { return err @@ -180,7 +182,8 @@ func (msg addr) Handle(node Noder) error { continue } - go node.LocalNode().Connect(address) + //save the node address in address list + node.LocalNode().AddAddressToKnownAddress(v) } return nil } diff --git a/net/message/verack.go b/net/message/verack.go index b5c69674..7b5e4ece 100644 --- a/net/message/verack.go +++ b/net/message/verack.go @@ -69,7 +69,9 @@ func (msg verACK) Handle(node Noder) error { // Fixme, there is a race condition here, // but it doesn't matter to access the invalid // node which will trigger a warning - node.ReqNeighborList() + if node.LocalNode().NeedMoreAddresses() { + node.ReqNeighborList() + } addr := node.GetAddr() port := node.GetPort() nodeAddr := addr + ":" + strconv.Itoa(int(port)) diff --git a/net/message/version.go b/net/message/version.go index aa0dd695..d0c7c416 100644 --- a/net/message/version.go +++ b/net/message/version.go @@ -197,6 +197,16 @@ func (msg version) Handle(node Noder) error { msg.P.Port, msg.P.Nonce, msg.P.Relay, msg.P.StartHeight) localNode.AddNbrNode(node) + ip, _ := node.GetAddr16() + addr := NodeAddr{ + Time: node.GetTime(), + Services: msg.P.Services, + IpAddr: ip, + Port: msg.P.Port, + ID: msg.P.Nonce, + } + localNode.AddAddressToKnownAddress(addr) + var buf []byte if s == INIT { node.SetState(HANDSHAKE) diff --git a/net/node/infoUpdate.go b/net/node/infoUpdate.go index b43ea1de..de9ffa5d 100644 --- a/net/node/infoUpdate.go +++ b/net/node/infoUpdate.go @@ -4,6 +4,7 @@ import ( "DNA/common/config" "DNA/common/log" "DNA/core/ledger" + "DNA/events" . "DNA/net/message" . "DNA/net/protocol" "math/rand" @@ -135,7 +136,9 @@ func (node *node) ConnectSeeds() { node.nbrNodes.Unlock() if found { if n.GetState() == ESTABLISH { - n.ReqNeighborList() + if node.LocalNode().NeedMoreAddresses() { + n.ReqNeighborList() + } } } else { //not found go node.Connect(nodeAddr) @@ -144,6 +147,22 @@ func (node *node) ConnectSeeds() { } } +func (node *node) ConnectNode() { + cntcount := node.nbrNodes.GetConnectionCnt() + if cntcount < node.GetMaxOutboundCnt() { + nbrAddr, _ := node.GetNeighborAddrs() + addrs := node.RandGetAddresses(nbrAddr) + for _, nodeAddr := range addrs { + addr := nodeAddr.IpAddr + port := nodeAddr.Port + var ip net.IP + ip = addr[:] + na := ip.To16().String() + ":" + strconv.Itoa(int(port)) + go node.Connect(na) + } + } +} + func getNodeAddr(n *node) NodeAddr { var addr NodeAddr addr.IpAddr, _ = n.GetAddr16() @@ -227,16 +246,25 @@ func (node *node) updateNodeInfo() { //close(quit) } +func (node *node) CheckConnCnt() { + //compare if connect count is larger than DefaultMaxPeers, disconnect one of the connection + if node.nbrNodes.GetConnectionCnt() > node.GetDefaultMaxPeers() { + disconnNode := node.RandGetANbr() + node.eventQueue.GetEvent("disconnect").Notify(events.EventNodeDisconnect, disconnNode) + } +} + func (node *node) updateConnection() { t := time.NewTimer(time.Second * CONNMONITOR) for { select { case <-t.C: node.ConnectSeeds() - node.TryConnect() + //node.TryConnect() + node.ConnectNode() + node.CheckConnCnt() t.Stop() t.Reset(time.Second * CONNMONITOR) } } - } diff --git a/net/node/knowaddrlist.go b/net/node/knowaddrlist.go new file mode 100644 index 00000000..32d905f1 --- /dev/null +++ b/net/node/knowaddrlist.go @@ -0,0 +1,266 @@ +package node + +import ( + "DNA/common/log" + . "DNA/net/protocol" + "math/rand" + "sync" + "time" +) + +const ( + // needAddressThreshold is the number of addresses under which the + // address manager will claim to need more addresses. + needAddressThreshold = 1000 + // numMissingDays is the number of days before which we assume an + // address has vanished if we have not seen it announced in that long. + numMissingDays = 30 + // numRetries is the number of tried without a single success before + // we assume an address is bad. + numRetries = 10 +) + +type KnownAddress struct { + srcAddr NodeAddr + lastattempt time.Time + lastDisconnect time.Time + attempts int +} + +type KnownAddressList struct { + sync.RWMutex + List map[uint64]*KnownAddress + addrCount uint64 +} + +func (ka *KnownAddress) LastAttempt() time.Time { + return ka.lastattempt +} + +func (ka *KnownAddress) increaseAttempts() { + ka.attempts++ +} + +func (ka *KnownAddress) updateLastAttempt() { + // set last tried time to now + ka.lastattempt = time.Now() +} + +func (ka *KnownAddress) updateLastDisconnect() { + // set last disconnect time to now + ka.lastDisconnect = time.Now() +} + +// chance returns the selection probability for a known address. The priority +// depends upon how recently the address has been seen, how recently it was last +// attempted and how often attempts to connect to it have failed. +func (ka *KnownAddress) chance() float64 { + now := time.Now() + lastAttempt := now.Sub(ka.lastattempt) + + if lastAttempt < 0 { + lastAttempt = 0 + } + + c := 1.0 + + // Very recent attempts are less likely to be retried. + if lastAttempt < 10*time.Minute { + c *= 0.01 + } + + // Failed attempts deprioritise. + for i := ka.attempts; i > 0; i-- { + c /= 1.5 + } + + return c +} + +// isBad returns true if the address in question has not been tried in the last +// minute and meets one of the following criteria: +// 1) Just tried in one minute +// 2) It hasn't been seen in over a month +// 3) It has failed at least ten times +// 4) It has failed ten times in the last week +// All addresses that meet these criteria are assumed to be worthless and not +// worth keeping hold of. +func (ka *KnownAddress) isBad() bool { + // just tried in one minute? + if ka.lastattempt.After(time.Now().Add(-1 * time.Minute)) { + return false + } + + // Over a month old? + if ka.srcAddr.Time < (time.Now().Add(-1 * numMissingDays * time.Hour * 24)).UnixNano() { + return true + } + + // Just disconnected in one minute? + if ka.lastDisconnect.After(time.Now().Add(-1 * time.Minute)) { + return true + } + + // tried too many times? + if ka.attempts >= numRetries { + return true + } + + return false +} + +func (ka *KnownAddress) SaveAddr(na NodeAddr) { + ka.srcAddr.Time = na.Time + ka.srcAddr.Services = na.Services + ka.srcAddr.IpAddr = na.IpAddr + ka.srcAddr.Port = na.Port + ka.srcAddr.ID = na.ID +} + +func (ka *KnownAddress) NetAddress() NodeAddr { + return ka.srcAddr +} + +func (ka *KnownAddress) GetID() uint64 { + return ka.srcAddr.ID +} + +func (al *KnownAddressList) NeedMoreAddresses() bool { + al.Lock() + defer al.Unlock() + + return al.addrCount < needAddressThreshold +} + +func (al *KnownAddressList) AddressExisted(uid uint64) bool { + _, ok := al.List[uid] + return ok +} + +func (al *KnownAddressList) UpdateLastDisconn(id uint64) { + ka := al.List[id] + ka.updateLastDisconnect() +} + +func (al *KnownAddressList) AddAddressToKnownAddress(na NodeAddr) { + al.Lock() + defer al.Unlock() + + ka := new(KnownAddress) + ka.SaveAddr(na) + if al.AddressExisted(ka.GetID()) { + log.Debug("Insert a existed addr\n") + } else { + al.List[ka.GetID()] = ka + al.addrCount++ + } +} + +func (al *KnownAddressList) DelAddressFromList(id uint64) bool { + al.Lock() + defer al.Unlock() + + _, ok := al.List[id] + if ok == false { + return false + } + delete(al.List, id) + return true +} + +func (al *KnownAddressList) GetAddressCnt() uint64 { + al.RLock() + defer al.RUnlock() + if al != nil { + return al.addrCount + } + return 0 +} + +func (al *KnownAddressList) init() { + al.List = make(map[uint64]*KnownAddress) +} + +func isInNbrList(id uint64, nbrAddrs []NodeAddr) bool { + for _, na := range nbrAddrs { + if id == na.ID { + return true + } + } + return false +} + +func (al *KnownAddressList) RandGetAddresses(nbrAddrs []NodeAddr) []NodeAddr { + al.RLock() + defer al.RUnlock() + var keys []uint64 + for k := range al.List { + isInNbr := isInNbrList(k, nbrAddrs) + isBad := al.List[k].isBad() + if isInNbr == false && isBad == false { + keys = append(keys, k) + } + } + addrLen := len(keys) + var i int + addrs := []NodeAddr{} + if MAXOUTBOUNDCNT-len(nbrAddrs) > addrLen { + for _, v := range keys { + ka, ok := al.List[v] + if !ok { + continue + } + addrs = append(addrs, ka.srcAddr) + } + } else { + order := rand.Perm(addrLen) + var count int + count = MAXOUTBOUNDCNT - len(nbrAddrs) + for i = 0; i < count; i++ { + for j, v := range keys { + if j == order[j] { + ka, ok := al.List[v] + if !ok { + continue + } + ka.increaseAttempts() + ka.updateLastAttempt() + addrs = append(addrs, ka.srcAddr) + keys = append(keys[:j], keys[j+1:]...) + break + } + } + } + } + + return addrs +} + +func (al *KnownAddressList) RandSelectAddresses() []NodeAddr { + al.RLock() + defer al.RUnlock() + var keys []uint64 + addrs := []NodeAddr{} + for k := range al.List { + keys = append(keys, k) + } + addrLen := len(keys) + + var count int + if MAXOUTBOUNDCNT > addrLen { + count = addrLen + } else { + count = MAXOUTBOUNDCNT + } + for i, v := range keys { + if i < count { + ka, ok := al.List[v] + if !ok { + continue + } + addrs = append(addrs, ka.srcAddr) + } + } + + return addrs +} diff --git a/net/node/node.go b/net/node/node.go index afc18daf..2c7f0528 100644 --- a/net/node/node.go +++ b/net/node/node.go @@ -53,6 +53,10 @@ type node struct { tryTimes uint32 ConnectingNodes RetryConnAddrs + KnownAddressList + MaxOutboundCnt uint + DefaultMaxPeers uint + GetAddrMax uint } type RetryConnAddrs struct { @@ -169,10 +173,14 @@ func InitNode(pubKey *crypto.PubKey) Noder { } log.Info(fmt.Sprintf("Init node ID to 0x%x", n.id)) n.nbrNodes.init() + n.KnownAddressList.init() n.local = n n.publicKey = pubKey n.TXNPool.init() n.eventQueue.init() + n.local.SetMaxOutboundCnt() + n.local.SetDefaultMaxPeers() + n.local.SetGetAddrMax() n.nodeDisconnectSubscriber = n.eventQueue.GetEvent("disconnect").Subscribe(events.EventNodeDisconnect, n.NodeDisconnect) go n.initConnection() go n.updateConnection() @@ -181,6 +189,42 @@ func InitNode(pubKey *crypto.PubKey) Noder { return n } +func (n *node) SetMaxOutboundCnt() { + if (Parameters.MaxOutboundCnt < MAXOUTBOUNDCNT) && (Parameters.MaxOutboundCnt > 0) { + n.MaxOutboundCnt = Parameters.MaxOutboundCnt + } else { + n.MaxOutboundCnt = MAXOUTBOUNDCNT + } +} + +func (n *node) SetGetAddrMax() { + if (Parameters.GetAddrMax < GETADDRMAX) && (Parameters.GetAddrMax > 0) { + n.GetAddrMax = Parameters.GetAddrMax + } else { + n.GetAddrMax = GETADDRMAX + } +} + +func (n *node) SetDefaultMaxPeers() { + if (Parameters.DefaultMaxPeers < DEFAULTMAXPEERS) && (Parameters.DefaultMaxPeers > 0) { + n.DefaultMaxPeers = Parameters.MaxOutboundCnt + } else { + n.DefaultMaxPeers = DEFAULTMAXPEERS + } +} + +func (n *node) GetGetAddrMax() uint { + return n.GetAddrMax +} + +func (n *node) GetMaxOutboundCnt() uint { + return n.MaxOutboundCnt +} + +func (n *node) GetDefaultMaxPeers() uint { + return n.DefaultMaxPeers +} + func (n *node) NodeDisconnect(v interface{}) { if node, ok := v.(*node); ok { node.SetState(INACTIVITY) @@ -488,5 +532,4 @@ func (node *node) RemoveFromRetryList(addr string) { log.Debug("remove addr from retry list", addr) } } - } diff --git a/net/node/nodeMap.go b/net/node/nodeMap.go index 60b5baa8..245322d8 100644 --- a/net/node/nodeMap.go +++ b/net/node/nodeMap.go @@ -154,3 +154,12 @@ func (node *node) GetNbrNodeCnt() uint32 { } return count } + +func (node *node) RandGetANbr() Noder { + for _, n := range node.nbrNodes.List { + if n.GetState() == ESTABLISH { + return n + } + } + return nil +} diff --git a/net/protocol/protocol.go b/net/protocol/protocol.go index 9aacb483..dd08c9a3 100644 --- a/net/protocol/protocol.go +++ b/net/protocol/protocol.go @@ -46,18 +46,22 @@ const ( ) const ( - HELLOTIMEOUT = 3 // Seconds - MAXHELLORETYR = 3 - MAXBUFLEN = 1024 * 16 // Fixme The maximum buffer to receive message - MAXCHANBUF = 512 - PROTOCOLVERSION = 0 - PERIODUPDATETIME = 3 // Time to update and sync information with other nodes - HEARTBEAT = 2 - KEEPALIVETIMEOUT = 3 - DIALTIMEOUT = 6 - CONNMONITOR = 6 - CONNMAXBACK = 4000 - MAXRETRYCOUNT = 3 + HELLOTIMEOUT = 3 // Seconds + MAXHELLORETYR = 3 + MAXBUFLEN = 1024 * 16 // Fixme The maximum buffer to receive message + MAXCHANBUF = 512 + PROTOCOLVERSION = 0 + PERIODUPDATETIME = 3 // Time to update and sync information with other nodes + HEARTBEAT = 2 + KEEPALIVETIMEOUT = 3 + DIALTIMEOUT = 6 + CONNMONITOR = 6 + CONNMAXBACK = 4000 + MAXRETRYCOUNT = 3 + NEEDADDRESSTHRESHOLD = 1000 + MAXOUTBOUNDCNT = 8 + DEFAULTMAXPEERS = 125 + GETADDRMAX = 2500 ) // The node state @@ -77,6 +81,7 @@ type Noder interface { GetID() uint64 Services() uint64 GetAddr() string + GetAddr16() ([16]byte, error) GetPort() uint16 GetHttpInfoPort() int SetHttpInfoPort(uint16) @@ -137,6 +142,15 @@ type Noder interface { RemoveAddrInConnectingList(addr string) AddInRetryList(addr string) RemoveFromRetryList(addr string) + GetAddressCnt() uint64 + AddAddressToKnownAddress(na NodeAddr) + RandGetAddresses(nbrAddrs []NodeAddr) []NodeAddr + GetDefaultMaxPeers() uint + GetMaxOutboundCnt() uint + GetGetAddrMax() uint + NeedMoreAddresses() bool + RandSelectAddresses() []NodeAddr + UpdateLastDisconn(id uint64) } func (msg *NodeAddr) Deserialization(p []byte) error {