Skip to content

Commit

Permalink
chore: filter remove all subscription from a peer that is leaving (#3267
Browse files Browse the repository at this point in the history
)

* waku/waku_filter_v2/protocol.nim keeps track of the filter-client connections in Table[PeerId, Connection]
* waku/waku_filter_v2/protocol.nim starts listening for peer-left events in order to completely remove the previous Connection instance. Also, a new Connection is added when the filter-service starts publishing to its peers.

---------
    
Co-authored-by: NagyZoltanPeter <[email protected]>
  • Loading branch information
Ivansete-status authored Jan 31, 2025
1 parent 1d6ef31 commit 46747fd
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 46 deletions.
3 changes: 1 addition & 2 deletions tests/node/test_wakunode_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ suite "Waku Filter - End to End":
)

discard await wakuFilter.subscriptions.addSubscription(
clientPeerId, filterCriteria.toHashSet(), peerManager
clientPeerId, filterCriteria.toHashSet()
)

let
Expand Down Expand Up @@ -605,7 +605,6 @@ suite "Waku Filter - End to End":
await wakuFilter.subscriptions.addSubscription(
peers[index].switch.peerInfo.peerId,
@[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet(),
peerManager,
)
).isOkOr:
assert false, $error
Expand Down
5 changes: 5 additions & 0 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,11 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
trace "Pruning Peer", Peer = $p
asyncSpawn(pm.switch.disconnect(p))

proc addExtPeerEventHandler*(
pm: PeerManager, eventHandler: PeerEventHandler, eventKind: PeerEventKind
) =
pm.switch.addPeerEventHandler(eventHandler, eventKind)

#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
# Initialization and Constructor #
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
Expand Down
54 changes: 43 additions & 11 deletions waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type WakuFilter* = ref object of LPProtocol
messageCache: TimedCache[string]
peerRequestRateLimiter*: PerPeerRateLimiter
subscriptionsManagerFut: Future[void]
peerConnections: Table[PeerId, Connection]

proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
debug "pinging subscriber", peerId = peerId
Expand Down Expand Up @@ -69,7 +70,7 @@ proc subscribe(
debug "subscribing peer to filter criteria",
peerId = peerId, filterCriteria = filterCriteria

(await wf.subscriptions.addSubscription(peerId, filterCriteria, wf.peerManager)).isOkOr:
(await wf.subscriptions.addSubscription(peerId, filterCriteria)).isOkOr:
return err(FilterSubscribeError.serviceUnavailable(error))

debug "correct subscription", peerId = peerId
Expand Down Expand Up @@ -166,24 +167,40 @@ proc handleSubscribeRequest*(
else:
return FilterSubscribeResponse.ok(request.requestId)

proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
debug "pushing message to subscribed peer", peerId = shortLog(peer)
proc pushToPeer(
wf: WakuFilter, peerId: PeerId, buffer: seq[byte]
): Future[Result[void, string]] {.async.} =
debug "pushing message to subscribed peer", peerId = shortLog(peerId)

if not wf.peerManager.wakuPeerStore.hasPeer(peer, WakuFilterPushCodec):
if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec):
# Check that peer has not been removed from peer store
error "no addresses for peer", peerId = shortLog(peer)
return
error "no addresses for peer", peerId = shortLog(peerId)
return err("no addresses for peer: " & $peerId)

let conn = wf.subscriptions.getConnectionByPeerId(peer).valueOr:
error "could not get connection by peer id", error = $error
return
let conn =
if wf.peerConnections.contains(peerId):
wf.peerConnections[peerId]
else:
## we never pushed a message before, let's dial then
let connRes = await wf.peerManager.dialPeer(peerId, WakuFilterPushCodec)
if connRes.isNone():
## We do not remove this peer, but allow the underlying peer manager
## to do so if it is deemed necessary
error "pushToPeer no connection to peer", peerId = shortLog(peerId)
return err("pushToPeer no connection to peer: " & shortLog(peerId))

let newConn = connRes.get()
wf.peerConnections[peerId] = newConn
newConn

await conn.writeLp(buffer)
debug "published successful", peerId = shortLog(peer), conn
debug "published successful", peerId = shortLog(peerId), conn
waku_service_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"]
)

return ok()

