From caf9741de8f3cf5601dfe18e918ebf72d388d4d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Wed, 12 Feb 2025 21:35:52 +0100 Subject: [PATCH 01/25] chore: add raising exceptions to services setup --- .../connectivity/autonat/service.nim | 4 ++- libp2p/services/autorelayservice.nim | 33 +++++++++++-------- libp2p/services/hpservice.nim | 4 ++- libp2p/services/wildcardresolverservice.nim | 2 +- libp2p/switch.nim | 4 ++- 5 files changed, 30 insertions(+), 17 deletions(-) diff --git a/libp2p/protocols/connectivity/autonat/service.nim b/libp2p/protocols/connectivity/autonat/service.nim index 94698f7494..48fc802e94 100644 --- a/libp2p/protocols/connectivity/autonat/service.nim +++ b/libp2p/protocols/connectivity/autonat/service.nim @@ -198,7 +198,9 @@ proc addressMapper( addrs.add(processedMA) return addrs -method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} = +method setup*( + self: AutonatService, switch: Switch +): Future[bool] {.async: (raises: [CancelledError, CatchableError]).} = self.addressMapper = proc( listenAddrs: seq[MultiAddress] ): Future[seq[MultiAddress]] {.async.} = diff --git a/libp2p/services/autorelayservice.nim b/libp2p/services/autorelayservice.nim index ccaa393222..d39e953a3f 100644 --- a/libp2p/services/autorelayservice.nim +++ b/libp2p/services/autorelayservice.nim @@ -36,7 +36,7 @@ proc isRunning*(self: AutoRelayService): bool = proc addressMapper( self: AutoRelayService, listenAddrs: seq[MultiAddress] -): Future[seq[MultiAddress]] {.async.} = +): Future[seq[MultiAddress]] {.async: (raises: []).} = return concat(toSeq(self.relayAddresses.values)) & listenAddrs proc reserveAndUpdate( @@ -58,7 +58,9 @@ proc reserveAndUpdate( self.onReservation(concat(toSeq(self.relayAddresses.values))) await sleepAsync chronos.seconds(ttl - 30) -method setup*(self: AutoRelayService, switch: Switch): Future[bool] {.async.} = +method setup*( + self: AutoRelayService, switch: Switch +): Future[bool] {.async: (raises: [CancelledError, CatchableError]).} = self.addressMapper = proc( listenAddrs: seq[MultiAddress] ): Future[seq[MultiAddress]] {.async.} = @@ -87,19 +89,24 @@ proc manageBackedOff(self: AutoRelayService, pid: PeerId) {.async.} = self.backingOff.keepItIf(it != pid) self.peerAvailable.fire() -proc innerRun(self: AutoRelayService, switch: Switch) {.async.} = +proc innerRun( + self: AutoRelayService, switch: Switch +) {.async: (raises: [CancelledError]).} = while true: # Remove relayPeers that failed let peers = toSeq(self.relayPeers.keys()) for k in peers: - if self.relayPeers[k].finished(): - self.relayPeers.del(k) - self.relayAddresses.del(k) - if not self.onReservation.isNil(): - self.onReservation(concat(toSeq(self.relayAddresses.values))) - # To avoid ddosing our peers in certain conditions - self.backingOff.add(k) - asyncSpawn self.manageBackedOff(k) + try: + if self.relayPeers[k].finished(): + self.relayPeers.del(k) + self.relayAddresses.del(k) + if not self.onReservation.isNil(): + self.onReservation(concat(toSeq(self.relayAddresses.values))) + # To avoid ddosing our peers in certain conditions + self.backingOff.add(k) + asyncSpawn self.manageBackedOff(k) + except KeyError: + discard # Get all connected relayPeers self.peerAvailable.clear() @@ -115,9 +122,9 @@ proc innerRun(self: AutoRelayService, switch: Switch) {.async.} = break self.relayPeers[relayPid] = self.reserveAndUpdate(relayPid, switch) - if self.relayPeers.len() > 0: + try: await one(toSeq(self.relayPeers.values())) or self.peerAvailable.wait() - else: + except ValueError: await self.peerAvailable.wait() method run*(self: AutoRelayService, switch: Switch) {.async.} = diff --git a/libp2p/services/hpservice.nim b/libp2p/services/hpservice.nim index 272c99b1af..74583ad440 100644 --- a/libp2p/services/hpservice.nim +++ b/libp2p/services/hpservice.nim @@ -89,7 +89,9 @@ proc newConnectedPeerHandler( except CatchableError as err: debug "Hole punching failed during dcutr", err = err.msg -method setup*(self: HPService, switch: Switch): Future[bool] {.async.} = +method setup*( + self: HPService, switch: Switch +): Future[bool] {.async: (raises: [CancelledError, CatchableError]).} = var hasBeenSetup = await procCall Service(self).setup(switch) hasBeenSetup = hasBeenSetup and await self.autonatService.setup(switch) diff --git a/libp2p/services/wildcardresolverservice.nim b/libp2p/services/wildcardresolverservice.nim index 151abd2ae1..3bd213c84e 100644 --- a/libp2p/services/wildcardresolverservice.nim +++ b/libp2p/services/wildcardresolverservice.nim @@ -148,7 +148,7 @@ proc expandWildcardAddresses( method setup*( self: WildcardAddressResolverService, switch: Switch -): Future[bool] {.async.} = +): Future[bool] {.async: (raises: [CancelledError, CatchableError]).} = ## Sets up the `WildcardAddressResolverService`. ## ## This method adds the address mapper to the peer's list of address mappers. diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 4ab9fedd65..781afb0e69 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -64,7 +64,9 @@ type Service* = ref object of RootObj inUse: bool -method setup*(self: Service, switch: Switch): Future[bool] {.base, async.} = +method setup*( + self: Service, switch: Switch +): Future[bool] {.base, async: (raises: [CancelledError, CatchableError]).} = if self.inUse: warn "service setup has already been called" return false From 08fe20e76a6516fb631edcb0187778b23c1ea230 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 13 Feb 2025 11:19:20 +0100 Subject: [PATCH 02/25] chore: add exceptions to stop --- libp2p/protocols/connectivity/autonat/service.nim | 4 +++- libp2p/services/autorelayservice.nim | 4 +++- libp2p/services/hpservice.nim | 4 +++- libp2p/services/wildcardresolverservice.nim | 2 +- libp2p/switch.nim | 4 +++- 5 files changed, 13 insertions(+), 5 deletions(-) diff --git a/libp2p/protocols/connectivity/autonat/service.nim b/libp2p/protocols/connectivity/autonat/service.nim index 48fc802e94..71b622fce2 100644 --- a/libp2p/protocols/connectivity/autonat/service.nim +++ b/libp2p/protocols/connectivity/autonat/service.nim @@ -227,7 +227,9 @@ method run*(self: AutonatService, switch: Switch) {.async, public.} = trace "Running AutonatService" await askConnectedPeers(self, switch) -method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public.} = +method stop*( + self: AutonatService, switch: Switch +): Future[bool] {.public, async: (raises: [CancelledError, CatchableError]).} = info "Stopping AutonatService" let hasBeenStopped = await procCall Service(self).stop(switch) if hasBeenStopped: diff --git a/libp2p/services/autorelayservice.nim b/libp2p/services/autorelayservice.nim index d39e953a3f..c1e15bb108 100644 --- a/libp2p/services/autorelayservice.nim +++ b/libp2p/services/autorelayservice.nim @@ -134,7 +134,9 @@ method run*(self: AutoRelayService, switch: Switch) {.async.} = self.running = true self.runner = self.innerRun(switch) -method stop*(self: AutoRelayService, switch: Switch): Future[bool] {.async.} = +method stop*( + self: AutoRelayService, switch: Switch +): Future[bool] {.public, async: (raises: [CancelledError, CatchableError]).} = let hasBeenStopped = await procCall Service(self).stop(switch) if hasBeenStopped: self.running = false diff --git a/libp2p/services/hpservice.nim b/libp2p/services/hpservice.nim index 74583ad440..b36c1f50e6 100644 --- a/libp2p/services/hpservice.nim +++ b/libp2p/services/hpservice.nim @@ -126,7 +126,9 @@ method setup*( method run*(self: HPService, switch: Switch) {.async, public.} = await self.autonatService.run(switch) -method stop*(self: HPService, switch: Switch): Future[bool] {.async, public.} = +method stop*( + self: HPService, switch: Switch +): Future[bool] {.public, async: (raises: [CancelledError, CatchableError]).} = discard await self.autonatService.stop(switch) if not isNil(self.newConnectedPeerHandler): switch.connManager.removePeerEventHandler( diff --git a/libp2p/services/wildcardresolverservice.nim b/libp2p/services/wildcardresolverservice.nim index 3bd213c84e..2cbc1f036d 100644 --- a/libp2p/services/wildcardresolverservice.nim +++ b/libp2p/services/wildcardresolverservice.nim @@ -181,7 +181,7 @@ method run*(self: WildcardAddressResolverService, switch: Switch) {.async, publi method stop*( self: WildcardAddressResolverService, switch: Switch -): Future[bool] {.async, public.} = +): Future[bool] {.public, async: (raises: [CancelledError, CatchableError]).} = ## Stops the WildcardAddressResolverService. ## ## Handles the shutdown process of the WildcardAddressResolverService for a given switch. diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 781afb0e69..fe6c471362 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -76,7 +76,9 @@ method setup*( method run*(self: Service, switch: Switch) {.base, async.} = doAssert(false, "Not implemented!") -method stop*(self: Service, switch: Switch): Future[bool] {.base, async.} = +method stop*( + self: Service, switch: Switch +): Future[bool] {.base, async: (raises: [CancelledError, CatchableError]).} = if not self.inUse: warn "service is already stopped" return false From d5b2b57317c4fde6d6c208a79569c057bb20231d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 13 Feb 2025 11:30:48 +0100 Subject: [PATCH 03/25] chore: add exceptions list to run method --- libp2p/protocols/connectivity/autonat/service.nim | 4 +++- libp2p/services/autorelayservice.nim | 4 +++- libp2p/services/hpservice.nim | 4 +++- libp2p/services/wildcardresolverservice.nim | 4 +++- libp2p/switch.nim | 4 +++- 5 files changed, 15 insertions(+), 5 deletions(-) diff --git a/libp2p/protocols/connectivity/autonat/service.nim b/libp2p/protocols/connectivity/autonat/service.nim index 71b622fce2..00a6b5b00f 100644 --- a/libp2p/protocols/connectivity/autonat/service.nim +++ b/libp2p/protocols/connectivity/autonat/service.nim @@ -223,7 +223,9 @@ method setup*( switch.peerInfo.addressMappers.add(self.addressMapper) return hasBeenSetup -method run*(self: AutonatService, switch: Switch) {.async, public.} = +method run*( + self: AutonatService, switch: Switch +) {.public, async: (raises: [CancelledError, CatchableError]).} = trace "Running AutonatService" await askConnectedPeers(self, switch) diff --git a/libp2p/services/autorelayservice.nim b/libp2p/services/autorelayservice.nim index c1e15bb108..94b02473db 100644 --- a/libp2p/services/autorelayservice.nim +++ b/libp2p/services/autorelayservice.nim @@ -127,7 +127,9 @@ proc innerRun( except ValueError: await self.peerAvailable.wait() -method run*(self: AutoRelayService, switch: Switch) {.async.} = +method run*( + self: AutoRelayService, switch: Switch +) {.async: (raises: [CancelledError, CatchableError]).} = if self.running: trace "Autorelay is already running" return diff --git a/libp2p/services/hpservice.nim b/libp2p/services/hpservice.nim index b36c1f50e6..15c1d9c090 100644 --- a/libp2p/services/hpservice.nim +++ b/libp2p/services/hpservice.nim @@ -123,7 +123,9 @@ method setup*( self.autonatService.statusAndConfidenceHandler(self.onNewStatusHandler) return hasBeenSetup -method run*(self: HPService, switch: Switch) {.async, public.} = +method run*( + self: HPService, switch: Switch +) {.public, async: (raises: [CancelledError, CatchableError]).} = await self.autonatService.run(switch) method stop*( diff --git a/libp2p/services/wildcardresolverservice.nim b/libp2p/services/wildcardresolverservice.nim index 2cbc1f036d..090cf02410 100644 --- a/libp2p/services/wildcardresolverservice.nim +++ b/libp2p/services/wildcardresolverservice.nim @@ -170,7 +170,9 @@ method setup*( switch.peerInfo.addressMappers.add(self.addressMapper) return hasBeenSetup -method run*(self: WildcardAddressResolverService, switch: Switch) {.async, public.} = +method run*( + self: WildcardAddressResolverService, switch: Switch +) {.public, async: (raises: [CancelledError, CatchableError]).} = ## Runs the WildcardAddressResolverService for a given switch. ## ## It updates the peer information for the provided switch by running the registered address mapper. Any other diff --git a/libp2p/switch.nim b/libp2p/switch.nim index fe6c471362..af01beba5d 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -73,7 +73,9 @@ method setup*( self.inUse = true return true -method run*(self: Service, switch: Switch) {.base, async.} = +method run*( + self: Service, switch: Switch +) {.base, async: (raises: [CancelledError, CatchableError]).} = doAssert(false, "Not implemented!") method stop*( From 68e28667eab4a6d20db4e04852f3026a318a855f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 13 Feb 2025 15:03:48 +0100 Subject: [PATCH 04/25] fix: remove CatchableError --- libp2p/services/autorelayservice.nim | 6 +++--- libp2p/services/hpservice.nim | 6 +++--- libp2p/services/wildcardresolverservice.nim | 6 +++--- libp2p/switch.nim | 8 +++----- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/libp2p/services/autorelayservice.nim b/libp2p/services/autorelayservice.nim index 94b02473db..a58b85ed7e 100644 --- a/libp2p/services/autorelayservice.nim +++ b/libp2p/services/autorelayservice.nim @@ -60,7 +60,7 @@ proc reserveAndUpdate( method setup*( self: AutoRelayService, switch: Switch -): Future[bool] {.async: (raises: [CancelledError, CatchableError]).} = +): Future[bool] {.async: (raises: [CancelledError]).} = self.addressMapper = proc( listenAddrs: seq[MultiAddress] ): Future[seq[MultiAddress]] {.async.} = @@ -129,7 +129,7 @@ proc innerRun( method run*( self: AutoRelayService, switch: Switch -) {.async: (raises: [CancelledError, CatchableError]).} = +) {.async: (raises: [CancelledError]).} = if self.running: trace "Autorelay is already running" return @@ -138,7 +138,7 @@ method run*( method stop*( self: AutoRelayService, switch: Switch -): Future[bool] {.public, async: (raises: [CancelledError, CatchableError]).} = +): Future[bool] {.public, async: (raises: [CancelledError]).} = let hasBeenStopped = await procCall Service(self).stop(switch) if hasBeenStopped: self.running = false diff --git a/libp2p/services/hpservice.nim b/libp2p/services/hpservice.nim index 15c1d9c090..be36752d40 100644 --- a/libp2p/services/hpservice.nim +++ b/libp2p/services/hpservice.nim @@ -91,7 +91,7 @@ proc newConnectedPeerHandler( method setup*( self: HPService, switch: Switch -): Future[bool] {.async: (raises: [CancelledError, CatchableError]).} = +): Future[bool] {.async: (raises: [CancelledError]).} = var hasBeenSetup = await procCall Service(self).setup(switch) hasBeenSetup = hasBeenSetup and await self.autonatService.setup(switch) @@ -125,12 +125,12 @@ method setup*( method run*( self: HPService, switch: Switch -) {.public, async: (raises: [CancelledError, CatchableError]).} = +) {.public, async: (raises: [CancelledError]).} = await self.autonatService.run(switch) method stop*( self: HPService, switch: Switch -): Future[bool] {.public, async: (raises: [CancelledError, CatchableError]).} = +): Future[bool] {.public, async: (raises: [CancelledError]).} = discard await self.autonatService.stop(switch) if not isNil(self.newConnectedPeerHandler): switch.connManager.removePeerEventHandler( diff --git a/libp2p/services/wildcardresolverservice.nim b/libp2p/services/wildcardresolverservice.nim index 090cf02410..fe385b7aaf 100644 --- a/libp2p/services/wildcardresolverservice.nim +++ b/libp2p/services/wildcardresolverservice.nim @@ -148,7 +148,7 @@ proc expandWildcardAddresses( method setup*( self: WildcardAddressResolverService, switch: Switch -): Future[bool] {.async: (raises: [CancelledError, CatchableError]).} = +): Future[bool] {.async: (raises: [CancelledError]).} = ## Sets up the `WildcardAddressResolverService`. ## ## This method adds the address mapper to the peer's list of address mappers. @@ -172,7 +172,7 @@ method setup*( method run*( self: WildcardAddressResolverService, switch: Switch -) {.public, async: (raises: [CancelledError, CatchableError]).} = +) {.public, async: (raises: [CancelledError]).} = ## Runs the WildcardAddressResolverService for a given switch. ## ## It updates the peer information for the provided switch by running the registered address mapper. Any other @@ -183,7 +183,7 @@ method run*( method stop*( self: WildcardAddressResolverService, switch: Switch -): Future[bool] {.public, async: (raises: [CancelledError, CatchableError]).} = +): Future[bool] {.public, async: (raises: [CancelledError]).} = ## Stops the WildcardAddressResolverService. ## ## Handles the shutdown process of the WildcardAddressResolverService for a given switch. diff --git a/libp2p/switch.nim b/libp2p/switch.nim index af01beba5d..84da1c2a28 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -66,21 +66,19 @@ type method setup*( self: Service, switch: Switch -): Future[bool] {.base, async: (raises: [CancelledError, CatchableError]).} = +): Future[bool] {.base, async: (raises: [CancelledError]).} = if self.inUse: warn "service setup has already been called" return false self.inUse = true return true -method run*( - self: Service, switch: Switch -) {.base, async: (raises: [CancelledError, CatchableError]).} = +method run*(self: Service, switch: Switch) {.base, async: (raises: [CancelledError]).} = doAssert(false, "Not implemented!") method stop*( self: Service, switch: Switch -): Future[bool] {.base, async: (raises: [CancelledError, CatchableError]).} = +): Future[bool] {.base, async: (raises: [CancelledError]).} = if not self.inUse: warn "service is already stopped" return false From 66b4cf1d8af65dc38b7fdc20514afc462d7adc3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 13 Feb 2025 15:39:30 +0100 Subject: [PATCH 05/25] chore: more effort --- libp2p/peerinfo.nim | 4 ++-- .../connectivity/autonat/service.nim | 22 ++++++++++--------- libp2p/services/autorelayservice.nim | 2 +- libp2p/services/hpservice.nim | 9 +++++--- libp2p/services/wildcardresolverservice.nim | 2 +- tests/testautonatservice.nim | 20 ++++++++--------- tests/testpeerinfo.nim | 2 +- 7 files changed, 33 insertions(+), 28 deletions(-) diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index ba8f00a083..fe42f75f59 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -22,7 +22,7 @@ type PeerInfoError* = object of LPError AddressMapper* = proc(listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {. - gcsafe, raises: [] + gcsafe, async: (raises: [CancelledError]) .} ## A proc that expected to resolve the listen addresses into dialable addresses PeerInfo* {.public.} = ref object @@ -52,7 +52,7 @@ func shortLog*(p: PeerInfo): auto = chronicles.formatIt(PeerInfo): shortLog(it) -proc update*(p: PeerInfo) {.async.} = +proc update*(p: PeerInfo) {.async: (raises: [CancelledError]).} = # p.addrs.len == 0 overrides addrs only if it is the first time update is being executed or if the field is empty. # p.addressMappers.len == 0 is for when all addressMappers have been removed, # and we wish to have addrs in its initial state, i.e., a copy of listenAddrs. diff --git a/libp2p/protocols/connectivity/autonat/service.nim b/libp2p/protocols/connectivity/autonat/service.nim index 00a6b5b00f..95860c61c2 100644 --- a/libp2p/protocols/connectivity/autonat/service.nim +++ b/libp2p/protocols/connectivity/autonat/service.nim @@ -50,7 +50,7 @@ type StatusAndConfidenceHandler* = proc( networkReachability: NetworkReachability, confidence: Opt[float] - ): Future[void] {.gcsafe, raises: [].} + ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} proc new*( T: typedesc[AutonatService], @@ -79,7 +79,7 @@ proc new*( enableAddressMapper: enableAddressMapper, ) -proc callHandler(self: AutonatService) {.async.} = +proc callHandler(self: AutonatService) {.async: (raises: [CancelledError]).} = if not isNil(self.statusAndConfidenceHandler): await self.statusAndConfidenceHandler(self.networkReachability, self.confidence) @@ -92,7 +92,7 @@ proc doesPeerHaveIncomingConn(switch: Switch, peerId: PeerId): bool = proc handleAnswer( self: AutonatService, ans: NetworkReachability -): Future[bool] {.async.} = +): Future[bool] {.async: (raises: [CancelledError]).} = if ans == Unknown: return @@ -127,7 +127,7 @@ proc handleAnswer( proc askPeer( self: AutonatService, switch: Switch, peerId: PeerId -): Future[NetworkReachability] {.async.} = +): Future[NetworkReachability] {.async: (raises: [CancelledError]).} = logScope: peerId = $peerId @@ -160,7 +160,9 @@ proc askPeer( await switch.peerInfo.update() return ans -proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} = +proc askConnectedPeers( + self: AutonatService, switch: Switch +) {.async: (raises: [CancelledError]).} = trace "Asking peers for reachability" var peers = switch.connectedPeers(Direction.Out) self.rng.shuffle(peers) @@ -181,7 +183,7 @@ proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.asy proc addressMapper( self: AutonatService, peerStore: PeerStore, listenAddrs: seq[MultiAddress] -): Future[seq[MultiAddress]] {.async.} = +): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} = if self.networkReachability != NetworkReachability.Reachable: return listenAddrs @@ -200,10 +202,10 @@ proc addressMapper( method setup*( self: AutonatService, switch: Switch -): Future[bool] {.async: (raises: [CancelledError, CatchableError]).} = +): Future[bool] {.async: (raises: [CancelledError]).} = self.addressMapper = proc( listenAddrs: seq[MultiAddress] - ): Future[seq[MultiAddress]] {.async.} = + ): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} = return await addressMapper(self, switch.peerStore, listenAddrs) info "Setting up AutonatService" @@ -225,13 +227,13 @@ method setup*( method run*( self: AutonatService, switch: Switch -) {.public, async: (raises: [CancelledError, CatchableError]).} = +) {.public, async: (raises: [CancelledError]).} = trace "Running AutonatService" await askConnectedPeers(self, switch) method stop*( self: AutonatService, switch: Switch -): Future[bool] {.public, async: (raises: [CancelledError, CatchableError]).} = +): Future[bool] {.public, async: (raises: [CancelledError]).} = info "Stopping AutonatService" let hasBeenStopped = await procCall Service(self).stop(switch) if hasBeenStopped: diff --git a/libp2p/services/autorelayservice.nim b/libp2p/services/autorelayservice.nim index a58b85ed7e..abbba0d1e7 100644 --- a/libp2p/services/autorelayservice.nim +++ b/libp2p/services/autorelayservice.nim @@ -63,7 +63,7 @@ method setup*( ): Future[bool] {.async: (raises: [CancelledError]).} = self.addressMapper = proc( listenAddrs: seq[MultiAddress] - ): Future[seq[MultiAddress]] {.async.} = + ): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} = return await addressMapper(self, listenAddrs) let hasBeenSetUp = await procCall Service(self).setup(switch) diff --git a/libp2p/services/hpservice.nim b/libp2p/services/hpservice.nim index be36752d40..bc5f5cf177 100644 --- a/libp2p/services/hpservice.nim +++ b/libp2p/services/hpservice.nim @@ -96,8 +96,11 @@ method setup*( hasBeenSetup = hasBeenSetup and await self.autonatService.setup(switch) if hasBeenSetup: - let dcutrProto = Dcutr.new(switch) - switch.mount(dcutrProto) + try: + let dcutrProto = Dcutr.new(switch) + switch.mount(dcutrProto) + except LPError as err: + trace "Failed to mount Dcutr", err = err.msg self.newConnectedPeerHandler = proc(peerId: PeerId, event: PeerEvent) {.async.} = await newConnectedPeerHandler(self, switch, peerId, event) @@ -108,7 +111,7 @@ method setup*( self.onNewStatusHandler = proc( networkReachability: NetworkReachability, confidence: Opt[float] - ) {.async.} = + ) {.async: (raises: [CancelledError]).} = if networkReachability == NetworkReachability.NotReachable and not self.autoRelayService.isRunning(): discard await self.autoRelayService.setup(switch) diff --git a/libp2p/services/wildcardresolverservice.nim b/libp2p/services/wildcardresolverservice.nim index fe385b7aaf..07de23f0d1 100644 --- a/libp2p/services/wildcardresolverservice.nim +++ b/libp2p/services/wildcardresolverservice.nim @@ -161,7 +161,7 @@ method setup*( ## - A `Future[bool]` that resolves to `true` if the setup was successful, otherwise `false`. self.addressMapper = proc( listenAddrs: seq[MultiAddress] - ): Future[seq[MultiAddress]] {.async.} = + ): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} = return expandWildcardAddresses(self.networkInterfaceProvider, listenAddrs) debug "Setting up WildcardAddressResolverService" diff --git a/tests/testautonatservice.nim b/tests/testautonatservice.nim index a61c5f7eff..5738b86b5d 100644 --- a/tests/testautonatservice.nim +++ b/tests/testautonatservice.nim @@ -99,7 +99,7 @@ suite "Autonat Service": proc statusAndConfidenceHandler( networkReachability: NetworkReachability, confidence: Opt[float] - ) {.async.} = + ) {.async: (raises: [CancelledError]).} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() >= 0.3: if not awaiter.finished: @@ -148,7 +148,7 @@ suite "Autonat Service": proc statusAndConfidenceHandler( networkReachability: NetworkReachability, confidence: Opt[float] - ) {.async.} = + ) {.async: (raises: [CancelledError]).} = if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3: if not awaiter.finished: @@ -196,7 +196,7 @@ suite "Autonat Service": proc statusAndConfidenceHandler( networkReachability: NetworkReachability, confidence: Opt[float] - ) {.async.} = + ) {.async: (raises: [CancelledError]).} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: if not awaiter.finished: @@ -241,7 +241,7 @@ suite "Autonat Service": proc statusAndConfidenceHandler( networkReachability: NetworkReachability, confidence: Opt[float] - ) {.async.} = + ) {.async: (raises: [CancelledError]).} = if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3: if not awaiter.finished: @@ -303,7 +303,7 @@ suite "Autonat Service": proc statusAndConfidenceHandler( networkReachability: NetworkReachability, confidence: Opt[float] - ) {.async.} = + ) {.async: (raises: [CancelledError]).} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: if not awaiter.finished: @@ -353,7 +353,7 @@ suite "Autonat Service": proc statusAndConfidenceHandler1( networkReachability: NetworkReachability, confidence: Opt[float] - ) {.async.} = + ) {.async: (raises: [CancelledError]).} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: if not awaiter1.finished: @@ -361,7 +361,7 @@ suite "Autonat Service": proc statusAndConfidenceHandler2( networkReachability: NetworkReachability, confidence: Opt[float] - ) {.async.} = + ) {.async: (raises: [CancelledError]).} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: if not awaiter2.finished: @@ -405,7 +405,7 @@ suite "Autonat Service": proc statusAndConfidenceHandler1( networkReachability: NetworkReachability, confidence: Opt[float] - ) {.async.} = + ) {.async: (raises: [CancelledError]).} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: if not awaiter1.finished: @@ -454,7 +454,7 @@ suite "Autonat Service": proc statusAndConfidenceHandler( networkReachability: NetworkReachability, confidence: Opt[float] - ) {.async.} = + ) {.async: (raises: [CancelledError]).} = if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: if not awaiter.finished: @@ -498,7 +498,7 @@ suite "Autonat Service": proc statusAndConfidenceHandler( networkReachability: NetworkReachability, confidence: Opt[float] - ) {.async.} = + ) {.async: (raises: [CancelledError]).} = fail() check autonatService.networkReachability == NetworkReachability.Unknown diff --git a/tests/testpeerinfo.nim b/tests/testpeerinfo.nim index 87b5f3fccf..38c4b4fa29 100644 --- a/tests/testpeerinfo.nim +++ b/tests/testpeerinfo.nim @@ -74,7 +74,7 @@ suite "PeerInfo": ] multiAddresses2 = @[MultiAddress.init("/ip4/8.8.8.8/tcp/33").tryGet()] - proc addressMapper(input: seq[MultiAddress]): Future[seq[MultiAddress]] {.async.} = + proc addressMapper(input: seq[MultiAddress]): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} = check input == multiAddresses await sleepAsync(0.seconds) return multiAddresses2 From 5a72a86315110293b7298349c3091483caf0a4c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 13 Feb 2025 15:45:13 +0100 Subject: [PATCH 06/25] style: fix --- tests/testpeerinfo.nim | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/testpeerinfo.nim b/tests/testpeerinfo.nim index 38c4b4fa29..7421d6de2f 100644 --- a/tests/testpeerinfo.nim +++ b/tests/testpeerinfo.nim @@ -74,7 +74,9 @@ suite "PeerInfo": ] multiAddresses2 = @[MultiAddress.init("/ip4/8.8.8.8/tcp/33").tryGet()] - proc addressMapper(input: seq[MultiAddress]): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} = + proc addressMapper( + input: seq[MultiAddress] + ): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} = check input == multiAddresses await sleepAsync(0.seconds) return multiAddresses2 From 331ebab934c3725954725bdcd1b94c4d6ce11233 Mon Sep 17 00:00:00 2001 From: vladopajic Date: Thu, 13 Feb 2025 16:19:21 +0100 Subject: [PATCH 07/25] chore: discard to raiseAssert MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: richΛrd --- libp2p/services/autorelayservice.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/services/autorelayservice.nim b/libp2p/services/autorelayservice.nim index abbba0d1e7..509f4d9bee 100644 --- a/libp2p/services/autorelayservice.nim +++ b/libp2p/services/autorelayservice.nim @@ -106,7 +106,7 @@ proc innerRun( self.backingOff.add(k) asyncSpawn self.manageBackedOff(k) except KeyError: - discard + raiseAssert "checked with in" # Get all connected relayPeers self.peerAvailable.clear() From 3e999f60ea7f1dbf1318a48278ce31095759f913 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 13 Feb 2025 16:23:09 +0100 Subject: [PATCH 08/25] chore: add if --- libp2p/services/autorelayservice.nim | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/libp2p/services/autorelayservice.nim b/libp2p/services/autorelayservice.nim index 509f4d9bee..5797e30aec 100644 --- a/libp2p/services/autorelayservice.nim +++ b/libp2p/services/autorelayservice.nim @@ -122,9 +122,12 @@ proc innerRun( break self.relayPeers[relayPid] = self.reserveAndUpdate(relayPid, switch) - try: - await one(toSeq(self.relayPeers.values())) or self.peerAvailable.wait() - except ValueError: + if self.relayPeers.len() > 0: + try: + await one(toSeq(self.relayPeers.values())) or self.peerAvailable.wait() + except ValueError: + raiseAssert "checked with relayPeers.len()" + else: await self.peerAvailable.wait() method run*( From 0b53b9d4392ea87d7c2ac8d6cdc5688d14ebdedb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 13 Feb 2025 16:27:10 +0100 Subject: [PATCH 09/25] style: fix --- libp2p/services/autorelayservice.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/services/autorelayservice.nim b/libp2p/services/autorelayservice.nim index 5797e30aec..ea6a834724 100644 --- a/libp2p/services/autorelayservice.nim +++ b/libp2p/services/autorelayservice.nim @@ -124,9 +124,9 @@ proc innerRun( if self.relayPeers.len() > 0: try: - await one(toSeq(self.relayPeers.values())) or self.peerAvailable.wait() + await one(toSeq(self.relayPeers.values())) or self.peerAvailable.wait() except ValueError: - raiseAssert "checked with relayPeers.len()" + raiseAssert "checked with relayPeers.len()" else: await self.peerAvailable.wait() From 49e88dfaa7ef9220630d09d58ca7a5a4df89ddae Mon Sep 17 00:00:00 2001 From: vladopajic Date: Fri, 14 Feb 2025 11:11:08 +0100 Subject: [PATCH 10/25] chore: change log level MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: richΛrd --- libp2p/services/hpservice.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/services/hpservice.nim b/libp2p/services/hpservice.nim index bc5f5cf177..e7ea276e2d 100644 --- a/libp2p/services/hpservice.nim +++ b/libp2p/services/hpservice.nim @@ -100,7 +100,7 @@ method setup*( let dcutrProto = Dcutr.new(switch) switch.mount(dcutrProto) except LPError as err: - trace "Failed to mount Dcutr", err = err.msg + error "Failed to mount Dcutr", err = err.msg self.newConnectedPeerHandler = proc(peerId: PeerId, event: PeerEvent) {.async.} = await newConnectedPeerHandler(self, switch, peerId, event) From bb07f60ea1dc8c64dcc4a1a57ad3d91c56b2d520 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 20 Feb 2025 11:43:42 +0100 Subject: [PATCH 11/25] chore: add raised errors to dialer --- libp2p/dial.nim | 17 ++++++---- libp2p/dialer.nim | 79 ++++++++++++++++++++++++++++------------------- 2 files changed, 58 insertions(+), 38 deletions(-) diff --git a/libp2p/dial.nim b/libp2p/dial.nim index 9d5012e321..65b2f82239 100644 --- a/libp2p/dial.nim +++ b/libp2p/dial.nim @@ -15,7 +15,10 @@ import peerid, stream/connection, transports/transport export results -type Dial* = ref object of RootObj +type + Dial* = ref object of RootObj + + DialFailedError* = object of LPError method connect*( self: Dial, @@ -24,7 +27,7 @@ method connect*( forceDial = false, reuseConnection = true, dir = Direction.Out, -) {.async, base.} = +) {.base, async: (raises: [DialFailedError, CancelledError]).} = ## connect remote peer without negotiating ## a protocol ## @@ -33,14 +36,14 @@ method connect*( method connect*( self: Dial, address: MultiAddress, allowUnknownPeerId = false -): Future[PeerId] {.async, base.} = +): Future[PeerId] {.base, async: (raises: [DialFailedError, CancelledError]).} = ## Connects to a peer and retrieve its PeerId doAssert(false, "Not implemented!") method dial*( self: Dial, peerId: PeerId, protos: seq[string] -): Future[Connection] {.async, base.} = +): Future[Connection] {.base, async: (raises: [DialFailedError, CancelledError]).} = ## create a protocol stream over an ## existing connection ## @@ -53,7 +56,7 @@ method dial*( addrs: seq[MultiAddress], protos: seq[string], forceDial = false, -): Future[Connection] {.async, base.} = +): Future[Connection] {.base, async: (raises: [DialFailedError, CancelledError]).} = ## create a protocol stream and establish ## a connection if one doesn't exist already ## @@ -65,5 +68,7 @@ method addTransport*(self: Dial, transport: Transport) {.base.} = method tryDial*( self: Dial, peerId: PeerId, addrs: seq[MultiAddress] -): Future[Opt[MultiAddress]] {.async, base.} = +): Future[Opt[MultiAddress]] {. + base, async: (raises: [DialFailedError, CancelledError]) +.} = doAssert(false, "Not implemented!") diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 81a7c83fd0..2a7bbff32b 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -36,16 +36,13 @@ declareCounter(libp2p_total_dial_attempts, "total attempted dials") declareCounter(libp2p_successful_dials, "dialed successful peers") declareCounter(libp2p_failed_dials, "failed dials") -type - DialFailedError* = object of LPError - - Dialer* = ref object of Dial - localPeerId*: PeerId - connManager: ConnManager - dialLock: Table[PeerId, AsyncLock] - transports: seq[Transport] - peerStore: PeerStore - nameResolver: NameResolver +type Dialer* = ref object of Dial + localPeerId*: PeerId + connManager: ConnManager + dialLock: Table[PeerId, AsyncLock] + transports: seq[Transport] + peerStore: PeerStore + nameResolver: NameResolver proc dialAndUpgrade( self: Dialer, @@ -53,7 +50,7 @@ proc dialAndUpgrade( hostname: string, address: MultiAddress, dir = Direction.Out, -): Future[Muxer] {.async.} = +): Future[Muxer] {.async: (raises: [CancelledError]).} = for transport in self.transports: # for each transport if transport.handles(address): # check if it can dial it trace "Dialing address", address, peerId = peerId.get(default(PeerId)), hostname @@ -105,7 +102,9 @@ proc dialAndUpgrade( proc expandDnsAddr( self: Dialer, peerId: Opt[PeerId], address: MultiAddress -): Future[seq[(MultiAddress, Opt[PeerId])]] {.async.} = +): Future[seq[(MultiAddress, Opt[PeerId])]] {. + async: (raises: [CancelledError, MaError, TransportAddressError, LPError]) +.} = if not DNSADDR.matchPartial(address): return @[(address, peerId)] if isNil(self.nameResolver): @@ -115,7 +114,10 @@ proc expandDnsAddr( let toResolve = if peerId.isSome: - address & MultiAddress.init(multiCodec("p2p"), peerId.tryGet()).tryGet() + try: + address & MultiAddress.init(multiCodec("p2p"), peerId.tryGet()).tryGet() + except ResultError[void]: + raiseAssert "checked with if" else: address resolved = await self.nameResolver.resolveDnsAddr(toResolve) @@ -132,7 +134,9 @@ proc expandDnsAddr( proc dialAndUpgrade( self: Dialer, peerId: Opt[PeerId], addrs: seq[MultiAddress], dir = Direction.Out -): Future[Muxer] {.async.} = +): Future[Muxer] {. + async: (raises: [CancelledError, MaError, TransportAddressError, LPError]) +.} = debug "Dialing peer", peerId = peerId.get(default(PeerId)), addrs for rawAddress in addrs: @@ -169,9 +173,9 @@ proc internalConnect( forceDial: bool, reuseConnection = true, dir = Direction.Out, -): Future[Muxer] {.async.} = +): Future[Muxer] {.async: (raises: [DialFailedError, CancelledError]).} = if Opt.some(self.localPeerId) == peerId: - raise newException(CatchableError, "can't dial self!") + raise newException(DialFailedError, "can't dial self!") # Ensure there's only one in-flight attempt per peer let lock = self.dialLock.mgetOrPut(peerId.get(default(PeerId)), newAsyncLock()) @@ -189,7 +193,8 @@ proc internalConnect( await self.dialAndUpgrade(peerId, addrs, dir) except CatchableError as exc: slot.release() - raise exc + raise newException(DialFailedError, exc.msg) + slot.trackMuxer(muxed) if isNil(muxed): # None of the addresses connected raise newException(DialFailedError, "Unable to establish outgoing link") @@ -202,14 +207,19 @@ proc internalConnect( PeerEvent(kind: PeerEventKind.Identified, initiator: true), ) except CatchableError as exc: - trace "Failed to finish outgoung upgrade", description = exc.msg + trace "Failed to finish outgoing upgrade", description = exc.msg await muxed.close() - raise exc + raise newException(DialFailedError, "Failed to finish outgoing upgrade") return muxed + except TooManyConnectionsError as exc: + raise newException(DialFailedError, exc.msg) finally: if lock.locked(): - lock.release() + try: + lock.release() + except: + raiseAssert "checked with if" method connect*( self: Dialer, @@ -218,7 +228,7 @@ method connect*( forceDial = false, reuseConnection = true, dir = Direction.Out, -) {.async.} = +) {.async: (raises: [DialFailedError, CancelledError]).} = ## connect remote peer without negotiating ## a protocol ## @@ -231,7 +241,7 @@ method connect*( method connect*( self: Dialer, address: MultiAddress, allowUnknownPeerId = false -): Future[PeerId] {.async.} = +): Future[PeerId] {.async: (raises: [DialFailedError, CancelledError]).} = ## Connects to a peer and retrieve its PeerId parseFullAddress(address).toOpt().withValue(fullAddress): @@ -249,7 +259,7 @@ method connect*( proc negotiateStream( self: Dialer, conn: Connection, protos: seq[string] -): Future[Connection] {.async.} = +): Future[Connection] {.async: (raises: [CatchableError]).} = trace "Negotiating stream", conn, protos let selected = await MultistreamSelect.select(conn, protos) if not protos.contains(selected): @@ -260,7 +270,7 @@ proc negotiateStream( method tryDial*( self: Dialer, peerId: PeerId, addrs: seq[MultiAddress] -): Future[Opt[MultiAddress]] {.async.} = +): Future[Opt[MultiAddress]] {.async: (raises: [DialFailedError, CancelledError]).} = ## Create a protocol stream in order to check ## if a connection is possible. ## Doesn't use the Connection Manager to save it. @@ -280,17 +290,22 @@ method tryDial*( method dial*( self: Dialer, peerId: PeerId, protos: seq[string] -): Future[Connection] {.async.} = +): Future[Connection] {.async: (raises: [DialFailedError, CancelledError]).} = ## create a protocol stream over an ## existing connection ## trace "Dialing (existing)", peerId, protos - let stream = await self.connManager.getStream(peerId) - if stream.isNil: - raise newException(DialFailedError, "Couldn't get muxed stream") - return await self.negotiateStream(stream, protos) + try: + let stream = await self.connManager.getStream(peerId) + return await self.negotiateStream(stream, protos) + except CancelledError as exc: + trace "Dial canceled" + raise exc + except CatchableError as exc: + trace "Error canceled", description = exc.msg + raise newException(DialFailedError, exc.msg) method dial*( self: Dialer, @@ -298,7 +313,7 @@ method dial*( addrs: seq[MultiAddress], protos: seq[string], forceDial = false, -): Future[Connection] {.async.} = +): Future[Connection] {.async: (raises: [DialFailedError, CancelledError]).} = ## create a protocol stream and establish ## a connection if one doesn't exist already ## @@ -307,7 +322,7 @@ method dial*( conn: Muxer stream: Connection - proc cleanup() {.async.} = + proc cleanup() {.async: (raises: []).} = if not (isNil(stream)): await stream.closeWithEOF() @@ -331,7 +346,7 @@ method dial*( except CatchableError as exc: debug "Error dialing", conn, description = exc.msg await cleanup() - raise exc + raise newException(DialFailedError, exc.msg) method addTransport*(self: Dialer, t: Transport) = self.transports &= t From a34b1229e483ca9308127ba22de4669aa5956ed6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 20 Feb 2025 11:52:58 +0100 Subject: [PATCH 12/25] chore: add to switch --- libp2p/switch.nim | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 84da1c2a28..f043763178 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -139,14 +139,16 @@ method connect*( forceDial = false, reuseConnection = true, dir = Direction.Out, -): Future[void] {.public.} = +): Future[void] {. + public, async: (raises: [DialFailedError, CancelledError], raw: true) +.} = ## Connects to a peer without opening a stream to it s.dialer.connect(peerId, addrs, forceDial, reuseConnection, dir) method connect*( s: Switch, address: MultiAddress, allowUnknownPeerId = false -): Future[PeerId] = +): Future[PeerId] {.async: (raises: [DialFailedError, CancelledError], raw: true).} = ## Connects to a peer and retrieve its PeerId ## ## If the P2P part is missing from the MA and `allowUnknownPeerId` is set @@ -157,12 +159,18 @@ method connect*( method dial*( s: Switch, peerId: PeerId, protos: seq[string] -): Future[Connection] {.public.} = +): Future[Connection] {. + public, async: (raises: [DialFailedError, CancelledError], raw: true) +.} = ## Open a stream to a connected peer with the specified `protos` s.dialer.dial(peerId, protos) -proc dial*(s: Switch, peerId: PeerId, proto: string): Future[Connection] {.public.} = +proc dial*( + s: Switch, peerId: PeerId, proto: string +): Future[Connection] {. + public, async: (raises: [DialFailedError, CancelledError], raw: true) +.} = ## Open a stream to a connected peer with the specified `proto` dial(s, peerId, @[proto]) @@ -173,7 +181,9 @@ method dial*( addrs: seq[MultiAddress], protos: seq[string], forceDial = false, -): Future[Connection] {.public.} = +): Future[Connection] {. + public, async: (raises: [DialFailedError, CancelledError], raw: true) +.} = ## Connected to a peer and open a stream ## with the specified `protos` @@ -181,7 +191,9 @@ method dial*( proc dial*( s: Switch, peerId: PeerId, addrs: seq[MultiAddress], proto: string -): Future[Connection] {.public.} = +): Future[Connection] {. + public, async: (raises: [DialFailedError, CancelledError], raw: true) +.} = ## Connected to a peer and open a stream ## with the specified `proto` From cb06d1f2913181673bc83b614e9ad197dd75e6f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 20 Feb 2025 14:53:33 +0100 Subject: [PATCH 13/25] chore: autonat tryDial --- .../protocols/connectivity/autonat/server.nim | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/libp2p/protocols/connectivity/autonat/server.nim b/libp2p/protocols/connectivity/autonat/server.nim index 1b35ed0dfe..6767a811f5 100644 --- a/libp2p/protocols/connectivity/autonat/server.nim +++ b/libp2p/protocols/connectivity/autonat/server.nim @@ -40,7 +40,7 @@ proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} proc sendResponseError( conn: Connection, status: ResponseStatus, text: string = "" -) {.async.} = +) {.async: (raises: [CancelledError]).} = let pb = AutonatDialResponse( status: status, text: @@ -50,7 +50,10 @@ proc sendResponseError( Opt.some(text), ma: Opt.none(MultiAddress), ).encode() - await conn.writeLp(pb.buffer) + try: + await conn.writeLp(pb.buffer) + except LPStreamError as exc: + trace "autonat failed to send response error", description = exc.msg, conn proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} = let pb = AutonatDialResponse( @@ -58,9 +61,11 @@ proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} = ).encode() await conn.writeLp(pb.buffer) -proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} = +proc tryDial( + autonat: Autonat, conn: Connection, addrs: seq[MultiAddress] +) {.async: (raises: [DialFailedError, CancelledError]).} = await autonat.sem.acquire() - var futs: seq[Future[Opt[MultiAddress]]] + var futs: seq[Future[Opt[MultiAddress]].Raising([DialFailedError, CancelledError])] try: # This is to bypass the per peer max connections limit let outgoingConnection = @@ -71,18 +76,24 @@ proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.asy return # Safer to always try to cancel cause we aren't sure if the connection was established defer: - outgoingConnection.cancel() + outgoingConnection.cancelSoon() + # tryDial is to bypass the global max connections limit futs = addrs.mapIt(autonat.switch.dialer.tryDial(conn.peerId, @[it])) - let fut = await anyCompleted(futs).wait(autonat.dialTimeout) - let ma = await fut - ma.withValue(maddr): - await conn.sendResponseOk(maddr) - else: - await conn.sendResponseError(DialError, "Missing observed address") + while true: + let raceFut = await one(futs).wait(autonat.dialTimeout) + if raceFut.completed: + let ma = await raceFut + ma.withValue(maddr): + await conn.sendResponseOk(maddr) + else: + await conn.sendResponseError(DialError, "Missing observed address") + break + else: + futs.del(futs.find(raceFut)) except CancelledError as exc: raise exc - except AllFuturesFailedError as exc: + except ValueError as exc: debug "All dial attempts failed", addrs, description = exc.msg await conn.sendResponseError(DialError, "All dial attempts failed") except AsyncTimeoutError as exc: @@ -163,8 +174,6 @@ proc new*( await autonat.handleDial(conn, msg) except CancelledError as exc: raise exc - except CatchableError as exc: - debug "exception in autonat handler", description = exc.msg, conn finally: trace "exiting autonat handler", conn await conn.close() From 7562c3024128f6c6539904835b2031b1ccec3775 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 20 Feb 2025 15:13:44 +0100 Subject: [PATCH 14/25] chore: fix tests --- tests/stubs/switchstub.nim | 4 ++-- tests/testdcutr.nim | 12 ++++++------ tests/testhpservice.nim | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/stubs/switchstub.nim b/tests/stubs/switchstub.nim index 8c6e829420..88ae8f7d15 100644 --- a/tests/stubs/switchstub.nim +++ b/tests/stubs/switchstub.nim @@ -26,7 +26,7 @@ type forceDial = false, reuseConnection = true, dir = Direction.Out, - ): Future[void] {.async.} + ): Future[void] {.async: (raises: [DialFailedError, CancelledError]).} method connect*( self: SwitchStub, @@ -35,7 +35,7 @@ method connect*( forceDial = false, reuseConnection = true, dir = Direction.Out, -) {.async.} = +) {.async: (raises: [DialFailedError, CancelledError]).} = if (self.connectStub != nil): await self.connectStub(self, peerId, addrs, forceDial, reuseConnection, dir) else: diff --git a/tests/testdcutr.nim b/tests/testdcutr.nim index 4ec29e58de..580aa5b676 100644 --- a/tests/testdcutr.nim +++ b/tests/testdcutr.nim @@ -114,7 +114,7 @@ suite "Dcutr": forceDial = false, reuseConnection = true, dir = Direction.Out, - ): Future[void] {.async.} = + ): Future[void] {.async: (raises: [DialFailedError, CancelledError]).} = await sleepAsync(100.millis) let behindNATSwitch = SwitchStub.new(newStandardSwitch(), connectTimeoutProc) @@ -136,8 +136,8 @@ suite "Dcutr": forceDial = false, reuseConnection = true, dir = Direction.Out, - ): Future[void] {.async.} = - raise newException(CatchableError, "error") + ): Future[void] {.async: (raises: [DialFailedError, CancelledError]).} = + raise newException(DialFailedError, "error") let behindNATSwitch = SwitchStub.new(newStandardSwitch(), connectErrorProc) let publicSwitch = newStandardSwitch() @@ -193,7 +193,7 @@ suite "Dcutr": forceDial = false, reuseConnection = true, dir = Direction.Out, - ): Future[void] {.async.} = + ): Future[void] {.async: (raises: [DialFailedError, CancelledError]).} = await sleepAsync(100.millis) await ductrServerTest(connectProc) @@ -206,8 +206,8 @@ suite "Dcutr": forceDial = false, reuseConnection = true, dir = Direction.Out, - ): Future[void] {.async.} = - raise newException(CatchableError, "error") + ): Future[void] {.async: (raises: [DialFailedError, CancelledError]).} = + raise newException(DialFailedError, "error") await ductrServerTest(connectProc) diff --git a/tests/testhpservice.nim b/tests/testhpservice.nim index 27fb4711d7..bfe5d5eecf 100644 --- a/tests/testhpservice.nim +++ b/tests/testhpservice.nim @@ -293,8 +293,8 @@ suite "Hole Punching": forceDial = false, reuseConnection = true, dir = Direction.Out, - ): Future[void] {.async.} = + ): Future[void] {.async: (raises: [DialFailedError, CancelledError]).} = self.connectStub = nil # this stub should be called only once - raise newException(CatchableError, "error") + raise newException(DialFailedError, "error") await holePunchingTest(connectStub, nil, Reachable) From d406f6a71ec891e5c322dc6d9040afcb75d9794d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 20 Feb 2025 15:36:57 +0100 Subject: [PATCH 15/25] chore: more effort --- .../protocols/connectivity/autonat/server.nim | 19 +++++-------- .../protocols/connectivity/dcutr/client.nim | 2 +- .../protocols/connectivity/dcutr/server.nim | 2 +- libp2p/utils/future.nim | 28 +++++++++++++++++-- 4 files changed, 35 insertions(+), 16 deletions(-) diff --git a/libp2p/protocols/connectivity/autonat/server.nim b/libp2p/protocols/connectivity/autonat/server.nim index 6767a811f5..7298b8e204 100644 --- a/libp2p/protocols/connectivity/autonat/server.nim +++ b/libp2p/protocols/connectivity/autonat/server.nim @@ -80,20 +80,15 @@ proc tryDial( # tryDial is to bypass the global max connections limit futs = addrs.mapIt(autonat.switch.dialer.tryDial(conn.peerId, @[it])) - while true: - let raceFut = await one(futs).wait(autonat.dialTimeout) - if raceFut.completed: - let ma = await raceFut - ma.withValue(maddr): - await conn.sendResponseOk(maddr) - else: - await conn.sendResponseError(DialError, "Missing observed address") - break - else: - futs.del(futs.find(raceFut)) + let raceFut = await anyCompletedCatchable(futs).wait(autonat.dialTimeout) + let ma = await raceFut + ma.withValue(maddr): + await conn.sendResponseOk(maddr) + else: + await conn.sendResponseError(DialError, "Missing observed address") except CancelledError as exc: raise exc - except ValueError as exc: + except AllFuturesFailedError as exc: debug "All dial attempts failed", addrs, description = exc.msg await conn.sendResponseError(DialError, "All dial attempts failed") except AsyncTimeoutError as exc: diff --git a/libp2p/protocols/connectivity/dcutr/client.nim b/libp2p/protocols/connectivity/dcutr/client.nim index 19e0df4de6..7dd7efd684 100644 --- a/libp2p/protocols/connectivity/dcutr/client.nim +++ b/libp2p/protocols/connectivity/dcutr/client.nim @@ -79,7 +79,7 @@ proc startSync*( ) ) try: - discard await anyCompleted(futs).wait(self.connectTimeout) + discard await anyCompletedCatchable(futs).wait(self.connectTimeout) debug "Dcutr initiator has directly connected to the remote peer." finally: for fut in futs: diff --git a/libp2p/protocols/connectivity/dcutr/server.nim b/libp2p/protocols/connectivity/dcutr/server.nim index 00e8ddbc08..f3bbdb0a31 100644 --- a/libp2p/protocols/connectivity/dcutr/server.nim +++ b/libp2p/protocols/connectivity/dcutr/server.nim @@ -71,7 +71,7 @@ proc new*( ) ) try: - discard await anyCompleted(futs).wait(connectTimeout) + discard await anyCompletedCatchable(futs).wait(connectTimeout) debug "Dcutr receiver has directly connected to the remote peer." finally: for fut in futs: diff --git a/libp2p/utils/future.nim b/libp2p/utils/future.nim index 6d079c5ce6..2659e36ccd 100644 --- a/libp2p/utils/future.nim +++ b/libp2p/utils/future.nim @@ -23,7 +23,27 @@ proc anyCompleted*[T]( while true: var raceFut: Future[T] + try: + raceFut = await one(requests) + if raceFut.completed: + return raceFut + except ValueError: + raise newException( + AllFuturesFailedError, "None of the futures completed successfully" + ) + requests.del(requests.find(raceFut)) + +proc anyCompletedCatchable*[T]( + futs: seq[T] +): Future[T] {.async: (raises: [AllFuturesFailedError, CancelledError]).} = + ## Returns a future that will complete with the first future that completes. + ## If all futures fail or futs is empty, the returned future will fail with AllFuturesFailedError. + + var requests = futs + + while true: + var raceFut: T try: raceFut = await one(requests) if raceFut.completed: @@ -32,6 +52,10 @@ proc anyCompleted*[T]( raise newException( AllFuturesFailedError, "None of the futures completed successfully" ) + except CancelledError as exc: + raise exc + except CatchableError: + discard + + requests.del(requests.find(raceFut)) - let index = requests.find(raceFut) - requests.del(index) From 1726725ecb0b59db678912ea3887f6c3c66d9735 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 20 Feb 2025 15:39:33 +0100 Subject: [PATCH 16/25] chore: style fix --- libp2p/utils/future.nim | 2 -- 1 file changed, 2 deletions(-) diff --git a/libp2p/utils/future.nim b/libp2p/utils/future.nim index 2659e36ccd..d94657b31c 100644 --- a/libp2p/utils/future.nim +++ b/libp2p/utils/future.nim @@ -33,7 +33,6 @@ proc anyCompleted*[T]( ) requests.del(requests.find(raceFut)) - proc anyCompletedCatchable*[T]( futs: seq[T] ): Future[T] {.async: (raises: [AllFuturesFailedError, CancelledError]).} = @@ -58,4 +57,3 @@ proc anyCompletedCatchable*[T]( discard requests.del(requests.find(raceFut)) - From 295a760a5eaef07e63fd452a5ae842ac4e20c159 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 20 Feb 2025 16:00:16 +0100 Subject: [PATCH 17/25] chore: improvement --- libp2p/dial.nim | 1 - libp2p/dialer.nim | 75 ++++++++++--------- .../protocols/connectivity/autonat/server.nim | 16 ++-- tests/testdialer.nim | 23 ++++++ tests/testswitch.nim | 5 +- 5 files changed, 76 insertions(+), 44 deletions(-) diff --git a/libp2p/dial.nim b/libp2p/dial.nim index 65b2f82239..45146ada48 100644 --- a/libp2p/dial.nim +++ b/libp2p/dial.nim @@ -17,7 +17,6 @@ export results type Dial* = ref object of RootObj - DialFailedError* = object of LPError method connect*( diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 2a7bbff32b..66ee8ef1f0 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -179,47 +179,54 @@ proc internalConnect( # Ensure there's only one in-flight attempt per peer let lock = self.dialLock.mgetOrPut(peerId.get(default(PeerId)), newAsyncLock()) - try: - await lock.acquire() - - if reuseConnection: - peerId.withValue(peerId): - self.tryReusingConnection(peerId).withValue(mux): - return mux - - let slot = self.connManager.getOutgoingSlot(forceDial) - let muxed = + await lock.acquire() + defer: + if lock.locked(): try: - await self.dialAndUpgrade(peerId, addrs, dir) - except CatchableError as exc: - slot.release() - raise newException(DialFailedError, exc.msg) + lock.release() + except: + raiseAssert "checked with if" + + if reuseConnection: + peerId.withValue(peerId): + self.tryReusingConnection(peerId).withValue(mux): + return mux - slot.trackMuxer(muxed) - if isNil(muxed): # None of the addresses connected - raise newException(DialFailedError, "Unable to establish outgoing link") + let slot = + try: + self.connManager.getOutgoingSlot(forceDial) + except TooManyConnectionsError as exc: + raise newException(DialFailedError, exc.msg) + let muxed = try: - self.connManager.storeMuxer(muxed) - await self.peerStore.identify(muxed) - await self.connManager.triggerPeerEvents( - muxed.connection.peerId, - PeerEvent(kind: PeerEventKind.Identified, initiator: true), - ) + await self.dialAndUpgrade(peerId, addrs, dir) + except CancelledError as exc: + slot.release() + raise exc except CatchableError as exc: - trace "Failed to finish outgoing upgrade", description = exc.msg - await muxed.close() - raise newException(DialFailedError, "Failed to finish outgoing upgrade") + slot.release() + raise newException(DialFailedError, exc.msg) + slot.trackMuxer(muxed) + if isNil(muxed): # None of the addresses connected + raise newException(DialFailedError, "Unable to establish outgoing link") + + try: + self.connManager.storeMuxer(muxed) + await self.peerStore.identify(muxed) + await self.connManager.triggerPeerEvents( + muxed.connection.peerId, + PeerEvent(kind: PeerEventKind.Identified, initiator: true), + ) return muxed - except TooManyConnectionsError as exc: - raise newException(DialFailedError, exc.msg) - finally: - if lock.locked(): - try: - lock.release() - except: - raiseAssert "checked with if" + except CancelledError as exc: + await muxed.close() + raise exc + except CatchableError as exc: + trace "Failed to finish outgoing upgrade", description = exc.msg + await muxed.close() + raise newException(DialFailedError, "Failed to finish outgoing upgrade") method connect*( self: Dialer, diff --git a/libp2p/protocols/connectivity/autonat/server.nim b/libp2p/protocols/connectivity/autonat/server.nim index 7298b8e204..1c33f7c23b 100644 --- a/libp2p/protocols/connectivity/autonat/server.nim +++ b/libp2p/protocols/connectivity/autonat/server.nim @@ -55,11 +55,16 @@ proc sendResponseError( except LPStreamError as exc: trace "autonat failed to send response error", description = exc.msg, conn -proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} = +proc sendResponseOk( + conn: Connection, ma: MultiAddress +) {.async: (raises: [CancelledError]).} = let pb = AutonatDialResponse( status: ResponseStatus.Ok, text: Opt.some("Ok"), ma: Opt.some(ma) ).encode() - await conn.writeLp(pb.buffer) + try: + await conn.writeLp(pb.buffer) + except LPStreamError as exc: + trace "autonat failed to send response ok", description = exc.msg, conn proc tryDial( autonat: Autonat, conn: Connection, addrs: seq[MultiAddress] @@ -80,8 +85,8 @@ proc tryDial( # tryDial is to bypass the global max connections limit futs = addrs.mapIt(autonat.switch.dialer.tryDial(conn.peerId, @[it])) - let raceFut = await anyCompletedCatchable(futs).wait(autonat.dialTimeout) - let ma = await raceFut + let fut = await anyCompletedCatchable(futs).wait(autonat.dialTimeout) + let ma = await fut ma.withValue(maddr): await conn.sendResponseOk(maddr) else: @@ -94,9 +99,6 @@ proc tryDial( except AsyncTimeoutError as exc: debug "Dial timeout", addrs, description = exc.msg await conn.sendResponseError(DialError, "Dial timeout") - except CatchableError as exc: - debug "Unexpected error", addrs, description = exc.msg - await conn.sendResponseError(DialError, "Unexpected error") finally: autonat.sem.release() for f in futs: diff --git a/tests/testdialer.nim b/tests/testdialer.nim index 8716158946..8cfaa7e447 100644 --- a/tests/testdialer.nim +++ b/tests/testdialer.nim @@ -9,6 +9,7 @@ import std/options import chronos +import sequtils import unittest2 import ../libp2p/[builders, switch] import ./helpers @@ -34,3 +35,25 @@ suite "Dialer": check src.connManager.connCount(dst.peerInfo.peerId) == 2 await allFutures(src.stop(), dst.stop()) + + asyncTest "Max connections reached": + var switches: seq[Switch] + + let dst = newStandardSwitch(maxConnections = 2) + await dst.start() + switches.add(dst) + + for i in 1 ..< 3: + let src = newStandardSwitch() + switches.add(src) + await src.start() + await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs, true, false) + + let src = newStandardSwitch() + switches.add(src) + await src.start() + check not await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs).withTimeout( + 1000.millis + ) + + await allFuturesThrowing(allFutures(switches.mapIt(it.stop()))) diff --git a/tests/testswitch.nim b/tests/testswitch.nim index eb645b14a6..5c0459b70e 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -15,6 +15,7 @@ import stew/byteutils import ../libp2p/[ errors, + dial, switch, multistream, builders, @@ -739,7 +740,7 @@ suite "Switch": 1000.millis ) - expect TooManyConnectionsError: + expect DialFailedError: await srcSwitch.connect(dstSwitch.peerInfo.peerId, dstSwitch.peerInfo.addrs) switches.add(srcSwitch) @@ -792,7 +793,7 @@ suite "Switch": 1000.millis ) - expect TooManyConnectionsError: + expect DialFailedError: await srcSwitch.connect(dstSwitch.peerInfo.peerId, dstSwitch.peerInfo.addrs) switches.add(srcSwitch) From 8207c448b77f6f0751471addc11a8216520cc069 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Fri, 21 Feb 2025 11:31:47 +0100 Subject: [PATCH 18/25] chore: utilize DialFailedError --- libp2p/protocols/connectivity/autonat/client.nim | 2 +- libp2p/protocols/connectivity/relay/relay.nim | 4 ++-- libp2p/protocols/pubsub/pubsub.nim | 2 +- tests/pubsub/utils.nim | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/libp2p/protocols/connectivity/autonat/client.nim b/libp2p/protocols/connectivity/autonat/client.nim index c20b724109..0347c6c8ea 100644 --- a/libp2p/protocols/connectivity/autonat/client.nim +++ b/libp2p/protocols/connectivity/autonat/client.nim @@ -47,7 +47,7 @@ method dialMe*( await switch.dial(pid, @[AutonatCodec]) else: await switch.dial(pid, addrs, AutonatCodec) - except CatchableError as err: + except DialFailedError as err: raise newException(AutonatError, "Unexpected error when dialling: " & err.msg, err) diff --git a/libp2p/protocols/connectivity/relay/relay.nim b/libp2p/protocols/connectivity/relay/relay.nim index fb5357b67c..dab355595a 100644 --- a/libp2p/protocols/connectivity/relay/relay.nim +++ b/libp2p/protocols/connectivity/relay/relay.nim @@ -166,7 +166,7 @@ proc handleConnect(r: Relay, connSrc: Connection, msg: HopMessage) {.async.} = await r.switch.dial(dst, RelayV2StopCodec) except CancelledError as exc: raise exc - except CatchableError as exc: + except DialFailedError as exc: trace "error opening relay stream", dst, description = exc.msg await sendHopStatus(connSrc, ConnectionFailed) return @@ -271,7 +271,7 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async.} = await r.switch.dial(dst.peerId, RelayV1Codec) except CancelledError as exc: raise exc - except CatchableError as exc: + except DialFailedError as exc: trace "error opening relay stream", dst, description = exc.msg await sendStatus(connSrc, StatusV1.HopCantDialDst) return diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index e4a03b3c26..7cc99064ab 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -366,7 +366,7 @@ method getOrCreatePeer*( return await p.switch.dial(peerId, protosToDial) except CancelledError as exc: raise exc - except CatchableError as e: + except DialFailedError as e: raise (ref GetConnDialError)(parent: e) proc onEvent(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe.} = diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 95878418eb..c05b588d42 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -34,7 +34,7 @@ proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer = return await p.switch.dial(peerId, GossipSubCodec_12) except CancelledError as exc: raise exc - except CatchableError as e: + except DialFailedError as e: raise (ref GetConnDialError)(parent: e) let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec_12, 1024 * 1024) From 7b65e32823178701dc9cdc2c653ab7c50dbb56ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Fri, 21 Feb 2025 13:23:28 +0100 Subject: [PATCH 19/25] chore: msg fix --- libp2p/dialer.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 66ee8ef1f0..7fa15f46e8 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -311,7 +311,7 @@ method dial*( trace "Dial canceled" raise exc except CatchableError as exc: - trace "Error canceled", description = exc.msg + trace "Error dialing", description = exc.msg raise newException(DialFailedError, exc.msg) method dial*( From 2639d8d645deb7a4575c95acc89a1bb35f645521 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Fri, 21 Feb 2025 13:32:11 +0100 Subject: [PATCH 20/25] chore: utilize DialFailedError --- libp2p/protocols/pubsub/gossipsub.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index af40cc48b8..dfed01c9b4 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -833,7 +833,7 @@ proc maintainDirectPeer( except CancelledError as exc: trace "Direct peer dial canceled" raise exc - except CatchableError as exc: + except DialFailedError as exc: debug "Direct peer error dialing", description = exc.msg proc addDirectPeer*( From ca6fef33fdcf4f8daa6808021666db2c357c773b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Fri, 21 Feb 2025 15:47:08 +0100 Subject: [PATCH 21/25] chore: cosmetics --- libp2p/utils/future.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/utils/future.nim b/libp2p/utils/future.nim index d94657b31c..8665702e13 100644 --- a/libp2p/utils/future.nim +++ b/libp2p/utils/future.nim @@ -54,6 +54,6 @@ proc anyCompletedCatchable*[T]( except CancelledError as exc: raise exc except CatchableError: - discard + continue requests.del(requests.find(raceFut)) From 1ef5a16750ade754c04fa996a3461bc23af2c67c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Fri, 21 Feb 2025 15:47:22 +0100 Subject: [PATCH 22/25] fix: nil stream --- libp2p/dialer.nim | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 7fa15f46e8..6a45251c2d 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -306,6 +306,8 @@ method dial*( try: let stream = await self.connManager.getStream(peerId) + if isNil(stream): + raise newException(DialFailedError, "stream is nil") return await self.negotiateStream(stream, protos) except CancelledError as exc: trace "Dial canceled" From 41f8f4abcdbc8a11124096e40fcad0fd4f808165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Fri, 21 Feb 2025 16:18:49 +0100 Subject: [PATCH 23/25] chore: tidy --- libp2p/dialer.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 6a45251c2d..1bb98a6729 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -306,8 +306,8 @@ method dial*( try: let stream = await self.connManager.getStream(peerId) - if isNil(stream): - raise newException(DialFailedError, "stream is nil") + if stream.isNil: + raise newException(DialFailedError, "Couldn't get muxed stream") return await self.negotiateStream(stream, protos) except CancelledError as exc: trace "Dial canceled" From 3f19f5042fc0e354c5ebccf22fa289441e62470d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Mon, 24 Feb 2025 09:53:51 +0100 Subject: [PATCH 24/25] chore: anyCompleted improvements --- .../protocols/connectivity/autonat/server.nim | 2 +- .../protocols/connectivity/dcutr/client.nim | 2 +- .../protocols/connectivity/dcutr/server.nim | 2 +- libp2p/utils/future.nim | 26 ++----------------- tests/testfuture.nim | 4 ++- 5 files changed, 8 insertions(+), 28 deletions(-) diff --git a/libp2p/protocols/connectivity/autonat/server.nim b/libp2p/protocols/connectivity/autonat/server.nim index 1c33f7c23b..f71ec6e2bd 100644 --- a/libp2p/protocols/connectivity/autonat/server.nim +++ b/libp2p/protocols/connectivity/autonat/server.nim @@ -85,7 +85,7 @@ proc tryDial( # tryDial is to bypass the global max connections limit futs = addrs.mapIt(autonat.switch.dialer.tryDial(conn.peerId, @[it])) - let fut = await anyCompletedCatchable(futs).wait(autonat.dialTimeout) + let fut = await anyCompleted(futs).wait(autonat.dialTimeout) let ma = await fut ma.withValue(maddr): await conn.sendResponseOk(maddr) diff --git a/libp2p/protocols/connectivity/dcutr/client.nim b/libp2p/protocols/connectivity/dcutr/client.nim index 7dd7efd684..19e0df4de6 100644 --- a/libp2p/protocols/connectivity/dcutr/client.nim +++ b/libp2p/protocols/connectivity/dcutr/client.nim @@ -79,7 +79,7 @@ proc startSync*( ) ) try: - discard await anyCompletedCatchable(futs).wait(self.connectTimeout) + discard await anyCompleted(futs).wait(self.connectTimeout) debug "Dcutr initiator has directly connected to the remote peer." finally: for fut in futs: diff --git a/libp2p/protocols/connectivity/dcutr/server.nim b/libp2p/protocols/connectivity/dcutr/server.nim index f3bbdb0a31..00e8ddbc08 100644 --- a/libp2p/protocols/connectivity/dcutr/server.nim +++ b/libp2p/protocols/connectivity/dcutr/server.nim @@ -71,7 +71,7 @@ proc new*( ) ) try: - discard await anyCompletedCatchable(futs).wait(connectTimeout) + discard await anyCompleted(futs).wait(connectTimeout) debug "Dcutr receiver has directly connected to the remote peer." finally: for fut in futs: diff --git a/libp2p/utils/future.nim b/libp2p/utils/future.nim index 8665702e13..87831cd00d 100644 --- a/libp2p/utils/future.nim +++ b/libp2p/utils/future.nim @@ -14,26 +14,6 @@ import chronos type AllFuturesFailedError* = object of CatchableError proc anyCompleted*[T]( - futs: seq[Future[T]] -): Future[Future[T]] {.async: (raises: [AllFuturesFailedError, CancelledError]).} = - ## Returns a future that will complete with the first future that completes. - ## If all futures fail or futs is empty, the returned future will fail with AllFuturesFailedError. - - var requests = futs - - while true: - var raceFut: Future[T] - try: - raceFut = await one(requests) - if raceFut.completed: - return raceFut - except ValueError: - raise newException( - AllFuturesFailedError, "None of the futures completed successfully" - ) - requests.del(requests.find(raceFut)) - -proc anyCompletedCatchable*[T]( futs: seq[T] ): Future[T] {.async: (raises: [AllFuturesFailedError, CancelledError]).} = ## Returns a future that will complete with the first future that completes. @@ -42,11 +22,11 @@ proc anyCompletedCatchable*[T]( var requests = futs while true: - var raceFut: T try: - raceFut = await one(requests) + var raceFut = await one(requests) if raceFut.completed: return raceFut + requests.del(requests.find(raceFut)) except ValueError: raise newException( AllFuturesFailedError, "None of the futures completed successfully" @@ -55,5 +35,3 @@ proc anyCompletedCatchable*[T]( raise exc except CatchableError: continue - - requests.del(requests.find(raceFut)) diff --git a/tests/testfuture.nim b/tests/testfuture.nim index c9d4d37873..9f9a8cd4df 100644 --- a/tests/testfuture.nim +++ b/tests/testfuture.nim @@ -60,7 +60,9 @@ suite "Future": proc fut2() {.async.} = await sleepAsync(200.milliseconds) - proc fut3() {.async.} = + proc fut3() {.async: (raises: [ValueError]).} = + # fut3 intentionally specifies raised ValueError + # so that it's type is of InternalRaisesFuture raise newException(ValueError, "fut3") var f1 = fut1() From 95177db9477ea6bc5e59919554077cbb80e054de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Mon, 24 Feb 2025 10:21:38 +0100 Subject: [PATCH 25/25] chore: removed if --- libp2p/dialer.nim | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 1bb98a6729..b43ccbdd6c 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -181,11 +181,10 @@ proc internalConnect( let lock = self.dialLock.mgetOrPut(peerId.get(default(PeerId)), newAsyncLock()) await lock.acquire() defer: - if lock.locked(): - try: - lock.release() - except: - raiseAssert "checked with if" + try: + lock.release() + except AsyncLockError: + raiseAssert "lock must have been acquired in line above" if reuseConnection: peerId.withValue(peerId):