diff --git a/go.mod b/go.mod index d744751e2..f0c63e051 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.2 github.com/hashicorp/golang-lru/v2 v2.0.1 github.com/herumi/bls-go-binary v1.31.0 - github.com/hitenjain14/fasthttp v0.0.0-20240131211206-9180af2dd7e0 + github.com/hitenjain14/fasthttp v0.0.0-20240201092245-8e4835c0e974 github.com/influxdata/influxdb v1.8.3 github.com/klauspost/reedsolomon v1.11.8 github.com/lithammer/shortuuid/v3 v3.0.7 diff --git a/go.sum b/go.sum index 230cc6834..81c21f03e 100644 --- a/go.sum +++ b/go.sum @@ -299,8 +299,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/herumi/bls-go-binary v1.31.0 h1:L1goQ2tMtGgpXCg5AwHAdJQpLs/pfnWWEc3Wog6OhmI= github.com/herumi/bls-go-binary v1.31.0/go.mod h1:O4Vp1AfR4raRGwFeQpr9X/PQtncEicMoOe6BQt1oX0Y= -github.com/hitenjain14/fasthttp v0.0.0-20240131211206-9180af2dd7e0 h1:wOau/5lP3w3DlozOYbDp6mbSSjcXaEGkWPlgUuEPvY0= -github.com/hitenjain14/fasthttp v0.0.0-20240131211206-9180af2dd7e0/go.mod h1:RZMcXy7u4S+E97IXYTe7WHZ3+mCYOh4vys8PkIGZeXk= +github.com/hitenjain14/fasthttp v0.0.0-20240201092245-8e4835c0e974 h1:oEjH9SSKBlzwDyYjzZaqRpxo7GlfUJCyRoOk7QHKSDs= +github.com/hitenjain14/fasthttp v0.0.0-20240201092245-8e4835c0e974/go.mod h1:RZMcXy7u4S+E97IXYTe7WHZ3+mCYOh4vys8PkIGZeXk= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c h1:DZfsyhDK1hnSS5lH8l+JggqzEleHteTYfutAiVlSUM8= diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 649d554cf..c2a4c0f46 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -309,6 +309,9 @@ func (a *Allocation) InitAllocation() { a.downloadRequests = make([]*DownloadRequest, 0, 100) a.mutex = &sync.Mutex{} a.fullconsensus, a.consensusThreshold = a.getConsensuses() + for _, blobber := range a.Blobbers { + zboxutil.SetHostClient(blobber.ID, blobber.Baseurl) + } a.startWorker(a.ctx) InitCommitWorker(a.Blobbers) InitBlockDownloader(a.Blobbers, downloadWorkerCount) diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index f0394b24b..81b3f2375 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -75,13 +75,14 @@ func InitBlockDownloader(blobbers []*blockchain.StorageNode, workerCount int) { for _, blobber := range blobbers { if _, ok := downloadBlockChan[blobber.ID]; !ok { downloadBlockChan[blobber.ID] = make(chan *BlockDownloadRequest, workerCount) - go startBlockDownloadWorker(downloadBlockChan[blobber.ID], workerCount) + go startBlockDownloadWorker(downloadBlockChan[blobber.ID], workerCount, blobber.ID) } } } -func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest, workers int) { +func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest, workers int, id string) { sem := semaphore.NewWeighted(int64(workers)) + hostClient := zboxutil.GetHostClient(id) for { blockDownloadReq, open := <-blobberChan if !open { @@ -92,7 +93,7 @@ func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest, workers in continue } go func() { - blockDownloadReq.downloadBlobberBlock() + blockDownloadReq.downloadBlobberBlock(hostClient) sem.Release(1) }() } @@ -111,7 +112,7 @@ func (req *BlockDownloadRequest) splitData(buf []byte, lim int) [][]byte { return chunks } -func (req *BlockDownloadRequest) downloadBlobberBlock() { +func (req *BlockDownloadRequest) downloadBlobberBlock(hostClient *fasthttp.HostClient) { if req.numBlocks <= 0 { req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: errors.New("invalid_request", "Invalid number of blocks for download")} return @@ -150,7 +151,7 @@ func (req *BlockDownloadRequest) downloadBlobberBlock() { header.ToFastHeader(httpreq) err = func() error { - statuscode, respBuf, err := zboxutil.FastHttpClient.GetWithRequestTimeout(httpreq, req.respBuf, 30*time.Second) + statuscode, respBuf, err := hostClient.GetWithRequestTimeout(httpreq, req.respBuf, 30*time.Second) fasthttp.ReleaseRequest(httpreq) if err != nil { zlogger.Logger.Error("Error downloading block: ", err) diff --git a/zboxcore/zboxutil/http.go b/zboxcore/zboxutil/http.go index 77071175c..2d347a65a 100644 --- a/zboxcore/zboxutil/http.go +++ b/zboxcore/zboxutil/http.go @@ -39,11 +39,13 @@ type HttpClient interface { Do(req *http.Request) (*http.Response, error) } -var Client HttpClient - -var FastHttpClient *fasthttp.Client - -var log logger.Logger +var ( + Client HttpClient + FastHttpClient *fasthttp.Client + HostClientMap = make(map[string]*fasthttp.HostClient) + hostLock sync.RWMutex + log logger.Logger +) func GetLogger() *logger.Logger { return &log @@ -125,6 +127,33 @@ func (pfe *proxyFromEnv) isLoopback(host string) (ok bool) { return net.ParseIP(host).IsLoopback() } +func SetHostClient(id, baseURL string) { + hostLock.Lock() + defer hostLock.Unlock() + if _, ok := HostClientMap[id]; !ok { + u, _ := url.Parse(baseURL) + host := fasthttp.AddMissingPort(u.Host, true) + HostClientMap[id] = &fasthttp.HostClient{ + NoDefaultUserAgentHeader: true, + Addr: host, + MaxIdleConnDuration: 1 * time.Hour, + DisableHeaderNamesNormalizing: true, + DisablePathNormalizing: true, + Dial: (&fasthttp.TCPDialer{ + Concurrency: 4096, + DNSCacheDuration: time.Hour, + }).Dial, + IsTLS: true, + } + } +} + +func GetHostClient(id string) *fasthttp.HostClient { + hostLock.RLock() + defer hostLock.RUnlock() + return HostClientMap[id] +} + func (pfe *proxyFromEnv) Proxy(req *http.Request) (proxy *url.URL, err error) { if pfe.isLoopback(req.URL.Host) { switch req.URL.Scheme {