proc pushToPeers(
wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush
) {.async.} =
Expand All @@ -208,11 +225,12 @@ proc pushToPeers(
msg_hash = msgHash

let bufferToPublish = messagePush.encode().buffer
var pushFuts: seq[Future[void]]
var pushFuts: seq[Future[Result[void, string]]]

for peerId in peers:
let pushFut = wf.pushToPeer(peerId, bufferToPublish)
pushFuts.add(pushFut)

await allFutures(pushFuts)

proc maintainSubscriptions*(wf: WakuFilter) {.async.} =
Expand Down Expand Up @@ -324,6 +342,15 @@ proc initProtocolHandler(wf: WakuFilter) =
wf.handler = handler
wf.codec = WakuFilterSubscribeCodec

proc onPeerEventHandler(wf: WakuFilter, peerId: PeerId, event: PeerEvent) {.async.} =
## These events are dispatched nim-libp2p, triggerPeerEvents proc
case event.kind
of Left:
## Drop the previous known connection reference
wf.peerConnections.del(peerId)
else:
discard

proc new*(
T: type WakuFilter,
peerManager: PeerManager,
Expand All @@ -342,6 +369,11 @@ proc new*(
peerRequestRateLimiter: PerPeerRateLimiter(setting: rateLimitSetting),
)

proc peerEventHandler(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} =
wf.onPeerEventHandler(peerId, event)

peerManager.addExtPeerEventHandler(peerEventHandler, PeerEventKind.Left)

wf.initProtocolHandler()
setServiceLimitMetric(WakuFilterSubscribeCodec, rateLimitSetting)
return wf
Expand Down
37 changes: 4 additions & 33 deletions waku/waku_filter_v2/subscriptions.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type

SubscribedPeers* = HashSet[PeerID] # a sequence of peer ids

PeerData* = tuple[lastSeen: Moment, criteriaCount: uint, connection: Connection]
PeerData* = tuple[lastSeen: Moment, criteriaCount: uint]

FilterSubscriptions* = ref object
peersSubscribed*: Table[PeerID, PeerData]
Expand All @@ -46,7 +46,6 @@ proc new*(
maxFilterPeers: uint32 = MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer,
): FilterSubscriptions =
## Create a new filter subscription object
return FilterSubscriptions(
peersSubscribed: initTable[PeerID, PeerData](),
subscriptions: initTable[FilterCriterion, SubscribedPeers](),
Expand Down Expand Up @@ -103,10 +102,6 @@ proc removePeer*(s: FilterSubscriptions, peerId: PeerID) {.async.} =
debug "removePeer",
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId

s.peersSubscribed.withValue(peerId, peerData):
debug "closing connection with peer", peerId = shortLog(peerId)
await peerData.connection.close()

s.peersSubscribed.del(peerId)

debug "removePeer after deletion",
Expand Down Expand Up @@ -146,15 +141,10 @@ proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) =
data.lastSeen = Moment.now()

proc addSubscription*(
s: FilterSubscriptions,
peerId: PeerID,
filterCriteria: FilterCriteria,
peerManager: PeerManager,
s: FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria
): Future[Result[void, string]] {.async.} =
## Add a subscription for a given peer
##
## The peerManager is needed to establish the first Connection through
## /vac/waku/filter-push/2.0.0-beta1

var peerData: ptr PeerData

s.peersSubscribed.withValue(peerId, data):
Expand All @@ -168,17 +158,7 @@ proc addSubscription*(
if cast[uint](s.peersSubscribed.len) >= s.maxPeers:
return err("node has reached maximum number of subscriptions: " & $(s.maxPeers))

let connRes = await peerManager.dialPeer(peerId, WakuFilterPushCodec)
if connRes.isNone():
## We do not remove this peer, but allow the underlying peer manager
## to do so if it is deemed necessary
return err("addSubscription no connection to peer: " & shortLog(peerId))

let newPeerData: PeerData =
(lastSeen: Moment.now(), criteriaCount: 0, connection: connRes.get())

debug "new WakuFilterPushCodec stream", conn = connRes.get()

let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0)
peerData = addr(s.peersSubscribed.mgetOrPut(peerId, newPeerData))

for filterCriterion in filterCriteria:
Expand Down Expand Up @@ -216,14 +196,5 @@ proc removeSubscription*(
do:
return err("Peer has no subscriptions")

proc getConnectionByPeerId*(
s: FilterSubscriptions, peerId: PeerID
): Result[Connection, string] =
if not s.peersSubscribed.hasKey(peerId):
return err("peer not subscribed: " & shortLog(peerId))

let peerData = s.peersSubscribed.getOrDefault(peerId)
return ok(peerData.connection)

proc setSubscriptionTimeout*(s: FilterSubscriptions, newTimeout: Duration) =
s.subscriptionTimeout = newTimeout

0 comments on commit 46747fd

Please sign in to comment.