Skip to content

Commit

Permalink
use hostClient
Browse files Browse the repository at this point in the history
  • Loading branch information
Hitenjain14 committed Feb 1, 2024
1 parent 1f11c06 commit 0419d35
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 13 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.2
github.com/hashicorp/golang-lru/v2 v2.0.1
github.com/herumi/bls-go-binary v1.31.0
github.com/hitenjain14/fasthttp v0.0.0-20240131211206-9180af2dd7e0
github.com/hitenjain14/fasthttp v0.0.0-20240201092245-8e4835c0e974
github.com/influxdata/influxdb v1.8.3
github.com/klauspost/reedsolomon v1.11.8
github.com/lithammer/shortuuid/v3 v3.0.7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/herumi/bls-go-binary v1.31.0 h1:L1goQ2tMtGgpXCg5AwHAdJQpLs/pfnWWEc3Wog6OhmI=
github.com/herumi/bls-go-binary v1.31.0/go.mod h1:O4Vp1AfR4raRGwFeQpr9X/PQtncEicMoOe6BQt1oX0Y=
github.com/hitenjain14/fasthttp v0.0.0-20240131211206-9180af2dd7e0 h1:wOau/5lP3w3DlozOYbDp6mbSSjcXaEGkWPlgUuEPvY0=
github.com/hitenjain14/fasthttp v0.0.0-20240131211206-9180af2dd7e0/go.mod h1:RZMcXy7u4S+E97IXYTe7WHZ3+mCYOh4vys8PkIGZeXk=
github.com/hitenjain14/fasthttp v0.0.0-20240201092245-8e4835c0e974 h1:oEjH9SSKBlzwDyYjzZaqRpxo7GlfUJCyRoOk7QHKSDs=
github.com/hitenjain14/fasthttp v0.0.0-20240201092245-8e4835c0e974/go.mod h1:RZMcXy7u4S+E97IXYTe7WHZ3+mCYOh4vys8PkIGZeXk=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA=
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c h1:DZfsyhDK1hnSS5lH8l+JggqzEleHteTYfutAiVlSUM8=
Expand Down
3 changes: 3 additions & 0 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ func (a *Allocation) InitAllocation() {
a.downloadRequests = make([]*DownloadRequest, 0, 100)
a.mutex = &sync.Mutex{}
a.fullconsensus, a.consensusThreshold = a.getConsensuses()
for _, blobber := range a.Blobbers {
zboxutil.SetHostClient(blobber.ID, blobber.Baseurl)
}
a.startWorker(a.ctx)
InitCommitWorker(a.Blobbers)
InitBlockDownloader(a.Blobbers, downloadWorkerCount)
Expand Down
11 changes: 6 additions & 5 deletions zboxcore/sdk/blockdownloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ func InitBlockDownloader(blobbers []*blockchain.StorageNode, workerCount int) {
for _, blobber := range blobbers {
if _, ok := downloadBlockChan[blobber.ID]; !ok {
downloadBlockChan[blobber.ID] = make(chan *BlockDownloadRequest, workerCount)
go startBlockDownloadWorker(downloadBlockChan[blobber.ID], workerCount)
go startBlockDownloadWorker(downloadBlockChan[blobber.ID], workerCount, blobber.ID)
}
}
}

func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest, workers int) {
func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest, workers int, id string) {
sem := semaphore.NewWeighted(int64(workers))
hostClient := zboxutil.GetHostClient(id)
for {
blockDownloadReq, open := <-blobberChan
if !open {
Expand All @@ -92,7 +93,7 @@ func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest, workers in
continue
}
go func() {
blockDownloadReq.downloadBlobberBlock()
blockDownloadReq.downloadBlobberBlock(hostClient)
sem.Release(1)
}()
}
Expand All @@ -111,7 +112,7 @@ func (req *BlockDownloadRequest) splitData(buf []byte, lim int) [][]byte {
return chunks
}

func (req *BlockDownloadRequest) downloadBlobberBlock() {
func (req *BlockDownloadRequest) downloadBlobberBlock(hostClient *fasthttp.HostClient) {
if req.numBlocks <= 0 {
req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: errors.New("invalid_request", "Invalid number of blocks for download")}
return
Expand Down Expand Up @@ -150,7 +151,7 @@ func (req *BlockDownloadRequest) downloadBlobberBlock() {
header.ToFastHeader(httpreq)

err = func() error {
statuscode, respBuf, err := zboxutil.FastHttpClient.GetWithRequestTimeout(httpreq, req.respBuf, 30*time.Second)
statuscode, respBuf, err := hostClient.GetWithRequestTimeout(httpreq, req.respBuf, 30*time.Second)
fasthttp.ReleaseRequest(httpreq)
if err != nil {
zlogger.Logger.Error("Error downloading block: ", err)
Expand Down
39 changes: 34 additions & 5 deletions zboxcore/zboxutil/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ type HttpClient interface {
Do(req *http.Request) (*http.Response, error)
}

var Client HttpClient

var FastHttpClient *fasthttp.Client

var log logger.Logger
var (
Client HttpClient
FastHttpClient *fasthttp.Client
HostClientMap = make(map[string]*fasthttp.HostClient)
hostLock sync.RWMutex
log logger.Logger
)

func GetLogger() *logger.Logger {
return &log
Expand Down Expand Up @@ -125,6 +127,33 @@ func (pfe *proxyFromEnv) isLoopback(host string) (ok bool) {
return net.ParseIP(host).IsLoopback()
}

func SetHostClient(id, baseURL string) {
hostLock.Lock()
defer hostLock.Unlock()
if _, ok := HostClientMap[id]; !ok {
u, _ := url.Parse(baseURL)
host := fasthttp.AddMissingPort(u.Host, true)
HostClientMap[id] = &fasthttp.HostClient{
NoDefaultUserAgentHeader: true,
Addr: host,
MaxIdleConnDuration: 1 * time.Hour,
DisableHeaderNamesNormalizing: true,
DisablePathNormalizing: true,
Dial: (&fasthttp.TCPDialer{
Concurrency: 4096,
DNSCacheDuration: time.Hour,
}).Dial,
IsTLS: true,
}
}
}

func GetHostClient(id string) *fasthttp.HostClient {
hostLock.RLock()
defer hostLock.RUnlock()
return HostClientMap[id]
}

func (pfe *proxyFromEnv) Proxy(req *http.Request) (proxy *url.URL, err error) {
if pfe.isLoopback(req.URL.Host) {
switch req.URL.Scheme {
Expand Down

0 comments on commit 0419d35

Please sign in to comment.