diff --git a/eth-node/types/mailserver.go b/eth-node/types/mailserver.go index 03a4a390247..3191cd3f17d 100644 --- a/eth-node/types/mailserver.go +++ b/eth-node/types/mailserver.go @@ -35,9 +35,9 @@ type MessagesRequest struct { } type StoreRequestCursor struct { - Digest []byte `json:"digest"` - ReceiverTime float64 `json:"receiverTime"` - SenderTime float64 `json:"senderTime"` + Digest []byte `json:"digest"` + ReceiverTime int64 `json:"receiverTime"` + SenderTime int64 `json:"senderTime"` } // SetDefaults sets the From and To defaults diff --git a/go.mod b/go.mod index 947bf5ebd6c..343891dc409 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a github.com/status-im/doubleratchet v3.0.0+incompatible - github.com/status-im/go-waku v0.0.0-20220218174911-0db40c7de58b + github.com/status-im/go-waku v0.0.0-20220224134018-cdc0c9c69d18 github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 github.com/status-im/markdown v0.0.0-20210405121740-32e5a5055fb6 github.com/status-im/migrate/v4 v4.6.2-status.2 diff --git a/go.sum b/go.sum index baf78e25f00..5f9db57049f 100644 --- a/go.sum +++ b/go.sum @@ -203,6 +203,7 @@ github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7 github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d h1:bE1UyBQ5aE6FjhNY4lbPtMqh7VDldoVkvZMtFEbd+CE= @@ -1142,6 +1143,7 @@ github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a h1:yVNJFSzkEG8sm github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a/go.mod h1:TPq+fcJOdGrkpZpXF4UVmFjYxH0gGqnxdgZ+OzAmvJk= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= @@ -1212,8 +1214,8 @@ github.com/status-im/go-ethereum v1.10.4-status.3 h1:RF618iSCvqJtXu3ZSg7XNg6MJaS github.com/status-im/go-ethereum v1.10.4-status.3/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE= github.com/status-im/go-multiaddr-ethv4 v1.2.1 h1:09v9n6426NAojNOvdgegqrAotgffWW/UPDwrpJ85DNE= github.com/status-im/go-multiaddr-ethv4 v1.2.1/go.mod h1:SlBebvQcSUM5+/R/YfpfMuu5WyraW47XFmIqLYBmlKU= -github.com/status-im/go-waku v0.0.0-20220218174911-0db40c7de58b h1:g8DyQPMeWhHyxNVTyJmdP3Q4vf+WTZ5j7lMe9bPkrpE= -github.com/status-im/go-waku v0.0.0-20220218174911-0db40c7de58b/go.mod h1:dxqyVXn079tFLM50Q4olJGCiHcw52CJeLPesr+ruzQg= +github.com/status-im/go-waku v0.0.0-20220224134018-cdc0c9c69d18 h1:pykYGtdQZemOQaeqmOTYoERuG2CGRUjKrZhP6ThN37I= +github.com/status-im/go-waku v0.0.0-20220224134018-cdc0c9c69d18/go.mod h1:JVJzXmxDWPcSg2CAuVBw0WBMLSpyqtyvv/HnTkYky8U= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4= github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= diff --git a/vendor/github.com/status-im/go-waku/waku/persistence/store.go b/vendor/github.com/status-im/go-waku/waku/persistence/store.go index affc3947b96..78a40114a06 100644 --- a/vendor/github.com/status-im/go-waku/waku/persistence/store.go +++ b/vendor/github.com/status-im/go-waku/waku/persistence/store.go @@ -28,7 +28,7 @@ type DBStore struct { type StoredMessage struct { ID []byte PubsubTopic string - ReceiverTime float64 + ReceiverTime int64 Message *pb.WakuMessage } @@ -93,8 +93,8 @@ func NewDBStore(log *zap.SugaredLogger, options ...DBOption) (*DBStore, error) { func (d *DBStore) createTable() error { sqlStmt := `CREATE TABLE IF NOT EXISTS message ( id BLOB PRIMARY KEY, - receiverTimestamp REAL NOT NULL, - senderTimestamp REAL NOT NULL, + receiverTimestamp INTEGER NOT NULL, + senderTimestamp INTEGER NOT NULL, contentTopic BLOB NOT NULL, pubsubTopic BLOB NOT NULL, payload BLOB, @@ -161,8 +161,8 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) { for rows.Next() { var id []byte - var receiverTimestamp float64 - var senderTimestamp float64 + var receiverTimestamp int64 + var senderTimestamp int64 var contentTopic string var payload []byte var version uint32 diff --git a/vendor/github.com/status-im/go-waku/waku/v2/broadcast.go b/vendor/github.com/status-im/go-waku/waku/v2/broadcast.go index 88d99efb942..f734aafaec3 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/broadcast.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/broadcast.go @@ -7,10 +7,17 @@ import ( // Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 // by Dustin Sallings (c) 2013, which was released under MIT license +type doneCh chan struct{} + +type chOperation struct { + ch chan<- *protocol.Envelope + done doneCh +} + type broadcaster struct { input chan *protocol.Envelope - reg chan chan<- *protocol.Envelope - unreg chan chan<- *protocol.Envelope + reg chan chOperation + unreg chan chOperation outputs map[chan<- *protocol.Envelope]bool } @@ -20,8 +27,12 @@ type broadcaster struct { type Broadcaster interface { // Register a new channel to receive broadcasts Register(chan<- *protocol.Envelope) + // Register a new channel to receive broadcasts and return a channel to wait until this operation is complete + WaitRegister(newch chan<- *protocol.Envelope) doneCh // Unregister a channel so that it no longer receives broadcasts. Unregister(chan<- *protocol.Envelope) + // Unregister a subscriptor channel and return a channel to wait until this operation is done + WaitUnregister(newch chan<- *protocol.Envelope) doneCh // Shut this broadcaster down. Close() // Submit a new object to all subscribers @@ -39,14 +50,23 @@ func (b *broadcaster) run() { select { case m := <-b.input: b.broadcast(m) - case ch, ok := <-b.reg: + case broadcastee, ok := <-b.reg: if ok { - b.outputs[ch] = true + b.outputs[broadcastee.ch] = true + if broadcastee.done != nil { + broadcastee.done <- struct{}{} + } } else { + if broadcastee.done != nil { + broadcastee.done <- struct{}{} + } return } - case ch := <-b.unreg: - delete(b.outputs, ch) + case broadcastee := <-b.unreg: + delete(b.outputs, broadcastee.ch) + if broadcastee.done != nil { + broadcastee.done <- struct{}{} + } } } } @@ -57,8 +77,8 @@ func (b *broadcaster) run() { func NewBroadcaster(buflen int) Broadcaster { b := &broadcaster{ input: make(chan *protocol.Envelope, buflen), - reg: make(chan chan<- *protocol.Envelope), - unreg: make(chan chan<- *protocol.Envelope), + reg: make(chan chOperation), + unreg: make(chan chOperation), outputs: make(map[chan<- *protocol.Envelope]bool), } @@ -67,14 +87,40 @@ func NewBroadcaster(buflen int) Broadcaster { return b } +// Register a subscriptor channel and return a channel to wait until this operation is done +func (b *broadcaster) WaitRegister(newch chan<- *protocol.Envelope) doneCh { + d := make(doneCh) + b.reg <- chOperation{ + ch: newch, + done: d, + } + return d +} + // Register a subscriptor channel func (b *broadcaster) Register(newch chan<- *protocol.Envelope) { - b.reg <- newch + b.reg <- chOperation{ + ch: newch, + done: nil, + } +} + +// Unregister a subscriptor channel and return a channel to wait until this operation is done +func (b *broadcaster) WaitUnregister(newch chan<- *protocol.Envelope) doneCh { + d := make(doneCh) + b.unreg <- chOperation{ + ch: newch, + done: d, + } + return d } // Unregister a subscriptor channel func (b *broadcaster) Unregister(newch chan<- *protocol.Envelope) { - b.unreg <- newch + b.unreg <- chOperation{ + ch: newch, + done: nil, + } } // Closes the broadcaster. Used to stop receiving new subscribers diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/connectedness.go b/vendor/github.com/status-im/go-waku/waku/v2/node/connectedness.go index 62e5499f429..da2a9d1a6e4 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/connectedness.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/connectedness.go @@ -120,7 +120,7 @@ func (w *WakuNode) Status() (isOnline bool, hasHistory bool) { if !hasLightPush && protocol == string(lightpush.LightPushID_v20beta1) { hasLightPush = true } - if !hasStore && protocol == string(store.StoreID_v20beta3) { + if !hasStore && protocol == string(store.StoreID_v20beta4) { hasStore = true } if !hasFilter && protocol == string(filter.FilterID_v20beta1) { diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go index b8f10cdd2e2..24e732b3d5c 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go @@ -486,7 +486,7 @@ func (w *WakuNode) startStore() { case <-w.quit: return case <-ticker.C: - _, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta3), w.log) + _, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta4), w.log) if err == nil { break peerVerif } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_message.pb.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_message.pb.go index 8930d3734fd..f0da3d1d52e 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_message.pb.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_message.pb.go @@ -4,7 +4,6 @@ package pb import ( - encoding_binary "encoding/binary" fmt "fmt" proto "github.com/golang/protobuf/proto" io "io" @@ -27,7 +26,7 @@ type WakuMessage struct { Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` ContentTopic string `protobuf:"bytes,2,opt,name=contentTopic,proto3" json:"contentTopic,omitempty"` Version uint32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` - Timestamp float64 `protobuf:"fixed64,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Timestamp int64 `protobuf:"zigzag64,10,opt,name=timestamp,proto3" json:"timestamp,omitempty"` Proof []byte `protobuf:"bytes,21,opt,name=proof,proto3" json:"proof,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` @@ -88,7 +87,7 @@ func (m *WakuMessage) GetVersion() uint32 { return 0 } -func (m *WakuMessage) GetTimestamp() float64 { +func (m *WakuMessage) GetTimestamp() int64 { if m != nil { return m.Timestamp } @@ -117,11 +116,11 @@ var fileDescriptor_6f0a20862b3bf714 = []byte{ 0x82, 0x71, 0x85, 0x94, 0xb8, 0x78, 0x92, 0xf3, 0xf3, 0x4a, 0x52, 0xf3, 0x4a, 0x42, 0xf2, 0x0b, 0x32, 0x93, 0x25, 0x98, 0x14, 0x18, 0x35, 0x38, 0x83, 0x50, 0xc4, 0x40, 0xba, 0xcb, 0x52, 0x8b, 0x8a, 0x33, 0xf3, 0xf3, 0x24, 0x98, 0x15, 0x18, 0x35, 0x78, 0x83, 0x60, 0x5c, 0x21, 0x19, 0x2e, - 0xce, 0x92, 0xcc, 0xdc, 0xd4, 0xe2, 0x92, 0xc4, 0xdc, 0x02, 0x09, 0x16, 0x05, 0x46, 0x0d, 0xc6, + 0xce, 0x92, 0xcc, 0xdc, 0xd4, 0xe2, 0x92, 0xc4, 0xdc, 0x02, 0x09, 0x2e, 0x05, 0x46, 0x0d, 0xa1, 0x20, 0x84, 0x80, 0x90, 0x08, 0x17, 0x6b, 0x41, 0x51, 0x7e, 0x7e, 0x9a, 0x84, 0x28, 0xd8, 0x4e, 0x08, 0xc7, 0x49, 0xe0, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf1, 0x58, 0x8e, 0x21, 0x89, 0x0d, 0xec, 0x70, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, - 0x0a, 0xc6, 0x91, 0x28, 0xce, 0x00, 0x00, 0x00, + 0x33, 0x1b, 0x64, 0x81, 0xce, 0x00, 0x00, 0x00, } func (m *WakuMessage) Marshal() (dAtA []byte, err error) { @@ -158,10 +157,9 @@ func (m *WakuMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0xaa } if m.Timestamp != 0 { - i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Timestamp)))) + i = encodeVarintWakuMessage(dAtA, i, uint64((uint64(m.Timestamp)<<1)^uint64((m.Timestamp>>63)))) i-- - dAtA[i] = 0x21 + dAtA[i] = 0x50 } if m.Version != 0 { i = encodeVarintWakuMessage(dAtA, i, uint64(m.Version)) @@ -214,7 +212,7 @@ func (m *WakuMessage) Size() (n int) { n += 1 + sovWakuMessage(uint64(m.Version)) } if m.Timestamp != 0 { - n += 9 + n += 1 + sozWakuMessage(uint64(m.Timestamp)) } l = len(m.Proof) if l > 0 { @@ -346,17 +344,27 @@ func (m *WakuMessage) Unmarshal(dAtA []byte) error { break } } - case 4: - if wireType != 1 { + case 10: + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) } var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } } - v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.Timestamp = float64(math.Float64frombits(v)) + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.Timestamp = int64(v) case 21: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Proof", wireType) diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_message.proto b/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_message.proto index 8b689cba4e3..dc79c976368 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_message.proto +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_message.proto @@ -6,6 +6,6 @@ message WakuMessage { bytes payload = 1; string contentTopic = 2; uint32 version = 3; - double timestamp = 4; + sint64 timestamp = 10; bytes proof = 21; } \ No newline at end of file diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_store.pb.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_store.pb.go index 144b92ec7e4..bc12c210d4a 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_store.pb.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_store.pb.go @@ -4,7 +4,6 @@ package pb import ( - encoding_binary "encoding/binary" fmt "fmt" proto "github.com/golang/protobuf/proto" io "io" @@ -75,8 +74,8 @@ func (HistoryResponse_Error) EnumDescriptor() ([]byte, []int) { type Index struct { Digest []byte `protobuf:"bytes,1,opt,name=digest,proto3" json:"digest,omitempty"` - ReceiverTime float64 `protobuf:"fixed64,2,opt,name=receiverTime,proto3" json:"receiverTime,omitempty"` - SenderTime float64 `protobuf:"fixed64,3,opt,name=senderTime,proto3" json:"senderTime,omitempty"` + ReceiverTime int64 `protobuf:"zigzag64,2,opt,name=receiverTime,proto3" json:"receiverTime,omitempty"` + SenderTime int64 `protobuf:"zigzag64,3,opt,name=senderTime,proto3" json:"senderTime,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -122,14 +121,14 @@ func (m *Index) GetDigest() []byte { return nil } -func (m *Index) GetReceiverTime() float64 { +func (m *Index) GetReceiverTime() int64 { if m != nil { return m.ReceiverTime } return 0 } -func (m *Index) GetSenderTime() float64 { +func (m *Index) GetSenderTime() int64 { if m != nil { return m.SenderTime } @@ -250,8 +249,8 @@ type HistoryQuery struct { PubsubTopic string `protobuf:"bytes,2,opt,name=pubsubTopic,proto3" json:"pubsubTopic,omitempty"` ContentFilters []*ContentFilter `protobuf:"bytes,3,rep,name=contentFilters,proto3" json:"contentFilters,omitempty"` PagingInfo *PagingInfo `protobuf:"bytes,4,opt,name=pagingInfo,proto3" json:"pagingInfo,omitempty"` - StartTime float64 `protobuf:"fixed64,5,opt,name=startTime,proto3" json:"startTime,omitempty"` - EndTime float64 `protobuf:"fixed64,6,opt,name=endTime,proto3" json:"endTime,omitempty"` + StartTime int64 `protobuf:"zigzag64,5,opt,name=startTime,proto3" json:"startTime,omitempty"` + EndTime int64 `protobuf:"zigzag64,6,opt,name=endTime,proto3" json:"endTime,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -311,14 +310,14 @@ func (m *HistoryQuery) GetPagingInfo() *PagingInfo { return nil } -func (m *HistoryQuery) GetStartTime() float64 { +func (m *HistoryQuery) GetStartTime() int64 { if m != nil { return m.StartTime } return 0 } -func (m *HistoryQuery) GetEndTime() float64 { +func (m *HistoryQuery) GetEndTime() int64 { if m != nil { return m.EndTime } @@ -476,31 +475,31 @@ var fileDescriptor_ca6891f77a46e680 = []byte{ 0x25, 0x32, 0x4e, 0x63, 0x2c, 0x24, 0x93, 0x7d, 0xd4, 0xea, 0x35, 0x29, 0xe5, 0x4f, 0x8d, 0xee, 0x06, 0x50, 0x1e, 0x44, 0x21, 0x7d, 0xc4, 0xbb, 0x50, 0x09, 0xc5, 0x94, 0x54, 0xea, 0xb0, 0x16, 0x6b, 0xd7, 0xb9, 0xa9, 0xd0, 0x85, 0xba, 0xa4, 0x80, 0xc4, 0x7b, 0x92, 0x17, 0xe2, 0x9a, 0x9c, - 0x42, 0x8b, 0xb5, 0x19, 0xdf, 0xd0, 0xb0, 0x09, 0xa0, 0x28, 0x0a, 0x0d, 0x51, 0xd4, 0xc4, 0x9a, - 0xe2, 0x7e, 0x65, 0x00, 0x67, 0xfe, 0x54, 0x44, 0xd3, 0x41, 0xf4, 0x36, 0xc6, 0x7d, 0xa8, 0x26, - 0xfe, 0x94, 0xce, 0xc5, 0x27, 0xd2, 0x66, 0x25, 0xbe, 0xac, 0xf1, 0x21, 0x54, 0x82, 0x4c, 0xaa, - 0x58, 0x6a, 0xa3, 0x5a, 0xc7, 0xf2, 0x92, 0x89, 0xa7, 0x13, 0x72, 0xd3, 0xc0, 0x27, 0x60, 0x85, - 0x42, 0x52, 0x90, 0x8a, 0x38, 0xd2, 0x66, 0x8d, 0x8e, 0x93, 0x53, 0x2b, 0x07, 0xaf, 0xbf, 0xe8, - 0xf3, 0x15, 0xea, 0x1e, 0x80, 0xb5, 0xd4, 0xb1, 0x0e, 0xd5, 0xe3, 0x6e, 0xef, 0xf9, 0x65, 0x97, - 0xf7, 0xed, 0x1d, 0xac, 0xc1, 0xee, 0xe9, 0x88, 0xeb, 0x82, 0xb9, 0x47, 0xf0, 0x5f, 0x2f, 0x8e, - 0x52, 0x8a, 0xd2, 0x53, 0xf1, 0x2e, 0x25, 0x99, 0xaf, 0x20, 0x98, 0x0b, 0x17, 0x71, 0x22, 0x02, - 0x9d, 0xd9, 0xe2, 0x1b, 0x9a, 0xfb, 0x93, 0x41, 0xfd, 0x99, 0xc8, 0x37, 0x7e, 0xfb, 0x2a, 0x23, - 0x79, 0x8b, 0x2d, 0xa8, 0x25, 0xd9, 0x44, 0x65, 0x93, 0xf9, 0x37, 0x05, 0xfd, 0xcd, 0xba, 0x84, - 0x4f, 0xa1, 0x11, 0xac, 0xfb, 0x28, 0xa7, 0xd8, 0x2a, 0xb6, 0x6b, 0x9d, 0xff, 0xf3, 0x61, 0x36, - 0x12, 0xf0, 0x2d, 0x10, 0x3d, 0x80, 0x64, 0x39, 0xad, 0x53, 0xd2, 0x9b, 0x6a, 0x6c, 0xee, 0x80, - 0xaf, 0x11, 0x78, 0x1f, 0x2c, 0x95, 0xfa, 0x32, 0xd5, 0xf7, 0x29, 0xeb, 0xfb, 0xac, 0x04, 0x74, - 0x60, 0x97, 0xa2, 0x50, 0xf7, 0x2a, 0xba, 0xb7, 0x28, 0xdd, 0x6f, 0x0c, 0xf6, 0xcc, 0x54, 0x9c, - 0x54, 0x12, 0x47, 0x8a, 0xf0, 0x31, 0x54, 0xcd, 0x13, 0x52, 0x4e, 0x41, 0x07, 0xde, 0xcb, 0x9d, - 0x2f, 0xfd, 0xab, 0xec, 0xe5, 0x5c, 0xe7, 0x4b, 0x60, 0x2b, 0x68, 0xf1, 0x9f, 0x41, 0x0f, 0xa1, - 0x4c, 0x52, 0xc6, 0x52, 0xcf, 0xd4, 0xe8, 0xdc, 0xcb, 0xd1, 0xad, 0x00, 0xde, 0x49, 0x0e, 0xf0, - 0x39, 0xe7, 0x3e, 0x82, 0xb2, 0xae, 0xb1, 0x0a, 0xa5, 0xe1, 0x68, 0x78, 0x62, 0xef, 0x20, 0x42, - 0x63, 0x30, 0x7c, 0xd3, 0x7d, 0x31, 0xe8, 0x8f, 0x7b, 0xaf, 0xf9, 0xf9, 0x88, 0xdb, 0xcc, 0xfd, - 0xcc, 0x00, 0x16, 0xbf, 0x73, 0xd6, 0xc3, 0x07, 0x00, 0x92, 0x6e, 0x32, 0x52, 0xe9, 0x58, 0x84, - 0xe6, 0x9e, 0x96, 0x51, 0x06, 0x21, 0x1e, 0x40, 0xf9, 0x26, 0x3f, 0xa2, 0x79, 0x83, 0xf6, 0x5a, - 0x0a, 0x7d, 0x5c, 0x3e, 0x6f, 0xe3, 0x21, 0x54, 0xa5, 0x49, 0x65, 0x66, 0xbb, 0xf3, 0x97, 0xc0, - 0x7c, 0x09, 0x1d, 0xdb, 0xdf, 0x67, 0x4d, 0xf6, 0x63, 0xd6, 0x64, 0xbf, 0x66, 0x4d, 0xf6, 0xe5, - 0x77, 0x73, 0x67, 0x52, 0xd1, 0x7f, 0xc3, 0xa3, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xb7, 0x5c, - 0xa1, 0x0e, 0xb2, 0x03, 0x00, 0x00, + 0x42, 0x8b, 0xb5, 0x91, 0x6f, 0x68, 0xd8, 0x04, 0x50, 0x14, 0x85, 0x86, 0x28, 0x6a, 0x62, 0x4d, + 0x71, 0xbf, 0x32, 0x80, 0x33, 0x7f, 0x2a, 0xa2, 0xe9, 0x20, 0x7a, 0x1b, 0xe3, 0x3e, 0x54, 0x13, + 0x7f, 0x4a, 0xe7, 0xe2, 0x13, 0x69, 0xb3, 0x12, 0x5f, 0xd6, 0xf8, 0x10, 0x2a, 0x41, 0x26, 0x55, + 0x2c, 0xb5, 0x51, 0xad, 0x63, 0x79, 0xc9, 0xc4, 0xd3, 0x09, 0xb9, 0x69, 0xe0, 0x13, 0xb0, 0x42, + 0x21, 0x29, 0x48, 0x45, 0x1c, 0x69, 0xb3, 0x46, 0xc7, 0xc9, 0xa9, 0x95, 0x83, 0xd7, 0x5f, 0xf4, + 0xf9, 0x0a, 0x75, 0x0f, 0xc0, 0x5a, 0xea, 0x58, 0x87, 0xea, 0x71, 0xb7, 0xf7, 0xfc, 0xb2, 0xcb, + 0xfb, 0xf6, 0x0e, 0xd6, 0x60, 0xf7, 0x74, 0xc4, 0x75, 0xc1, 0xdc, 0x23, 0xf8, 0xaf, 0x17, 0x47, + 0x29, 0x45, 0xe9, 0xa9, 0x78, 0x97, 0x92, 0xcc, 0x57, 0x10, 0xcc, 0x85, 0x8b, 0x38, 0x11, 0x81, + 0xce, 0x6c, 0xf1, 0x0d, 0xcd, 0xfd, 0xc9, 0xa0, 0xfe, 0x4c, 0xe4, 0x1b, 0xbf, 0x7d, 0x95, 0x91, + 0xbc, 0xc5, 0x16, 0xd4, 0x92, 0x6c, 0xa2, 0xb2, 0xc9, 0xfc, 0x9b, 0x82, 0xfe, 0x66, 0x5d, 0xc2, + 0xa7, 0xd0, 0x08, 0xd6, 0x7d, 0x94, 0x53, 0x6c, 0x15, 0xdb, 0xb5, 0xce, 0xff, 0xf9, 0x30, 0x1b, + 0x09, 0xf8, 0x16, 0x88, 0x1e, 0x40, 0xb2, 0x9c, 0xd6, 0x29, 0xe9, 0x4d, 0x35, 0x36, 0x77, 0xc0, + 0xd7, 0x08, 0xbc, 0x0f, 0x96, 0x4a, 0x7d, 0x99, 0xea, 0xfb, 0x94, 0xf5, 0x7d, 0x56, 0x02, 0x3a, + 0xb0, 0x4b, 0x51, 0xa8, 0x7b, 0x15, 0xdd, 0x5b, 0x94, 0xee, 0x37, 0x06, 0x7b, 0x66, 0x2a, 0x4e, + 0x2a, 0x89, 0x23, 0x45, 0xf8, 0x18, 0xaa, 0xe6, 0x09, 0x29, 0xa7, 0xa0, 0x03, 0xef, 0xe5, 0xce, + 0x97, 0xfe, 0x55, 0xf6, 0x72, 0xae, 0xf3, 0x25, 0xb0, 0x15, 0xb4, 0xf8, 0xcf, 0xa0, 0x87, 0x50, + 0x26, 0x29, 0x63, 0xa9, 0x67, 0x6a, 0x74, 0xee, 0xe5, 0xe8, 0x56, 0x00, 0xef, 0x24, 0x07, 0xf8, + 0x9c, 0x73, 0x1f, 0x41, 0x59, 0xd7, 0x58, 0x85, 0xd2, 0x70, 0x34, 0x3c, 0xb1, 0x77, 0x10, 0xa1, + 0x31, 0x18, 0xbe, 0xe9, 0xbe, 0x18, 0xf4, 0xc7, 0xbd, 0xd7, 0xfc, 0x7c, 0xc4, 0x6d, 0xe6, 0x7e, + 0x66, 0x00, 0x8b, 0xdf, 0x39, 0xeb, 0xe1, 0x03, 0x00, 0x49, 0x37, 0x19, 0xa9, 0x74, 0x2c, 0x42, + 0x73, 0x4f, 0xcb, 0x28, 0x83, 0x10, 0x0f, 0xa0, 0x7c, 0x93, 0x1f, 0xd1, 0xbc, 0x41, 0x7b, 0x2d, + 0x85, 0x3e, 0x2e, 0x9f, 0xb7, 0xf1, 0x10, 0xaa, 0xd2, 0xa4, 0x32, 0xb3, 0xdd, 0xf9, 0x4b, 0x60, + 0xbe, 0x84, 0x8e, 0xed, 0xef, 0xb3, 0x26, 0xfb, 0x31, 0x6b, 0xb2, 0x5f, 0xb3, 0x26, 0xfb, 0xf2, + 0xbb, 0xb9, 0x33, 0xa9, 0xe8, 0xbf, 0xe1, 0xd1, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x14, 0x8f, + 0x84, 0xdf, 0xb2, 0x03, 0x00, 0x00, } func (m *Index) Marshal() (dAtA []byte, err error) { @@ -528,16 +527,14 @@ func (m *Index) MarshalToSizedBuffer(dAtA []byte) (int, error) { copy(dAtA[i:], m.XXX_unrecognized) } if m.SenderTime != 0 { - i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.SenderTime)))) + i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.SenderTime)<<1)^uint64((m.SenderTime>>63)))) i-- - dAtA[i] = 0x19 + dAtA[i] = 0x18 } if m.ReceiverTime != 0 { - i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.ReceiverTime)))) + i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.ReceiverTime)<<1)^uint64((m.ReceiverTime>>63)))) i-- - dAtA[i] = 0x11 + dAtA[i] = 0x10 } if len(m.Digest) > 0 { i -= len(m.Digest) @@ -657,16 +654,14 @@ func (m *HistoryQuery) MarshalToSizedBuffer(dAtA []byte) (int, error) { copy(dAtA[i:], m.XXX_unrecognized) } if m.EndTime != 0 { - i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.EndTime)))) + i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.EndTime)<<1)^uint64((m.EndTime>>63)))) i-- - dAtA[i] = 0x31 + dAtA[i] = 0x30 } if m.StartTime != 0 { - i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.StartTime)))) + i = encodeVarintWakuStore(dAtA, i, uint64((uint64(m.StartTime)<<1)^uint64((m.StartTime>>63)))) i-- - dAtA[i] = 0x29 + dAtA[i] = 0x28 } if m.PagingInfo != nil { { @@ -842,10 +837,10 @@ func (m *Index) Size() (n int) { n += 1 + l + sovWakuStore(uint64(l)) } if m.ReceiverTime != 0 { - n += 9 + n += 1 + sozWakuStore(uint64(m.ReceiverTime)) } if m.SenderTime != 0 { - n += 9 + n += 1 + sozWakuStore(uint64(m.SenderTime)) } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) @@ -912,10 +907,10 @@ func (m *HistoryQuery) Size() (n int) { n += 1 + l + sovWakuStore(uint64(l)) } if m.StartTime != 0 { - n += 9 + n += 1 + sozWakuStore(uint64(m.StartTime)) } if m.EndTime != 0 { - n += 9 + n += 1 + sozWakuStore(uint64(m.EndTime)) } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) @@ -1042,27 +1037,47 @@ func (m *Index) Unmarshal(dAtA []byte) error { } iNdEx = postIndex case 2: - if wireType != 1 { + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field ReceiverTime", wireType) } var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuStore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } } - v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.ReceiverTime = float64(math.Float64frombits(v)) + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.ReceiverTime = int64(v) case 3: - if wireType != 1 { + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field SenderTime", wireType) } var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuStore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } } - v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.SenderTime = float64(math.Float64frombits(v)) + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.SenderTime = int64(v) default: iNdEx = preIndex skippy, err := skipWakuStore(dAtA[iNdEx:]) @@ -1425,27 +1440,47 @@ func (m *HistoryQuery) Unmarshal(dAtA []byte) error { } iNdEx = postIndex case 5: - if wireType != 1 { + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field StartTime", wireType) } var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuStore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } } - v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.StartTime = float64(math.Float64frombits(v)) + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.StartTime = int64(v) case 6: - if wireType != 1 { + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field EndTime", wireType) } var v uint64 - if (iNdEx + 8) > l { - return io.ErrUnexpectedEOF + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWakuStore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } } - v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) - iNdEx += 8 - m.EndTime = float64(math.Float64frombits(v)) + v = (v >> 1) ^ uint64((int64(v&1)<<63)>>63) + m.EndTime = int64(v) default: iNdEx = preIndex skippy, err := skipWakuStore(dAtA[iNdEx:]) diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_store.proto b/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_store.proto index 7e4c879d3bf..4d940cd87df 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_store.proto +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/pb/waku_store.proto @@ -6,8 +6,8 @@ import "waku_message.proto"; message Index { bytes digest = 1; - double receiverTime = 2; - double senderTime = 3; + sint64 receiverTime = 2; + sint64 senderTime = 3; } message PagingInfo { @@ -28,8 +28,8 @@ message HistoryQuery { string pubsubTopic = 2; repeated ContentFilter contentFilters = 3; PagingInfo pagingInfo = 4; // used for pagination - double startTime = 5; - double endTime = 6; + sint64 startTime = 5; + sint64 endTime = 6; } message HistoryResponse { diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/subscription.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/subscription.go index 4adc0a835d0..b5b310de132 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/subscription.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/subscription.go @@ -8,6 +8,8 @@ import ( // Subscription handles the subscrition to a particular pubsub topic type Subscription struct { + sync.RWMutex + // C is channel used for receiving envelopes C chan *protocol.Envelope @@ -19,14 +21,14 @@ type Subscription struct { // Unsubscribe will close a subscription from a pubsub topic. Will close the message channel func (subs *Subscription) Unsubscribe() { subs.once.Do(func() { - subs.closed = true close(subs.quit) - close(subs.C) - }) + } // IsClosed determine whether a Subscription is still open for receiving messages func (subs *Subscription) IsClosed() bool { + subs.RLock() + defer subs.RUnlock() return subs.closed } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go index abd09b2f284..377c112d25d 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/relay/waku_relay.go @@ -300,9 +300,20 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub * for { select { case <-subscription.quit: - if w.bcaster != nil { - w.bcaster.Unregister(subscription.C) // Remove from broadcast list - } + func() { + subscription.Lock() + defer subscription.Unlock() + + if subscription.closed { + return + } + subscription.closed = true + if w.bcaster != nil { + <-w.bcaster.WaitUnregister(subscription.C) // Remove from broadcast list + } + + close(subscription.C) + }() // TODO: if there are no more relay subscriptions, close the pubsub subscription case msg := <-subChannel: if msg == nil { diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go index 71333111afa..b029c2a011c 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/message_queue.go @@ -28,6 +28,7 @@ func (self *MessageQueue) Push(msg IndexedWakuMessage) error { var k [32]byte copy(k[:], msg.index.Digest) + if _, ok := self.seen[k]; ok { return ErrDuplicatedMessage } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go index e4807cbc772..6b54ab828a1 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go @@ -26,8 +26,8 @@ import ( "github.com/status-im/go-waku/waku/v2/utils" ) -// StoreID_v20beta3 is the current Waku Store protocol identifier -const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta3") +// StoreID_v20beta4 is the current Waku Store protocol identifier +const StoreID_v20beta4 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4") // MaxPageSize is the maximum number of waku messages to return per page const MaxPageSize = 100 @@ -193,8 +193,8 @@ type MessageProvider interface { type Query struct { Topic string ContentTopics []string - StartTime float64 - EndTime float64 + StartTime int64 + EndTime int64 } // Result represents a valid response from a store node @@ -266,7 +266,7 @@ func (store *WakuStore) Start(ctx context.Context) { store.ctx = ctx store.MsgC = make(chan *protocol.Envelope, 1024) - store.h.SetStreamHandlerMatch(StoreID_v20beta3, protocol.PrefixTextMatch(string(StoreID_v20beta3)), store.onRequest) + store.h.SetStreamHandlerMatch(StoreID_v20beta4, protocol.PrefixTextMatch(string(StoreID_v20beta4)), store.onRequest) store.wg.Add(1) go store.storeIncomingMessages(ctx) @@ -296,7 +296,7 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { for _, storedMessage := range storedMessages { idx := &pb.Index{ Digest: storedMessage.ID, - ReceiverTime: float64(storedMessage.ReceiverTime), + ReceiverTime: storedMessage.ReceiverTime, } _ = store.addToMessageQueue(storedMessage.PubsubTopic, idx, storedMessage.Message) @@ -309,21 +309,21 @@ func (store *WakuStore) addToMessageQueue(pubsubTopic string, idx *pb.Index, msg return store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic}) } -func (store *WakuStore) storeMessage(env *protocol.Envelope) { +func (store *WakuStore) storeMessage(env *protocol.Envelope) error { index, err := computeIndex(env) if err != nil { store.log.Error("could not calculate message index", err) - return + return err } err = store.addToMessageQueue(env.PubsubTopic(), index, env.Message()) if err == ErrDuplicatedMessage { - return + return err } if store.msgProvider == nil { metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length()) - return + return err } // TODO: Move this to a separate go routine if DB writes becomes a bottleneck @@ -331,16 +331,17 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) { if err != nil { store.log.Error("could not store message", err) metrics.RecordStoreError(store.ctx, "store_failure") - return + return err } metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length()) + return nil } func (store *WakuStore) storeIncomingMessages(ctx context.Context) { defer store.wg.Done() for envelope := range store.MsgC { - store.storeMessage(envelope) + _ = store.storeMessage(envelope) } } @@ -445,7 +446,7 @@ func WithPeer(p peer.ID) HistoryRequestOption { // to request the message history func WithAutomaticPeerSelection() HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta3), params.s.log) + p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), params.s.log) if err == nil { params.selectedPeer = *p } else { @@ -456,7 +457,7 @@ func WithAutomaticPeerSelection() HistoryRequestOption { func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta3), params.s.log) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), params.s.log) if err == nil { params.selectedPeer = *p } else { @@ -503,7 +504,7 @@ func DefaultOptions() []HistoryRequestOption { func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) { store.log.Info(fmt.Sprintf("Querying message history with peer %s", selectedPeer)) - connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta3) + connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) if err != nil { store.log.Error("Failed to connect to remote peer", err) return nil, err @@ -650,6 +651,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId()) if err == nil { resultChan <- result + return } store.log.Error(fmt.Errorf("resume history with peer %s failed: %w", peer, err)) }() @@ -672,8 +674,8 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c return nil, ErrFailedQuery } -func (store *WakuStore) findLastSeen() float64 { - var lastSeenTime float64 = 0 +func (store *WakuStore) findLastSeen() int64 { + var lastSeenTime int64 = 0 for imsg := range store.messageQueue.Messages() { if imsg.msg.Timestamp > lastSeenTime { lastSeenTime = imsg.msg.Timestamp @@ -682,6 +684,13 @@ func (store *WakuStore) findLastSeen() float64 { return lastSeenTime } +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} + // Resume retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online // messages are stored in the store node's messages field and in the message db // the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message @@ -698,9 +707,9 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList currentTime := utils.GetUnixEpoch() lastSeenTime := store.findLastSeen() - var offset float64 = 200000 + var offset int64 = int64(20 * time.Nanosecond) currentTime = currentTime + offset - lastSeenTime = math.Max(lastSeenTime-offset, 0) + lastSeenTime = max(lastSeenTime-offset, 0) rpc := &pb.HistoryQuery{ PubsubTopic: pubsubTopic, @@ -713,7 +722,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList } if len(peerList) == 0 { - p, err := utils.SelectPeer(store.h, string(StoreID_v20beta3), store.log) + p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), store.log) if err != nil { store.log.Info("Error selecting peer: ", err) return -1, ErrNoPeersAvailable @@ -728,13 +737,16 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList return -1, ErrFailedToResumeHistory } + msgCount := 0 for _, msg := range messages { - store.storeMessage(protocol.NewEnvelope(msg, pubsubTopic)) + if err = store.storeMessage(protocol.NewEnvelope(msg, pubsubTopic)); err == nil { + msgCount++ + } } store.log.Info("Retrieved messages since the last online time: ", len(messages)) - return len(messages), nil + return msgCount, nil } // TODO: queryWithAccounting @@ -748,7 +760,7 @@ func (store *WakuStore) Stop() { } if store.h != nil { - store.h.RemoveStreamHandler(StoreID_v20beta3) + store.h.RemoveStreamHandler(StoreID_v20beta4) } store.wg.Wait() diff --git a/vendor/github.com/status-im/go-waku/waku/v2/utils/enr.go b/vendor/github.com/status-im/go-waku/waku/v2/utils/enr.go index c5366b08247..2ff67ed1d9f 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/utils/enr.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/utils/enr.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" "go.uber.org/zap" @@ -125,7 +126,8 @@ func GetENRandIP(addr ma.Multiaddr, wakuFlags WakuEnrBitfield, privK *ecdsa.Priv } func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) { - peerID, err := peer.IDFromPublicKey(&ECDSAPublicKey{node.Pubkey()}) + pubKey := (*crypto.Secp256k1PublicKey)(node.Pubkey()) + peerID, err := peer.IDFromPublicKey(pubKey) if err != nil { return nil, err } @@ -134,7 +136,8 @@ func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) { } func Multiaddress(node *enode.Node) ([]ma.Multiaddr, error) { - peerID, err := peer.IDFromPublicKey(&ECDSAPublicKey{node.Pubkey()}) + pubKey := (*crypto.Secp256k1PublicKey)(node.Pubkey()) + peerID, err := peer.IDFromPublicKey(pubKey) if err != nil { return nil, err } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/utils/public_key.go b/vendor/github.com/status-im/go-waku/waku/v2/utils/public_key.go deleted file mode 100644 index 937adc49def..00000000000 --- a/vendor/github.com/status-im/go-waku/waku/v2/utils/public_key.go +++ /dev/null @@ -1,79 +0,0 @@ -package utils - -import ( - "crypto/ecdsa" - "crypto/subtle" - "encoding/asn1" - "errors" - "math/big" - - ethcrypto "github.com/ethereum/go-ethereum/crypto" - "github.com/libp2p/go-libp2p-core/crypto" - pb "github.com/libp2p/go-libp2p-core/crypto/pb" - "github.com/minio/sha256-simd" -) - -// Taken from: https://github.com/libp2p/go-libp2p-core/blob/094b0d3f8ba2934339cb35e1a875b11ab6d08839/crypto/ecdsa.go as -// they don't provide a way to set the key -var ErrNilSig = errors.New("sig is nil") - -// ECDSASig holds the r and s values of an ECDSA signature -type ECDSASig struct { - R, S *big.Int -} - -// ECDSAPublicKey is an implementation of an ECDSA public key -type ECDSAPublicKey struct { - pub *ecdsa.PublicKey -} - -// Type returns the key type -func (ePub *ECDSAPublicKey) Type() pb.KeyType { - return pb.KeyType_Secp256k1 -} - -// Raw returns x509 bytes from a public key -func (ePub *ECDSAPublicKey) Raw() ([]byte, error) { - return ethcrypto.CompressPubkey(ePub.pub), nil -} - -// Bytes returns the public key as protobuf bytes -func (ePub *ECDSAPublicKey) Bytes() ([]byte, error) { - return crypto.MarshalPublicKey(ePub) -} - -// Equals compares to public keys -func (ePub *ECDSAPublicKey) Equals(o crypto.Key) bool { - return basicEquals(ePub, o) -} - -// Verify compares data to a signature -func (ePub *ECDSAPublicKey) Verify(data, sigBytes []byte) (bool, error) { - sig := new(ECDSASig) - if _, err := asn1.Unmarshal(sigBytes, sig); err != nil { - return false, err - } - if sig == nil { - return false, ErrNilSig - } - - hash := sha256.Sum256(data) - - return ecdsa.Verify(ePub.pub, hash[:], sig.R, sig.S), nil -} - -func basicEquals(k1, k2 crypto.Key) bool { - if k1.Type() != k2.Type() { - return false - } - - a, err := k1.Raw() - if err != nil { - return false - } - b, err := k2.Raw() - if err != nil { - return false - } - return subtle.ConstantTimeCompare(a, b) == 1 -} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/utils/time.go b/vendor/github.com/status-im/go-waku/waku/v2/utils/time.go index 1b28756e2c1..1634b3edd34 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/utils/time.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/utils/time.go @@ -2,14 +2,13 @@ package utils import "time" -// GetUnixEpoch converts a time into a unix timestamp with the integer part -// representing seconds and the decimal part representing subseconds -func GetUnixEpochFrom(now time.Time) float64 { - return float64(now.UnixNano()) / float64(time.Second) +// GetUnixEpoch converts a time into a unix timestamp with nanoseconds +func GetUnixEpochFrom(now time.Time) int64 { + return now.UnixNano() } // GetUnixEpoch returns the current time in unix timestamp with the integer part // representing seconds and the decimal part representing subseconds -func GetUnixEpoch() float64 { +func GetUnixEpoch() int64 { return GetUnixEpochFrom(time.Now()) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 427c7a422df..d29f526a141 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -453,7 +453,7 @@ github.com/status-im/go-discover/discover/v4wire github.com/status-im/go-discover/discover/v5wire # github.com/status-im/go-multiaddr-ethv4 v1.2.1 github.com/status-im/go-multiaddr-ethv4 -# github.com/status-im/go-waku v0.0.0-20220218174911-0db40c7de58b +# github.com/status-im/go-waku v0.0.0-20220224134018-cdc0c9c69d18 github.com/status-im/go-waku/waku/persistence github.com/status-im/go-waku/waku/try github.com/status-im/go-waku/waku/v2 diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 5008b962420..c76045093a6 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -377,7 +377,7 @@ func (w *Waku) addWakuV2Peers(cfg *Config) { log.Info("peer added successfully", peerID) } - w.addPeers(cfg.StoreNodes, store.StoreID_v20beta3, addToStore) + w.addPeers(cfg.StoreNodes, store.StoreID_v20beta4, addToStore) w.addPeers(cfg.FilterNodes, filter.FilterID_v20beta1, addToStore) w.addPeers(cfg.LightpushNodes, lightpush.LightPushID_v20beta1, addToStore) w.addPeers(cfg.WakuRendezvousNodes, rendezvous.RendezvousID_v001, addToStore) @@ -869,8 +869,8 @@ func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []s } query := store.Query{ - StartTime: float64(from), - EndTime: float64(to), + StartTime: int64(from), + EndTime: int64(to), ContentTopics: strTopics, Topic: relay.DefaultWakuTopic, } @@ -1074,7 +1074,7 @@ func (w *Waku) AddStorePeer(address string) (string, error) { return "", err } - peerID, err := w.node.AddPeer(addr, store.StoreID_v20beta3) + peerID, err := w.node.AddPeer(addr, store.StoreID_v20beta4) if err != nil { return "", err }