From 53692b3673ec7fb2f36e9f203e12312750cfc134 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Tue, 24 Dec 2024 21:54:00 +0530 Subject: [PATCH 01/25] Blobber monitoring --- zboxcore/sdk/allocation.go | 7 +++++++ zboxcore/sdk/chunked_upload.go | 4 ++++ zboxcore/sdk/chunked_upload_chunk_reader.go | 3 +++ zboxcore/sdk/chunked_upload_process.go | 4 ++++ zboxcore/sdk/multi_operation_worker.go | 20 ++++++++++++++++++-- 5 files changed, 36 insertions(+), 2 deletions(-) diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 779930ebb..865e73b33 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -1035,6 +1035,13 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest, opts ...Mul if !a.isInitialized() { return notInitialized } + + TotalFormBuildTime = 0 // reset the time + TotalUploadTime = 0 // reset the time + TotalReadChunkTime = 0 // reset the time + TotalUploadBlobberTime = 0 // reset the time + TotalTime = 0 // reset the time + TotalReadTime = 0 // reset the time connectionID := zboxutil.NewConnectionId() var mo MultiOperation mo.allocationObj = a diff --git a/zboxcore/sdk/chunked_upload.go b/zboxcore/sdk/chunked_upload.go index 1408fa539..4565a7323 100644 --- a/zboxcore/sdk/chunked_upload.go +++ b/zboxcore/sdk/chunked_upload.go @@ -431,8 +431,10 @@ func (su *ChunkedUpload) process() error { defer su.chunkReader.Close() defer su.ctxCncl(nil) for { + now := time.Now() chunks, err := su.readChunks(su.chunkNumber) + TotalReadTime += time.Since(now).Milliseconds() // chunk, err := su.chunkReader.Next() if err != nil { @@ -688,6 +690,7 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error { return context.Cause(su.ctx) default: } + now := time.Now() consensus := Consensus{ RWMutex: &sync.RWMutex{}, consensusThresh: su.consensus.consensusThresh, @@ -744,6 +747,7 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error { su.statusCallback.InProgress(su.allocationObj.ID, su.fileMeta.RemotePath, su.opCode, int(atomic.AddInt64(&su.progress.UploadLength, uploadLength)), nil) } } + atomic.AddInt64(&TotalUploadBlobberTime, time.Since(now).Milliseconds()) uploadData = UploadData{} // release memory return nil } diff --git a/zboxcore/sdk/chunked_upload_chunk_reader.go b/zboxcore/sdk/chunked_upload_chunk_reader.go index a54159b3c..c919b7f5f 100644 --- a/zboxcore/sdk/chunked_upload_chunk_reader.go +++ b/zboxcore/sdk/chunked_upload_chunk_reader.go @@ -5,6 +5,7 @@ import ( "math" "strconv" "sync" + "time" "github.com/0chain/errors" "github.com/0chain/gosdk/constants" @@ -198,11 +199,13 @@ func (r *chunkedUploadChunkReader) Next() (*ChunkData, error) { readLen int err error ) + now := time.Now() for readLen < len(chunkBytes) && err == nil { var nn int nn, err = r.fileReader.Read(chunkBytes[readLen:]) readLen += nn } + TotalReadChunkTime += time.Since(now).Milliseconds() if err != nil { if !errors.Is(err, io.EOF) { diff --git a/zboxcore/sdk/chunked_upload_process.go b/zboxcore/sdk/chunked_upload_process.go index 72a7560b7..3cf5023ee 100644 --- a/zboxcore/sdk/chunked_upload_process.go +++ b/zboxcore/sdk/chunked_upload_process.go @@ -8,6 +8,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" thrown "github.com/0chain/errors" "github.com/0chain/gosdk/zboxcore/zboxutil" @@ -86,6 +87,8 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int, uploadLength: uploadLength, } + now := time.Now() + wgErrors := make(chan error, len(su.blobbers)) if len(fileShards) == 0 { return thrown.New("upload_failed", "Upload failed. No data to upload") @@ -139,6 +142,7 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int, su.removeProgress() return thrown.New("upload_failed", fmt.Sprintf("Upload failed. %s", err)) } + TotalFormBuildTime += time.Since(now).Milliseconds() if !lastBufferOnly { su.uploadWG.Add(1) select { diff --git a/zboxcore/sdk/multi_operation_worker.go b/zboxcore/sdk/multi_operation_worker.go index de797d145..fc9230a55 100644 --- a/zboxcore/sdk/multi_operation_worker.go +++ b/zboxcore/sdk/multi_operation_worker.go @@ -30,6 +30,15 @@ const ( StorageV2 = 1 ) +var ( + TotalUploadTime int64 + TotalTime int64 + TotalReadTime int64 + TotalReadChunkTime int64 + TotalFormBuildTime int64 + TotalUploadBlobberTime int64 +) + var BatchSize = 6 type MultiOperationOption func(mo *MultiOperation) @@ -164,6 +173,9 @@ func (mo *MultiOperation) createConnectionObj(blobberIdx int) (err error) { func (mo *MultiOperation) Process() error { l.Logger.Debug("MultiOperation Process start") + TotalReadTime = 0 + TotalReadChunkTime = 0 + wg := &sync.WaitGroup{} if mo.allocationObj.StorageVersion == 0 { mo.changes = make([][]allocationchange.AllocationChange, len(mo.operations)) @@ -178,6 +190,7 @@ func (mo *MultiOperation) Process() error { if mo.allocationObj.StorageVersion != StorageV2 { mo.operationMask = zboxutil.NewUint128(0) } + now := time.Now() for idx, op := range mo.operations { uid := util.GetNewUUID() swg.Add() @@ -218,7 +231,8 @@ func (mo *MultiOperation) Process() error { }(op, idx) } swg.Wait() - + logger.Logger.Info("[process]", time.Since(now).Milliseconds()) + TotalUploadTime = time.Since(now).Milliseconds() if ctx.Err() != nil { err := context.Cause(ctx) return err @@ -351,6 +365,7 @@ func (mo *MultiOperation) Process() error { } wg.Wait() logger.Logger.Debug("[commitRequests]", time.Since(start).Milliseconds()) + // JAYASH: COMMIT REQUEST TIMINGS rollbackMask := zboxutil.NewUint128(0) errSlice := make([]error, len(commitReqs)) for idx, commitReq := range commitReqs { @@ -386,8 +401,9 @@ func (mo *MultiOperation) Process() error { } } - return nil + TotalTime = time.Since(now).Milliseconds() + return nil } func (mo *MultiOperation) commitV2() error { From da9280112026e07ea7194970ab66565d81ee3818 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Thu, 26 Dec 2024 22:37:23 +0530 Subject: [PATCH 02/25] Add kafk --- core/kafka/kafka.go | 163 ++++++++++++++++++++++++++++++++++++++++++++ go.mod | 27 ++++++-- go.sum | 76 +++++++++++++++++---- 3 files changed, 246 insertions(+), 20 deletions(-) create mode 100644 core/kafka/kafka.go diff --git a/core/kafka/kafka.go b/core/kafka/kafka.go new file mode 100644 index 000000000..a924ff95f --- /dev/null +++ b/core/kafka/kafka.go @@ -0,0 +1,163 @@ +package kafka + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/IBM/sarama" + "go.uber.org/zap" +) + +type ProviderI interface { + PublishToKafka(topic string, key, message []byte) error + ReconnectWriter(topic string) error + CloseWriter(topic string) error + CloseAllWriters() error +} + +type KafkaProvider struct { + Host string + WriteTimeout time.Duration + Config *sarama.Config + mutex sync.RWMutex + writers map[string]sarama.AsyncProducer +} + +var ( + BlobberMonitoringKafkaTopic = "blobber_monitoring" + BlobberMonitoringKafka = NewKafkaProvider("91.107.200.12:9092", "admin", "zus-operator", 1*time.Minute) +) + +func NewKafkaProvider(host, username, password string, writeTimeout time.Duration) *KafkaProvider { + log.Println("Initializing Kafka provider", zap.String("host", host)) + + config := sarama.NewConfig() + config.Net.SASL.Enable = true + config.Net.SASL.User = username + config.Net.SASL.Password = password + config.Net.SASL.Mechanism = sarama.SASLTypePlaintext + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + config.Net.MaxOpenRequests = 1 + config.Producer.Idempotent = true + config.Producer.RequiredAcks = sarama.WaitForAll + config.Producer.Retry.Max = 5 + config.Metadata.AllowAutoTopicCreation = true + config.Producer.MaxMessageBytes = 10 * 1024 * 1024 + + return &KafkaProvider{ + Host: host, + WriteTimeout: writeTimeout, + Config: config, + writers: make(map[string]sarama.AsyncProducer), + } +} + +func (k *KafkaProvider) PublishToKafka(topic string, key, message []byte) error { + k.mutex.RLock() + writer, exists := k.writers[topic] + k.mutex.RUnlock() + + if !exists { + k.mutex.Lock() + writer, exists = k.writers[topic] + if !exists { + writer = k.createKafkaWriter(topic) + k.writers[topic] = writer + } + k.mutex.Unlock() + } + + msg := &sarama.ProducerMessage{ + Topic: topic, + Key: sarama.ByteEncoder(key), + Value: sarama.ByteEncoder(message), + } + + select { + case writer.Input() <- msg: + log.Println("Message published to Kafka", zap.String("topic", topic)) + case <-time.After(k.WriteTimeout): + log.Println("Failed to publish to Kafka: timeout", zap.String("topic", topic)) + return fmt.Errorf("timeout publishing to Kafka topic %s", topic) + } + + select { + case <-writer.Successes(): + log.Println("Message published to Kafka", zap.String("topic", topic)) + case err := <-writer.Errors(): + log.Println("Failed to publish to Kafka", zap.String("topic", topic), zap.Error(err)) + } + + return nil +} + +func PublishBlobberMonitoringLogsToKafka(key, message []byte) error { + return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, key, message) +} + +func (k *KafkaProvider) ReconnectWriter(topic string) error { + k.mutex.Lock() + defer k.mutex.Unlock() + + writer, exists := k.writers[topic] + if !exists { + return fmt.Errorf("no Kafka writer found for topic %v", topic) + } + + if err := writer.Close(); err != nil { + log.Println("Error closing Kafka writer", zap.Error(err)) + return fmt.Errorf("error closing Kafka connection for topic %v: %v", topic, err) + } + + k.writers[topic] = k.createKafkaWriter(topic) + return nil +} + +func (k *KafkaProvider) CloseWriter(topic string) error { + k.mutex.Lock() + defer k.mutex.Unlock() + + writer, exists := k.writers[topic] + if !exists { + return fmt.Errorf("no Kafka writer found for topic %v", topic) + } + + if err := writer.Close(); err != nil { + log.Println("Error closing Kafka writer", zap.Error(err)) + return err + } + + delete(k.writers, topic) + return nil +} + +func (k *KafkaProvider) CloseAllWriters() error { + k.mutex.Lock() + defer k.mutex.Unlock() + + for topic, writer := range k.writers { + if err := writer.Close(); err != nil { + log.Println("Error closing Kafka writer", zap.String("topic", topic), zap.Error(err)) + } + delete(k.writers, topic) + } + return nil +} + +func (k *KafkaProvider) createKafkaWriter(topic string) sarama.AsyncProducer { + producer, err := sarama.NewAsyncProducer([]string{k.Host}, k.Config) + if err != nil { + log.Fatalf("Failed to create Kafka producer: %v", err) + } + + go func() { + for err := range producer.Errors() { + log.Printf("Kafka error: %v\n", err) + } + }() + + return producer +} diff --git a/go.mod b/go.mod index 23e1cebfa..831cece90 100644 --- a/go.mod +++ b/go.mod @@ -29,9 +29,9 @@ require ( github.com/uptrace/bunrouter v1.0.20 go.dedis.ch/kyber/v3 v3.1.0 go.uber.org/zap v1.24.0 - golang.org/x/crypto v0.21.0 + golang.org/x/crypto v0.26.0 golang.org/x/image v0.14.0 - golang.org/x/sync v0.7.0 + golang.org/x/sync v0.8.0 gopkg.in/cheggaaa/pb.v1 v1.0.28 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v2 v2.4.0 @@ -39,6 +39,7 @@ require ( ) require ( + github.com/IBM/sarama v1.43.3 github.com/hack-pad/go-webworkers v0.1.0 github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d github.com/minio/sha256-simd v1.0.1 @@ -56,6 +57,9 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/deckarep/golang-set v1.8.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect github.com/edsrzf/mmap-go v1.0.0 // indirect github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect @@ -68,13 +72,21 @@ require ( github.com/golang-jwt/jwt/v4 v4.3.0 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/gorilla/websocket v1.5.0 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-bexpr v0.1.10 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c // indirect github.com/huin/goupnp v1.0.3 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/matryer/is v1.4.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect @@ -84,8 +96,10 @@ require ( github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/philhofer/fwd v1.1.2-0.20210722190033-5c56ac6d0bb9 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/tsdb v0.7.1 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rivo/uniseg v0.4.3 // indirect github.com/rjeczalik/notify v0.9.1 // indirect github.com/rs/cors v1.7.0 // indirect @@ -107,9 +121,8 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.dedis.ch/fixbuf v1.0.3 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + golang.org/x/sys v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect ) @@ -117,13 +130,13 @@ require ( require ( github.com/btcsuite/btcd/btcutil v1.1.3 github.com/hack-pad/safejs v0.1.1 - github.com/klauspost/compress v1.17.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/remeh/sizedwaitgroup v1.0.0 github.com/yusufpapurcu/wmi v1.2.2 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/net v0.23.0 // indirect + golang.org/x/net v0.28.0 // indirect golang.org/x/time v0.3.0 // indirect ) diff --git a/go.sum b/go.sum index 91dc05e8c..387872597 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= +github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= github.com/Luzifer/go-openssl/v3 v3.1.0 h1:QqKqo6kYXGGUsvtUoCpRZm8lHw+jDfhbzr36gVj+/gw= github.com/Luzifer/go-openssl/v3 v3.1.0/go.mod h1:liy3FXuuS8hfDlYh1T+l78AwQ/NjZflJz0NDvjKhwDs= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -143,6 +145,12 @@ github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8/go.mod h1:VMaSuZ+SZcx/wljOQKvp5srsbCiKDEb6K2wC4+PiBmQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= @@ -159,6 +167,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 h1:FtmdgXiUlNeRsoNMFlKLDt+S+6hbjVMEW6RGQ7aUf7c= github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -268,6 +278,8 @@ github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8 github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/h2non/filetype v1.1.4-0.20231228185113-6469358c2bcb h1:GlQyMv2C48qmfPItvAXFoyN341Swxp9JNVeUZxnmbJw= @@ -276,6 +288,8 @@ github.com/hack-pad/go-webworkers v0.1.0 h1:QHBJpkXJgW0QRi2iiUGcxwGnmy7lQJL0F8Uf github.com/hack-pad/go-webworkers v0.1.0/go.mod h1:/rmjjgnlw0CursmeqRtP0NGIqo8CR+0o6AtzFydUHJ4= github.com/hack-pad/safejs v0.1.1 h1:d5qPO0iQ7h2oVtpzGnLExE+Wn9AtytxIfltcS2b9KD8= github.com/hack-pad/safejs v0.1.1/go.mod h1:HdS+bKF1NrE72VoXZeWzxFOVQVUSqZJAG0xNCnb+Tio= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= @@ -283,8 +297,13 @@ github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/S github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-hclog v1.2.0 h1:La19f8d7WIlm4ogzNHB0JGqs5AUDAZ2UfCY4sJXcJdM= github.com/hashicorp/go-hclog v1.2.0/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.7.2 h1:AcYqCvkpalPnPF2pn0KamgwamS42TqUDDYFRKq/RAd0= github.com/hashicorp/go-retryablehttp v0.7.2/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d h1:dg1dEPuWpEqDnvIw251EVy4zlP8gWbsGj4BsUKCRpYs= @@ -319,6 +338,18 @@ github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9/go.mod h1:Js0mq github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:Wbbw6tYNvwa5dlB6304Sd+82Z3f7PmVZHVKU637d4po= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= @@ -334,8 +365,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= -github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= -github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= @@ -347,7 +378,6 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -418,6 +448,8 @@ github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG github.com/philhofer/fwd v1.1.2-0.20210722190033-5c56ac6d0bb9 h1:6ob53CVz+ja2i7easAStApZJlh7sxyq3Cm7g1Di6iqA= github.com/philhofer/fwd v1.1.2-0.20210722190033-5c56ac6d0bb9/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -446,6 +478,8 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E= github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= @@ -537,6 +571,7 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.dedis.ch/fixbuf v1.0.3 h1:hGcV9Cd/znUxlusJ64eAlExS+5cJDIyTyEG+otu5wQs= @@ -578,9 +613,11 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -622,6 +659,7 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -659,8 +697,11 @@ golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= -golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -681,8 +722,9 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -730,21 +772,28 @@ golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -804,6 +853,7 @@ golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= From d42244810867398cb91f472d15fd1c3e9c24de10 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Fri, 27 Dec 2024 00:10:39 +0530 Subject: [PATCH 03/25] Fix --- core/indexdb/indexdb.go | 68 ++++++++++++++++++++++++++ core/kafka/kafka.go | 4 +- zboxcore/sdk/chunked_upload_blobber.go | 18 +++++++ 3 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 core/indexdb/indexdb.go diff --git a/core/indexdb/indexdb.go b/core/indexdb/indexdb.go new file mode 100644 index 000000000..4359aed81 --- /dev/null +++ b/core/indexdb/indexdb.go @@ -0,0 +1,68 @@ +//go:build js && wasm +// +build js,wasm + +package indexdb + +import "syscall/js" + +func writeToIndexedDB(this js.Value, args []js.Value) any { + dbName := args[0].String() + key := args[1].String() + value := args[2].String() + + // Open IndexedDB + openRequest := js.Global().Get("indexedDB").Call("open", dbName) + + openRequest.Set("onsuccess", js.FuncOf(func(this js.Value, args []js.Value) any { + db := openRequest.Get("result") + tx := db.Call("transaction", "store", "readwrite") + store := tx.Call("objectStore", "store") + + store.Call("put", value, key) + tx.Call("oncomplete", js.FuncOf(func(this js.Value, args []js.Value) any { + js.Global().Call("console.log", "Data written to IndexedDB successfully") + return nil + })) + return nil + })) + + openRequest.Set("onerror", js.FuncOf(func(this js.Value, args []js.Value) any { + js.Global().Call("console.error", "Error opening IndexedDB:", openRequest.Get("error")) + return nil + })) + + return nil +} + +func readFromIndexedDB(this js.Value, args []js.Value) any { + dbName := args[0].String() + key := args[1].String() + + // Open IndexedDB + openRequest := js.Global().Get("indexedDB").Call("open", dbName) + + openRequest.Set("onsuccess", js.FuncOf(func(this js.Value, args []js.Value) any { + db := openRequest.Get("result") + tx := db.Call("transaction", "store", "readonly") + store := tx.Call("objectStore", "store") + + getRequest := store.Call("get", key) + getRequest.Set("onsuccess", js.FuncOf(func(this js.Value, args []js.Value) any { + result := getRequest.Get("result") + js.Global().Call("console.log", "Data from IndexedDB:", result) + return nil + })) + getRequest.Set("onerror", js.FuncOf(func(this js.Value, args []js.Value) any { + js.Global().Call("console.error", "Error reading from IndexedDB:", getRequest.Get("error")) + return nil + })) + return nil + })) + + openRequest.Set("onerror", js.FuncOf(func(this js.Value, args []js.Value) any { + js.Global().Call("console.error", "Error opening IndexedDB:", openRequest.Get("error")) + return nil + })) + + return nil +} diff --git a/core/kafka/kafka.go b/core/kafka/kafka.go index a924ff95f..c80f4492e 100644 --- a/core/kafka/kafka.go +++ b/core/kafka/kafka.go @@ -94,8 +94,8 @@ func (k *KafkaProvider) PublishToKafka(topic string, key, message []byte) error return nil } -func PublishBlobberMonitoringLogsToKafka(key, message []byte) error { - return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, key, message) +func PublishBlobberMonitoringLogsToKafka(key, message string) error { + return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, []byte(key), []byte(message)) } func (k *KafkaProvider) ReconnectWriter(topic string) error { diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index 8a60737b2..b8f54f12c 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/0chain/gosdk/core/kafka" "io" "mime/multipart" "net/http" @@ -78,6 +79,7 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( shouldContinue bool ) var req *fasthttp.Request + now := time.Now() for i := 0; i < 3; i++ { req, err = zboxutil.NewFastUploadRequest( sb.blobber.Baseurl, su.allocationObj.ID, su.allocationObj.Tx, dataBuffers[ind].Bytes(), su.httpMethod, su.allocationObj.Owner) @@ -139,6 +141,22 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( break } + + kafkaObj := map[string]interface{}{ + "op": "upload", + "upload_time": time.Since(now).Milliseconds(), + "size": len(dataBuffers[ind].Bytes()), + } + + kafkaObjStr, err := json.Marshal(kafkaObj) + if err != nil { + logger.Logger.Error("Error publishing to kafka: ", err) + } + err = kafka.PublishBlobberMonitoringLogsToKafka(sb.blobber.ID+""+su.allocationObj.ID, string(kafkaObjStr)) + if err != nil { + logger.Logger.Error("Error publishing to kafka: ", err) + } + return err }) } From b30606ba1ec67f37ee4c37912c231716e7471821 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Fri, 27 Dec 2024 19:51:05 +0530 Subject: [PATCH 04/25] Remove indexdb --- core/indexdb/indexdb.go | 68 ----------------------------------------- 1 file changed, 68 deletions(-) delete mode 100644 core/indexdb/indexdb.go diff --git a/core/indexdb/indexdb.go b/core/indexdb/indexdb.go deleted file mode 100644 index 4359aed81..000000000 --- a/core/indexdb/indexdb.go +++ /dev/null @@ -1,68 +0,0 @@ -//go:build js && wasm -// +build js,wasm - -package indexdb - -import "syscall/js" - -func writeToIndexedDB(this js.Value, args []js.Value) any { - dbName := args[0].String() - key := args[1].String() - value := args[2].String() - - // Open IndexedDB - openRequest := js.Global().Get("indexedDB").Call("open", dbName) - - openRequest.Set("onsuccess", js.FuncOf(func(this js.Value, args []js.Value) any { - db := openRequest.Get("result") - tx := db.Call("transaction", "store", "readwrite") - store := tx.Call("objectStore", "store") - - store.Call("put", value, key) - tx.Call("oncomplete", js.FuncOf(func(this js.Value, args []js.Value) any { - js.Global().Call("console.log", "Data written to IndexedDB successfully") - return nil - })) - return nil - })) - - openRequest.Set("onerror", js.FuncOf(func(this js.Value, args []js.Value) any { - js.Global().Call("console.error", "Error opening IndexedDB:", openRequest.Get("error")) - return nil - })) - - return nil -} - -func readFromIndexedDB(this js.Value, args []js.Value) any { - dbName := args[0].String() - key := args[1].String() - - // Open IndexedDB - openRequest := js.Global().Get("indexedDB").Call("open", dbName) - - openRequest.Set("onsuccess", js.FuncOf(func(this js.Value, args []js.Value) any { - db := openRequest.Get("result") - tx := db.Call("transaction", "store", "readonly") - store := tx.Call("objectStore", "store") - - getRequest := store.Call("get", key) - getRequest.Set("onsuccess", js.FuncOf(func(this js.Value, args []js.Value) any { - result := getRequest.Get("result") - js.Global().Call("console.log", "Data from IndexedDB:", result) - return nil - })) - getRequest.Set("onerror", js.FuncOf(func(this js.Value, args []js.Value) any { - js.Global().Call("console.error", "Error reading from IndexedDB:", getRequest.Get("error")) - return nil - })) - return nil - })) - - openRequest.Set("onerror", js.FuncOf(func(this js.Value, args []js.Value) any { - js.Global().Call("console.error", "Error opening IndexedDB:", openRequest.Get("error")) - return nil - })) - - return nil -} From 77c03a1f38129d16293ac99e3950fc32bc7dc5f8 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Fri, 27 Dec 2024 19:52:17 +0530 Subject: [PATCH 05/25] Revert "Blobber monitoring" This reverts commit 53692b3673ec7fb2f36e9f203e12312750cfc134. --- zboxcore/sdk/allocation.go | 7 ------- zboxcore/sdk/chunked_upload.go | 4 ---- zboxcore/sdk/chunked_upload_chunk_reader.go | 3 --- zboxcore/sdk/chunked_upload_process.go | 4 ---- zboxcore/sdk/multi_operation_worker.go | 20 ++------------------ 5 files changed, 2 insertions(+), 36 deletions(-) diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 865e73b33..779930ebb 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -1035,13 +1035,6 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest, opts ...Mul if !a.isInitialized() { return notInitialized } - - TotalFormBuildTime = 0 // reset the time - TotalUploadTime = 0 // reset the time - TotalReadChunkTime = 0 // reset the time - TotalUploadBlobberTime = 0 // reset the time - TotalTime = 0 // reset the time - TotalReadTime = 0 // reset the time connectionID := zboxutil.NewConnectionId() var mo MultiOperation mo.allocationObj = a diff --git a/zboxcore/sdk/chunked_upload.go b/zboxcore/sdk/chunked_upload.go index 4565a7323..1408fa539 100644 --- a/zboxcore/sdk/chunked_upload.go +++ b/zboxcore/sdk/chunked_upload.go @@ -431,10 +431,8 @@ func (su *ChunkedUpload) process() error { defer su.chunkReader.Close() defer su.ctxCncl(nil) for { - now := time.Now() chunks, err := su.readChunks(su.chunkNumber) - TotalReadTime += time.Since(now).Milliseconds() // chunk, err := su.chunkReader.Next() if err != nil { @@ -690,7 +688,6 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error { return context.Cause(su.ctx) default: } - now := time.Now() consensus := Consensus{ RWMutex: &sync.RWMutex{}, consensusThresh: su.consensus.consensusThresh, @@ -747,7 +744,6 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error { su.statusCallback.InProgress(su.allocationObj.ID, su.fileMeta.RemotePath, su.opCode, int(atomic.AddInt64(&su.progress.UploadLength, uploadLength)), nil) } } - atomic.AddInt64(&TotalUploadBlobberTime, time.Since(now).Milliseconds()) uploadData = UploadData{} // release memory return nil } diff --git a/zboxcore/sdk/chunked_upload_chunk_reader.go b/zboxcore/sdk/chunked_upload_chunk_reader.go index c919b7f5f..a54159b3c 100644 --- a/zboxcore/sdk/chunked_upload_chunk_reader.go +++ b/zboxcore/sdk/chunked_upload_chunk_reader.go @@ -5,7 +5,6 @@ import ( "math" "strconv" "sync" - "time" "github.com/0chain/errors" "github.com/0chain/gosdk/constants" @@ -199,13 +198,11 @@ func (r *chunkedUploadChunkReader) Next() (*ChunkData, error) { readLen int err error ) - now := time.Now() for readLen < len(chunkBytes) && err == nil { var nn int nn, err = r.fileReader.Read(chunkBytes[readLen:]) readLen += nn } - TotalReadChunkTime += time.Since(now).Milliseconds() if err != nil { if !errors.Is(err, io.EOF) { diff --git a/zboxcore/sdk/chunked_upload_process.go b/zboxcore/sdk/chunked_upload_process.go index 3cf5023ee..72a7560b7 100644 --- a/zboxcore/sdk/chunked_upload_process.go +++ b/zboxcore/sdk/chunked_upload_process.go @@ -8,7 +8,6 @@ import ( "fmt" "sync" "sync/atomic" - "time" thrown "github.com/0chain/errors" "github.com/0chain/gosdk/zboxcore/zboxutil" @@ -87,8 +86,6 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int, uploadLength: uploadLength, } - now := time.Now() - wgErrors := make(chan error, len(su.blobbers)) if len(fileShards) == 0 { return thrown.New("upload_failed", "Upload failed. No data to upload") @@ -142,7 +139,6 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int, su.removeProgress() return thrown.New("upload_failed", fmt.Sprintf("Upload failed. %s", err)) } - TotalFormBuildTime += time.Since(now).Milliseconds() if !lastBufferOnly { su.uploadWG.Add(1) select { diff --git a/zboxcore/sdk/multi_operation_worker.go b/zboxcore/sdk/multi_operation_worker.go index fc9230a55..de797d145 100644 --- a/zboxcore/sdk/multi_operation_worker.go +++ b/zboxcore/sdk/multi_operation_worker.go @@ -30,15 +30,6 @@ const ( StorageV2 = 1 ) -var ( - TotalUploadTime int64 - TotalTime int64 - TotalReadTime int64 - TotalReadChunkTime int64 - TotalFormBuildTime int64 - TotalUploadBlobberTime int64 -) - var BatchSize = 6 type MultiOperationOption func(mo *MultiOperation) @@ -173,9 +164,6 @@ func (mo *MultiOperation) createConnectionObj(blobberIdx int) (err error) { func (mo *MultiOperation) Process() error { l.Logger.Debug("MultiOperation Process start") - TotalReadTime = 0 - TotalReadChunkTime = 0 - wg := &sync.WaitGroup{} if mo.allocationObj.StorageVersion == 0 { mo.changes = make([][]allocationchange.AllocationChange, len(mo.operations)) @@ -190,7 +178,6 @@ func (mo *MultiOperation) Process() error { if mo.allocationObj.StorageVersion != StorageV2 { mo.operationMask = zboxutil.NewUint128(0) } - now := time.Now() for idx, op := range mo.operations { uid := util.GetNewUUID() swg.Add() @@ -231,8 +218,7 @@ func (mo *MultiOperation) Process() error { }(op, idx) } swg.Wait() - logger.Logger.Info("[process]", time.Since(now).Milliseconds()) - TotalUploadTime = time.Since(now).Milliseconds() + if ctx.Err() != nil { err := context.Cause(ctx) return err @@ -365,7 +351,6 @@ func (mo *MultiOperation) Process() error { } wg.Wait() logger.Logger.Debug("[commitRequests]", time.Since(start).Milliseconds()) - // JAYASH: COMMIT REQUEST TIMINGS rollbackMask := zboxutil.NewUint128(0) errSlice := make([]error, len(commitReqs)) for idx, commitReq := range commitReqs { @@ -401,9 +386,8 @@ func (mo *MultiOperation) Process() error { } } - TotalTime = time.Since(now).Milliseconds() - return nil + } func (mo *MultiOperation) commitV2() error { From 2c5513e55af9783737902c70b7751522c27e13c2 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Fri, 27 Dec 2024 20:13:45 +0530 Subject: [PATCH 06/25] Fix --- core/kafka/kafka.go | 85 +++++++++++--------------- zboxcore/sdk/chunked_upload_blobber.go | 26 ++++++-- 2 files changed, 59 insertions(+), 52 deletions(-) diff --git a/core/kafka/kafka.go b/core/kafka/kafka.go index c80f4492e..4e71f8d52 100644 --- a/core/kafka/kafka.go +++ b/core/kafka/kafka.go @@ -2,6 +2,7 @@ package kafka import ( "fmt" + "github.com/0chain/common/core/logging" "log" "sync" "time" @@ -11,7 +12,7 @@ import ( ) type ProviderI interface { - PublishToKafka(topic string, key, message []byte) error + PublishToKafka(topic string, key, message []byte) chan int64 ReconnectWriter(topic string) error CloseWriter(topic string) error CloseAllWriters() error @@ -21,8 +22,14 @@ type KafkaProvider struct { Host string WriteTimeout time.Duration Config *sarama.Config - mutex sync.RWMutex - writers map[string]sarama.AsyncProducer + mutex sync.RWMutex // Mutex for synchronizing access to writers map +} + +// map of kafka writers for each topic +var writers map[string]sarama.AsyncProducer + +func init() { + writers = make(map[string]sarama.AsyncProducer) } var ( @@ -51,21 +58,20 @@ func NewKafkaProvider(host, username, password string, writeTimeout time.Duratio Host: host, WriteTimeout: writeTimeout, Config: config, - writers: make(map[string]sarama.AsyncProducer), } } -func (k *KafkaProvider) PublishToKafka(topic string, key, message []byte) error { +func (k *KafkaProvider) PublishToKafka(topic string, key, message string) chan int64 { k.mutex.RLock() - writer, exists := k.writers[topic] + writer, exists := writers[topic] k.mutex.RUnlock() - + res := make(chan int64) if !exists { k.mutex.Lock() - writer, exists = k.writers[topic] + writer, exists = writers[topic] if !exists { writer = k.createKafkaWriter(topic) - k.writers[topic] = writer + writers[topic] = writer } k.mutex.Unlock() } @@ -76,61 +82,44 @@ func (k *KafkaProvider) PublishToKafka(topic string, key, message []byte) error Value: sarama.ByteEncoder(message), } - select { - case writer.Input() <- msg: - log.Println("Message published to Kafka", zap.String("topic", topic)) - case <-time.After(k.WriteTimeout): - log.Println("Failed to publish to Kafka: timeout", zap.String("topic", topic)) - return fmt.Errorf("timeout publishing to Kafka topic %s", topic) - } - - select { - case <-writer.Successes(): - log.Println("Message published to Kafka", zap.String("topic", topic)) - case err := <-writer.Errors(): - log.Println("Failed to publish to Kafka", zap.String("topic", topic), zap.Error(err)) - } - - return nil -} - -func PublishBlobberMonitoringLogsToKafka(key, message string) error { - return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, []byte(key), []byte(message)) + writer.Input() <- msg + go func() { + r := <-writer.Successes() + res <- r.Offset + }() + return res } func (k *KafkaProvider) ReconnectWriter(topic string) error { k.mutex.Lock() defer k.mutex.Unlock() - - writer, exists := k.writers[topic] - if !exists { - return fmt.Errorf("no Kafka writer found for topic %v", topic) + writer := writers[topic] + if writer == nil { + return fmt.Errorf("no kafka writer found for the topic %v", topic) } if err := writer.Close(); err != nil { - log.Println("Error closing Kafka writer", zap.Error(err)) - return fmt.Errorf("error closing Kafka connection for topic %v: %v", topic, err) + logging.Logger.Error("error closing kafka connection", zap.String("topic", topic), zap.Error(err)) + return fmt.Errorf("error closing kafka connection for topic %v: %v", topic, err) } - k.writers[topic] = k.createKafkaWriter(topic) + writers[topic] = k.createKafkaWriter(topic) return nil } func (k *KafkaProvider) CloseWriter(topic string) error { k.mutex.Lock() - defer k.mutex.Unlock() + writer := writers[topic] + k.mutex.Unlock() - writer, exists := k.writers[topic] - if !exists { - return fmt.Errorf("no Kafka writer found for topic %v", topic) + if writer == nil { + return fmt.Errorf("no kafka writer found for the topic %v", topic) } if err := writer.Close(); err != nil { - log.Println("Error closing Kafka writer", zap.Error(err)) - return err + logging.Logger.Error("error closing kafka connection", zap.Error(err)) } - delete(k.writers, topic) return nil } @@ -138,11 +127,10 @@ func (k *KafkaProvider) CloseAllWriters() error { k.mutex.Lock() defer k.mutex.Unlock() - for topic, writer := range k.writers { + for topic, writer := range writers { if err := writer.Close(); err != nil { - log.Println("Error closing Kafka writer", zap.String("topic", topic), zap.Error(err)) + logging.Logger.Error("error closing kafka connection", zap.String("topic", topic), zap.Error(err)) } - delete(k.writers, topic) } return nil } @@ -150,12 +138,13 @@ func (k *KafkaProvider) CloseAllWriters() error { func (k *KafkaProvider) createKafkaWriter(topic string) sarama.AsyncProducer { producer, err := sarama.NewAsyncProducer([]string{k.Host}, k.Config) if err != nil { - log.Fatalf("Failed to create Kafka producer: %v", err) + logging.Logger.Panic(fmt.Sprintf("Failed to start Sarama producer: %v", err)) } go func() { for err := range producer.Errors() { - log.Printf("Kafka error: %v\n", err) + fmt.Println("kafka - failed to write access log entry:", err) + logging.Logger.Panic("kafka - failed to write access log entry:", zap.Error(err)) } }() diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index b8f54f12c..ad637811f 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/0chain/common/core/logging" "github.com/0chain/gosdk/core/kafka" "io" "mime/multipart" @@ -72,6 +73,8 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( eg, _ := errgroup.WithContext(ctx) + var results []chan int64 + for dataInd := 0; dataInd < len(dataBuffers); dataInd++ { ind := dataInd eg.Go(func() error { @@ -152,10 +155,8 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( if err != nil { logger.Logger.Error("Error publishing to kafka: ", err) } - err = kafka.PublishBlobberMonitoringLogsToKafka(sb.blobber.ID+""+su.allocationObj.ID, string(kafkaObjStr)) - if err != nil { - logger.Logger.Error("Error publishing to kafka: ", err) - } + res := kafka.BlobberMonitoringKafka.PublishToKafka(kafka.BlobberMonitoringKafkaTopic, sb.blobber.ID+""+su.allocationObj.ID, string(kafkaObjStr)) + results = append(results, res) return err }) @@ -166,6 +167,23 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( } consensus.Done() + //wait for all responses + timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Second) + defer cancelFunc() + sent := 0 +L: + for _, ch := range results { + select { + case <-ch: + sent++ + if sent == len(dataBuffers) { + break L + } + case <-timeout.Done(): + logging.Logger.Panic("Timeout to publish event to kafka") + } + } + if formData.ThumbnailBytesLen > 0 { sb.fileRef.ThumbnailSize = int64(formData.ThumbnailBytesLen) From f2fbc8a0ee8d4faf44d49492e86202c4a71eed17 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Fri, 27 Dec 2024 20:54:59 +0530 Subject: [PATCH 07/25] Fix --- core/kafka/kafka.go | 7 ++----- zboxcore/sdk/chunked_upload_blobber.go | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/core/kafka/kafka.go b/core/kafka/kafka.go index 4e71f8d52..41439a547 100644 --- a/core/kafka/kafka.go +++ b/core/kafka/kafka.go @@ -32,11 +32,6 @@ func init() { writers = make(map[string]sarama.AsyncProducer) } -var ( - BlobberMonitoringKafkaTopic = "blobber_monitoring" - BlobberMonitoringKafka = NewKafkaProvider("91.107.200.12:9092", "admin", "zus-operator", 1*time.Minute) -) - func NewKafkaProvider(host, username, password string, writeTimeout time.Duration) *KafkaProvider { log.Println("Initializing Kafka provider", zap.String("host", host)) @@ -82,6 +77,8 @@ func (k *KafkaProvider) PublishToKafka(topic string, key, message string) chan i Value: sarama.ByteEncoder(message), } + fmt.Println("Publishing to kafka", zap.String("topic", topic), zap.String("key", key), zap.String("message", message)) + writer.Input() <- msg go func() { r := <-writer.Successes() diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index ad637811f..6f09acf0c 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -5,9 +5,8 @@ import ( "context" "encoding/json" "fmt" - "github.com/0chain/common/core/logging" - "github.com/0chain/gosdk/core/kafka" "io" + "log" "mime/multipart" "net/http" "strings" @@ -149,14 +148,22 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( "op": "upload", "upload_time": time.Since(now).Milliseconds(), "size": len(dataBuffers[ind].Bytes()), + "alloc": su.allocationObj.ID, } kafkaObjStr, err := json.Marshal(kafkaObj) if err != nil { logger.Logger.Error("Error publishing to kafka: ", err) } - res := kafka.BlobberMonitoringKafka.PublishToKafka(kafka.BlobberMonitoringKafkaTopic, sb.blobber.ID+""+su.allocationObj.ID, string(kafkaObjStr)) - results = append(results, res) + fmt.Println(kafkaObjStr) + // + //var ( + // BlobberMonitoringKafkaTopic = "blobber_monitoring" + // BlobberMonitoringKafka = kafka.NewKafkaProvider("91.107.200.12:9092", "admin", "zus-operator", 1*time.Minute) + //) + + //res := BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, sb.blobber.ID, string(kafkaObjStr)) + //results = append(results, res) return err }) @@ -175,12 +182,13 @@ L: for _, ch := range results { select { case <-ch: + log.Println("Sent to Kafka : ", sent, " : lenDataBuffers : ", len(dataBuffers)) sent++ if sent == len(dataBuffers) { break L } case <-timeout.Done(): - logging.Logger.Panic("Timeout to publish event to kafka") + log.Panic("Timeout to publish event to kafka") } } From 99eba474b23aa3d2a4fbb80a0dbee85714bb74bf Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Fri, 27 Dec 2024 21:09:32 +0530 Subject: [PATCH 08/25] Add for download --- zboxcore/sdk/blockdownloadworker.go | 27 ++++++++++++++++++++++++++ zboxcore/sdk/chunked_upload_blobber.go | 16 +++++++-------- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index 462b08675..ad9012a99 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -189,6 +189,33 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Clien return errors.New("response_error", string(respBuf)) } + //kafkaObj := map[string]interface{}{ + // "op": "download", + // "time": time.Since(now).Milliseconds(), + // "size": len(respBuf), + // "alloc": req.allocationID, + //} + //kafkaObjStr, err := json.Marshal(kafkaObj) + //if err != nil { + // log.Println("Error publishing to kafka: ", err) + //} + // + //var ( + // BlobberMonitoringKafkaTopic = "blobber_monitoring2" + // BlobberMonitoringKafka = kafka.NewKafkaProvider("91.107.200.12:9092", "admin", "zus-operator", 1*time.Minute) + //) + // + //timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Second) + //defer cancelFunc() + // + //res := BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, req.blobber.ID, string(kafkaObjStr)) + //select { + //case <-res: + // break + //case <-timeout.Done(): + // log.Panic("Timeout to publish event to kafka") + //} + dR := downloadResponse{} if req.shouldVerify { err = json.Unmarshal(respBuf, &dR) diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index 6f09acf0c..7ca0a6232 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -145,23 +145,24 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( } kafkaObj := map[string]interface{}{ - "op": "upload", - "upload_time": time.Since(now).Milliseconds(), - "size": len(dataBuffers[ind].Bytes()), - "alloc": su.allocationObj.ID, + "op": "upload", + "time": time.Since(now).Milliseconds(), + "size": len(dataBuffers[ind].Bytes()), + "alloc": su.allocationObj.ID, } kafkaObjStr, err := json.Marshal(kafkaObj) if err != nil { logger.Logger.Error("Error publishing to kafka: ", err) } + fmt.Println(kafkaObjStr) - // + //var ( - // BlobberMonitoringKafkaTopic = "blobber_monitoring" + // BlobberMonitoringKafkaTopic = "blobber_monitoring2" // BlobberMonitoringKafka = kafka.NewKafkaProvider("91.107.200.12:9092", "admin", "zus-operator", 1*time.Minute) //) - + // //res := BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, sb.blobber.ID, string(kafkaObjStr)) //results = append(results, res) @@ -174,7 +175,6 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( } consensus.Done() - //wait for all responses timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Second) defer cancelFunc() sent := 0 From 3b1a54c4345e84990f26ab7d58d3880d34dd0bf4 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Sat, 28 Dec 2024 23:32:40 +0530 Subject: [PATCH 09/25] Done --- core/kafka/kafka.go | 15 +++++++ zboxcore/sdk/blockdownloadworker.go | 54 +++++++++++++------------- zboxcore/sdk/chunked_upload_blobber.go | 26 ++++++------- 3 files changed, 55 insertions(+), 40 deletions(-) diff --git a/core/kafka/kafka.go b/core/kafka/kafka.go index 41439a547..b2e198c6f 100644 --- a/core/kafka/kafka.go +++ b/core/kafka/kafka.go @@ -11,6 +11,21 @@ import ( "go.uber.org/zap" ) +var ( + BlobberMonitoringKafkaTopic = "monitor2" + BlobberMonitoringKafka = NewKafkaProvider("", "", "", 1*time.Minute) +) + +type BlobberMonitoring struct { + ID string `json:"id"` + Operation string `json:"operation"` + BlobberId string `json:"blobber_id"` + AllocationId string `json:"allocation_id"` + Size int64 `json:"size"` + TimeSpent int64 `json:"time_spent"` + Count int `json:"count"` +} + type ProviderI interface { PublishToKafka(topic string, key, message []byte) chan int64 ReconnectWriter(topic string) error diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index ad9012a99..fe34ede93 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + "github.com/0chain/gosdk/core/kafka" + "log" "net/http" "sync" "syscall" @@ -189,32 +191,32 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Clien return errors.New("response_error", string(respBuf)) } - //kafkaObj := map[string]interface{}{ - // "op": "download", - // "time": time.Since(now).Milliseconds(), - // "size": len(respBuf), - // "alloc": req.allocationID, - //} - //kafkaObjStr, err := json.Marshal(kafkaObj) - //if err != nil { - // log.Println("Error publishing to kafka: ", err) - //} - // - //var ( - // BlobberMonitoringKafkaTopic = "blobber_monitoring2" - // BlobberMonitoringKafka = kafka.NewKafkaProvider("91.107.200.12:9092", "admin", "zus-operator", 1*time.Minute) - //) - // - //timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Second) - //defer cancelFunc() - // - //res := BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, req.blobber.ID, string(kafkaObjStr)) - //select { - //case <-res: - // break - //case <-timeout.Done(): - // log.Panic("Timeout to publish event to kafka") - //} + dnldSizeInMb := int64(len(respBuf)) / 1024 / 1024 + kafkaObj := kafka.BlobberMonitoring{ + ID: fmt.Sprintf("%s_%s_%d", req.blobber.ID, req.allocationID, dnldSizeInMb), + Operation: "upload", + BlobberId: req.blobber.ID, + TimeSpent: time.Since(now).Milliseconds(), + Size: dnldSizeInMb, + AllocationId: req.allocationID, + Count: 1, + } + + kafkaObjStr, err := json.Marshal(kafkaObj) + if err != nil { + log.Println("Error publishing to kafka: ", err) + } + + timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Second) + defer cancelFunc() + + res := kafka.BlobberMonitoringKafka.PublishToKafka(kafka.BlobberMonitoringKafkaTopic, req.blobber.ID, string(kafkaObjStr)) + select { + case <-res: + break + case <-timeout.Done(): + log.Panic("Timeout to publish event to kafka") + } dR := downloadResponse{} if req.shouldVerify { diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index 7ca0a6232..5b8dd01b0 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/0chain/gosdk/core/kafka" "io" "log" "mime/multipart" @@ -144,11 +145,15 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( break } - kafkaObj := map[string]interface{}{ - "op": "upload", - "time": time.Since(now).Milliseconds(), - "size": len(dataBuffers[ind].Bytes()), - "alloc": su.allocationObj.ID, + uploadSizeInMb := int64(len(dataBuffers[ind].Bytes())) / 1024 / 1024 + kafkaObj := kafka.BlobberMonitoring{ + ID: fmt.Sprintf("%s_%s_%d", sb.blobber.ID, su.allocationObj.ID, uploadSizeInMb), + Operation: "upload", + BlobberId: sb.blobber.ID, + TimeSpent: time.Since(now).Milliseconds(), + Size: uploadSizeInMb, + AllocationId: su.allocationObj.ID, + Count: 1, } kafkaObjStr, err := json.Marshal(kafkaObj) @@ -156,15 +161,8 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( logger.Logger.Error("Error publishing to kafka: ", err) } - fmt.Println(kafkaObjStr) - - //var ( - // BlobberMonitoringKafkaTopic = "blobber_monitoring2" - // BlobberMonitoringKafka = kafka.NewKafkaProvider("91.107.200.12:9092", "admin", "zus-operator", 1*time.Minute) - //) - // - //res := BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, sb.blobber.ID, string(kafkaObjStr)) - //results = append(results, res) + res := kafka.BlobberMonitoringKafka.PublishToKafka(kafka.BlobberMonitoringKafkaTopic, sb.blobber.ID, string(kafkaObjStr)) + results = append(results, res) return err }) From f13181d7d475acd89c4fae0b26b462f9b8bdf013 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Sun, 29 Dec 2024 21:37:58 +0530 Subject: [PATCH 10/25] Fix --- core/client/set.go | 5 +++++ core/conf/config.go | 5 +++++ core/conf/vars.go | 4 ++++ core/kafka/kafka.go | 5 +++-- 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/core/client/set.go b/core/client/set.go index 3eb4d486e..2b86a3b16 100644 --- a/core/client/set.go +++ b/core/client/set.go @@ -279,6 +279,7 @@ func GetClient() *zcncrypto.Wallet { func InitSDK(walletJSON string, blockWorker, chainID, signatureScheme string, nonce int64, addWallet bool, + kafkaHost, kafkaUsername, kafkaPassword, kafkaTopic string, options ...int) error { if addWallet { @@ -318,6 +319,10 @@ func InitSDK(walletJSON string, MinSubmit: minSubmit, ConfirmationChainLength: confirmationChainLength, SharderConsensous: sharderConsensous, + KafkaHost: kafkaHost, + KafkaUsername: kafkaUsername, + KafkaPassword: kafkaPassword, + KafkaTopic: kafkaTopic, }) if err != nil { return err diff --git a/core/conf/config.go b/core/conf/config.go index b3d159f2c..c503d941c 100644 --- a/core/conf/config.go +++ b/core/conf/config.go @@ -78,6 +78,11 @@ type Config struct { SharderConsensous int `json:"sharder_consensous"` ZauthServer string `json:"zauth_server"` V *viper.Viper `json:"-"` + + KafkaHost string `json:"kafka_host"` + KafkaUsername string `json:"kafka_username"` + KafkaPassword string `json:"kafka_password"` + KafkaTopic string `json:"kafka_topic"` } // LoadConfigFile load and parse SDK Config from file diff --git a/core/conf/vars.go b/core/conf/vars.go index 8551c1cd9..bb7b29e62 100644 --- a/core/conf/vars.go +++ b/core/conf/vars.go @@ -39,6 +39,10 @@ func GetClientConfig() (*Config, error) { return cfg, nil } +func GetConfig() *Config { + return cfg +} + // InitClientConfig set global client config func InitClientConfig(c *Config) { onceCfg.Do(func() { diff --git a/core/kafka/kafka.go b/core/kafka/kafka.go index b2e198c6f..c92f5b1c3 100644 --- a/core/kafka/kafka.go +++ b/core/kafka/kafka.go @@ -3,6 +3,7 @@ package kafka import ( "fmt" "github.com/0chain/common/core/logging" + "github.com/0chain/gosdk/core/conf" "log" "sync" "time" @@ -12,8 +13,8 @@ import ( ) var ( - BlobberMonitoringKafkaTopic = "monitor2" - BlobberMonitoringKafka = NewKafkaProvider("", "", "", 1*time.Minute) + BlobberMonitoringKafkaTopic = conf.GetConfig().KafkaTopic + BlobberMonitoringKafka = NewKafkaProvider(conf.GetConfig().KafkaHost, conf.GetConfig().KafkaUsername, conf.GetConfig().KafkaPassword, 1*time.Minute) ) type BlobberMonitoring struct { From a81d15d19dfdc641205d15f885b69a4cceeb2505 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Mon, 30 Dec 2024 03:57:08 +0530 Subject: [PATCH 11/25] Fix --- core/kafka/kafka.go | 15 ++++++++++----- zboxcore/sdk/blockdownloadworker.go | 2 +- zboxcore/sdk/chunked_upload_blobber.go | 2 +- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/core/kafka/kafka.go b/core/kafka/kafka.go index c92f5b1c3..4daa0f03f 100644 --- a/core/kafka/kafka.go +++ b/core/kafka/kafka.go @@ -12,11 +12,6 @@ import ( "go.uber.org/zap" ) -var ( - BlobberMonitoringKafkaTopic = conf.GetConfig().KafkaTopic - BlobberMonitoringKafka = NewKafkaProvider(conf.GetConfig().KafkaHost, conf.GetConfig().KafkaUsername, conf.GetConfig().KafkaPassword, 1*time.Minute) -) - type BlobberMonitoring struct { ID string `json:"id"` Operation string `json:"operation"` @@ -72,6 +67,16 @@ func NewKafkaProvider(host, username, password string, writeTimeout time.Duratio } } +func PublishToKafka(key, message string) chan int64 { + var ( + cfg = conf.GetConfig() + BlobberMonitoringKafkaTopic = cfg.KafkaTopic + BlobberMonitoringKafka = NewKafkaProvider(cfg.KafkaHost, cfg.KafkaUsername, cfg.KafkaPassword, 1*time.Minute) + ) + + return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, key, message) +} + func (k *KafkaProvider) PublishToKafka(topic string, key, message string) chan int64 { k.mutex.RLock() writer, exists := writers[topic] diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index fe34ede93..79e125522 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -210,7 +210,7 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Clien timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Second) defer cancelFunc() - res := kafka.BlobberMonitoringKafka.PublishToKafka(kafka.BlobberMonitoringKafkaTopic, req.blobber.ID, string(kafkaObjStr)) + res := kafka.PublishToKafka(req.blobber.ID, string(kafkaObjStr)) select { case <-res: break diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index 5b8dd01b0..e14b2e8f2 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -161,7 +161,7 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( logger.Logger.Error("Error publishing to kafka: ", err) } - res := kafka.BlobberMonitoringKafka.PublishToKafka(kafka.BlobberMonitoringKafkaTopic, sb.blobber.ID, string(kafkaObjStr)) + res := kafka.PublishToKafka(sb.blobber.ID, string(kafkaObjStr)) results = append(results, res) return err From 558266939fe24b3e06f7a223e6a7adbad7d56043 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Mon, 30 Dec 2024 04:51:11 +0530 Subject: [PATCH 12/25] Fix --- core/kafka/kafka.go | 13 +++++++------ zboxcore/sdk/blockdownloadworker.go | 13 ++++++++----- zboxcore/sdk/chunked_upload_blobber.go | 4 +++- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/core/kafka/kafka.go b/core/kafka/kafka.go index 4daa0f03f..cb9c5651a 100644 --- a/core/kafka/kafka.go +++ b/core/kafka/kafka.go @@ -68,12 +68,13 @@ func NewKafkaProvider(host, username, password string, writeTimeout time.Duratio } func PublishToKafka(key, message string) chan int64 { - var ( - cfg = conf.GetConfig() - BlobberMonitoringKafkaTopic = cfg.KafkaTopic - BlobberMonitoringKafka = NewKafkaProvider(cfg.KafkaHost, cfg.KafkaUsername, cfg.KafkaPassword, 1*time.Minute) - ) - + cfg := conf.GetConfig() + if cfg == nil { + fmt.Println("Failed to get config") + return nil + } + BlobberMonitoringKafkaTopic := cfg.KafkaTopic + BlobberMonitoringKafka := NewKafkaProvider(cfg.KafkaHost, cfg.KafkaUsername, cfg.KafkaPassword, 1*time.Minute) return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, key, message) } diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index 79e125522..4b6645689 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -211,11 +211,14 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Clien defer cancelFunc() res := kafka.PublishToKafka(req.blobber.ID, string(kafkaObjStr)) - select { - case <-res: - break - case <-timeout.Done(): - log.Panic("Timeout to publish event to kafka") + + if res != nil { + select { + case <-res: + break + case <-timeout.Done(): + log.Panic("Timeout to publish event to kafka") + } } dR := downloadResponse{} diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index e14b2e8f2..83ac9cbb0 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -162,7 +162,9 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( } res := kafka.PublishToKafka(sb.blobber.ID, string(kafkaObjStr)) - results = append(results, res) + if res != nil { + results = append(results, res) + } return err }) From 9ee183b411bb4086847a79340c88ba8f992c8870 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Mon, 30 Dec 2024 05:11:35 +0530 Subject: [PATCH 13/25] Debug --- core/kafka/kafka.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/kafka/kafka.go b/core/kafka/kafka.go index cb9c5651a..a7ed2f29e 100644 --- a/core/kafka/kafka.go +++ b/core/kafka/kafka.go @@ -75,6 +75,14 @@ func PublishToKafka(key, message string) chan int64 { } BlobberMonitoringKafkaTopic := cfg.KafkaTopic BlobberMonitoringKafka := NewKafkaProvider(cfg.KafkaHost, cfg.KafkaUsername, cfg.KafkaPassword, 1*time.Minute) + + fmt.Println("Kafka: ", cfg.KafkaTopic) + fmt.Println("Kafka: ", cfg.KafkaHost) + fmt.Println("Kafka: ", cfg.KafkaUsername) + fmt.Println("Kafka: ", cfg.KafkaPassword) + fmt.Println("Key : ", key) + fmt.Println("Message : ", message) + return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, key, message) } From 5534d6cb74caa1fc97c606f7e441c98c827e4f18 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Mon, 30 Dec 2024 05:16:29 +0530 Subject: [PATCH 14/25] Debug --- core/client/set.go | 5 +++++ core/kafka/kafka.go | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/core/client/set.go b/core/client/set.go index 2b86a3b16..693124e91 100644 --- a/core/client/set.go +++ b/core/client/set.go @@ -311,6 +311,11 @@ func InitSDK(walletJSON string, sharderConsensous = options[4] } + fmt.Println("Kafka Host: ", kafkaHost) + fmt.Println("Kafka Username: ", kafkaUsername) + fmt.Println("Kafka Password: ", kafkaPassword) + fmt.Println("Kafka Topic: ", kafkaTopic) + err := Init(context.Background(), conf.Config{ BlockWorker: blockWorker, SignatureScheme: signatureScheme, diff --git a/core/kafka/kafka.go b/core/kafka/kafka.go index a7ed2f29e..197b4197d 100644 --- a/core/kafka/kafka.go +++ b/core/kafka/kafka.go @@ -3,6 +3,7 @@ package kafka import ( "fmt" "github.com/0chain/common/core/logging" + "github.com/0chain/gosdk/core/client" "github.com/0chain/gosdk/core/conf" "log" "sync" @@ -73,6 +74,12 @@ func PublishToKafka(key, message string) chan int64 { fmt.Println("Failed to get config") return nil } + + if !client.IsSDKInitialized() { + fmt.Println("SDK is not initialized") + return nil + } + BlobberMonitoringKafkaTopic := cfg.KafkaTopic BlobberMonitoringKafka := NewKafkaProvider(cfg.KafkaHost, cfg.KafkaUsername, cfg.KafkaPassword, 1*time.Minute) From 33f9d3f79b69b945b07b68a191a001b206145907 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Mon, 30 Dec 2024 05:18:52 +0530 Subject: [PATCH 15/25] Debug --- core/kafka/kafka.go | 27 ------------------------- zboxcore/sdk/blockdownloadworker.go | 2 +- zboxcore/sdk/chunked_upload_blobber.go | 28 +++++++++++++++++++++++++- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/core/kafka/kafka.go b/core/kafka/kafka.go index 197b4197d..c3fac80d8 100644 --- a/core/kafka/kafka.go +++ b/core/kafka/kafka.go @@ -3,8 +3,6 @@ package kafka import ( "fmt" "github.com/0chain/common/core/logging" - "github.com/0chain/gosdk/core/client" - "github.com/0chain/gosdk/core/conf" "log" "sync" "time" @@ -68,31 +66,6 @@ func NewKafkaProvider(host, username, password string, writeTimeout time.Duratio } } -func PublishToKafka(key, message string) chan int64 { - cfg := conf.GetConfig() - if cfg == nil { - fmt.Println("Failed to get config") - return nil - } - - if !client.IsSDKInitialized() { - fmt.Println("SDK is not initialized") - return nil - } - - BlobberMonitoringKafkaTopic := cfg.KafkaTopic - BlobberMonitoringKafka := NewKafkaProvider(cfg.KafkaHost, cfg.KafkaUsername, cfg.KafkaPassword, 1*time.Minute) - - fmt.Println("Kafka: ", cfg.KafkaTopic) - fmt.Println("Kafka: ", cfg.KafkaHost) - fmt.Println("Kafka: ", cfg.KafkaUsername) - fmt.Println("Kafka: ", cfg.KafkaPassword) - fmt.Println("Key : ", key) - fmt.Println("Message : ", message) - - return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, key, message) -} - func (k *KafkaProvider) PublishToKafka(topic string, key, message string) chan int64 { k.mutex.RLock() writer, exists := writers[topic] diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index 4b6645689..db28ab743 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -210,7 +210,7 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Clien timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Second) defer cancelFunc() - res := kafka.PublishToKafka(req.blobber.ID, string(kafkaObjStr)) + res := PublishToKafka(req.blobber.ID, string(kafkaObjStr)) if res != nil { select { diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index 83ac9cbb0..c46f907e9 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/0chain/gosdk/core/conf" "github.com/0chain/gosdk/core/kafka" "io" "log" @@ -29,6 +30,31 @@ import ( "golang.org/x/sync/errgroup" ) +func PublishToKafka(key, message string) chan int64 { + cfg := conf.GetConfig() + if cfg == nil { + fmt.Println("Failed to get config") + return nil + } + + if !client.IsSDKInitialized() { + fmt.Println("SDK is not initialized") + return nil + } + + BlobberMonitoringKafkaTopic := cfg.KafkaTopic + BlobberMonitoringKafka := kafka.NewKafkaProvider(cfg.KafkaHost, cfg.KafkaUsername, cfg.KafkaPassword, 1*time.Minute) + + fmt.Println("Kafka: ", cfg.KafkaTopic) + fmt.Println("Kafka: ", cfg.KafkaHost) + fmt.Println("Kafka: ", cfg.KafkaUsername) + fmt.Println("Kafka: ", cfg.KafkaPassword) + fmt.Println("Key : ", key) + fmt.Println("Message : ", message) + + return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, key, message) +} + // ChunkedUploadBlobber client of blobber's upload type ChunkedUploadBlobber struct { writeMarkerMutex *WriteMarkerMutex @@ -161,7 +187,7 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( logger.Logger.Error("Error publishing to kafka: ", err) } - res := kafka.PublishToKafka(sb.blobber.ID, string(kafkaObjStr)) + res := PublishToKafka(sb.blobber.ID, string(kafkaObjStr)) if res != nil { results = append(results, res) } From 240eacc9c52b66873a0b676c86bc1040b7c61c34 Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Mon, 30 Dec 2024 05:37:21 +0530 Subject: [PATCH 16/25] Fix --- core/client/set.go | 5 ----- core/conf/vars.go | 9 +++++---- zboxcore/sdk/chunked_upload_blobber.go | 13 +++---------- 3 files changed, 8 insertions(+), 19 deletions(-) diff --git a/core/client/set.go b/core/client/set.go index 693124e91..2b86a3b16 100644 --- a/core/client/set.go +++ b/core/client/set.go @@ -311,11 +311,6 @@ func InitSDK(walletJSON string, sharderConsensous = options[4] } - fmt.Println("Kafka Host: ", kafkaHost) - fmt.Println("Kafka Username: ", kafkaUsername) - fmt.Println("Kafka Password: ", kafkaPassword) - fmt.Println("Kafka Topic: ", kafkaTopic) - err := Init(context.Background(), conf.Config{ BlockWorker: blockWorker, SignatureScheme: signatureScheme, diff --git a/core/conf/vars.go b/core/conf/vars.go index bb7b29e62..67d3592bd 100644 --- a/core/conf/vars.go +++ b/core/conf/vars.go @@ -39,10 +39,6 @@ func GetClientConfig() (*Config, error) { return cfg, nil } -func GetConfig() *Config { - return cfg -} - // InitClientConfig set global client config func InitClientConfig(c *Config) { onceCfg.Do(func() { @@ -60,6 +56,11 @@ func InitClientConfig(c *Config) { cfg.MinSubmit = DefaultMinSubmit } }) + + cfg.KafkaTopic = c.KafkaTopic + cfg.KafkaHost = c.KafkaHost + cfg.KafkaUsername = c.KafkaUsername + cfg.KafkaPassword = c.KafkaPassword } // Deprecated: Use client.Init() function. To normalize urls, use network.NormalizeURLs() method diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index c46f907e9..fe6288a2b 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -31,9 +31,9 @@ import ( ) func PublishToKafka(key, message string) chan int64 { - cfg := conf.GetConfig() - if cfg == nil { - fmt.Println("Failed to get config") + cfg, err := conf.GetClientConfig() + if err != nil { + fmt.Println("Error getting client config: ", err) return nil } @@ -45,13 +45,6 @@ func PublishToKafka(key, message string) chan int64 { BlobberMonitoringKafkaTopic := cfg.KafkaTopic BlobberMonitoringKafka := kafka.NewKafkaProvider(cfg.KafkaHost, cfg.KafkaUsername, cfg.KafkaPassword, 1*time.Minute) - fmt.Println("Kafka: ", cfg.KafkaTopic) - fmt.Println("Kafka: ", cfg.KafkaHost) - fmt.Println("Kafka: ", cfg.KafkaUsername) - fmt.Println("Kafka: ", cfg.KafkaPassword) - fmt.Println("Key : ", key) - fmt.Println("Message : ", message) - return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, key, message) } From 4cba83a0edd4b596872dbfeddd3b0b92eb12368d Mon Sep 17 00:00:00 2001 From: Jayash Satolia Date: Mon, 30 Dec 2024 05:59:58 +0530 Subject: [PATCH 17/25] Record in KB and nano second --- zboxcore/sdk/blockdownloadworker.go | 4 ++-- zboxcore/sdk/chunked_upload_blobber.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index db28ab743..0f5428dc3 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -191,12 +191,12 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Clien return errors.New("response_error", string(respBuf)) } - dnldSizeInMb := int64(len(respBuf)) / 1024 / 1024 + dnldSizeInMb := int64(len(respBuf)) / 1024 kafkaObj := kafka.BlobberMonitoring{ ID: fmt.Sprintf("%s_%s_%d", req.blobber.ID, req.allocationID, dnldSizeInMb), Operation: "upload", BlobberId: req.blobber.ID, - TimeSpent: time.Since(now).Milliseconds(), + TimeSpent: time.Since(now).Nanoseconds(), Size: dnldSizeInMb, AllocationId: req.allocationID, Count: 1, diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index fe6288a2b..9930bbc2a 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -164,12 +164,12 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( break } - uploadSizeInMb := int64(len(dataBuffers[ind].Bytes())) / 1024 / 1024 + uploadSizeInMb := int64(len(dataBuffers[ind].Bytes())) / 1024 kafkaObj := kafka.BlobberMonitoring{ ID: fmt.Sprintf("%s_%s_%d", sb.blobber.ID, su.allocationObj.ID, uploadSizeInMb), Operation: "upload", BlobberId: sb.blobber.ID, - TimeSpent: time.Since(now).Milliseconds(), + TimeSpent: time.Since(now).Nanoseconds(), Size: uploadSizeInMb, AllocationId: su.allocationObj.ID, Count: 1, From cb60894e92ac86e2b6088730dab0691592463e66 Mon Sep 17 00:00:00 2001 From: smaulik13 Date: Wed, 19 Feb 2025 02:12:48 +0530 Subject: [PATCH 18/25] fix --- zboxcore/sdk/blockdownloadworker.go | 6 +++--- zboxcore/sdk/chunked_upload_blobber.go | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index 0f5428dc3..f6773684d 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -4,13 +4,14 @@ import ( "context" "encoding/json" "fmt" - "github.com/0chain/gosdk/core/kafka" "log" "net/http" "sync" "syscall" "time" + "github.com/0chain/gosdk/core/kafka" + "github.com/0chain/errors" "github.com/0chain/gosdk/core/client" "github.com/0chain/gosdk/core/common" @@ -193,8 +194,7 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Clien dnldSizeInMb := int64(len(respBuf)) / 1024 kafkaObj := kafka.BlobberMonitoring{ - ID: fmt.Sprintf("%s_%s_%d", req.blobber.ID, req.allocationID, dnldSizeInMb), - Operation: "upload", + Operation: "download", BlobberId: req.blobber.ID, TimeSpent: time.Since(now).Nanoseconds(), Size: dnldSizeInMb, diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index 9930bbc2a..a170702cc 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -166,7 +166,6 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( uploadSizeInMb := int64(len(dataBuffers[ind].Bytes())) / 1024 kafkaObj := kafka.BlobberMonitoring{ - ID: fmt.Sprintf("%s_%s_%d", sb.blobber.ID, su.allocationObj.ID, uploadSizeInMb), Operation: "upload", BlobberId: sb.blobber.ID, TimeSpent: time.Since(now).Nanoseconds(), From 3e545c6d406274bec8b2fc07c9b77a50944a5f22 Mon Sep 17 00:00:00 2001 From: smaulik13 Date: Thu, 20 Feb 2025 14:11:32 +0530 Subject: [PATCH 19/25] moved kafka publish to common --- zboxcore/sdk/chunked_upload_blobber.go | 18 ------------------ zboxcore/sdk/common.go | 21 +++++++++++++++++++++ 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index a170702cc..568cf9f1e 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -30,24 +30,6 @@ import ( "golang.org/x/sync/errgroup" ) -func PublishToKafka(key, message string) chan int64 { - cfg, err := conf.GetClientConfig() - if err != nil { - fmt.Println("Error getting client config: ", err) - return nil - } - - if !client.IsSDKInitialized() { - fmt.Println("SDK is not initialized") - return nil - } - - BlobberMonitoringKafkaTopic := cfg.KafkaTopic - BlobberMonitoringKafka := kafka.NewKafkaProvider(cfg.KafkaHost, cfg.KafkaUsername, cfg.KafkaPassword, 1*time.Minute) - - return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, key, message) -} - // ChunkedUploadBlobber client of blobber's upload type ChunkedUploadBlobber struct { writeMarkerMutex *WriteMarkerMutex diff --git a/zboxcore/sdk/common.go b/zboxcore/sdk/common.go index 3f9b698b6..f88f56d06 100644 --- a/zboxcore/sdk/common.go +++ b/zboxcore/sdk/common.go @@ -14,6 +14,9 @@ import ( "github.com/0chain/errors" "github.com/0chain/gosdk/constants" + "github.com/0chain/gosdk/core/client" + "github.com/0chain/gosdk/core/conf" + "github.com/0chain/gosdk/core/kafka" "github.com/0chain/gosdk/zboxcore/blockchain" "github.com/0chain/gosdk/zboxcore/fileref" l "github.com/0chain/gosdk/zboxcore/logger" @@ -229,3 +232,21 @@ func (req *subDirRequest) processSubDirectories() error { return nil } + +func PublishToKafka(key, message string) chan int64 { + cfg, err := conf.GetClientConfig() + if err != nil { + fmt.Println("Error getting client config: ", err) + return nil + } + + if !client.IsSDKInitialized() { + fmt.Println("SDK is not initialized") + return nil + } + + BlobberMonitoringKafkaTopic := cfg.KafkaTopic + BlobberMonitoringKafka := kafka.NewKafkaProvider(cfg.KafkaHost, cfg.KafkaUsername, cfg.KafkaPassword, 1*time.Minute) + + return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, key, message) +} From 1d0422e46b9592aaab1c8bb6dcd8338f8772c02e Mon Sep 17 00:00:00 2001 From: smaulik13 Date: Thu, 20 Feb 2025 19:12:52 +0530 Subject: [PATCH 20/25] added kakfa in params --- core/client/set.go | 9 +++++++-- mobilesdk/sdk/sdk.go | 9 +++++++++ wasmsdk/sdk.go | 6 +++++- zboxcore/sdk/chunked_upload_blobber.go | 4 ++-- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/core/client/set.go b/core/client/set.go index 44212a490..df05d73bf 100644 --- a/core/client/set.go +++ b/core/client/set.go @@ -46,11 +46,15 @@ type InitSdkOptions struct { AddWallet bool TxnFee *int MinConfirmation *int - MinSubmit *int ConfirmationChainLength *int + MinSubmit *int SharderConsensous *int ZboxHost string ZboxAppType string + KafkaHost string + KafkaUsername string + KafkaPassword string + KafkaTopic string } func init() { @@ -385,7 +389,8 @@ func InitSDK(walletJSON string, } func InitSDKWithWebApp(params InitSdkOptions) error { - err := InitSDK(params.WalletJSON, params.BlockWorker, params.ChainID, params.SignatureScheme, params.Nonce, params.AddWallet, *params.MinConfirmation, *params.MinSubmit, *params.ConfirmationChainLength, *params.SharderConsensous) + err := InitSDK(params.WalletJSON, params.BlockWorker, params.ChainID, params.SignatureScheme, params.Nonce, params.AddWallet, params.KafkaHost, params.KafkaUsername, params.KafkaPassword, params.KafkaTopic, + *params.MinConfirmation, *params.MinSubmit, *params.ConfirmationChainLength, *params.SharderConsensous) if err != nil { return err } diff --git a/mobilesdk/sdk/sdk.go b/mobilesdk/sdk/sdk.go index f5dd299c8..56cf1b002 100644 --- a/mobilesdk/sdk/sdk.go +++ b/mobilesdk/sdk/sdk.go @@ -44,6 +44,11 @@ type ChainConfig struct { ZboxHost string `json:"zbox_host"` // ZboxAppType app type name ZboxAppType string `json:"zbox_app_type"` + //kafka details + KafkaHost string `json:"kafka_host"` + KafkaUsername string `json:"kafka_username"` + KafkaPassword string `json:"kafka_password"` + KafkaTopic string `json:"kafka_topic"` } // StorageSDK - storage SDK config @@ -134,6 +139,10 @@ func InitStorageSDK(clientJson string, configJson string) (*StorageSDK, error) { AddWallet: true, ZboxHost: configObj.ZboxHost, ZboxAppType: configObj.ZboxAppType, + KafkaHost: configObj.KafkaHost, + KafkaUsername: configObj.KafkaUsername, + KafkaPassword: configObj.KafkaPassword, + KafkaTopic: configObj.KafkaTopic, } if err = client.InitSDKWithWebApp(params); err != nil { diff --git a/wasmsdk/sdk.go b/wasmsdk/sdk.go index 07f9974ad..63bf2df33 100644 --- a/wasmsdk/sdk.go +++ b/wasmsdk/sdk.go @@ -34,7 +34,7 @@ var CreateObjectURL func(buf []byte, mimeType string) string // - sharderconsensous is the number of sharders to reach consensus func initSDKs(chainID, blockWorker, signatureScheme string, minConfirmation, minSubmit, confirmationChainLength int, - zboxHost, zboxAppType string, sharderConsensous int) error { + zboxHost, zboxAppType string, sharderConsensous int, kafkaHost, kafkaUsername, kafkaPassword, kafkaTopic string) error { // Print the parameters beautified fmt.Printf("{ chainID: %s, blockWorker: %s, signatureScheme: %s, minConfirmation: %d, minSubmit: %d, confirmationChainLength: %d, zboxHost: %s, zboxAppType: %s, sharderConsensous: %d }\n", chainID, blockWorker, signatureScheme, minConfirmation, minSubmit, confirmationChainLength, zboxHost, zboxAppType, sharderConsensous) @@ -54,6 +54,10 @@ func initSDKs(chainID, blockWorker, signatureScheme string, ConfirmationChainLength: &confirmationChainLength, ZboxHost: zboxHost, ZboxAppType: zboxAppType, + KafkaHost: kafkaHost, + KafkaUsername: kafkaUsername, + KafkaPassword: kafkaPassword, + KafkaTopic: kafkaTopic, } err := client.InitSDKWithWebApp(params) diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index 568cf9f1e..8059fe806 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -5,8 +5,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/0chain/gosdk/core/conf" - "github.com/0chain/gosdk/core/kafka" "io" "log" "mime/multipart" @@ -15,6 +13,8 @@ import ( "syscall" "time" + "github.com/0chain/gosdk/core/kafka" + "github.com/0chain/errors" thrown "github.com/0chain/errors" "github.com/0chain/gosdk/constants" From 6957a4c30ddcc3e78fefa8280ed0da1f3c4e750f Mon Sep 17 00:00:00 2001 From: smaulik13 Date: Tue, 25 Feb 2025 20:35:16 +0530 Subject: [PATCH 21/25] added blobber mon changes --- core/kafka/kafka.go | 159 ------------------------- zboxcore/sdk/allocation.go | 35 ++++-- zboxcore/sdk/blockdownloadworker.go | 35 ++---- zboxcore/sdk/chunked_upload_blobber.go | 35 ++---- zboxcore/sdk/common.go | 21 ---- 5 files changed, 46 insertions(+), 239 deletions(-) delete mode 100644 core/kafka/kafka.go diff --git a/core/kafka/kafka.go b/core/kafka/kafka.go deleted file mode 100644 index c3fac80d8..000000000 --- a/core/kafka/kafka.go +++ /dev/null @@ -1,159 +0,0 @@ -package kafka - -import ( - "fmt" - "github.com/0chain/common/core/logging" - "log" - "sync" - "time" - - "github.com/IBM/sarama" - "go.uber.org/zap" -) - -type BlobberMonitoring struct { - ID string `json:"id"` - Operation string `json:"operation"` - BlobberId string `json:"blobber_id"` - AllocationId string `json:"allocation_id"` - Size int64 `json:"size"` - TimeSpent int64 `json:"time_spent"` - Count int `json:"count"` -} - -type ProviderI interface { - PublishToKafka(topic string, key, message []byte) chan int64 - ReconnectWriter(topic string) error - CloseWriter(topic string) error - CloseAllWriters() error -} - -type KafkaProvider struct { - Host string - WriteTimeout time.Duration - Config *sarama.Config - mutex sync.RWMutex // Mutex for synchronizing access to writers map -} - -// map of kafka writers for each topic -var writers map[string]sarama.AsyncProducer - -func init() { - writers = make(map[string]sarama.AsyncProducer) -} - -func NewKafkaProvider(host, username, password string, writeTimeout time.Duration) *KafkaProvider { - log.Println("Initializing Kafka provider", zap.String("host", host)) - - config := sarama.NewConfig() - config.Net.SASL.Enable = true - config.Net.SASL.User = username - config.Net.SASL.Password = password - config.Net.SASL.Mechanism = sarama.SASLTypePlaintext - config.Producer.Return.Successes = true - config.Producer.Return.Errors = true - config.Net.MaxOpenRequests = 1 - config.Producer.Idempotent = true - config.Producer.RequiredAcks = sarama.WaitForAll - config.Producer.Retry.Max = 5 - config.Metadata.AllowAutoTopicCreation = true - config.Producer.MaxMessageBytes = 10 * 1024 * 1024 - - return &KafkaProvider{ - Host: host, - WriteTimeout: writeTimeout, - Config: config, - } -} - -func (k *KafkaProvider) PublishToKafka(topic string, key, message string) chan int64 { - k.mutex.RLock() - writer, exists := writers[topic] - k.mutex.RUnlock() - res := make(chan int64) - if !exists { - k.mutex.Lock() - writer, exists = writers[topic] - if !exists { - writer = k.createKafkaWriter(topic) - writers[topic] = writer - } - k.mutex.Unlock() - } - - msg := &sarama.ProducerMessage{ - Topic: topic, - Key: sarama.ByteEncoder(key), - Value: sarama.ByteEncoder(message), - } - - fmt.Println("Publishing to kafka", zap.String("topic", topic), zap.String("key", key), zap.String("message", message)) - - writer.Input() <- msg - go func() { - r := <-writer.Successes() - res <- r.Offset - }() - return res -} - -func (k *KafkaProvider) ReconnectWriter(topic string) error { - k.mutex.Lock() - defer k.mutex.Unlock() - writer := writers[topic] - if writer == nil { - return fmt.Errorf("no kafka writer found for the topic %v", topic) - } - - if err := writer.Close(); err != nil { - logging.Logger.Error("error closing kafka connection", zap.String("topic", topic), zap.Error(err)) - return fmt.Errorf("error closing kafka connection for topic %v: %v", topic, err) - } - - writers[topic] = k.createKafkaWriter(topic) - return nil -} - -func (k *KafkaProvider) CloseWriter(topic string) error { - k.mutex.Lock() - writer := writers[topic] - k.mutex.Unlock() - - if writer == nil { - return fmt.Errorf("no kafka writer found for the topic %v", topic) - } - - if err := writer.Close(); err != nil { - logging.Logger.Error("error closing kafka connection", zap.Error(err)) - } - - return nil -} - -func (k *KafkaProvider) CloseAllWriters() error { - k.mutex.Lock() - defer k.mutex.Unlock() - - for topic, writer := range writers { - if err := writer.Close(); err != nil { - logging.Logger.Error("error closing kafka connection", zap.String("topic", topic), zap.Error(err)) - } - } - return nil -} - -func (k *KafkaProvider) createKafkaWriter(topic string) sarama.AsyncProducer { - producer, err := sarama.NewAsyncProducer([]string{k.Host}, k.Config) - if err != nil { - logging.Logger.Panic(fmt.Sprintf("Failed to start Sarama producer: %v", err)) - } - - go func() { - for err := range producer.Errors() { - fmt.Println("kafka - failed to write access log entry:", err) - logging.Logger.Panic("kafka - failed to write access log entry:", zap.Error(err)) - } - }() - - return producer -} diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 0d606a533..9dca42de6 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -44,14 +44,16 @@ import ( ) var ( - noBLOBBERS = errors.New("", "No Blobbers set in this allocation") - notInitialized = errors.New("sdk_not_initialized", "Please call InitStorageSDK Init and use GetAllocation to get the allocation object") - IsWasm = false - MultiOpBatchSize = 50 - RepairBatchSize = 50 - Workdir string - logChanMap = make(map[string]chan logEntry) - logMapMutex = &sync.Mutex{} + noBLOBBERS = errors.New("", "No Blobbers set in this allocation") + notInitialized = errors.New("sdk_not_initialized", "Please call InitStorageSDK Init and use GetAllocation to get the allocation object") + IsWasm = false + MultiOpBatchSize = 50 + RepairBatchSize = 50 + Workdir string + logChanMap = make(map[string]chan logEntry) + logMapMutex = &sync.Mutex{} + LogBlobberMonitoring = false + LogBlobberMonitoringChan = make(chan BlobberMonitoring) ) const ( @@ -80,6 +82,17 @@ var GetFileInfo = func(localpath string) (os.FileInfo, error) { return sys.Files.Stat(localpath) } +func SetBlobberMonitoring(val bool) { + LogBlobberMonitoring = val +} + +type BlobberMonitoring struct { + BlobberId string `json:"blobber_id"` + Size int64 `json:"size"` + TimeSpent int64 `json:"time_spent"` + Count int `json:"count"` +} + // BlobberAllocationStats represents the blobber allocation statistics. type BlobberAllocationStats struct { BlobberID string @@ -3429,8 +3442,12 @@ func logWorker(key string, logChan chan logEntry) { sys.Files.StoreLogs(key, string(data)) } } - + func writeLogEntry(blobberURL string, log logEntry) { logChan := getLogChan(blobberURL) logChan <- log } + +func addBlobberMonitoringLog(log BlobberMonitoring) { + LogBlobberMonitoringChan <- log +} diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index d48ccca6d..974edc629 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -4,14 +4,11 @@ import ( "context" "encoding/json" "fmt" - "log" "net/http" "sync" "syscall" "time" - "github.com/0chain/gosdk/core/kafka" - "github.com/0chain/errors" "github.com/0chain/gosdk/core/client" "github.com/0chain/gosdk/core/common" @@ -193,32 +190,14 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Clien } dnldSizeInMb := int64(len(respBuf)) / 1024 - kafkaObj := kafka.BlobberMonitoring{ - Operation: "download", - BlobberId: req.blobber.ID, - TimeSpent: time.Since(now).Nanoseconds(), - Size: dnldSizeInMb, - AllocationId: req.allocationID, - Count: 1, - } - - kafkaObjStr, err := json.Marshal(kafkaObj) - if err != nil { - log.Println("Error publishing to kafka: ", err) - } - - timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Second) - defer cancelFunc() - - res := PublishToKafka(req.blobber.ID, string(kafkaObjStr)) - - if res != nil { - select { - case <-res: - break - case <-timeout.Done(): - log.Panic("Timeout to publish event to kafka") + if LogBlobberMonitoring { + blobberMonitoringlog := BlobberMonitoring{ + BlobberId: req.blobber.ID, + TimeSpent: timeTaken, + Size: dnldSizeInMb, + Count: 1, } + addBlobberMonitoringLog(blobberMonitoringlog) } entry := logEntry{ diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index 8059fe806..1ef61e4b0 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -13,8 +13,6 @@ import ( "syscall" "time" - "github.com/0chain/gosdk/core/kafka" - "github.com/0chain/errors" thrown "github.com/0chain/errors" "github.com/0chain/gosdk/constants" @@ -95,8 +93,10 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( err, shouldContinue = func() (err error, shouldContinue bool) { resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) + now := time.Now() err = zboxutil.FastHttpClient.DoTimeout(req, resp, su.uploadTimeOut) fasthttp.ReleaseRequest(req) + timeTaken := time.Since(now).Milliseconds() if err != nil { logger.Logger.Error("Upload : ", err, " baseurl: ", sb.blobber.Baseurl) if errors.Is(err, fasthttp.ErrConnectionClosed) || errors.Is(err, syscall.EPIPE) { @@ -105,6 +105,17 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( return fmt.Errorf("Error while doing reqeust. Error %s", err), false } + uploadSizeInMb := int64(len(dataBuffers[ind].Bytes())) / 1024 + if LogBlobberMonitoring { + blobberMonitoringlog := BlobberMonitoring{ + BlobberId: sb.blobber.ID, + TimeSpent: timeTaken, + Size: uploadSizeInMb, + Count: 1, + } + addBlobberMonitoringLog(blobberMonitoringlog) + } + if resp.StatusCode() == http.StatusOK { return } @@ -146,26 +157,6 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( break } - uploadSizeInMb := int64(len(dataBuffers[ind].Bytes())) / 1024 - kafkaObj := kafka.BlobberMonitoring{ - Operation: "upload", - BlobberId: sb.blobber.ID, - TimeSpent: time.Since(now).Nanoseconds(), - Size: uploadSizeInMb, - AllocationId: su.allocationObj.ID, - Count: 1, - } - - kafkaObjStr, err := json.Marshal(kafkaObj) - if err != nil { - logger.Logger.Error("Error publishing to kafka: ", err) - } - - res := PublishToKafka(sb.blobber.ID, string(kafkaObjStr)) - if res != nil { - results = append(results, res) - } - return err }) } diff --git a/zboxcore/sdk/common.go b/zboxcore/sdk/common.go index f88f56d06..3f9b698b6 100644 --- a/zboxcore/sdk/common.go +++ b/zboxcore/sdk/common.go @@ -14,9 +14,6 @@ import ( "github.com/0chain/errors" "github.com/0chain/gosdk/constants" - "github.com/0chain/gosdk/core/client" - "github.com/0chain/gosdk/core/conf" - "github.com/0chain/gosdk/core/kafka" "github.com/0chain/gosdk/zboxcore/blockchain" "github.com/0chain/gosdk/zboxcore/fileref" l "github.com/0chain/gosdk/zboxcore/logger" @@ -232,21 +229,3 @@ func (req *subDirRequest) processSubDirectories() error { return nil } - -func PublishToKafka(key, message string) chan int64 { - cfg, err := conf.GetClientConfig() - if err != nil { - fmt.Println("Error getting client config: ", err) - return nil - } - - if !client.IsSDKInitialized() { - fmt.Println("SDK is not initialized") - return nil - } - - BlobberMonitoringKafkaTopic := cfg.KafkaTopic - BlobberMonitoringKafka := kafka.NewKafkaProvider(cfg.KafkaHost, cfg.KafkaUsername, cfg.KafkaPassword, 1*time.Minute) - - return BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, key, message) -} From bff3fb594d346100c83d37283f29bf8ac513da3c Mon Sep 17 00:00:00 2001 From: smaulik13 Date: Thu, 27 Feb 2025 14:54:22 +0530 Subject: [PATCH 22/25] go lint --- zboxcore/sdk/chunked_upload_blobber.go | 1 - 1 file changed, 1 deletion(-) diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index 1ef61e4b0..6e0727cbc 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -81,7 +81,6 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( shouldContinue bool ) var req *fasthttp.Request - now := time.Now() for i := 0; i < 3; i++ { req, err = zboxutil.NewFastUploadRequest( sb.blobber.Baseurl, su.allocationObj.ID, su.allocationObj.Tx, dataBuffers[ind].Bytes(), su.httpMethod, su.allocationObj.Owner) From 65a7395b8ef611af43465a3db7094d91aae81c66 Mon Sep 17 00:00:00 2001 From: smaulik13 Date: Tue, 4 Mar 2025 14:10:43 +0530 Subject: [PATCH 23/25] added logs --- core/client/set.go | 8 ++------ core/conf/config.go | 5 ----- core/conf/vars.go | 5 ----- mobilesdk/sdk/sdk.go | 4 ---- wasmsdk/sdk.go | 4 ---- zboxcore/sdk/allocation.go | 3 ++- zboxcore/sdk/blockdownloadworker.go | 1 + zboxcore/sdk/chunked_upload_blobber.go | 1 + 8 files changed, 6 insertions(+), 25 deletions(-) diff --git a/core/client/set.go b/core/client/set.go index df05d73bf..3be0238fa 100644 --- a/core/client/set.go +++ b/core/client/set.go @@ -291,6 +291,7 @@ func PublicKey(clients ...string) string { } return client.wallets[clients[0]].ClientKey } + return client.wallet.ClientKey } @@ -336,7 +337,6 @@ func GetClient() *zcncrypto.Wallet { func InitSDK(walletJSON string, blockWorker, chainID, signatureScheme string, nonce int64, addWallet bool, - kafkaHost, kafkaUsername, kafkaPassword, kafkaTopic string, options ...int) error { if addWallet { @@ -376,10 +376,6 @@ func InitSDK(walletJSON string, MinSubmit: minSubmit, ConfirmationChainLength: confirmationChainLength, SharderConsensous: sharderConsensous, - KafkaHost: kafkaHost, - KafkaUsername: kafkaUsername, - KafkaPassword: kafkaPassword, - KafkaTopic: kafkaTopic, }) if err != nil { return err @@ -389,7 +385,7 @@ func InitSDK(walletJSON string, } func InitSDKWithWebApp(params InitSdkOptions) error { - err := InitSDK(params.WalletJSON, params.BlockWorker, params.ChainID, params.SignatureScheme, params.Nonce, params.AddWallet, params.KafkaHost, params.KafkaUsername, params.KafkaPassword, params.KafkaTopic, + err := InitSDK(params.WalletJSON, params.BlockWorker, params.ChainID, params.SignatureScheme, params.Nonce, params.AddWallet, *params.MinConfirmation, *params.MinSubmit, *params.ConfirmationChainLength, *params.SharderConsensous) if err != nil { return err diff --git a/core/conf/config.go b/core/conf/config.go index 5fb046c41..4fae43bec 100644 --- a/core/conf/config.go +++ b/core/conf/config.go @@ -78,11 +78,6 @@ type Config struct { SharderConsensous int `json:"sharder_consensous"` ZauthServer string `json:"zauth_server"` V *viper.Viper `json:"-"` - - KafkaHost string `json:"kafka_host"` - KafkaUsername string `json:"kafka_username"` - KafkaPassword string `json:"kafka_password"` - KafkaTopic string `json:"kafka_topic"` } // LoadConfigFile load and parse SDK Config from file diff --git a/core/conf/vars.go b/core/conf/vars.go index dce651a11..e31cbcd8f 100644 --- a/core/conf/vars.go +++ b/core/conf/vars.go @@ -61,11 +61,6 @@ func InitClientConfig(c *Config) { cfg.MinSubmit = DefaultMinSubmit } }) - - cfg.KafkaTopic = c.KafkaTopic - cfg.KafkaHost = c.KafkaHost - cfg.KafkaUsername = c.KafkaUsername - cfg.KafkaPassword = c.KafkaPassword } // Deprecated: Use client.Init() function. To normalize urls, use network.NormalizeURLs() method diff --git a/mobilesdk/sdk/sdk.go b/mobilesdk/sdk/sdk.go index 56cf1b002..e7c1b18f8 100644 --- a/mobilesdk/sdk/sdk.go +++ b/mobilesdk/sdk/sdk.go @@ -139,10 +139,6 @@ func InitStorageSDK(clientJson string, configJson string) (*StorageSDK, error) { AddWallet: true, ZboxHost: configObj.ZboxHost, ZboxAppType: configObj.ZboxAppType, - KafkaHost: configObj.KafkaHost, - KafkaUsername: configObj.KafkaUsername, - KafkaPassword: configObj.KafkaPassword, - KafkaTopic: configObj.KafkaTopic, } if err = client.InitSDKWithWebApp(params); err != nil { diff --git a/wasmsdk/sdk.go b/wasmsdk/sdk.go index 63bf2df33..27f5f7888 100644 --- a/wasmsdk/sdk.go +++ b/wasmsdk/sdk.go @@ -54,10 +54,6 @@ func initSDKs(chainID, blockWorker, signatureScheme string, ConfirmationChainLength: &confirmationChainLength, ZboxHost: zboxHost, ZboxAppType: zboxAppType, - KafkaHost: kafkaHost, - KafkaUsername: kafkaUsername, - KafkaPassword: kafkaPassword, - KafkaTopic: kafkaTopic, } err := client.InitSDKWithWebApp(params) diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 9dca42de6..0f28f6a44 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -88,6 +88,7 @@ func SetBlobberMonitoring(val bool) { type BlobberMonitoring struct { BlobberId string `json:"blobber_id"` + Operation string `json:"operation"` Size int64 `json:"size"` TimeSpent int64 `json:"time_spent"` Count int `json:"count"` @@ -3442,7 +3443,7 @@ func logWorker(key string, logChan chan logEntry) { sys.Files.StoreLogs(key, string(data)) } } - + func writeLogEntry(blobberURL string, log logEntry) { logChan := getLogChan(blobberURL) logChan <- log diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index 974edc629..2659167e2 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -193,6 +193,7 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Clien if LogBlobberMonitoring { blobberMonitoringlog := BlobberMonitoring{ BlobberId: req.blobber.ID, + Operation: "download", TimeSpent: timeTaken, Size: dnldSizeInMb, Count: 1, diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index 6e0727cbc..e8e004431 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -108,6 +108,7 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( if LogBlobberMonitoring { blobberMonitoringlog := BlobberMonitoring{ BlobberId: sb.blobber.ID, + Operation: "upload", TimeSpent: timeTaken, Size: uploadSizeInMb, Count: 1, From c5793f7e914f1fcaad249a73e7539ccad40effcc Mon Sep 17 00:00:00 2001 From: smaulik13 Date: Wed, 5 Mar 2025 19:32:13 +0530 Subject: [PATCH 24/25] convert to mb: --- zboxcore/sdk/blockdownloadworker.go | 3 ++- zboxcore/sdk/chunked_upload_blobber.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index 2659167e2..918d38892 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math" "net/http" "sync" "syscall" @@ -189,7 +190,7 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Clien return errors.New("response_error", string(respBuf)) } - dnldSizeInMb := int64(len(respBuf)) / 1024 + dnldSizeInMb := int64(math.Round(float64((len(respBuf))) / float64((1024 * 1024)))) if LogBlobberMonitoring { blobberMonitoringlog := BlobberMonitoring{ BlobberId: req.blobber.ID, diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index e8e004431..d17efe71f 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "log" + "math" "mime/multipart" "net/http" "strings" @@ -104,7 +105,7 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( return fmt.Errorf("Error while doing reqeust. Error %s", err), false } - uploadSizeInMb := int64(len(dataBuffers[ind].Bytes())) / 1024 + uploadSizeInMb := int64(math.Round(float64((len(dataBuffers[ind].Bytes()))) / float64((1024 * 1024)))) if LogBlobberMonitoring { blobberMonitoringlog := BlobberMonitoring{ BlobberId: sb.blobber.ID, From 3c56e0b431e0edd800fa1a9f0058c07450a4c695 Mon Sep 17 00:00:00 2001 From: smaulik13 Date: Wed, 5 Mar 2025 23:55:37 +0530 Subject: [PATCH 25/25] resolve conflict --- core/client/set.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/client/set.go b/core/client/set.go index 3be0238fa..e38c4619d 100644 --- a/core/client/set.go +++ b/core/client/set.go @@ -385,10 +385,16 @@ func InitSDK(walletJSON string, } func InitSDKWithWebApp(params InitSdkOptions) error { - err := InitSDK(params.WalletJSON, params.BlockWorker, params.ChainID, params.SignatureScheme, params.Nonce, params.AddWallet, - *params.MinConfirmation, *params.MinSubmit, *params.ConfirmationChainLength, *params.SharderConsensous) - if err != nil { - return err + if params.MinConfirmation != nil && params.MinSubmit != nil && params.ConfirmationChainLength != nil && params.SharderConsensous != nil { + err := InitSDK(params.WalletJSON, params.BlockWorker, params.ChainID, params.SignatureScheme, params.Nonce, params.AddWallet, *params.MinConfirmation, *params.MinSubmit, *params.ConfirmationChainLength, *params.SharderConsensous) + if err != nil { + return err + } + } else { + err := InitSDK(params.WalletJSON, params.BlockWorker, params.ChainID, params.SignatureScheme, params.Nonce, params.AddWallet) + if err != nil { + return err + } } conf.SetZboxAppConfigs(params.ZboxHost, params.ZboxAppType) SetIsAppFlow(true)