diff --git a/Gopkg.lock b/Gopkg.lock index 0787c74fa63..fac08ea8b90 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -88,7 +88,7 @@ revision = "935e0e8a636ca4ba70b713f3e38a19e1b77739e8" [[projects]] - digest = "1:cf53a6be6cf3d947c0c7fb678237a7b34e06e4102411ecb843eedacb46759995" + digest = "1:1dc638d013f2a6f6ae13568584e8fee7e172cbf295e82064ded18bcf6d63c04c" name = "github.com/ethereum/go-ethereum" packages = [ ".", @@ -155,10 +155,9 @@ "rlp", "rpc", "trie", - "whisper/whisperv6", ] pruneopts = "T" - revision = "56c2fa69e0224e8f939a092b3aba20b23bb95c26" + revision = "31f0afca344c53678e98032d6306e0e4ff7b9be1" source = "github.com/status-im/go-ethereum" version = "v1.8.16" @@ -1102,7 +1101,6 @@ "github.com/ethereum/go-ethereum/params", "github.com/ethereum/go-ethereum/rlp", "github.com/ethereum/go-ethereum/rpc", - "github.com/ethereum/go-ethereum/whisper/whisperv6", "github.com/golang/mock/gomock", "github.com/golang/protobuf/proto", "github.com/libp2p/go-libp2p-crypto", diff --git a/vendor/github.com/ethereum/go-ethereum/_assets/patches/0038-ulc.patch b/vendor/github.com/ethereum/go-ethereum/_assets/patches/0038-ulc.patch new file mode 100644 index 00000000000..c7e367738e2 --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/_assets/patches/0038-ulc.patch @@ -0,0 +1,2231 @@ +diff --git a/cmd/geth/config.go b/cmd/geth/config.go +index b0749d2..d724562 100644 +--- a/cmd/geth/config.go ++++ b/cmd/geth/config.go +@@ -124,6 +124,7 @@ func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) { + } + + // Apply flags. ++ utils.SetULC(ctx, &cfg.Eth) + utils.SetNodeConfig(ctx, &cfg.Node) + stack, err := node.New(&cfg.Node) + if err != nil { +diff --git a/cmd/geth/main.go b/cmd/geth/main.go +index fae4b57..bdc624d 100644 +--- a/cmd/geth/main.go ++++ b/cmd/geth/main.go +@@ -83,6 +83,10 @@ var ( + utils.TxPoolAccountQueueFlag, + utils.TxPoolGlobalQueueFlag, + utils.TxPoolLifetimeFlag, ++ utils.ULCModeConfigFlag, ++ utils.OnlyAnnounceModeFlag, ++ utils.ULCTrustedNodesFlag, ++ utils.ULCMinTrustedFractionFlag, + utils.SyncModeFlag, + utils.GCModeFlag, + utils.LightServFlag, +diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go +index a2becd0..0df8fb8 100644 +--- a/cmd/utils/flags.go ++++ b/cmd/utils/flags.go +@@ -19,6 +19,7 @@ package utils + + import ( + "crypto/ecdsa" ++ "encoding/json" + "fmt" + "io/ioutil" + "math/big" +@@ -157,6 +158,23 @@ var ( + Usage: "Document Root for HTTPClient file scheme", + Value: DirectoryString{homeDir()}, + } ++ ULCModeConfigFlag = cli.StringFlag{ ++ Name: "les.ulcconfig", ++ Usage: "Config file to use for ULC mode", ++ } ++ OnlyAnnounceModeFlag = cli.BoolFlag{ ++ Name: "les.onlyannounce", ++ Usage: "LES server sends only announce", ++ } ++ ULCMinTrustedFractionFlag = cli.IntFlag{ ++ Name: "les.mintrustedfraction", ++ Usage: "LES server sends only announce", ++ } ++ ULCTrustedNodesFlag = cli.StringFlag{ ++ Name: "les.trusted", ++ Usage: "List of trusted nodes", ++ } ++ + defaultSyncMode = eth.DefaultConfig.SyncMode + SyncModeFlag = TextMarshalerFlag{ + Name: "syncmode", +@@ -816,6 +834,40 @@ func setIPC(ctx *cli.Context, cfg *node.Config) { + } + } + ++// SetULC setup ULC config from file if given. ++func SetULC(ctx *cli.Context, cfg *eth.Config) { ++ // ULC config isn't loaded from global config and ULC config and ULC trusted nodes are not defined. ++ if cfg.ULC == nil && !(ctx.GlobalIsSet(ULCModeConfigFlag.Name) || ctx.GlobalIsSet(ULCTrustedNodesFlag.Name)) { ++ return ++ } ++ cfg.ULC = ð.ULCConfig{} ++ ++ path := ctx.GlobalString(ULCModeConfigFlag.Name) ++ if path != "" { ++ cfgData, err := ioutil.ReadFile(path) ++ if err != nil { ++ Fatalf("Failed to unmarshal ULC configuration: %v", err) ++ } ++ ++ err = json.Unmarshal(cfgData, &cfg.ULC) ++ if err != nil { ++ Fatalf("Failed to unmarshal ULC configuration: %s", err.Error()) ++ } ++ } ++ ++ if trustedNodes := ctx.GlobalString(ULCTrustedNodesFlag.Name); trustedNodes != "" { ++ cfg.ULC.TrustedServers = strings.Split(trustedNodes, ",") ++ } ++ ++ if trustedFraction := ctx.GlobalInt(ULCMinTrustedFractionFlag.Name); trustedFraction > 0 { ++ cfg.ULC.MinTrustedFraction = trustedFraction ++ } ++ if cfg.ULC.MinTrustedFraction <= 0 && cfg.ULC.MinTrustedFraction > 100 { ++ log.Error("MinTrustedFraction is invalid", "MinTrustedFraction", cfg.ULC.MinTrustedFraction, "Changed to default", eth.DefaultUTCMinTrustedFraction) ++ cfg.ULC.MinTrustedFraction = eth.DefaultUTCMinTrustedFraction ++ } ++} ++ + // makeDatabaseHandles raises out the number of allowed file handles per process + // for Geth and returns half of the allowance to assign to the database. + func makeDatabaseHandles() int { +@@ -1140,6 +1192,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { + if ctx.GlobalIsSet(LightPeersFlag.Name) { + cfg.LightPeers = ctx.GlobalInt(LightPeersFlag.Name) + } ++ if ctx.GlobalIsSet(OnlyAnnounceModeFlag.Name) { ++ cfg.OnlyAnnounce = ctx.GlobalBool(OnlyAnnounceModeFlag.Name) ++ } + if ctx.GlobalIsSet(NetworkIdFlag.Name) { + cfg.NetworkId = ctx.GlobalUint64(NetworkIdFlag.Name) + } +diff --git a/core/headerchain.go b/core/headerchain.go +index d209311..8904dd8 100644 +--- a/core/headerchain.go ++++ b/core/headerchain.go +@@ -219,14 +219,18 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) + + // Generate the list of seal verification requests, and start the parallel verifier + seals := make([]bool, len(chain)) +- for i := 0; i < len(seals)/checkFreq; i++ { +- index := i*checkFreq + hc.rand.Intn(checkFreq) +- if index >= len(seals) { +- index = len(seals) - 1 ++ if checkFreq != 0 { ++ // In case of checkFreq == 0 all seals are left false. ++ for i := 0; i < len(seals)/checkFreq; i++ { ++ index := i*checkFreq + hc.rand.Intn(checkFreq) ++ if index >= len(seals) { ++ index = len(seals) - 1 ++ } ++ seals[index] = true + } +- seals[index] = true ++ // Last should always be verified to avoid junk. ++ seals[len(seals)-1] = true + } +- seals[len(seals)-1] = true // Last should always be verified to avoid junk + + abort, results := hc.engine.VerifyHeaders(hc, chain, seals) + defer close(abort) +diff --git a/eth/config.go b/eth/config.go +index efbaafb..7d1db9f 100644 +--- a/eth/config.go ++++ b/eth/config.go +@@ -87,8 +87,12 @@ type Config struct { + NoPruning bool + + // Light client options +- LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests +- LightPeers int `toml:",omitempty"` // Maximum number of LES client peers ++ LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests ++ LightPeers int `toml:",omitempty"` // Maximum number of LES client peers ++ OnlyAnnounce bool // Maximum number of LES client peers ++ ++ // Ultra Light client options ++ ULC *ULCConfig `toml:",omitempty"` + + // Database options + SkipBcVersionCheck bool `toml:"-"` +diff --git a/eth/gen_config.go b/eth/gen_config.go +index d401a91..6423cf5 100644 +--- a/eth/gen_config.go ++++ b/eth/gen_config.go +@@ -23,10 +23,12 @@ func (c Config) MarshalTOML() (interface{}, error) { + NetworkId uint64 + SyncMode downloader.SyncMode + NoPruning bool +- LightServ int `toml:",omitempty"` +- LightPeers int `toml:",omitempty"` +- SkipBcVersionCheck bool `toml:"-"` +- DatabaseHandles int `toml:"-"` ++ LightServ int `toml:",omitempty"` ++ LightPeers int `toml:",omitempty"` ++ OnlyAnnounce bool ++ ULC *ULCConfig `toml:",omitempty"` ++ SkipBcVersionCheck bool `toml:"-"` ++ DatabaseHandles int `toml:"-"` + DatabaseCache int + TrieCache int + TrieTimeout time.Duration +@@ -79,10 +81,12 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { + NetworkId *uint64 + SyncMode *downloader.SyncMode + NoPruning *bool +- LightServ *int `toml:",omitempty"` +- LightPeers *int `toml:",omitempty"` +- SkipBcVersionCheck *bool `toml:"-"` +- DatabaseHandles *int `toml:"-"` ++ LightServ *int `toml:",omitempty"` ++ LightPeers *int `toml:",omitempty"` ++ OnlyAnnounce *bool ++ ULC *ULCConfig `toml:",omitempty"` ++ SkipBcVersionCheck *bool `toml:"-"` ++ DatabaseHandles *int `toml:"-"` + DatabaseCache *int + TrieCache *int + TrieTimeout *time.Duration +@@ -122,6 +126,12 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { + if dec.LightPeers != nil { + c.LightPeers = *dec.LightPeers + } ++ if dec.OnlyAnnounce != nil { ++ c.OnlyAnnounce = *dec.OnlyAnnounce ++ } ++ if dec.ULC != nil { ++ c.ULC = dec.ULC ++ } + if dec.SkipBcVersionCheck != nil { + c.SkipBcVersionCheck = *dec.SkipBcVersionCheck + } +diff --git a/eth/ulc_config.go b/eth/ulc_config.go +new file mode 100644 +index 0000000..960cb0d +--- /dev/null ++++ b/eth/ulc_config.go +@@ -0,0 +1,9 @@ ++package eth ++ ++const DefaultUTCMinTrustedFraction = 75 ++ ++// ULCConfig is a Ultra Light client options. ++type ULCConfig struct { ++ TrustedServers []string `toml:",omitempty"` // A list of trusted servers ++ MinTrustedFraction int `toml:",omitempty"` // Minimum percentage of connected trusted servers to validate trusted (1-100) ++} +diff --git a/les/backend.go b/les/backend.go +index 8e9cca6..fd32e59 100644 +--- a/les/backend.go ++++ b/les/backend.go +@@ -108,8 +108,12 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { + bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), + } + ++ var trustedNodes []string ++ if leth.config.ULC != nil { ++ trustedNodes = leth.config.ULC.TrustedServers ++ } + leth.relay = NewLesTxRelay(peers, leth.reqDist) +- leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg) ++ leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg, trustedNodes) + leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool) + + leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever) +@@ -135,10 +139,32 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { + } + + leth.txPool = light.NewTxPool(leth.chainConfig, leth.blockchain, leth.relay) +- if leth.protocolManager, err = NewProtocolManager(leth.chainConfig, light.DefaultClientIndexerConfig, true, config.NetworkId, leth.eventMux, leth.engine, leth.peers, leth.blockchain, nil, chainDb, leth.odr, leth.relay, leth.serverPool, quitSync, &leth.wg); err != nil { ++ ++ if leth.protocolManager, err = NewProtocolManager( ++ leth.chainConfig, ++ light.DefaultClientIndexerConfig, ++ true, ++ config.NetworkId, ++ leth.eventMux, ++ leth.engine, ++ leth.peers, ++ leth.blockchain, ++ nil, ++ chainDb, ++ leth.odr, ++ leth.relay, ++ leth.serverPool, ++ quitSync, ++ &leth.wg, ++ config.ULC); err != nil { + return nil, err + } ++ ++ if leth.protocolManager.isULCEnabled() { ++ leth.blockchain.DisableCheckFreq() ++ } + leth.ApiBackend = &LesApiBackend{leth, nil} ++ + gpoParams := config.GPO + if gpoParams.Default == nil { + gpoParams.Default = config.MinerGasPrice +diff --git a/les/fetcher.go b/les/fetcher.go +index cc539c4..1291c4f 100644 +--- a/les/fetcher.go ++++ b/les/fetcher.go +@@ -42,7 +42,7 @@ const ( + type lightFetcher struct { + pm *ProtocolManager + odr *LesOdr +- chain *light.LightChain ++ chain lightChain + + lock sync.Mutex // lock protects access to the fetcher's internal state variables except sent requests + maxConfirmedTd *big.Int +@@ -51,11 +51,19 @@ type lightFetcher struct { + syncing bool + syncDone chan *peer + +- reqMu sync.RWMutex // reqMu protects access to sent header fetch requests +- requested map[uint64]fetchRequest +- deliverChn chan fetchResponse +- timeoutChn chan uint64 +- requestChn chan bool // true if initiated from outside ++ reqMu sync.RWMutex // reqMu protects access to sent header fetch requests ++ requested map[uint64]fetchRequest ++ deliverChn chan fetchResponse ++ timeoutChn chan uint64 ++ requestChn chan bool // true if initiated from outside ++ lastTrustedHeader *types.Header ++} ++ ++// lightChain extends the BlockChain interface by locking. ++type lightChain interface { ++ BlockChain ++ LockChain() ++ UnlockChain() + } + + // fetcherPeerInfo holds fetcher-specific information about each active peer +@@ -143,6 +151,7 @@ func (f *lightFetcher) syncLoop() { + rq *distReq + reqID uint64 + ) ++ + if !f.syncing && !(newAnnounce && s) { + rq, reqID = f.nextRequest() + } +@@ -205,8 +214,11 @@ func (f *lightFetcher) syncLoop() { + case p := <-f.syncDone: + f.lock.Lock() + p.Log().Debug("Done synchronising with peer") +- f.checkSyncedHeaders(p) ++ res, h, td := f.checkSyncedHeaders(p) + f.syncing = false ++ if res { ++ f.newHeaders(h, []*big.Int{td}) ++ } + f.lock.Unlock() + } + } +@@ -222,7 +234,6 @@ func (f *lightFetcher) registerPeer(p *peer) { + + f.lock.Lock() + defer f.lock.Unlock() +- + f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)} + } + +@@ -275,8 +286,10 @@ func (f *lightFetcher) announce(p *peer, head *announceData) { + fp.nodeCnt = 0 + fp.nodeByHash = make(map[common.Hash]*fetcherTreeNode) + } ++ // check if the node count is too high to add new nodes, discard oldest ones if necessary + if n != nil { +- // check if the node count is too high to add new nodes, discard oldest ones if necessary ++ // n is now the reorg common ancestor, add a new branch of nodes ++ // check if the node count is too high to add new nodes + locked := false + for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil { + if !locked { +@@ -320,6 +333,7 @@ func (f *lightFetcher) announce(p *peer, head *announceData) { + fp.nodeByHash[n.hash] = n + } + } ++ + if n == nil { + // could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed + if fp.root != nil { +@@ -400,25 +414,13 @@ func (f *lightFetcher) requestedID(reqID uint64) bool { + // to be downloaded starting from the head backwards is also returned + func (f *lightFetcher) nextRequest() (*distReq, uint64) { + var ( +- bestHash common.Hash +- bestAmount uint64 ++ bestHash common.Hash ++ bestAmount uint64 ++ bestTd *big.Int ++ bestSyncing bool + ) +- bestTd := f.maxConfirmedTd +- bestSyncing := false ++ bestHash, bestAmount, bestTd, bestSyncing = f.findBestValues() + +- for p, fp := range f.peers { +- for hash, n := range fp.nodeByHash { +- if !f.checkKnownNode(p, n) && !n.requested && (bestTd == nil || n.td.Cmp(bestTd) >= 0) { +- amount := f.requestAmount(p, n) +- if bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount { +- bestHash = hash +- bestAmount = amount +- bestTd = n.td +- bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) +- } +- } +- } +- } + if bestTd == f.maxConfirmedTd { + return nil, 0 + } +@@ -428,72 +430,131 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64) { + var rq *distReq + reqID := genReqID() + if f.syncing { +- rq = &distReq{ +- getCost: func(dp distPeer) uint64 { +- return 0 +- }, +- canSend: func(dp distPeer) bool { +- p := dp.(*peer) +- f.lock.Lock() +- defer f.lock.Unlock() +- +- fp := f.peers[p] +- return fp != nil && fp.nodeByHash[bestHash] != nil +- }, +- request: func(dp distPeer) func() { +- go func() { +- p := dp.(*peer) +- p.Log().Debug("Synchronisation started") +- f.pm.synchronise(p) +- f.syncDone <- p +- }() +- return nil +- }, +- } ++ rq = f.newFetcherDistReqForSync(bestHash) + } else { +- rq = &distReq{ +- getCost: func(dp distPeer) uint64 { +- p := dp.(*peer) +- return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) +- }, +- canSend: func(dp distPeer) bool { ++ rq = f.newFetcherDistReq(bestHash, reqID, bestAmount) ++ } ++ return rq, reqID ++} ++ ++// findBestValues retrieves the best values for LES or ULC mode. ++func (f *lightFetcher) findBestValues() (bestHash common.Hash, bestAmount uint64, bestTd *big.Int, bestSyncing bool) { ++ bestTd = f.maxConfirmedTd ++ bestSyncing = false ++ ++ for p, fp := range f.peers { ++ for hash, n := range fp.nodeByHash { ++ if f.checkKnownNode(p, n) || n.requested { ++ continue ++ } ++ ++ //if ulc mode is disabled, isTrustedHash returns true ++ amount := f.requestAmount(p, n) ++ if (bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount) && f.isTrustedHash(hash) { ++ bestHash = hash ++ bestTd = n.td ++ bestAmount = amount ++ bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) ++ } ++ } ++ } ++ return ++} ++ ++// isTrustedHash checks if the block can be trusted by the minimum trusted fraction. ++func (f *lightFetcher) isTrustedHash(hash common.Hash) bool { ++ if !f.pm.isULCEnabled() { ++ return true ++ } ++ ++ var numAgreed int ++ for p, fp := range f.peers { ++ if !p.isTrusted { ++ continue ++ } ++ if _, ok := fp.nodeByHash[hash]; !ok { ++ continue ++ } ++ ++ numAgreed++ ++ } ++ ++ return 100*numAgreed/len(f.pm.ulc.trustedKeys) >= f.pm.ulc.minTrustedFraction ++} ++ ++func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq { ++ return &distReq{ ++ getCost: func(dp distPeer) uint64 { ++ return 0 ++ }, ++ canSend: func(dp distPeer) bool { ++ p := dp.(*peer) ++ if p.isOnlyAnnounce { ++ return false ++ } ++ ++ fp := f.peers[p] ++ return fp != nil && fp.nodeByHash[bestHash] != nil ++ }, ++ request: func(dp distPeer) func() { ++ if f.pm.isULCEnabled() { ++ //keep last trusted header before sync ++ f.setLastTrustedHeader(f.chain.CurrentHeader()) ++ } ++ go func() { + p := dp.(*peer) +- f.lock.Lock() +- defer f.lock.Unlock() ++ p.Log().Debug("Synchronisation started") ++ f.pm.synchronise(p) ++ f.syncDone <- p ++ }() ++ return nil ++ }, ++ } ++} + +- fp := f.peers[p] +- if fp == nil { +- return false +- } ++// newFetcherDistReq creates a new request for the distributor. ++func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bestAmount uint64) *distReq { ++ return &distReq{ ++ getCost: func(dp distPeer) uint64 { ++ p := dp.(*peer) ++ return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) ++ }, ++ canSend: func(dp distPeer) bool { ++ p := dp.(*peer) ++ if p.isOnlyAnnounce { ++ return false ++ } ++ ++ fp := f.peers[p] ++ if fp == nil { ++ return false ++ } ++ n := fp.nodeByHash[bestHash] ++ return n != nil && !n.requested ++ }, ++ request: func(dp distPeer) func() { ++ p := dp.(*peer) ++ ++ fp := f.peers[p] ++ if fp != nil { + n := fp.nodeByHash[bestHash] +- return n != nil && !n.requested +- }, +- request: func(dp distPeer) func() { +- p := dp.(*peer) +- f.lock.Lock() +- fp := f.peers[p] +- if fp != nil { +- n := fp.nodeByHash[bestHash] +- if n != nil { +- n.requested = true +- } ++ if n != nil { ++ n.requested = true + } +- f.lock.Unlock() +- +- cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) +- p.fcServer.QueueRequest(reqID, cost) +- f.reqMu.Lock() +- f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()} +- f.reqMu.Unlock() +- go func() { +- time.Sleep(hardRequestTimeout) +- f.timeoutChn <- reqID +- }() +- return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) } +- }, +- } ++ } ++ ++ cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) ++ p.fcServer.QueueRequest(reqID, cost) ++ f.reqMu.Lock() ++ f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()} ++ f.reqMu.Unlock() ++ go func() { ++ time.Sleep(hardRequestTimeout) ++ f.timeoutChn <- reqID ++ }() ++ return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) } ++ }, + } +- return rq, reqID + } + + // deliverHeaders delivers header download request responses for processing +@@ -511,6 +572,7 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo + for i, header := range resp.headers { + headers[int(req.amount)-1-i] = header + } ++ + if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil { + if err == consensus.ErrFutureBlock { + return true +@@ -535,6 +597,7 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo + // downloaded and validated batch or headers + func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) { + var maxTd *big.Int ++ + for p, fp := range f.peers { + if !f.checkAnnouncedHeaders(fp, headers, tds) { + p.Log().Debug("Inconsistent announcement") +@@ -544,6 +607,7 @@ func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) { + maxTd = fp.confirmedTd + } + } ++ + if maxTd != nil { + f.updateMaxConfirmedTd(maxTd) + } +@@ -625,28 +689,111 @@ func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*typ + // checkSyncedHeaders updates peer's block tree after synchronisation by marking + // downloaded headers as known. If none of the announced headers are found after + // syncing, the peer is dropped. +-func (f *lightFetcher) checkSyncedHeaders(p *peer) { ++func (f *lightFetcher) checkSyncedHeaders(p *peer) (bool, []*types.Header, *big.Int) { + fp := f.peers[p] + if fp == nil { + p.Log().Debug("Unknown peer to check sync headers") +- return ++ return false, nil, nil + } ++ ++ var h *types.Header ++ if f.pm.isULCEnabled() { ++ var unapprovedHashes []common.Hash ++ // Overwrite last announced for ULC mode ++ h, unapprovedHashes = f.lastTrustedTreeNode(p) ++ //rollback untrusted blocks ++ f.chain.Rollback(unapprovedHashes) ++ } ++ + n := fp.lastAnnounced + var td *big.Int ++ trustedHeaderExisted := false ++ ++ //find last trusted block + for n != nil { +- if td = f.chain.GetTd(n.hash, n.number); td != nil { ++ //we found last trusted header ++ if n.hash == h.Hash() { ++ trustedHeaderExisted = true ++ } ++ if td = f.chain.GetTd(n.hash, n.number); td != nil && trustedHeaderExisted { ++ break ++ } ++ ++ //break if we found last trusted hash before sync ++ if f.lastTrustedHeader == nil { ++ break ++ } ++ if n.hash == f.lastTrustedHeader.Hash() { + break + } + n = n.parent + } +- // now n is the latest downloaded header after syncing +- if n == nil { ++ ++ // Now n is the latest downloaded/approved header after syncing ++ if n == nil && !p.isTrusted { + p.Log().Debug("Synchronisation failed") + go f.pm.removePeer(p.id) +- } else { +- header := f.chain.GetHeader(n.hash, n.number) +- f.newHeaders([]*types.Header{header}, []*big.Int{td}) ++ return false, nil, nil + } ++ header := f.chain.GetHeader(n.hash, n.number) ++ return true, []*types.Header{header}, td ++} ++ ++// lastTrustedTreeNode return last approved treeNode and a list of unapproved hashes ++func (f *lightFetcher) lastTrustedTreeNode(p *peer) (*types.Header, []common.Hash) { ++ unapprovedHashes := make([]common.Hash, 0) ++ current := f.chain.CurrentHeader() ++ ++ if f.lastTrustedHeader == nil { ++ return current, unapprovedHashes ++ } ++ ++ canonical := f.chain.CurrentHeader() ++ if canonical.Number.Uint64() > f.lastTrustedHeader.Number.Uint64() { ++ canonical = f.chain.GetHeaderByNumber(f.lastTrustedHeader.Number.Uint64()) ++ } ++ commonAncestor := rawdb.FindCommonAncestor(f.pm.chainDb, canonical, f.lastTrustedHeader) ++ if commonAncestor == nil { ++ log.Error("Common ancestor of last trusted header and canonical header is nil", "canonical hash", canonical.Hash(), "trusted hash", f.lastTrustedHeader.Hash()) ++ return current, unapprovedHashes ++ } ++ ++ for !f.isStopValidationTree(current, commonAncestor) { ++ if f.isTrustedHash(current.Hash()) { ++ break ++ } ++ unapprovedHashes = append(unapprovedHashes, current.Hash()) ++ current = f.chain.GetHeader(current.ParentHash, current.Number.Uint64()-1) ++ } ++ return current, unapprovedHashes ++} ++ ++//isStopValidationTree found when we should stop on finding last trusted header ++func (f *lightFetcher) isStopValidationTree(current *types.Header, commonAncestor *types.Header) bool { ++ if current == nil { ++ return true ++ } ++ ++ currentHash := current.Hash() ++ ancestorHash := commonAncestor.Hash() ++ ++ //found lastTrustedHeader ++ if currentHash == f.lastTrustedHeader.Hash() { ++ return true ++ } ++ ++ //found common ancestor between lastTrustedHeader and ++ if current.Hash() == ancestorHash { ++ return true ++ } ++ ++ return false ++} ++ ++func (f *lightFetcher) setLastTrustedHeader(h *types.Header) { ++ f.lock.Lock() ++ defer f.lock.Unlock() ++ f.lastTrustedHeader = h + } + + // checkKnownNode checks if a block tree node is known (downloaded and validated) +@@ -738,6 +885,7 @@ func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) { + if f.lastUpdateStats != nil { + f.lastUpdateStats.next = newEntry + } ++ + f.lastUpdateStats = newEntry + for p := range f.peers { + f.checkUpdateStats(p, newEntry) +@@ -760,6 +908,7 @@ func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) { + p.Log().Debug("Unknown peer to check update stats") + return + } ++ + if newEntry != nil && fp.firstUpdateStats == nil { + fp.firstUpdateStats = newEntry + } +diff --git a/les/fetcher_test.go b/les/fetcher_test.go +new file mode 100644 +index 0000000..fd60ec2 +--- /dev/null ++++ b/les/fetcher_test.go +@@ -0,0 +1,149 @@ ++package les ++ ++import ( ++ "crypto/rand" ++ "math/big" ++ "testing" ++ ++ "github.com/ethereum/go-ethereum/common" ++ "github.com/ethereum/go-ethereum/core/types" ++ "github.com/ethereum/go-ethereum/p2p" ++ "github.com/ethereum/go-ethereum/p2p/discover" ++) ++ ++func TestFetcherULCPeerSelector(t *testing.T) { ++ ++ var ( ++ id1 discover.NodeID ++ id2 discover.NodeID ++ id3 discover.NodeID ++ id4 discover.NodeID ++ ) ++ rand.Read(id1[:]) ++ rand.Read(id2[:]) ++ rand.Read(id3[:]) ++ rand.Read(id4[:]) ++ ++ ftn1 := &fetcherTreeNode{ ++ hash: common.HexToHash("1"), ++ td: big.NewInt(1), ++ } ++ ftn2 := &fetcherTreeNode{ ++ hash: common.HexToHash("2"), ++ td: big.NewInt(2), ++ parent: ftn1, ++ } ++ ftn3 := &fetcherTreeNode{ ++ hash: common.HexToHash("3"), ++ td: big.NewInt(3), ++ parent: ftn2, ++ } ++ lf := lightFetcher{ ++ pm: &ProtocolManager{ ++ ulc: &ulc{ ++ trustedKeys: map[string]struct{}{ ++ id1.String(): {}, ++ id2.String(): {}, ++ id3.String(): {}, ++ id4.String(): {}, ++ }, ++ minTrustedFraction: 70, ++ }, ++ }, ++ maxConfirmedTd: ftn1.td, ++ ++ peers: map[*peer]*fetcherPeerInfo{ ++ { ++ id: "peer1", ++ Peer: p2p.NewPeer(id1, "peer1", []p2p.Cap{}), ++ isTrusted: true, ++ }: { ++ nodeByHash: map[common.Hash]*fetcherTreeNode{ ++ ftn1.hash: ftn1, ++ ftn2.hash: ftn2, ++ }, ++ }, ++ { ++ Peer: p2p.NewPeer(id2, "peer2", []p2p.Cap{}), ++ id: "peer2", ++ isTrusted: true, ++ }: { ++ nodeByHash: map[common.Hash]*fetcherTreeNode{ ++ ftn1.hash: ftn1, ++ ftn2.hash: ftn2, ++ }, ++ }, ++ { ++ id: "peer3", ++ Peer: p2p.NewPeer(id3, "peer3", []p2p.Cap{}), ++ isTrusted: true, ++ }: { ++ nodeByHash: map[common.Hash]*fetcherTreeNode{ ++ ftn1.hash: ftn1, ++ ftn2.hash: ftn2, ++ ftn3.hash: ftn3, ++ }, ++ }, ++ { ++ id: "peer4", ++ Peer: p2p.NewPeer(id4, "peer4", []p2p.Cap{}), ++ isTrusted: true, ++ }: { ++ nodeByHash: map[common.Hash]*fetcherTreeNode{ ++ ftn1.hash: ftn1, ++ }, ++ }, ++ }, ++ chain: &lightChainStub{ ++ tds: map[common.Hash]*big.Int{}, ++ headers: map[common.Hash]*types.Header{ ++ ftn1.hash: {}, ++ ftn2.hash: {}, ++ ftn3.hash: {}, ++ }, ++ }, ++ } ++ bestHash, bestAmount, bestTD, sync := lf.findBestValues() ++ ++ if bestTD == nil { ++ t.Fatal("Empty result") ++ } ++ ++ if bestTD.Cmp(ftn2.td) != 0 { ++ t.Fatal("bad td", bestTD) ++ } ++ if bestHash != ftn2.hash { ++ t.Fatal("bad hash", bestTD) ++ } ++ ++ _, _ = bestAmount, sync ++} ++ ++type lightChainStub struct { ++ BlockChain ++ tds map[common.Hash]*big.Int ++ headers map[common.Hash]*types.Header ++ insertHeaderChainAssertFunc func(chain []*types.Header, checkFreq int) (int, error) ++} ++ ++func (l *lightChainStub) GetHeader(hash common.Hash, number uint64) *types.Header { ++ if h, ok := l.headers[hash]; ok { ++ return h ++ } ++ ++ return nil ++} ++ ++func (l *lightChainStub) LockChain() {} ++func (l *lightChainStub) UnlockChain() {} ++ ++func (l *lightChainStub) GetTd(hash common.Hash, number uint64) *big.Int { ++ if td, ok := l.tds[hash]; ok { ++ return td ++ } ++ return nil ++} ++ ++func (l *lightChainStub) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) { ++ return l.insertHeaderChainAssertFunc(chain, checkFreq) ++} +diff --git a/les/handler.go b/les/handler.go +index e884afb..ea6455a 100644 +--- a/les/handler.go ++++ b/les/handler.go +@@ -33,6 +33,7 @@ import ( + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" ++ "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" +@@ -119,12 +120,29 @@ type ProtocolManager struct { + + // wait group is used for graceful shutdowns during downloading + // and processing +- wg *sync.WaitGroup ++ wg *sync.WaitGroup ++ ulc *ulc + } + + // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable + // with the ethereum network. +-func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) { ++func NewProtocolManager( ++ chainConfig *params.ChainConfig, ++ indexerConfig *light.IndexerConfig, ++ lightSync bool, ++ networkId uint64, ++ mux *event.TypeMux, ++ engine consensus.Engine, ++ peers *peerSet, ++ blockchain BlockChain, ++ txpool txPool, ++ chainDb ethdb.Database, ++ odr *LesOdr, ++ txrelay *LesTxRelay, ++ serverPool *serverPool, ++ quitSync chan struct{}, ++ wg *sync.WaitGroup, ++ ulcConfig *eth.ULCConfig) (*ProtocolManager, error) { + // Create the protocol manager with the base fields + manager := &ProtocolManager{ + lightSync: lightSync, +@@ -149,6 +167,10 @@ func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.In + manager.reqDist = odr.retriever.dist + } + ++ if ulcConfig != nil { ++ manager.ulc = newULC(ulcConfig) ++ } ++ + removePeer := manager.removePeer + if disableClientRemovePeer { + removePeer = func(id string) {} +@@ -238,7 +260,11 @@ func (pm *ProtocolManager) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWrit + } + + func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { +- return newPeer(pv, nv, p, newMeteredMsgWriter(rw)) ++ var isTrusted bool ++ if pm.isULCEnabled() { ++ isTrusted = pm.ulc.isTrusted(p.ID()) ++ } ++ return newPeer(pv, nv, isTrusted, p, newMeteredMsgWriter(rw)) + } + + // handle is the callback invoked to manage the life cycle of a les peer. When +@@ -280,6 +306,7 @@ func (pm *ProtocolManager) handle(p *peer) error { + if rw, ok := p.rw.(*meteredMsgReadWriter); ok { + rw.Init(p.version) + } ++ + // Register the peer locally + if err := pm.peers.Register(p); err != nil { + p.Log().Error("Light Ethereum peer registration failed", "err", err) +@@ -291,6 +318,7 @@ func (pm *ProtocolManager) handle(p *peer) error { + } + pm.removePeer(p.id) + }() ++ + // Register the peer in the downloader. If the downloader considers it banned, we disconnect + if pm.lightSync { + p.lock.Lock() +@@ -375,7 +403,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { + // Block header query, collect the requested headers and reply + case AnnounceMsg: + p.Log().Trace("Received announce message") +- if p.requestAnnounceType == announceTypeNone { ++ if p.announceType == announceTypeNone { + return errResp(ErrUnexpectedResponse, "") + } + +@@ -384,7 +412,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { + return errResp(ErrDecode, "%v: %v", msg, err) + } + +- if p.requestAnnounceType == announceTypeSigned { ++ if p.announceType == announceTypeSigned { + if err := req.checkSignature(p.pubKey); err != nil { + p.Log().Trace("Invalid announcement signature", "err", err) + return err +@@ -1179,6 +1207,14 @@ func (pm *ProtocolManager) txStatus(hashes []common.Hash) []txStatus { + return stats + } + ++// isULCEnabled returns true if we can use ULC ++func (pm *ProtocolManager) isULCEnabled() bool { ++ if pm.ulc == nil || len(pm.ulc.trustedKeys) == 0 { ++ return false ++ } ++ return true ++} ++ + // downloaderPeerNotify implements peerSetNotify + type downloaderPeerNotify ProtocolManager + +@@ -1223,7 +1259,8 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip + return peer.GetRequestCost(GetBlockHeadersMsg, amount) + }, + canSend: func(dp distPeer) bool { +- return dp.(*peer) == pc.peer ++ p := dp.(*peer) ++ return p == pc.peer + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) +@@ -1250,5 +1287,7 @@ func (d *downloaderPeerNotify) registerPeer(p *peer) { + + func (d *downloaderPeerNotify) unregisterPeer(p *peer) { + pm := (*ProtocolManager)(d) +- pm.downloader.UnregisterPeer(p.id) ++ if pm.ulc == nil || p.isTrusted { ++ pm.downloader.UnregisterPeer(p.id) ++ } + } +diff --git a/les/handler_test.go b/les/handler_test.go +index 43be7f4..72ba266 100644 +--- a/les/handler_test.go ++++ b/les/handler_test.go +@@ -494,7 +494,7 @@ func TestGetBloombitsProofs(t *testing.T) { + + func TestTransactionStatusLes2(t *testing.T) { + db := ethdb.NewMemDatabase() +- pm := newTestProtocolManagerMust(t, false, 0, nil, nil, nil, db) ++ pm := newTestProtocolManagerMust(t, false, 0, nil, nil, nil, db, nil) + chain := pm.blockchain.(*core.BlockChain) + config := core.DefaultTxPoolConfig + config.Journal = "" +diff --git a/les/helper_test.go b/les/helper_test.go +index 29496d6..5c37763 100644 +--- a/les/helper_test.go ++++ b/les/helper_test.go +@@ -146,7 +146,7 @@ func testRCL() RequestCostList { + // newTestProtocolManager creates a new protocol manager for testing purposes, + // with the given number of blocks already known, potential notification + // channels for different events and relative chain indexers array. +-func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database) (*ProtocolManager, error) { ++func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database, ulcConfig *eth.ULCConfig) (*ProtocolManager, error) { + var ( + evmux = new(event.TypeMux) + engine = ethash.NewFaker() +@@ -176,7 +176,7 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor + if lightSync { + indexConfig = light.TestClientIndexerConfig + } +- pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup)) ++ pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup), ulcConfig) + if err != nil { + return nil, err + } +@@ -200,8 +200,8 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor + // with the given number of blocks already known, potential notification + // channels for different events and relative chain indexers array. In case of an error, the constructor force- + // fails the test. +-func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database) *ProtocolManager { +- pm, err := newTestProtocolManager(lightSync, blocks, generator, odr, peers, db) ++func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database, ulcConfig *eth.ULCConfig) *ProtocolManager { ++ pm, err := newTestProtocolManager(lightSync, blocks, generator, odr, peers, db, ulcConfig) + if err != nil { + t.Fatalf("Failed to create protocol manager: %v", err) + } +@@ -343,7 +343,7 @@ func newServerEnv(t *testing.T, blocks int, protocol int, waitIndexers func(*cor + db := ethdb.NewMemDatabase() + cIndexer, bIndexer, btIndexer := testIndexers(db, nil, light.TestServerIndexerConfig) + +- pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, nil, db) ++ pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, nil, db, nil) + peer, _ := newTestPeer(t, "peer", protocol, pm, true) + + cIndexer.Start(pm.blockchain.(*core.BlockChain)) +@@ -383,8 +383,8 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, waitIndexers fun + lcIndexer, lbIndexer, lbtIndexer := testIndexers(ldb, odr, light.TestClientIndexerConfig) + odr.SetIndexers(lcIndexer, lbtIndexer, lbIndexer) + +- pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, peers, db) +- lpm := newTestProtocolManagerMust(t, true, 0, nil, odr, lPeers, ldb) ++ pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, peers, db, nil) ++ lpm := newTestProtocolManagerMust(t, true, 0, nil, odr, lPeers, ldb, nil) + + startIndexers := func(clientMode bool, pm *ProtocolManager) { + if clientMode { +diff --git a/les/odr.go b/les/odr.go +index 9def05a..f759235 100644 +--- a/les/odr.go ++++ b/les/odr.go +@@ -109,7 +109,10 @@ func (odr *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err erro + }, + canSend: func(dp distPeer) bool { + p := dp.(*peer) +- return lreq.CanSend(p) ++ if !p.isOnlyAnnounce { ++ return lreq.CanSend(p) ++ } ++ return false + }, + request: func(dp distPeer) func() { + p := dp.(*peer) +diff --git a/les/peer.go b/les/peer.go +index 70c863c..c7fa59c 100644 +--- a/les/peer.go ++++ b/les/peer.go +@@ -58,7 +58,7 @@ type peer struct { + version int // Protocol version negotiated + network uint64 // Network ID being on + +- announceType, requestAnnounceType uint64 ++ announceType uint64 + + id string + +@@ -76,9 +76,12 @@ type peer struct { + fcServer *flowcontrol.ServerNode // nil if the peer is client only + fcServerParams *flowcontrol.ServerParams + fcCosts requestCostTable ++ ++ isTrusted bool ++ isOnlyAnnounce bool + } + +-func newPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { ++func newPeer(version int, network uint64, isTrusted bool, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { + id := p.ID() + pubKey, _ := id.Pubkey() + +@@ -90,6 +93,7 @@ func newPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *pe + network: network, + id: fmt.Sprintf("%x", id[:8]), + announceChn: make(chan announceData, 20), ++ isTrusted: isTrusted, + } + } + +@@ -405,23 +409,32 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis + send = send.add("headNum", headNum) + send = send.add("genesisHash", genesis) + if server != nil { +- send = send.add("serveHeaders", nil) +- send = send.add("serveChainSince", uint64(0)) +- send = send.add("serveStateSince", uint64(0)) +- send = send.add("txRelay", nil) ++ if !server.onlyAnnounce { ++ //only announce server. It sends only announse requests ++ send = send.add("serveHeaders", nil) ++ send = send.add("serveChainSince", uint64(0)) ++ send = send.add("serveStateSince", uint64(0)) ++ send = send.add("txRelay", nil) ++ } + send = send.add("flowControl/BL", server.defParams.BufLimit) + send = send.add("flowControl/MRR", server.defParams.MinRecharge) + list := server.fcCostStats.getCurrentList() + send = send.add("flowControl/MRC", list) + p.fcCosts = list.decode() + } else { +- p.requestAnnounceType = announceTypeSimple // set to default until "very light" client mode is implemented +- send = send.add("announceType", p.requestAnnounceType) ++ //on client node ++ p.announceType = announceTypeSimple ++ if p.isTrusted { ++ p.announceType = announceTypeSigned ++ } ++ send = send.add("announceType", p.announceType) + } ++ + recvList, err := p.sendReceiveHandshake(send) + if err != nil { + return err + } ++ + recv := recvList.decode() + + var rGenesis, rHash common.Hash +@@ -456,25 +469,33 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis + if int(rVersion) != p.version { + return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", rVersion, p.version) + } ++ + if server != nil { + // until we have a proper peer connectivity API, allow LES connection to other servers + /*if recv.get("serveStateSince", nil) == nil { + return errResp(ErrUselessPeer, "wanted client, got server") + }*/ + if recv.get("announceType", &p.announceType) != nil { ++ //set default announceType on server side + p.announceType = announceTypeSimple + } + p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams) + } else { ++ //mark OnlyAnnounce server if "serveHeaders", "serveChainSince", "serveStateSince" or "txRelay" fields don't exist + if recv.get("serveChainSince", nil) != nil { +- return errResp(ErrUselessPeer, "peer cannot serve chain") ++ p.isOnlyAnnounce = true + } + if recv.get("serveStateSince", nil) != nil { +- return errResp(ErrUselessPeer, "peer cannot serve state") ++ p.isOnlyAnnounce = true + } + if recv.get("txRelay", nil) != nil { +- return errResp(ErrUselessPeer, "peer cannot relay transactions") ++ p.isOnlyAnnounce = true + } ++ ++ if p.isOnlyAnnounce && !p.isTrusted { ++ return errResp(ErrUselessPeer, "peer cannot serve requests") ++ } ++ + params := &flowcontrol.ServerParams{} + if err := recv.get("flowControl/BL", ¶ms.BufLimit); err != nil { + return err +@@ -490,7 +511,6 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis + p.fcServer = flowcontrol.NewServerNode(params) + p.fcCosts = MRC.decode() + } +- + p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum} + return nil + } +@@ -580,8 +600,10 @@ func (ps *peerSet) Unregister(id string) error { + for _, n := range peers { + n.unregisterPeer(p) + } ++ + p.sendQueue.quit() + p.Peer.Disconnect(p2p.DiscUselessPeer) ++ + return nil + } + } +diff --git a/les/peer_test.go b/les/peer_test.go +new file mode 100644 +index 0000000..e88d9c9 +--- /dev/null ++++ b/les/peer_test.go +@@ -0,0 +1,303 @@ ++package les ++ ++import ( ++ "crypto/rand" ++ "math/big" ++ "testing" ++ ++ "github.com/ethereum/go-ethereum/common" ++ "github.com/ethereum/go-ethereum/les/flowcontrol" ++ "github.com/ethereum/go-ethereum/p2p" ++ "github.com/ethereum/go-ethereum/p2p/discover" ++ "github.com/ethereum/go-ethereum/rlp" ++) ++ ++const ( ++ test_networkid = 10 ++ protocol_version = 2123 ++) ++ ++var ( ++ hash = common.HexToHash("some string") ++ genesis = common.HexToHash("genesis hash") ++ headNum = uint64(1234) ++ td = big.NewInt(123) ++) ++ ++//ulc connects to trusted peer and send announceType=announceTypeSigned ++func TestPeerHandshakeSetAnnounceTypeToAnnounceTypeSignedForTrustedPeer(t *testing.T) { ++ var id discover.NodeID ++ rand.Read(id[:]) ++ ++ //peer to connect(on ulc side) ++ p := peer{ ++ Peer: p2p.NewPeer(id, "test peer", []p2p.Cap{}), ++ version: protocol_version, ++ isTrusted: true, ++ rw: &rwStub{ ++ WriteHook: func(recvList keyValueList) { ++ //checking that ulc sends to peer allowedRequests=onlyAnnounceRequests and announceType = announceTypeSigned ++ recv := recvList.decode() ++ var reqType uint64 ++ ++ err := recv.get("announceType", &reqType) ++ if err != nil { ++ t.Fatal(err) ++ } ++ ++ if reqType != announceTypeSigned { ++ t.Fatal("Expected announceTypeSigned") ++ } ++ }, ++ ReadHook: func(l keyValueList) keyValueList { ++ l = l.add("serveHeaders", nil) ++ l = l.add("serveChainSince", uint64(0)) ++ l = l.add("serveStateSince", uint64(0)) ++ l = l.add("txRelay", nil) ++ l = l.add("flowControl/BL", uint64(0)) ++ l = l.add("flowControl/MRR", uint64(0)) ++ l = l.add("flowControl/MRC", RequestCostList{}) ++ ++ return l ++ }, ++ }, ++ network: test_networkid, ++ } ++ ++ err := p.Handshake(td, hash, headNum, genesis, nil) ++ if err != nil { ++ t.Fatalf("Handshake error: %s", err) ++ } ++ ++ if p.announceType != announceTypeSigned { ++ t.Fatal("Incorrect announceType") ++ } ++} ++ ++func TestPeerHandshakeAnnounceTypeSignedForTrustedPeersPeerNotInTrusted(t *testing.T) { ++ var id discover.NodeID ++ rand.Read(id[:]) ++ p := peer{ ++ Peer: p2p.NewPeer(id, "test peer", []p2p.Cap{}), ++ version: protocol_version, ++ rw: &rwStub{ ++ WriteHook: func(recvList keyValueList) { ++ //checking that ulc sends to peer allowedRequests=noRequests and announceType != announceTypeSigned ++ recv := recvList.decode() ++ var reqType uint64 ++ ++ err := recv.get("announceType", &reqType) ++ if err != nil { ++ t.Fatal(err) ++ } ++ ++ if reqType == announceTypeSigned { ++ t.Fatal("Expected not announceTypeSigned") ++ } ++ }, ++ ReadHook: func(l keyValueList) keyValueList { ++ l = l.add("serveHeaders", nil) ++ l = l.add("serveChainSince", uint64(0)) ++ l = l.add("serveStateSince", uint64(0)) ++ l = l.add("txRelay", nil) ++ l = l.add("flowControl/BL", uint64(0)) ++ l = l.add("flowControl/MRR", uint64(0)) ++ l = l.add("flowControl/MRC", RequestCostList{}) ++ ++ return l ++ }, ++ }, ++ network: test_networkid, ++ } ++ ++ err := p.Handshake(td, hash, headNum, genesis, nil) ++ if err != nil { ++ t.Fatal(err) ++ } ++ if p.announceType == announceTypeSigned { ++ t.Fatal("Incorrect announceType") ++ } ++} ++ ++func TestPeerHandshakeDefaultAllRequests(t *testing.T) { ++ var id discover.NodeID ++ rand.Read(id[:]) ++ ++ s := generateLesServer() ++ ++ p := peer{ ++ Peer: p2p.NewPeer(id, "test peer", []p2p.Cap{}), ++ version: protocol_version, ++ rw: &rwStub{ ++ ReadHook: func(l keyValueList) keyValueList { ++ l = l.add("announceType", uint64(announceTypeSigned)) ++ l = l.add("allowedRequests", uint64(0)) ++ ++ return l ++ }, ++ }, ++ network: test_networkid, ++ } ++ ++ err := p.Handshake(td, hash, headNum, genesis, s) ++ if err != nil { ++ t.Fatal(err) ++ } ++ ++ if p.isOnlyAnnounce { ++ t.Fatal("Incorrect announceType") ++ } ++} ++ ++func TestPeerHandshakeServerSendOnlyAnnounceRequestsHeaders(t *testing.T) { ++ var id discover.NodeID ++ rand.Read(id[:]) ++ ++ s := generateLesServer() ++ s.onlyAnnounce = true ++ ++ p := peer{ ++ Peer: p2p.NewPeer(id, "test peer", []p2p.Cap{}), ++ version: protocol_version, ++ rw: &rwStub{ ++ ReadHook: func(l keyValueList) keyValueList { ++ l = l.add("announceType", uint64(announceTypeSigned)) ++ ++ return l ++ }, ++ WriteHook: func(l keyValueList) { ++ for _, v := range l { ++ if v.Key == "serveHeaders" || ++ v.Key == "serveChainSince" || ++ v.Key == "serveStateSince" || ++ v.Key == "txRelay" { ++ t.Fatalf("%v exists", v.Key) ++ } ++ } ++ }, ++ }, ++ network: test_networkid, ++ } ++ ++ err := p.Handshake(td, hash, headNum, genesis, s) ++ if err != nil { ++ t.Fatal(err) ++ } ++} ++func TestPeerHandshakeClientReceiveOnlyAnnounceRequestsHeaders(t *testing.T) { ++ var id discover.NodeID ++ rand.Read(id[:]) ++ ++ p := peer{ ++ Peer: p2p.NewPeer(id, "test peer", []p2p.Cap{}), ++ version: protocol_version, ++ rw: &rwStub{ ++ ReadHook: func(l keyValueList) keyValueList { ++ l = l.add("flowControl/BL", uint64(0)) ++ l = l.add("flowControl/MRR", uint64(0)) ++ l = l.add("flowControl/MRC", RequestCostList{}) ++ ++ l = l.add("announceType", uint64(announceTypeSigned)) ++ ++ return l ++ }, ++ }, ++ network: test_networkid, ++ isTrusted: true, ++ } ++ ++ err := p.Handshake(td, hash, headNum, genesis, nil) ++ if err != nil { ++ t.Fatal(err) ++ } ++ ++ if !p.isOnlyAnnounce { ++ t.Fatal("isOnlyAnnounce must be true") ++ } ++} ++ ++func TestPeerHandshakeClientReturnErrorOnUselessPeer(t *testing.T) { ++ var id discover.NodeID ++ rand.Read(id[:]) ++ ++ p := peer{ ++ Peer: p2p.NewPeer(id, "test peer", []p2p.Cap{}), ++ version: protocol_version, ++ rw: &rwStub{ ++ ReadHook: func(l keyValueList) keyValueList { ++ l = l.add("flowControl/BL", uint64(0)) ++ l = l.add("flowControl/MRR", uint64(0)) ++ l = l.add("flowControl/MRC", RequestCostList{}) ++ ++ l = l.add("announceType", uint64(announceTypeSigned)) ++ ++ return l ++ }, ++ }, ++ network: test_networkid, ++ } ++ ++ err := p.Handshake(td, hash, headNum, genesis, nil) ++ if err == nil { ++ t.FailNow() ++ } ++} ++ ++func generateLesServer() *LesServer { ++ s := &LesServer{ ++ defParams: &flowcontrol.ServerParams{ ++ BufLimit: uint64(300000000), ++ MinRecharge: uint64(50000), ++ }, ++ fcManager: flowcontrol.NewClientManager(1, 2, 3), ++ fcCostStats: &requestCostStats{ ++ stats: make(map[uint64]*linReg, len(reqList)), ++ }, ++ } ++ for _, code := range reqList { ++ s.fcCostStats.stats[code] = &linReg{cnt: 100} ++ } ++ return s ++} ++ ++type rwStub struct { ++ ReadHook func(l keyValueList) keyValueList ++ WriteHook func(l keyValueList) ++} ++ ++func (s *rwStub) ReadMsg() (p2p.Msg, error) { ++ payload := keyValueList{} ++ payload = payload.add("protocolVersion", uint64(protocol_version)) ++ payload = payload.add("networkId", uint64(test_networkid)) ++ payload = payload.add("headTd", td) ++ payload = payload.add("headHash", hash) ++ payload = payload.add("headNum", headNum) ++ payload = payload.add("genesisHash", genesis) ++ ++ if s.ReadHook != nil { ++ payload = s.ReadHook(payload) ++ } ++ ++ size, p, err := rlp.EncodeToReader(payload) ++ if err != nil { ++ return p2p.Msg{}, err ++ } ++ ++ return p2p.Msg{ ++ Size: uint32(size), ++ Payload: p, ++ }, nil ++} ++ ++func (s *rwStub) WriteMsg(m p2p.Msg) error { ++ recvList := keyValueList{} ++ if err := m.Decode(&recvList); err != nil { ++ return err ++ } ++ ++ if s.WriteHook != nil { ++ s.WriteHook(recvList) ++ } ++ ++ return nil ++} +diff --git a/les/server.go b/les/server.go +index 2fa0456..2ded3c1 100644 +--- a/les/server.go ++++ b/les/server.go +@@ -41,17 +41,34 @@ import ( + type LesServer struct { + lesCommons + +- fcManager *flowcontrol.ClientManager // nil if our node is client only +- fcCostStats *requestCostStats +- defParams *flowcontrol.ServerParams +- lesTopics []discv5.Topic +- privateKey *ecdsa.PrivateKey +- quitSync chan struct{} ++ fcManager *flowcontrol.ClientManager // nil if our node is client only ++ fcCostStats *requestCostStats ++ defParams *flowcontrol.ServerParams ++ lesTopics []discv5.Topic ++ privateKey *ecdsa.PrivateKey ++ quitSync chan struct{} ++ onlyAnnounce bool + } + + func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { + quitSync := make(chan struct{}) +- pm, err := NewProtocolManager(eth.BlockChain().Config(), light.DefaultServerIndexerConfig, false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup)) ++ pm, err := NewProtocolManager( ++ eth.BlockChain().Config(), ++ light.DefaultServerIndexerConfig, ++ false, ++ config.NetworkId, ++ eth.EventMux(), ++ eth.Engine(), ++ newPeerSet(), ++ eth.BlockChain(), ++ eth.TxPool(), ++ eth.ChainDb(), ++ nil, ++ nil, ++ nil, ++ quitSync, ++ new(sync.WaitGroup), ++ config.ULC) + if err != nil { + return nil, err + } +@@ -70,8 +87,9 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { + bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency), + protocolManager: pm, + }, +- quitSync: quitSync, +- lesTopics: lesTopics, ++ quitSync: quitSync, ++ lesTopics: lesTopics, ++ onlyAnnounce: config.OnlyAnnounce, + } + + logger := log.New() +@@ -289,10 +307,8 @@ func (s *requestCostStats) getCurrentList() RequestCostList { + defer s.lock.Unlock() + + list := make(RequestCostList, len(reqList)) +- //fmt.Println("RequestCostList") + for idx, code := range reqList { + b, m := s.stats[code].calc() +- //fmt.Println(code, s.stats[code].cnt, b/1000000, m/1000000) + if m < 0 { + b += m + m = 0 +diff --git a/les/serverpool.go b/les/serverpool.go +index 1a4c752..3a2dcea 100644 +--- a/les/serverpool.go ++++ b/les/serverpool.go +@@ -125,22 +125,22 @@ type serverPool struct { + discNodes chan *discv5.Node + discLookups chan bool + ++ trustedNodes []string + entries map[discover.NodeID]*poolEntry + timeout, enableRetry chan *poolEntry + adjustStats chan poolStatAdjust + +- connCh chan *connReq +- disconnCh chan *disconnReq +- registerCh chan *registerReq +- +- knownQueue, newQueue poolEntryQueue +- knownSelect, newSelect *weightedRandomSelect +- knownSelected, newSelected int +- fastDiscover bool ++ knownQueue, newQueue, trustedQueue poolEntryQueue ++ knownSelect, newSelect *weightedRandomSelect ++ knownSelected, newSelected int ++ fastDiscover bool ++ connCh chan *connReq ++ disconnCh chan *disconnReq ++ registerCh chan *registerReq + } + + // newServerPool creates a new serverPool instance +-func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *serverPool { ++func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup, trustedNodes []string) *serverPool { + pool := &serverPool{ + db: db, + quit: quit, +@@ -155,7 +155,10 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s + knownSelect: newWeightedRandomSelect(), + newSelect: newWeightedRandomSelect(), + fastDiscover: true, ++ trustedNodes: trustedNodes, + } ++ ++ pool.trustedQueue = newPoolEntryQueue(maxKnownEntries, nil) + pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry) + pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry) + return pool +@@ -431,7 +434,7 @@ func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16 + } + addr.lastSeen = now + entry.addrSelect.update(addr) +- if !entry.known { ++ if !entry.known || !entry.trusted { + pool.newQueue.setLatest(entry) + } + return entry +@@ -459,6 +462,30 @@ func (pool *serverPool) loadNodes() { + pool.knownQueue.setLatest(e) + pool.knownSelect.update((*knownEntry)(e)) + } ++ ++ for _, trusted := range pool.parseTrustedServers() { ++ e := pool.findOrNewNode(trusted.ID, trusted.IP, trusted.TCP) ++ e.trusted = true ++ e.dialed = &poolEntryAddress{ip: trusted.IP, port: trusted.TCP} ++ pool.entries[e.id] = e ++ pool.trustedQueue.setLatest(e) ++ } ++ ++} ++ ++// parseTrustedServers returns valid and parsed by discovery enodes. ++func (pool *serverPool) parseTrustedServers() []*discover.Node { ++ nodes := make([]*discover.Node, 0, len(pool.trustedNodes)) ++ ++ for _, enode := range pool.trustedNodes { ++ node, err := discover.ParseNode(enode) ++ if err != nil { ++ log.Warn("Trusted node URL invalid", "enode", enode, "err", err) ++ continue ++ } ++ nodes = append(nodes, node) ++ } ++ return nodes + } + + // saveNodes saves known nodes and their statistics into the database. Nodes are +@@ -516,6 +543,10 @@ func (pool *serverPool) updateCheckDial(entry *poolEntry) { + // checkDial checks if new dials can/should be made. It tries to select servers both + // based on good statistics and recent discovery. + func (pool *serverPool) checkDial() { ++ for _, e := range pool.trustedQueue.queue { ++ pool.dial(e, false) ++ } ++ + fillWithKnownSelects := !pool.fastDiscover + for pool.knownSelected < targetKnownSelect { + entry := pool.knownSelect.choose() +@@ -552,17 +583,26 @@ func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) { + return + } + entry.state = psDialed +- entry.knownSelected = knownSelected +- if knownSelected { +- pool.knownSelected++ +- } else { +- pool.newSelected++ ++ ++ if !entry.trusted { ++ entry.knownSelected = knownSelected ++ if knownSelected { ++ pool.knownSelected++ ++ } else { ++ pool.newSelected++ ++ } ++ addr := entry.addrSelect.choose().(*poolEntryAddress) ++ entry.dialed = addr + } +- addr := entry.addrSelect.choose().(*poolEntryAddress) +- log.Debug("Dialing new peer", "lesaddr", entry.id.String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected) +- entry.dialed = addr ++ ++ state := "known" ++ if entry.trusted { ++ state = "trusted" ++ } ++ log.Debug("Dialing new peer", "lesaddr", entry.id.String()+"@"+entry.dialed.strKey(), "set", len(entry.addr), state, knownSelected) ++ + go func() { +- pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port)) ++ pool.server.AddPeer(discover.NewNode(entry.id, entry.dialed.ip, entry.dialed.port, entry.dialed.port)) + select { + case <-pool.quit: + case <-time.After(dialTimeout): +@@ -609,6 +649,7 @@ type poolEntry struct { + + lastDiscovered mclock.AbsTime + known, knownSelected bool ++ trusted bool + connectStats, delayStats poolStats + responseStats, timeoutStats poolStats + state int +@@ -804,7 +845,7 @@ func (q *poolEntryQueue) setLatest(entry *poolEntry) { + if q.queue[entry.queueIdx] == entry { + delete(q.queue, entry.queueIdx) + } else { +- if len(q.queue) == q.maxCnt { ++ if len(q.queue) == q.maxCnt && q.removeFromPool != nil { + e := q.fetchOldest() + q.remove(e) + q.removeFromPool(e) +diff --git a/les/serverpool_test.go b/les/serverpool_test.go +new file mode 100644 +index 0000000..ca989e6 +--- /dev/null ++++ b/les/serverpool_test.go +@@ -0,0 +1,67 @@ ++package les ++ ++import ( ++ "math/rand" ++ "sync" ++ "testing" ++ ++ "github.com/ethereum/go-ethereum/ethdb" ++ "github.com/ethereum/go-ethereum/p2p" ++ "github.com/ethereum/go-ethereum/p2p/discover" ++ "github.com/ethereum/go-ethereum/rlp" ++) ++ ++func TestLoadTrustedNodes(t *testing.T) { ++ node := discover.Node{ ++ ID: discover.NodeID{}, ++ } ++ rand.Read(node.ID[:]) ++ ++ var wg sync.WaitGroup ++ q := make(chan struct{}) ++ sp := newServerPool(&dbStub{}, q, &wg, []string{node.String()}) ++ sp.server = &p2p.Server{} ++ sp.server.TrustedNodes = []*discover.Node{ ++ &node, ++ } ++ ++ sp.loadNodes() ++ ++ if len(sp.entries) == 0 { ++ t.Fatal("empty nodes") ++ } ++ if _, ok := sp.entries[node.ID]; !ok { ++ t.Fatal("empty entries") ++ } ++ if len(sp.trustedQueue.queue) != 1 { ++ t.Fatal("incorrect trustedQueue.queue") ++ } ++ if sp.trustedQueue.queue[sp.entries[node.ID].queueIdx] != sp.entries[node.ID] { ++ t.Fatal("not exist") ++ } ++ if sp.trustedQueue.newPtr != 1 { ++ t.Fatal("incorrect ptr") ++ } ++} ++ ++type dbStub struct{} ++ ++func (db *dbStub) Put(key []byte, value []byte) error { ++ return nil ++} ++func (db *dbStub) Get(key []byte) ([]byte, error) { ++ list := make([]*poolEntry, 0) ++ return rlp.EncodeToBytes(&list) ++} ++func (db *dbStub) Has(key []byte) (bool, error) { ++ return false, nil ++} ++func (db *dbStub) Delete(key []byte) error { ++ return nil ++} ++func (db *dbStub) Close() { ++ ++} ++func (db *dbStub) NewBatch() ethdb.Batch { ++ return nil ++} +diff --git a/les/sync.go b/les/sync.go +index eb15537..1ac6455 100644 +--- a/les/sync.go ++++ b/les/sync.go +@@ -31,6 +31,7 @@ func (pm *ProtocolManager) syncer() { + // Start and ensure cleanup of sync mechanisms + //pm.fetcher.Start() + //defer pm.fetcher.Stop() ++ defer pm.downloader.Terminate() + + // Wait for different events to fire synchronisation operations + //forceSync := time.Tick(forceSyncCycle) +diff --git a/les/txrelay.go b/les/txrelay.go +index 7a02cc8..38fc66a 100644 +--- a/les/txrelay.go ++++ b/les/txrelay.go +@@ -121,7 +121,10 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) { + return peer.GetRequestCost(SendTxMsg, len(ll)) + }, + canSend: func(dp distPeer) bool { +- return dp.(*peer) == pp ++ if !dp.(*peer).isOnlyAnnounce { ++ return dp.(*peer) == pp ++ } ++ return false + }, + request: func(dp distPeer) func() { + peer := dp.(*peer) +diff --git a/les/ulc.go b/les/ulc.go +new file mode 100644 +index 0000000..c96181f +--- /dev/null ++++ b/les/ulc.go +@@ -0,0 +1,36 @@ ++package les ++ ++import ( ++ "github.com/ethereum/go-ethereum/eth" ++ "github.com/ethereum/go-ethereum/p2p/discover" ++) ++ ++type ulc struct { ++ trustedKeys map[string]struct{} ++ minTrustedFraction int ++} ++ ++func newULC(ulcConfig *eth.ULCConfig) *ulc { ++ if ulcConfig == nil { ++ return nil ++ } ++ ++ m := make(map[string]struct{}, len(ulcConfig.TrustedServers)) ++ for _, id := range ulcConfig.TrustedServers { ++ node, err := discover.ParseNode(id) ++ if err != nil { ++ continue ++ } ++ m[node.ID.String()] = struct{}{} ++ } ++ ++ return &ulc{m, ulcConfig.MinTrustedFraction} ++} ++ ++func (u *ulc) isTrusted(p discover.NodeID) bool { ++ if u.trustedKeys == nil { ++ return false ++ } ++ _, ok := u.trustedKeys[p.String()] ++ return ok ++} +diff --git a/les/ulc_test.go b/les/ulc_test.go +new file mode 100644 +index 0000000..1df0add +--- /dev/null ++++ b/les/ulc_test.go +@@ -0,0 +1,245 @@ ++package les ++ ++import ( ++ "fmt" ++ "reflect" ++ "testing" ++ "time" ++ ++ "github.com/ethereum/go-ethereum/core" ++ "github.com/ethereum/go-ethereum/crypto" ++ "github.com/ethereum/go-ethereum/eth" ++ "github.com/ethereum/go-ethereum/ethdb" ++ "github.com/ethereum/go-ethereum/light" ++ "github.com/ethereum/go-ethereum/p2p" ++ "github.com/ethereum/go-ethereum/p2p/discover" ++) ++ ++func TestULCSyncWithOnePeer(t *testing.T) { ++ f := newFullPeerPair(t, 1, 4, testChainGen) ++ ulcConfig := ð.ULCConfig{ ++ MinTrustedFraction: 100, ++ TrustedServers: []string{f.ID.String()}, ++ } ++ ++ l := newLightPeer(t, ulcConfig) ++ ++ if reflect.DeepEqual(f.PM.blockchain.CurrentHeader().Hash(), l.PM.blockchain.CurrentHeader().Hash()) { ++ t.Fatal("blocks are equal") ++ } ++ ++ _, _, err := connectPeers(f, l, 2) ++ if err != nil { ++ t.Fatal(err) ++ } ++ ++ l.PM.fetcher.lock.Lock() ++ l.PM.fetcher.nextRequest() ++ l.PM.fetcher.lock.Unlock() ++ ++ if !reflect.DeepEqual(f.PM.blockchain.CurrentHeader().Hash(), l.PM.blockchain.CurrentHeader().Hash()) { ++ t.Fatal("sync doesn't work") ++ } ++} ++ ++func TestULCReceiveAnnounce(t *testing.T) { ++ f := newFullPeerPair(t, 1, 4, testChainGen) ++ ulcConfig := ð.ULCConfig{ ++ MinTrustedFraction: 100, ++ TrustedServers: []string{f.ID.String()}, ++ } ++ ++ key, err := crypto.GenerateKey() ++ ID := discover.PubkeyID(&key.PublicKey) ++ l := newLightPeer(t, ulcConfig) ++ l.ID = ID ++ ++ fPeer, lPeer, err := connectPeers(f, l, 2) ++ if err != nil { ++ t.Fatal(err) ++ } ++ ++ l.PM.synchronise(fPeer) ++ ++ //check that the sync is finished correctly ++ if !reflect.DeepEqual(f.PM.blockchain.CurrentHeader().Hash(), l.PM.blockchain.CurrentHeader().Hash()) { ++ t.Fatal("sync doesn't work") ++ } ++ ++ l.PM.peers.lock.Lock() ++ if len(l.PM.peers.peers) == 0 { ++ t.Fatal("peer list should not be empty") ++ } ++ l.PM.peers.lock.Unlock() ++ ++ //send a signed announce message(payload doesn't matter) ++ announce := announceData{} ++ announce.sign(key) ++ lPeer.SendAnnounce(announce) ++ ++ l.PM.peers.lock.Lock() ++ if len(l.PM.peers.peers) == 0 { ++ t.Fatal("peer list after receiving message should not be empty") ++ } ++ l.PM.peers.lock.Unlock() ++} ++ ++func TestULCShouldNotSyncWithTwoPeersOneHaveEmptyChain(t *testing.T) { ++ f1 := newFullPeerPair(t, 1, 4, testChainGen) ++ f2 := newFullPeerPair(t, 2, 0, nil) ++ ulcConf := &ulc{minTrustedFraction: 100, trustedKeys: make(map[string]struct{})} ++ ulcConf.trustedKeys[f1.ID.String()] = struct{}{} ++ ulcConf.trustedKeys[f2.ID.String()] = struct{}{} ++ ulcConfig := ð.ULCConfig{ ++ MinTrustedFraction: 100, ++ TrustedServers: []string{f1.ID.String(), f2.ID.String()}, ++ } ++ l := newLightPeer(t, ulcConfig) ++ l.PM.ulc.minTrustedFraction = 100 ++ ++ _, _, err := connectPeers(f1, l, 2) ++ if err != nil { ++ t.Fatal(err) ++ } ++ _, _, err = connectPeers(f2, l, 2) ++ if err != nil { ++ t.Fatal(err) ++ } ++ ++ l.PM.fetcher.lock.Lock() ++ l.PM.fetcher.nextRequest() ++ l.PM.fetcher.lock.Unlock() ++ ++ if reflect.DeepEqual(f1.PM.blockchain.CurrentHeader().Hash(), l.PM.blockchain.CurrentHeader().Hash()) { ++ t.Fatal("Incorrect hash: second peer has empty chain") ++ } ++} ++ ++func TestULCShouldNotSyncWithThreePeersOneHaveEmptyChain(t *testing.T) { ++ f1 := newFullPeerPair(t, 1, 4, testChainGen) ++ f2 := newFullPeerPair(t, 2, 4, testChainGen) ++ f3 := newFullPeerPair(t, 3, 0, nil) ++ ulcConf := &ulc{minTrustedFraction: 60, trustedKeys: make(map[string]struct{})} ++ ulcConf.trustedKeys[f1.ID.String()] = struct{}{} ++ ulcConf.trustedKeys[f2.ID.String()] = struct{}{} ++ ulcConfig := ð.ULCConfig{ ++ MinTrustedFraction: 60, ++ TrustedServers: []string{f1.ID.String(), f2.ID.String()}, ++ } ++ l := newLightPeer(t, ulcConfig) ++ l.PM.ulc.minTrustedFraction = 60 ++ ++ _, _, err := connectPeers(f1, l, 2) ++ if err != nil { ++ t.Fatal(err) ++ } ++ _, _, err = connectPeers(f2, l, 2) ++ if err != nil { ++ t.Fatal(err) ++ } ++ ++ _, _, err = connectPeers(f3, l, 2) ++ if err != nil { ++ t.Fatal(err) ++ } ++ ++ l.PM.fetcher.lock.Lock() ++ l.PM.fetcher.nextRequest() ++ l.PM.fetcher.lock.Unlock() ++ ++ if !reflect.DeepEqual(f1.PM.blockchain.CurrentHeader().Hash(), l.PM.blockchain.CurrentHeader().Hash()) { ++ t.Fatal("Incorrect hash") ++ } ++ if !reflect.DeepEqual(f2.PM.blockchain.CurrentHeader().Hash(), l.PM.blockchain.CurrentHeader().Hash()) { ++ t.Fatal("Incorrect hash") ++ } ++} ++ ++type pairPeer struct { ++ Name string ++ ID discover.NodeID ++ PM *ProtocolManager ++} ++ ++func connectPeers(full, light pairPeer, version int) (*peer, *peer, error) { ++ // Create a message pipe to communicate through ++ app, net := p2p.MsgPipe() ++ ++ peerLight := full.PM.newPeer(version, NetworkId, p2p.NewPeer(light.ID, light.Name, nil), net) ++ peerFull := light.PM.newPeer(version, NetworkId, p2p.NewPeer(full.ID, full.Name, nil), app) ++ ++ // Start the peerLight on a new thread ++ errc1 := make(chan error, 1) ++ errc2 := make(chan error, 1) ++ go func() { ++ select { ++ case light.PM.newPeerCh <- peerFull: ++ errc1 <- light.PM.handle(peerFull) ++ case <-light.PM.quitSync: ++ errc1 <- p2p.DiscQuitting ++ } ++ }() ++ go func() { ++ select { ++ case full.PM.newPeerCh <- peerLight: ++ errc2 <- full.PM.handle(peerLight) ++ case <-full.PM.quitSync: ++ errc2 <- p2p.DiscQuitting ++ } ++ }() ++ ++ select { ++ case <-time.After(time.Millisecond * 100): ++ case err := <-errc1: ++ return nil, nil, fmt.Errorf("peerLight handshake error: %v", err) ++ case err := <-errc2: ++ return nil, nil, fmt.Errorf("peerFull handshake error: %v", err) ++ } ++ ++ return peerFull, peerLight, nil ++} ++ ++// newFullPeerPair creates node with full sync mode ++func newFullPeerPair(t *testing.T, index int, numberOfblocks int, chainGen func(int, *core.BlockGen)) pairPeer { ++ db := ethdb.NewMemDatabase() ++ ++ pmFull := newTestProtocolManagerMust(t, false, numberOfblocks, chainGen, nil, nil, db, nil) ++ ++ peerPairFull := pairPeer{ ++ Name: "full node", ++ PM: pmFull, ++ } ++ key, err := crypto.GenerateKey() ++ if err != nil { ++ t.Fatal("generate key err:", err) ++ } ++ ID := discover.PubkeyID(&key.PublicKey) ++ peerPairFull.ID = ID ++ return peerPairFull ++} ++ ++// newLightPeer creates node with light sync mode ++func newLightPeer(t *testing.T, ulcConfig *eth.ULCConfig) pairPeer { ++ peers := newPeerSet() ++ dist := newRequestDistributor(peers, make(chan struct{})) ++ rm := newRetrieveManager(peers, dist, nil) ++ ldb := ethdb.NewMemDatabase() ++ ++ odr := NewLesOdr(ldb, light.DefaultClientIndexerConfig, rm) ++ ++ pmLight := newTestProtocolManagerMust(t, true, 0, nil, odr, peers, ldb, ulcConfig) ++ peerPairLight := pairPeer{ ++ Name: "ulc node", ++ PM: pmLight, ++ } ++ ++ key, err := crypto.GenerateKey() ++ if err != nil { ++ t.Fatal("generate key err:", err) ++ } ++ ID := discover.PubkeyID(&key.PublicKey) ++ ++ peerPairLight.ID = ID ++ ++ return peerPairLight ++} +diff --git a/light/lightchain.go b/light/lightchain.go +index 8e2734c..0b5571b 100644 +--- a/light/lightchain.go ++++ b/light/lightchain.go +@@ -71,6 +71,8 @@ type LightChain struct { + wg sync.WaitGroup + + engine consensus.Engine ++ ++ disableCheckFreq bool + } + + // NewLightChain returns a fully initialised light chain using information +@@ -355,6 +357,9 @@ func (self *LightChain) postChainEvents(events []interface{}) { + // In the case of a light chain, InsertHeaderChain also creates and posts light + // chain events when necessary. + func (self *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) { ++ if self.disableCheckFreq { ++ checkFreq = 0 ++ } + start := time.Now() + if i, err := self.hc.ValidateHeaderChain(chain, checkFreq); err != nil { + return i, err +@@ -533,3 +538,17 @@ func (self *LightChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri + func (self *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return self.scope.Track(new(event.Feed).Subscribe(ch)) + } ++ ++//DisableCheckFreq disables header validation. It needs for ULC ++func (self *LightChain) DisableCheckFreq() { ++ self.mu.Lock() ++ defer self.mu.Unlock() ++ self.disableCheckFreq = true ++} ++ ++//EnableCheckFreq enables header validation ++func (self *LightChain) EnableCheckFreq() { ++ self.mu.Lock() ++ defer self.mu.Unlock() ++ self.disableCheckFreq = false ++} +diff --git a/mobile/geth.go b/mobile/geth.go +index e3e2e90..4d674b2 100644 +--- a/mobile/geth.go ++++ b/mobile/geth.go +@@ -76,6 +76,9 @@ type NodeConfig struct { + + // Listening address of pprof server. + PprofAddress string ++ ++ // Ultra Light client options ++ ULC *eth.ULCConfig + } + + // defaultNodeConfig contains the default node configuration values to use if all diff --git a/vendor/github.com/ethereum/go-ethereum/_assets/patches/README.md b/vendor/github.com/ethereum/go-ethereum/_assets/patches/README.md index c642e26f490..2c3e9e6068e 100644 --- a/vendor/github.com/ethereum/go-ethereum/_assets/patches/README.md +++ b/vendor/github.com/ethereum/go-ethereum/_assets/patches/README.md @@ -1,13 +1,22 @@ Status Patches for geth (go-ethereum) ===================================== -status-go uses [go-ethereum](https://github.com/status-im/go-ethereum) as its dependency. As any other Go dependency `go-ethereum` code is vendored and stored in `vendor/` folder. +We keep changes in patches because it gives as a clear picture. In case of merge conflicts, thanks to patches we can easily figure out how the final code should look like. -However, there are a few changes has been made to the upstream, that are specific to Status and should not be merged to the upstream. We keep those changes as a set of patches, that can be applied upon each next release of `go-ethereum`. Patched version of `go-ethereum` is available in vendor folder. +## Syncing with upstream -We try to minimize number and amount of changes in those patches as much as possible, and whereas possible, to contribute changes into the upstream. +When a new geth version is released, we need to merge it to an appropriate branch and apply patches. -# Creating patches +The format of branches looks like this: `patched/1.8`, `patched/1.9`, and so on. + +In order to sync the upstream, follow this instruction: +1. Revert existing patches: `$ _assets/patches/patcher -r`, +1. Merge a new release: `$ git merge upstream/v1.8.16` where `v1.8.16` is a tag with a new release, +1. Apply patches back: `$ _assets/patches/patcher`. + +In the last step, some patches might be invalid. In such a case, they need to be fixed before proceeding. + +## Creating patches Instructions for creating a patch from the command line: @@ -16,23 +25,6 @@ Instructions for creating a patch from the command line: 1. Create a patch `git diff --relative=vendor/github.com/ethereum/go-ethereum > _assets/patches/geth/0000-name-of-the-patch.patch` 1. Commit changes. -# Updating patches - -1. Tweak the patch file. -1. Run `make dep-ensure` to re-apply patches. - -# Removing patches - -1. Remove the patch file -1. Remove the link from [this README] (./README.md) -1. Run `make dep-ensure` to re-apply patches. - -# Updating - -When a new stable release of `go-ethereum` comes out, we need to upgrade our vendored copy. We use `dep` for vendoring, so for upgrading: - -- Change target branch for `go-ethereum` in `Gopkg.toml`. -- `dep ensure -update github.com/ethereum/go-ethereum` -- `make dep-ensure` +## How to fix a patch? -This will ensure that dependency is upgraded and fully patched. Upon success, you can do `make vendor-check` after committing all the changes, in order to ensure that all changes are valid. +TBD diff --git a/vendor/github.com/ethereum/go-ethereum/cmd/geth/config.go b/vendor/github.com/ethereum/go-ethereum/cmd/geth/config.go index b0749d23291..d724562eb9b 100644 --- a/vendor/github.com/ethereum/go-ethereum/cmd/geth/config.go +++ b/vendor/github.com/ethereum/go-ethereum/cmd/geth/config.go @@ -124,6 +124,7 @@ func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) { } // Apply flags. + utils.SetULC(ctx, &cfg.Eth) utils.SetNodeConfig(ctx, &cfg.Node) stack, err := node.New(&cfg.Node) if err != nil { diff --git a/vendor/github.com/ethereum/go-ethereum/cmd/geth/main.go b/vendor/github.com/ethereum/go-ethereum/cmd/geth/main.go index fae4b571819..bdc624d5619 100644 --- a/vendor/github.com/ethereum/go-ethereum/cmd/geth/main.go +++ b/vendor/github.com/ethereum/go-ethereum/cmd/geth/main.go @@ -83,6 +83,10 @@ var ( utils.TxPoolAccountQueueFlag, utils.TxPoolGlobalQueueFlag, utils.TxPoolLifetimeFlag, + utils.ULCModeConfigFlag, + utils.OnlyAnnounceModeFlag, + utils.ULCTrustedNodesFlag, + utils.ULCMinTrustedFractionFlag, utils.SyncModeFlag, utils.GCModeFlag, utils.LightServFlag, diff --git a/vendor/github.com/ethereum/go-ethereum/cmd/utils/flags.go b/vendor/github.com/ethereum/go-ethereum/cmd/utils/flags.go index a2becd08b89..0df8fb89db0 100644 --- a/vendor/github.com/ethereum/go-ethereum/cmd/utils/flags.go +++ b/vendor/github.com/ethereum/go-ethereum/cmd/utils/flags.go @@ -19,6 +19,7 @@ package utils import ( "crypto/ecdsa" + "encoding/json" "fmt" "io/ioutil" "math/big" @@ -157,6 +158,23 @@ var ( Usage: "Document Root for HTTPClient file scheme", Value: DirectoryString{homeDir()}, } + ULCModeConfigFlag = cli.StringFlag{ + Name: "les.ulcconfig", + Usage: "Config file to use for ULC mode", + } + OnlyAnnounceModeFlag = cli.BoolFlag{ + Name: "les.onlyannounce", + Usage: "LES server sends only announce", + } + ULCMinTrustedFractionFlag = cli.IntFlag{ + Name: "les.mintrustedfraction", + Usage: "LES server sends only announce", + } + ULCTrustedNodesFlag = cli.StringFlag{ + Name: "les.trusted", + Usage: "List of trusted nodes", + } + defaultSyncMode = eth.DefaultConfig.SyncMode SyncModeFlag = TextMarshalerFlag{ Name: "syncmode", @@ -816,6 +834,40 @@ func setIPC(ctx *cli.Context, cfg *node.Config) { } } +// SetULC setup ULC config from file if given. +func SetULC(ctx *cli.Context, cfg *eth.Config) { + // ULC config isn't loaded from global config and ULC config and ULC trusted nodes are not defined. + if cfg.ULC == nil && !(ctx.GlobalIsSet(ULCModeConfigFlag.Name) || ctx.GlobalIsSet(ULCTrustedNodesFlag.Name)) { + return + } + cfg.ULC = ð.ULCConfig{} + + path := ctx.GlobalString(ULCModeConfigFlag.Name) + if path != "" { + cfgData, err := ioutil.ReadFile(path) + if err != nil { + Fatalf("Failed to unmarshal ULC configuration: %v", err) + } + + err = json.Unmarshal(cfgData, &cfg.ULC) + if err != nil { + Fatalf("Failed to unmarshal ULC configuration: %s", err.Error()) + } + } + + if trustedNodes := ctx.GlobalString(ULCTrustedNodesFlag.Name); trustedNodes != "" { + cfg.ULC.TrustedServers = strings.Split(trustedNodes, ",") + } + + if trustedFraction := ctx.GlobalInt(ULCMinTrustedFractionFlag.Name); trustedFraction > 0 { + cfg.ULC.MinTrustedFraction = trustedFraction + } + if cfg.ULC.MinTrustedFraction <= 0 && cfg.ULC.MinTrustedFraction > 100 { + log.Error("MinTrustedFraction is invalid", "MinTrustedFraction", cfg.ULC.MinTrustedFraction, "Changed to default", eth.DefaultUTCMinTrustedFraction) + cfg.ULC.MinTrustedFraction = eth.DefaultUTCMinTrustedFraction + } +} + // makeDatabaseHandles raises out the number of allowed file handles per process // for Geth and returns half of the allowance to assign to the database. func makeDatabaseHandles() int { @@ -1140,6 +1192,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { if ctx.GlobalIsSet(LightPeersFlag.Name) { cfg.LightPeers = ctx.GlobalInt(LightPeersFlag.Name) } + if ctx.GlobalIsSet(OnlyAnnounceModeFlag.Name) { + cfg.OnlyAnnounce = ctx.GlobalBool(OnlyAnnounceModeFlag.Name) + } if ctx.GlobalIsSet(NetworkIdFlag.Name) { cfg.NetworkId = ctx.GlobalUint64(NetworkIdFlag.Name) } diff --git a/vendor/github.com/ethereum/go-ethereum/core/headerchain.go b/vendor/github.com/ethereum/go-ethereum/core/headerchain.go index d2093113c0e..8904dd887b6 100644 --- a/vendor/github.com/ethereum/go-ethereum/core/headerchain.go +++ b/vendor/github.com/ethereum/go-ethereum/core/headerchain.go @@ -219,14 +219,18 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) // Generate the list of seal verification requests, and start the parallel verifier seals := make([]bool, len(chain)) - for i := 0; i < len(seals)/checkFreq; i++ { - index := i*checkFreq + hc.rand.Intn(checkFreq) - if index >= len(seals) { - index = len(seals) - 1 + if checkFreq != 0 { + // In case of checkFreq == 0 all seals are left false. + for i := 0; i < len(seals)/checkFreq; i++ { + index := i*checkFreq + hc.rand.Intn(checkFreq) + if index >= len(seals) { + index = len(seals) - 1 + } + seals[index] = true } - seals[index] = true + // Last should always be verified to avoid junk. + seals[len(seals)-1] = true } - seals[len(seals)-1] = true // Last should always be verified to avoid junk abort, results := hc.engine.VerifyHeaders(hc, chain, seals) defer close(abort) diff --git a/vendor/github.com/ethereum/go-ethereum/eth/config.go b/vendor/github.com/ethereum/go-ethereum/eth/config.go index efbaafb6a2d..7d1db9f82b8 100644 --- a/vendor/github.com/ethereum/go-ethereum/eth/config.go +++ b/vendor/github.com/ethereum/go-ethereum/eth/config.go @@ -87,8 +87,12 @@ type Config struct { NoPruning bool // Light client options - LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests - LightPeers int `toml:",omitempty"` // Maximum number of LES client peers + LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests + LightPeers int `toml:",omitempty"` // Maximum number of LES client peers + OnlyAnnounce bool // Maximum number of LES client peers + + // Ultra Light client options + ULC *ULCConfig `toml:",omitempty"` // Database options SkipBcVersionCheck bool `toml:"-"` diff --git a/vendor/github.com/ethereum/go-ethereum/eth/gen_config.go b/vendor/github.com/ethereum/go-ethereum/eth/gen_config.go index d401a917d2f..6423cf54edb 100644 --- a/vendor/github.com/ethereum/go-ethereum/eth/gen_config.go +++ b/vendor/github.com/ethereum/go-ethereum/eth/gen_config.go @@ -23,10 +23,12 @@ func (c Config) MarshalTOML() (interface{}, error) { NetworkId uint64 SyncMode downloader.SyncMode NoPruning bool - LightServ int `toml:",omitempty"` - LightPeers int `toml:",omitempty"` - SkipBcVersionCheck bool `toml:"-"` - DatabaseHandles int `toml:"-"` + LightServ int `toml:",omitempty"` + LightPeers int `toml:",omitempty"` + OnlyAnnounce bool + ULC *ULCConfig `toml:",omitempty"` + SkipBcVersionCheck bool `toml:"-"` + DatabaseHandles int `toml:"-"` DatabaseCache int TrieCache int TrieTimeout time.Duration @@ -79,10 +81,12 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { NetworkId *uint64 SyncMode *downloader.SyncMode NoPruning *bool - LightServ *int `toml:",omitempty"` - LightPeers *int `toml:",omitempty"` - SkipBcVersionCheck *bool `toml:"-"` - DatabaseHandles *int `toml:"-"` + LightServ *int `toml:",omitempty"` + LightPeers *int `toml:",omitempty"` + OnlyAnnounce *bool + ULC *ULCConfig `toml:",omitempty"` + SkipBcVersionCheck *bool `toml:"-"` + DatabaseHandles *int `toml:"-"` DatabaseCache *int TrieCache *int TrieTimeout *time.Duration @@ -122,6 +126,12 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.LightPeers != nil { c.LightPeers = *dec.LightPeers } + if dec.OnlyAnnounce != nil { + c.OnlyAnnounce = *dec.OnlyAnnounce + } + if dec.ULC != nil { + c.ULC = dec.ULC + } if dec.SkipBcVersionCheck != nil { c.SkipBcVersionCheck = *dec.SkipBcVersionCheck } diff --git a/vendor/github.com/ethereum/go-ethereum/eth/ulc_config.go b/vendor/github.com/ethereum/go-ethereum/eth/ulc_config.go new file mode 100644 index 00000000000..960cb0da564 --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/eth/ulc_config.go @@ -0,0 +1,9 @@ +package eth + +const DefaultUTCMinTrustedFraction = 75 + +// ULCConfig is a Ultra Light client options. +type ULCConfig struct { + TrustedServers []string `toml:",omitempty"` // A list of trusted servers + MinTrustedFraction int `toml:",omitempty"` // Minimum percentage of connected trusted servers to validate trusted (1-100) +} diff --git a/vendor/github.com/ethereum/go-ethereum/les/backend.go b/vendor/github.com/ethereum/go-ethereum/les/backend.go index 8e9cca6f632..fd32e59f9f2 100644 --- a/vendor/github.com/ethereum/go-ethereum/les/backend.go +++ b/vendor/github.com/ethereum/go-ethereum/les/backend.go @@ -108,8 +108,12 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), } + var trustedNodes []string + if leth.config.ULC != nil { + trustedNodes = leth.config.ULC.TrustedServers + } leth.relay = NewLesTxRelay(peers, leth.reqDist) - leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg) + leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg, trustedNodes) leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool) leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever) @@ -135,10 +139,32 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { } leth.txPool = light.NewTxPool(leth.chainConfig, leth.blockchain, leth.relay) - if leth.protocolManager, err = NewProtocolManager(leth.chainConfig, light.DefaultClientIndexerConfig, true, config.NetworkId, leth.eventMux, leth.engine, leth.peers, leth.blockchain, nil, chainDb, leth.odr, leth.relay, leth.serverPool, quitSync, &leth.wg); err != nil { + + if leth.protocolManager, err = NewProtocolManager( + leth.chainConfig, + light.DefaultClientIndexerConfig, + true, + config.NetworkId, + leth.eventMux, + leth.engine, + leth.peers, + leth.blockchain, + nil, + chainDb, + leth.odr, + leth.relay, + leth.serverPool, + quitSync, + &leth.wg, + config.ULC); err != nil { return nil, err } + + if leth.protocolManager.isULCEnabled() { + leth.blockchain.DisableCheckFreq() + } leth.ApiBackend = &LesApiBackend{leth, nil} + gpoParams := config.GPO if gpoParams.Default == nil { gpoParams.Default = config.MinerGasPrice diff --git a/vendor/github.com/ethereum/go-ethereum/les/fetcher.go b/vendor/github.com/ethereum/go-ethereum/les/fetcher.go index cc539c42bf5..1291c4f1bda 100644 --- a/vendor/github.com/ethereum/go-ethereum/les/fetcher.go +++ b/vendor/github.com/ethereum/go-ethereum/les/fetcher.go @@ -42,7 +42,7 @@ const ( type lightFetcher struct { pm *ProtocolManager odr *LesOdr - chain *light.LightChain + chain lightChain lock sync.Mutex // lock protects access to the fetcher's internal state variables except sent requests maxConfirmedTd *big.Int @@ -51,11 +51,19 @@ type lightFetcher struct { syncing bool syncDone chan *peer - reqMu sync.RWMutex // reqMu protects access to sent header fetch requests - requested map[uint64]fetchRequest - deliverChn chan fetchResponse - timeoutChn chan uint64 - requestChn chan bool // true if initiated from outside + reqMu sync.RWMutex // reqMu protects access to sent header fetch requests + requested map[uint64]fetchRequest + deliverChn chan fetchResponse + timeoutChn chan uint64 + requestChn chan bool // true if initiated from outside + lastTrustedHeader *types.Header +} + +// lightChain extends the BlockChain interface by locking. +type lightChain interface { + BlockChain + LockChain() + UnlockChain() } // fetcherPeerInfo holds fetcher-specific information about each active peer @@ -143,6 +151,7 @@ func (f *lightFetcher) syncLoop() { rq *distReq reqID uint64 ) + if !f.syncing && !(newAnnounce && s) { rq, reqID = f.nextRequest() } @@ -205,8 +214,11 @@ func (f *lightFetcher) syncLoop() { case p := <-f.syncDone: f.lock.Lock() p.Log().Debug("Done synchronising with peer") - f.checkSyncedHeaders(p) + res, h, td := f.checkSyncedHeaders(p) f.syncing = false + if res { + f.newHeaders(h, []*big.Int{td}) + } f.lock.Unlock() } } @@ -222,7 +234,6 @@ func (f *lightFetcher) registerPeer(p *peer) { f.lock.Lock() defer f.lock.Unlock() - f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)} } @@ -275,8 +286,10 @@ func (f *lightFetcher) announce(p *peer, head *announceData) { fp.nodeCnt = 0 fp.nodeByHash = make(map[common.Hash]*fetcherTreeNode) } + // check if the node count is too high to add new nodes, discard oldest ones if necessary if n != nil { - // check if the node count is too high to add new nodes, discard oldest ones if necessary + // n is now the reorg common ancestor, add a new branch of nodes + // check if the node count is too high to add new nodes locked := false for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil { if !locked { @@ -320,6 +333,7 @@ func (f *lightFetcher) announce(p *peer, head *announceData) { fp.nodeByHash[n.hash] = n } } + if n == nil { // could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed if fp.root != nil { @@ -400,25 +414,13 @@ func (f *lightFetcher) requestedID(reqID uint64) bool { // to be downloaded starting from the head backwards is also returned func (f *lightFetcher) nextRequest() (*distReq, uint64) { var ( - bestHash common.Hash - bestAmount uint64 + bestHash common.Hash + bestAmount uint64 + bestTd *big.Int + bestSyncing bool ) - bestTd := f.maxConfirmedTd - bestSyncing := false + bestHash, bestAmount, bestTd, bestSyncing = f.findBestValues() - for p, fp := range f.peers { - for hash, n := range fp.nodeByHash { - if !f.checkKnownNode(p, n) && !n.requested && (bestTd == nil || n.td.Cmp(bestTd) >= 0) { - amount := f.requestAmount(p, n) - if bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount { - bestHash = hash - bestAmount = amount - bestTd = n.td - bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) - } - } - } - } if bestTd == f.maxConfirmedTd { return nil, 0 } @@ -428,72 +430,131 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64) { var rq *distReq reqID := genReqID() if f.syncing { - rq = &distReq{ - getCost: func(dp distPeer) uint64 { - return 0 - }, - canSend: func(dp distPeer) bool { - p := dp.(*peer) - f.lock.Lock() - defer f.lock.Unlock() - - fp := f.peers[p] - return fp != nil && fp.nodeByHash[bestHash] != nil - }, - request: func(dp distPeer) func() { - go func() { - p := dp.(*peer) - p.Log().Debug("Synchronisation started") - f.pm.synchronise(p) - f.syncDone <- p - }() - return nil - }, - } + rq = f.newFetcherDistReqForSync(bestHash) } else { - rq = &distReq{ - getCost: func(dp distPeer) uint64 { - p := dp.(*peer) - return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) - }, - canSend: func(dp distPeer) bool { + rq = f.newFetcherDistReq(bestHash, reqID, bestAmount) + } + return rq, reqID +} + +// findBestValues retrieves the best values for LES or ULC mode. +func (f *lightFetcher) findBestValues() (bestHash common.Hash, bestAmount uint64, bestTd *big.Int, bestSyncing bool) { + bestTd = f.maxConfirmedTd + bestSyncing = false + + for p, fp := range f.peers { + for hash, n := range fp.nodeByHash { + if f.checkKnownNode(p, n) || n.requested { + continue + } + + //if ulc mode is disabled, isTrustedHash returns true + amount := f.requestAmount(p, n) + if (bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount) && f.isTrustedHash(hash) { + bestHash = hash + bestTd = n.td + bestAmount = amount + bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) + } + } + } + return +} + +// isTrustedHash checks if the block can be trusted by the minimum trusted fraction. +func (f *lightFetcher) isTrustedHash(hash common.Hash) bool { + if !f.pm.isULCEnabled() { + return true + } + + var numAgreed int + for p, fp := range f.peers { + if !p.isTrusted { + continue + } + if _, ok := fp.nodeByHash[hash]; !ok { + continue + } + + numAgreed++ + } + + return 100*numAgreed/len(f.pm.ulc.trustedKeys) >= f.pm.ulc.minTrustedFraction +} + +func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq { + return &distReq{ + getCost: func(dp distPeer) uint64 { + return 0 + }, + canSend: func(dp distPeer) bool { + p := dp.(*peer) + if p.isOnlyAnnounce { + return false + } + + fp := f.peers[p] + return fp != nil && fp.nodeByHash[bestHash] != nil + }, + request: func(dp distPeer) func() { + if f.pm.isULCEnabled() { + //keep last trusted header before sync + f.setLastTrustedHeader(f.chain.CurrentHeader()) + } + go func() { p := dp.(*peer) - f.lock.Lock() - defer f.lock.Unlock() + p.Log().Debug("Synchronisation started") + f.pm.synchronise(p) + f.syncDone <- p + }() + return nil + }, + } +} - fp := f.peers[p] - if fp == nil { - return false - } +// newFetcherDistReq creates a new request for the distributor. +func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bestAmount uint64) *distReq { + return &distReq{ + getCost: func(dp distPeer) uint64 { + p := dp.(*peer) + return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) + }, + canSend: func(dp distPeer) bool { + p := dp.(*peer) + if p.isOnlyAnnounce { + return false + } + + fp := f.peers[p] + if fp == nil { + return false + } + n := fp.nodeByHash[bestHash] + return n != nil && !n.requested + }, + request: func(dp distPeer) func() { + p := dp.(*peer) + + fp := f.peers[p] + if fp != nil { n := fp.nodeByHash[bestHash] - return n != nil && !n.requested - }, - request: func(dp distPeer) func() { - p := dp.(*peer) - f.lock.Lock() - fp := f.peers[p] - if fp != nil { - n := fp.nodeByHash[bestHash] - if n != nil { - n.requested = true - } + if n != nil { + n.requested = true } - f.lock.Unlock() - - cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) - p.fcServer.QueueRequest(reqID, cost) - f.reqMu.Lock() - f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()} - f.reqMu.Unlock() - go func() { - time.Sleep(hardRequestTimeout) - f.timeoutChn <- reqID - }() - return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) } - }, - } + } + + cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) + p.fcServer.QueueRequest(reqID, cost) + f.reqMu.Lock() + f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()} + f.reqMu.Unlock() + go func() { + time.Sleep(hardRequestTimeout) + f.timeoutChn <- reqID + }() + return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) } + }, } - return rq, reqID } // deliverHeaders delivers header download request responses for processing @@ -511,6 +572,7 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo for i, header := range resp.headers { headers[int(req.amount)-1-i] = header } + if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil { if err == consensus.ErrFutureBlock { return true @@ -535,6 +597,7 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo // downloaded and validated batch or headers func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) { var maxTd *big.Int + for p, fp := range f.peers { if !f.checkAnnouncedHeaders(fp, headers, tds) { p.Log().Debug("Inconsistent announcement") @@ -544,6 +607,7 @@ func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) { maxTd = fp.confirmedTd } } + if maxTd != nil { f.updateMaxConfirmedTd(maxTd) } @@ -625,28 +689,111 @@ func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*typ // checkSyncedHeaders updates peer's block tree after synchronisation by marking // downloaded headers as known. If none of the announced headers are found after // syncing, the peer is dropped. -func (f *lightFetcher) checkSyncedHeaders(p *peer) { +func (f *lightFetcher) checkSyncedHeaders(p *peer) (bool, []*types.Header, *big.Int) { fp := f.peers[p] if fp == nil { p.Log().Debug("Unknown peer to check sync headers") - return + return false, nil, nil } + + var h *types.Header + if f.pm.isULCEnabled() { + var unapprovedHashes []common.Hash + // Overwrite last announced for ULC mode + h, unapprovedHashes = f.lastTrustedTreeNode(p) + //rollback untrusted blocks + f.chain.Rollback(unapprovedHashes) + } + n := fp.lastAnnounced var td *big.Int + trustedHeaderExisted := false + + //find last trusted block for n != nil { - if td = f.chain.GetTd(n.hash, n.number); td != nil { + //we found last trusted header + if n.hash == h.Hash() { + trustedHeaderExisted = true + } + if td = f.chain.GetTd(n.hash, n.number); td != nil && trustedHeaderExisted { + break + } + + //break if we found last trusted hash before sync + if f.lastTrustedHeader == nil { + break + } + if n.hash == f.lastTrustedHeader.Hash() { break } n = n.parent } - // now n is the latest downloaded header after syncing - if n == nil { + + // Now n is the latest downloaded/approved header after syncing + if n == nil && !p.isTrusted { p.Log().Debug("Synchronisation failed") go f.pm.removePeer(p.id) - } else { - header := f.chain.GetHeader(n.hash, n.number) - f.newHeaders([]*types.Header{header}, []*big.Int{td}) + return false, nil, nil } + header := f.chain.GetHeader(n.hash, n.number) + return true, []*types.Header{header}, td +} + +// lastTrustedTreeNode return last approved treeNode and a list of unapproved hashes +func (f *lightFetcher) lastTrustedTreeNode(p *peer) (*types.Header, []common.Hash) { + unapprovedHashes := make([]common.Hash, 0) + current := f.chain.CurrentHeader() + + if f.lastTrustedHeader == nil { + return current, unapprovedHashes + } + + canonical := f.chain.CurrentHeader() + if canonical.Number.Uint64() > f.lastTrustedHeader.Number.Uint64() { + canonical = f.chain.GetHeaderByNumber(f.lastTrustedHeader.Number.Uint64()) + } + commonAncestor := rawdb.FindCommonAncestor(f.pm.chainDb, canonical, f.lastTrustedHeader) + if commonAncestor == nil { + log.Error("Common ancestor of last trusted header and canonical header is nil", "canonical hash", canonical.Hash(), "trusted hash", f.lastTrustedHeader.Hash()) + return current, unapprovedHashes + } + + for !f.isStopValidationTree(current, commonAncestor) { + if f.isTrustedHash(current.Hash()) { + break + } + unapprovedHashes = append(unapprovedHashes, current.Hash()) + current = f.chain.GetHeader(current.ParentHash, current.Number.Uint64()-1) + } + return current, unapprovedHashes +} + +//isStopValidationTree found when we should stop on finding last trusted header +func (f *lightFetcher) isStopValidationTree(current *types.Header, commonAncestor *types.Header) bool { + if current == nil { + return true + } + + currentHash := current.Hash() + ancestorHash := commonAncestor.Hash() + + //found lastTrustedHeader + if currentHash == f.lastTrustedHeader.Hash() { + return true + } + + //found common ancestor between lastTrustedHeader and + if current.Hash() == ancestorHash { + return true + } + + return false +} + +func (f *lightFetcher) setLastTrustedHeader(h *types.Header) { + f.lock.Lock() + defer f.lock.Unlock() + f.lastTrustedHeader = h } // checkKnownNode checks if a block tree node is known (downloaded and validated) @@ -738,6 +885,7 @@ func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) { if f.lastUpdateStats != nil { f.lastUpdateStats.next = newEntry } + f.lastUpdateStats = newEntry for p := range f.peers { f.checkUpdateStats(p, newEntry) @@ -760,6 +908,7 @@ func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) { p.Log().Debug("Unknown peer to check update stats") return } + if newEntry != nil && fp.firstUpdateStats == nil { fp.firstUpdateStats = newEntry } diff --git a/vendor/github.com/ethereum/go-ethereum/les/handler.go b/vendor/github.com/ethereum/go-ethereum/les/handler.go index e884afbb6d8..ea6455a385f 100644 --- a/vendor/github.com/ethereum/go-ethereum/les/handler.go +++ b/vendor/github.com/ethereum/go-ethereum/les/handler.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -119,12 +120,29 @@ type ProtocolManager struct { // wait group is used for graceful shutdowns during downloading // and processing - wg *sync.WaitGroup + wg *sync.WaitGroup + ulc *ulc } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) { +func NewProtocolManager( + chainConfig *params.ChainConfig, + indexerConfig *light.IndexerConfig, + lightSync bool, + networkId uint64, + mux *event.TypeMux, + engine consensus.Engine, + peers *peerSet, + blockchain BlockChain, + txpool txPool, + chainDb ethdb.Database, + odr *LesOdr, + txrelay *LesTxRelay, + serverPool *serverPool, + quitSync chan struct{}, + wg *sync.WaitGroup, + ulcConfig *eth.ULCConfig) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ lightSync: lightSync, @@ -149,6 +167,10 @@ func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.In manager.reqDist = odr.retriever.dist } + if ulcConfig != nil { + manager.ulc = newULC(ulcConfig) + } + removePeer := manager.removePeer if disableClientRemovePeer { removePeer = func(id string) {} @@ -238,7 +260,11 @@ func (pm *ProtocolManager) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWrit } func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - return newPeer(pv, nv, p, newMeteredMsgWriter(rw)) + var isTrusted bool + if pm.isULCEnabled() { + isTrusted = pm.ulc.isTrusted(p.ID()) + } + return newPeer(pv, nv, isTrusted, p, newMeteredMsgWriter(rw)) } // handle is the callback invoked to manage the life cycle of a les peer. When @@ -280,6 +306,7 @@ func (pm *ProtocolManager) handle(p *peer) error { if rw, ok := p.rw.(*meteredMsgReadWriter); ok { rw.Init(p.version) } + // Register the peer locally if err := pm.peers.Register(p); err != nil { p.Log().Error("Light Ethereum peer registration failed", "err", err) @@ -291,6 +318,7 @@ func (pm *ProtocolManager) handle(p *peer) error { } pm.removePeer(p.id) }() + // Register the peer in the downloader. If the downloader considers it banned, we disconnect if pm.lightSync { p.lock.Lock() @@ -375,7 +403,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Block header query, collect the requested headers and reply case AnnounceMsg: p.Log().Trace("Received announce message") - if p.requestAnnounceType == announceTypeNone { + if p.announceType == announceTypeNone { return errResp(ErrUnexpectedResponse, "") } @@ -384,7 +412,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "%v: %v", msg, err) } - if p.requestAnnounceType == announceTypeSigned { + if p.announceType == announceTypeSigned { if err := req.checkSignature(p.pubKey); err != nil { p.Log().Trace("Invalid announcement signature", "err", err) return err @@ -1179,6 +1207,14 @@ func (pm *ProtocolManager) txStatus(hashes []common.Hash) []txStatus { return stats } +// isULCEnabled returns true if we can use ULC +func (pm *ProtocolManager) isULCEnabled() bool { + if pm.ulc == nil || len(pm.ulc.trustedKeys) == 0 { + return false + } + return true +} + // downloaderPeerNotify implements peerSetNotify type downloaderPeerNotify ProtocolManager @@ -1223,7 +1259,8 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip return peer.GetRequestCost(GetBlockHeadersMsg, amount) }, canSend: func(dp distPeer) bool { - return dp.(*peer) == pc.peer + p := dp.(*peer) + return p == pc.peer }, request: func(dp distPeer) func() { peer := dp.(*peer) @@ -1250,5 +1287,7 @@ func (d *downloaderPeerNotify) registerPeer(p *peer) { func (d *downloaderPeerNotify) unregisterPeer(p *peer) { pm := (*ProtocolManager)(d) - pm.downloader.UnregisterPeer(p.id) + if pm.ulc == nil || p.isTrusted { + pm.downloader.UnregisterPeer(p.id) + } } diff --git a/vendor/github.com/ethereum/go-ethereum/les/odr.go b/vendor/github.com/ethereum/go-ethereum/les/odr.go index 9def05a6760..f7592354de6 100644 --- a/vendor/github.com/ethereum/go-ethereum/les/odr.go +++ b/vendor/github.com/ethereum/go-ethereum/les/odr.go @@ -109,7 +109,10 @@ func (odr *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err erro }, canSend: func(dp distPeer) bool { p := dp.(*peer) - return lreq.CanSend(p) + if !p.isOnlyAnnounce { + return lreq.CanSend(p) + } + return false }, request: func(dp distPeer) func() { p := dp.(*peer) diff --git a/vendor/github.com/ethereum/go-ethereum/les/peer.go b/vendor/github.com/ethereum/go-ethereum/les/peer.go index 70c863c2ff1..c7fa59c697f 100644 --- a/vendor/github.com/ethereum/go-ethereum/les/peer.go +++ b/vendor/github.com/ethereum/go-ethereum/les/peer.go @@ -58,7 +58,7 @@ type peer struct { version int // Protocol version negotiated network uint64 // Network ID being on - announceType, requestAnnounceType uint64 + announceType uint64 id string @@ -76,9 +76,12 @@ type peer struct { fcServer *flowcontrol.ServerNode // nil if the peer is client only fcServerParams *flowcontrol.ServerParams fcCosts requestCostTable + + isTrusted bool + isOnlyAnnounce bool } -func newPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { +func newPeer(version int, network uint64, isTrusted bool, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { id := p.ID() pubKey, _ := id.Pubkey() @@ -90,6 +93,7 @@ func newPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *pe network: network, id: fmt.Sprintf("%x", id[:8]), announceChn: make(chan announceData, 20), + isTrusted: isTrusted, } } @@ -405,23 +409,32 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis send = send.add("headNum", headNum) send = send.add("genesisHash", genesis) if server != nil { - send = send.add("serveHeaders", nil) - send = send.add("serveChainSince", uint64(0)) - send = send.add("serveStateSince", uint64(0)) - send = send.add("txRelay", nil) + if !server.onlyAnnounce { + //only announce server. It sends only announse requests + send = send.add("serveHeaders", nil) + send = send.add("serveChainSince", uint64(0)) + send = send.add("serveStateSince", uint64(0)) + send = send.add("txRelay", nil) + } send = send.add("flowControl/BL", server.defParams.BufLimit) send = send.add("flowControl/MRR", server.defParams.MinRecharge) list := server.fcCostStats.getCurrentList() send = send.add("flowControl/MRC", list) p.fcCosts = list.decode() } else { - p.requestAnnounceType = announceTypeSimple // set to default until "very light" client mode is implemented - send = send.add("announceType", p.requestAnnounceType) + //on client node + p.announceType = announceTypeSimple + if p.isTrusted { + p.announceType = announceTypeSigned + } + send = send.add("announceType", p.announceType) } + recvList, err := p.sendReceiveHandshake(send) if err != nil { return err } + recv := recvList.decode() var rGenesis, rHash common.Hash @@ -456,25 +469,33 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis if int(rVersion) != p.version { return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", rVersion, p.version) } + if server != nil { // until we have a proper peer connectivity API, allow LES connection to other servers /*if recv.get("serveStateSince", nil) == nil { return errResp(ErrUselessPeer, "wanted client, got server") }*/ if recv.get("announceType", &p.announceType) != nil { + //set default announceType on server side p.announceType = announceTypeSimple } p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams) } else { + //mark OnlyAnnounce server if "serveHeaders", "serveChainSince", "serveStateSince" or "txRelay" fields don't exist if recv.get("serveChainSince", nil) != nil { - return errResp(ErrUselessPeer, "peer cannot serve chain") + p.isOnlyAnnounce = true } if recv.get("serveStateSince", nil) != nil { - return errResp(ErrUselessPeer, "peer cannot serve state") + p.isOnlyAnnounce = true } if recv.get("txRelay", nil) != nil { - return errResp(ErrUselessPeer, "peer cannot relay transactions") + p.isOnlyAnnounce = true } + + if p.isOnlyAnnounce && !p.isTrusted { + return errResp(ErrUselessPeer, "peer cannot serve requests") + } + params := &flowcontrol.ServerParams{} if err := recv.get("flowControl/BL", ¶ms.BufLimit); err != nil { return err @@ -490,7 +511,6 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis p.fcServer = flowcontrol.NewServerNode(params) p.fcCosts = MRC.decode() } - p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum} return nil } @@ -580,8 +600,10 @@ func (ps *peerSet) Unregister(id string) error { for _, n := range peers { n.unregisterPeer(p) } + p.sendQueue.quit() p.Peer.Disconnect(p2p.DiscUselessPeer) + return nil } } diff --git a/vendor/github.com/ethereum/go-ethereum/les/server.go b/vendor/github.com/ethereum/go-ethereum/les/server.go index 2fa0456d695..2ded3c184b4 100644 --- a/vendor/github.com/ethereum/go-ethereum/les/server.go +++ b/vendor/github.com/ethereum/go-ethereum/les/server.go @@ -41,17 +41,34 @@ import ( type LesServer struct { lesCommons - fcManager *flowcontrol.ClientManager // nil if our node is client only - fcCostStats *requestCostStats - defParams *flowcontrol.ServerParams - lesTopics []discv5.Topic - privateKey *ecdsa.PrivateKey - quitSync chan struct{} + fcManager *flowcontrol.ClientManager // nil if our node is client only + fcCostStats *requestCostStats + defParams *flowcontrol.ServerParams + lesTopics []discv5.Topic + privateKey *ecdsa.PrivateKey + quitSync chan struct{} + onlyAnnounce bool } func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { quitSync := make(chan struct{}) - pm, err := NewProtocolManager(eth.BlockChain().Config(), light.DefaultServerIndexerConfig, false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup)) + pm, err := NewProtocolManager( + eth.BlockChain().Config(), + light.DefaultServerIndexerConfig, + false, + config.NetworkId, + eth.EventMux(), + eth.Engine(), + newPeerSet(), + eth.BlockChain(), + eth.TxPool(), + eth.ChainDb(), + nil, + nil, + nil, + quitSync, + new(sync.WaitGroup), + config.ULC) if err != nil { return nil, err } @@ -70,8 +87,9 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency), protocolManager: pm, }, - quitSync: quitSync, - lesTopics: lesTopics, + quitSync: quitSync, + lesTopics: lesTopics, + onlyAnnounce: config.OnlyAnnounce, } logger := log.New() @@ -289,10 +307,8 @@ func (s *requestCostStats) getCurrentList() RequestCostList { defer s.lock.Unlock() list := make(RequestCostList, len(reqList)) - //fmt.Println("RequestCostList") for idx, code := range reqList { b, m := s.stats[code].calc() - //fmt.Println(code, s.stats[code].cnt, b/1000000, m/1000000) if m < 0 { b += m m = 0 diff --git a/vendor/github.com/ethereum/go-ethereum/les/serverpool.go b/vendor/github.com/ethereum/go-ethereum/les/serverpool.go index 1a4c7522937..3a2dcea740a 100644 --- a/vendor/github.com/ethereum/go-ethereum/les/serverpool.go +++ b/vendor/github.com/ethereum/go-ethereum/les/serverpool.go @@ -125,22 +125,22 @@ type serverPool struct { discNodes chan *discv5.Node discLookups chan bool + trustedNodes []string entries map[discover.NodeID]*poolEntry timeout, enableRetry chan *poolEntry adjustStats chan poolStatAdjust - connCh chan *connReq - disconnCh chan *disconnReq - registerCh chan *registerReq - - knownQueue, newQueue poolEntryQueue - knownSelect, newSelect *weightedRandomSelect - knownSelected, newSelected int - fastDiscover bool + knownQueue, newQueue, trustedQueue poolEntryQueue + knownSelect, newSelect *weightedRandomSelect + knownSelected, newSelected int + fastDiscover bool + connCh chan *connReq + disconnCh chan *disconnReq + registerCh chan *registerReq } // newServerPool creates a new serverPool instance -func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *serverPool { +func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup, trustedNodes []string) *serverPool { pool := &serverPool{ db: db, quit: quit, @@ -155,7 +155,10 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s knownSelect: newWeightedRandomSelect(), newSelect: newWeightedRandomSelect(), fastDiscover: true, + trustedNodes: trustedNodes, } + + pool.trustedQueue = newPoolEntryQueue(maxKnownEntries, nil) pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry) pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry) return pool @@ -431,7 +434,7 @@ func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16 } addr.lastSeen = now entry.addrSelect.update(addr) - if !entry.known { + if !entry.known || !entry.trusted { pool.newQueue.setLatest(entry) } return entry @@ -459,6 +462,30 @@ func (pool *serverPool) loadNodes() { pool.knownQueue.setLatest(e) pool.knownSelect.update((*knownEntry)(e)) } + + for _, trusted := range pool.parseTrustedServers() { + e := pool.findOrNewNode(trusted.ID, trusted.IP, trusted.TCP) + e.trusted = true + e.dialed = &poolEntryAddress{ip: trusted.IP, port: trusted.TCP} + pool.entries[e.id] = e + pool.trustedQueue.setLatest(e) + } + +} + +// parseTrustedServers returns valid and parsed by discovery enodes. +func (pool *serverPool) parseTrustedServers() []*discover.Node { + nodes := make([]*discover.Node, 0, len(pool.trustedNodes)) + + for _, enode := range pool.trustedNodes { + node, err := discover.ParseNode(enode) + if err != nil { + log.Warn("Trusted node URL invalid", "enode", enode, "err", err) + continue + } + nodes = append(nodes, node) + } + return nodes } // saveNodes saves known nodes and their statistics into the database. Nodes are @@ -516,6 +543,10 @@ func (pool *serverPool) updateCheckDial(entry *poolEntry) { // checkDial checks if new dials can/should be made. It tries to select servers both // based on good statistics and recent discovery. func (pool *serverPool) checkDial() { + for _, e := range pool.trustedQueue.queue { + pool.dial(e, false) + } + fillWithKnownSelects := !pool.fastDiscover for pool.knownSelected < targetKnownSelect { entry := pool.knownSelect.choose() @@ -552,17 +583,26 @@ func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) { return } entry.state = psDialed - entry.knownSelected = knownSelected - if knownSelected { - pool.knownSelected++ - } else { - pool.newSelected++ + + if !entry.trusted { + entry.knownSelected = knownSelected + if knownSelected { + pool.knownSelected++ + } else { + pool.newSelected++ + } + addr := entry.addrSelect.choose().(*poolEntryAddress) + entry.dialed = addr } - addr := entry.addrSelect.choose().(*poolEntryAddress) - log.Debug("Dialing new peer", "lesaddr", entry.id.String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected) - entry.dialed = addr + + state := "known" + if entry.trusted { + state = "trusted" + } + log.Debug("Dialing new peer", "lesaddr", entry.id.String()+"@"+entry.dialed.strKey(), "set", len(entry.addr), state, knownSelected) + go func() { - pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port)) + pool.server.AddPeer(discover.NewNode(entry.id, entry.dialed.ip, entry.dialed.port, entry.dialed.port)) select { case <-pool.quit: case <-time.After(dialTimeout): @@ -609,6 +649,7 @@ type poolEntry struct { lastDiscovered mclock.AbsTime known, knownSelected bool + trusted bool connectStats, delayStats poolStats responseStats, timeoutStats poolStats state int @@ -804,7 +845,7 @@ func (q *poolEntryQueue) setLatest(entry *poolEntry) { if q.queue[entry.queueIdx] == entry { delete(q.queue, entry.queueIdx) } else { - if len(q.queue) == q.maxCnt { + if len(q.queue) == q.maxCnt && q.removeFromPool != nil { e := q.fetchOldest() q.remove(e) q.removeFromPool(e) diff --git a/vendor/github.com/ethereum/go-ethereum/les/sync.go b/vendor/github.com/ethereum/go-ethereum/les/sync.go index eb155377cc3..1ac64558520 100644 --- a/vendor/github.com/ethereum/go-ethereum/les/sync.go +++ b/vendor/github.com/ethereum/go-ethereum/les/sync.go @@ -31,6 +31,7 @@ func (pm *ProtocolManager) syncer() { // Start and ensure cleanup of sync mechanisms //pm.fetcher.Start() //defer pm.fetcher.Stop() + defer pm.downloader.Terminate() // Wait for different events to fire synchronisation operations //forceSync := time.Tick(forceSyncCycle) diff --git a/vendor/github.com/ethereum/go-ethereum/les/txrelay.go b/vendor/github.com/ethereum/go-ethereum/les/txrelay.go index 7a02cc837e6..38fc66af9dc 100644 --- a/vendor/github.com/ethereum/go-ethereum/les/txrelay.go +++ b/vendor/github.com/ethereum/go-ethereum/les/txrelay.go @@ -121,7 +121,10 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) { return peer.GetRequestCost(SendTxMsg, len(ll)) }, canSend: func(dp distPeer) bool { - return dp.(*peer) == pp + if !dp.(*peer).isOnlyAnnounce { + return dp.(*peer) == pp + } + return false }, request: func(dp distPeer) func() { peer := dp.(*peer) diff --git a/vendor/github.com/ethereum/go-ethereum/les/ulc.go b/vendor/github.com/ethereum/go-ethereum/les/ulc.go new file mode 100644 index 00000000000..c96181f8f00 --- /dev/null +++ b/vendor/github.com/ethereum/go-ethereum/les/ulc.go @@ -0,0 +1,36 @@ +package les + +import ( + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +type ulc struct { + trustedKeys map[string]struct{} + minTrustedFraction int +} + +func newULC(ulcConfig *eth.ULCConfig) *ulc { + if ulcConfig == nil { + return nil + } + + m := make(map[string]struct{}, len(ulcConfig.TrustedServers)) + for _, id := range ulcConfig.TrustedServers { + node, err := discover.ParseNode(id) + if err != nil { + continue + } + m[node.ID.String()] = struct{}{} + } + + return &ulc{m, ulcConfig.MinTrustedFraction} +} + +func (u *ulc) isTrusted(p discover.NodeID) bool { + if u.trustedKeys == nil { + return false + } + _, ok := u.trustedKeys[p.String()] + return ok +} diff --git a/vendor/github.com/ethereum/go-ethereum/light/lightchain.go b/vendor/github.com/ethereum/go-ethereum/light/lightchain.go index 8e2734c2d94..0b5571bb5bf 100644 --- a/vendor/github.com/ethereum/go-ethereum/light/lightchain.go +++ b/vendor/github.com/ethereum/go-ethereum/light/lightchain.go @@ -71,6 +71,8 @@ type LightChain struct { wg sync.WaitGroup engine consensus.Engine + + disableCheckFreq bool } // NewLightChain returns a fully initialised light chain using information @@ -355,6 +357,9 @@ func (self *LightChain) postChainEvents(events []interface{}) { // In the case of a light chain, InsertHeaderChain also creates and posts light // chain events when necessary. func (self *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) { + if self.disableCheckFreq { + checkFreq = 0 + } start := time.Now() if i, err := self.hc.ValidateHeaderChain(chain, checkFreq); err != nil { return i, err @@ -533,3 +538,17 @@ func (self *LightChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri func (self *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { return self.scope.Track(new(event.Feed).Subscribe(ch)) } + +//DisableCheckFreq disables header validation. It needs for ULC +func (self *LightChain) DisableCheckFreq() { + self.mu.Lock() + defer self.mu.Unlock() + self.disableCheckFreq = true +} + +//EnableCheckFreq enables header validation +func (self *LightChain) EnableCheckFreq() { + self.mu.Lock() + defer self.mu.Unlock() + self.disableCheckFreq = false +} diff --git a/vendor/github.com/ethereum/go-ethereum/mobile/geth.go b/vendor/github.com/ethereum/go-ethereum/mobile/geth.go index e3e2e905da3..4d674b26726 100644 --- a/vendor/github.com/ethereum/go-ethereum/mobile/geth.go +++ b/vendor/github.com/ethereum/go-ethereum/mobile/geth.go @@ -76,6 +76,9 @@ type NodeConfig struct { // Listening address of pprof server. PprofAddress string + + // Ultra Light client options + ULC *eth.ULCConfig } // defaultNodeConfig contains the default node configuration values to use if all