From 295a760a5eaef07e63fd452a5ae842ac4e20c159 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vlado=20Paji=C4=87?= Date: Thu, 20 Feb 2025 16:00:16 +0100 Subject: [PATCH] chore: improvement --- libp2p/dial.nim | 1 - libp2p/dialer.nim | 75 ++++++++++--------- .../protocols/connectivity/autonat/server.nim | 16 ++-- tests/testdialer.nim | 23 ++++++ tests/testswitch.nim | 5 +- 5 files changed, 76 insertions(+), 44 deletions(-) diff --git a/libp2p/dial.nim b/libp2p/dial.nim index 65b2f82239..45146ada48 100644 --- a/libp2p/dial.nim +++ b/libp2p/dial.nim @@ -17,7 +17,6 @@ export results type Dial* = ref object of RootObj - DialFailedError* = object of LPError method connect*( diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 2a7bbff32b..66ee8ef1f0 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -179,47 +179,54 @@ proc internalConnect( # Ensure there's only one in-flight attempt per peer let lock = self.dialLock.mgetOrPut(peerId.get(default(PeerId)), newAsyncLock()) - try: - await lock.acquire() - - if reuseConnection: - peerId.withValue(peerId): - self.tryReusingConnection(peerId).withValue(mux): - return mux - - let slot = self.connManager.getOutgoingSlot(forceDial) - let muxed = + await lock.acquire() + defer: + if lock.locked(): try: - await self.dialAndUpgrade(peerId, addrs, dir) - except CatchableError as exc: - slot.release() - raise newException(DialFailedError, exc.msg) + lock.release() + except: + raiseAssert "checked with if" + + if reuseConnection: + peerId.withValue(peerId): + self.tryReusingConnection(peerId).withValue(mux): + return mux - slot.trackMuxer(muxed) - if isNil(muxed): # None of the addresses connected - raise newException(DialFailedError, "Unable to establish outgoing link") + let slot = + try: + self.connManager.getOutgoingSlot(forceDial) + except TooManyConnectionsError as exc: + raise newException(DialFailedError, exc.msg) + let muxed = try: - self.connManager.storeMuxer(muxed) - await self.peerStore.identify(muxed) - await self.connManager.triggerPeerEvents( - muxed.connection.peerId, - PeerEvent(kind: PeerEventKind.Identified, initiator: true), - ) + await self.dialAndUpgrade(peerId, addrs, dir) + except CancelledError as exc: + slot.release() + raise exc except CatchableError as exc: - trace "Failed to finish outgoing upgrade", description = exc.msg - await muxed.close() - raise newException(DialFailedError, "Failed to finish outgoing upgrade") + slot.release() + raise newException(DialFailedError, exc.msg) + slot.trackMuxer(muxed) + if isNil(muxed): # None of the addresses connected + raise newException(DialFailedError, "Unable to establish outgoing link") + + try: + self.connManager.storeMuxer(muxed) + await self.peerStore.identify(muxed) + await self.connManager.triggerPeerEvents( + muxed.connection.peerId, + PeerEvent(kind: PeerEventKind.Identified, initiator: true), + ) return muxed - except TooManyConnectionsError as exc: - raise newException(DialFailedError, exc.msg) - finally: - if lock.locked(): - try: - lock.release() - except: - raiseAssert "checked with if" + except CancelledError as exc: + await muxed.close() + raise exc + except CatchableError as exc: + trace "Failed to finish outgoing upgrade", description = exc.msg + await muxed.close() + raise newException(DialFailedError, "Failed to finish outgoing upgrade") method connect*( self: Dialer, diff --git a/libp2p/protocols/connectivity/autonat/server.nim b/libp2p/protocols/connectivity/autonat/server.nim index 7298b8e204..1c33f7c23b 100644 --- a/libp2p/protocols/connectivity/autonat/server.nim +++ b/libp2p/protocols/connectivity/autonat/server.nim @@ -55,11 +55,16 @@ proc sendResponseError( except LPStreamError as exc: trace "autonat failed to send response error", description = exc.msg, conn -proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} = +proc sendResponseOk( + conn: Connection, ma: MultiAddress +) {.async: (raises: [CancelledError]).} = let pb = AutonatDialResponse( status: ResponseStatus.Ok, text: Opt.some("Ok"), ma: Opt.some(ma) ).encode() - await conn.writeLp(pb.buffer) + try: + await conn.writeLp(pb.buffer) + except LPStreamError as exc: + trace "autonat failed to send response ok", description = exc.msg, conn proc tryDial( autonat: Autonat, conn: Connection, addrs: seq[MultiAddress] @@ -80,8 +85,8 @@ proc tryDial( # tryDial is to bypass the global max connections limit futs = addrs.mapIt(autonat.switch.dialer.tryDial(conn.peerId, @[it])) - let raceFut = await anyCompletedCatchable(futs).wait(autonat.dialTimeout) - let ma = await raceFut + let fut = await anyCompletedCatchable(futs).wait(autonat.dialTimeout) + let ma = await fut ma.withValue(maddr): await conn.sendResponseOk(maddr) else: @@ -94,9 +99,6 @@ proc tryDial( except AsyncTimeoutError as exc: debug "Dial timeout", addrs, description = exc.msg await conn.sendResponseError(DialError, "Dial timeout") - except CatchableError as exc: - debug "Unexpected error", addrs, description = exc.msg - await conn.sendResponseError(DialError, "Unexpected error") finally: autonat.sem.release() for f in futs: diff --git a/tests/testdialer.nim b/tests/testdialer.nim index 8716158946..8cfaa7e447 100644 --- a/tests/testdialer.nim +++ b/tests/testdialer.nim @@ -9,6 +9,7 @@ import std/options import chronos +import sequtils import unittest2 import ../libp2p/[builders, switch] import ./helpers @@ -34,3 +35,25 @@ suite "Dialer": check src.connManager.connCount(dst.peerInfo.peerId) == 2 await allFutures(src.stop(), dst.stop()) + + asyncTest "Max connections reached": + var switches: seq[Switch] + + let dst = newStandardSwitch(maxConnections = 2) + await dst.start() + switches.add(dst) + + for i in 1 ..< 3: + let src = newStandardSwitch() + switches.add(src) + await src.start() + await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs, true, false) + + let src = newStandardSwitch() + switches.add(src) + await src.start() + check not await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs).withTimeout( + 1000.millis + ) + + await allFuturesThrowing(allFutures(switches.mapIt(it.stop()))) diff --git a/tests/testswitch.nim b/tests/testswitch.nim index eb645b14a6..5c0459b70e 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -15,6 +15,7 @@ import stew/byteutils import ../libp2p/[ errors, + dial, switch, multistream, builders, @@ -739,7 +740,7 @@ suite "Switch": 1000.millis ) - expect TooManyConnectionsError: + expect DialFailedError: await srcSwitch.connect(dstSwitch.peerInfo.peerId, dstSwitch.peerInfo.addrs) switches.add(srcSwitch) @@ -792,7 +793,7 @@ suite "Switch": 1000.millis ) - expect TooManyConnectionsError: + expect DialFailedError: await srcSwitch.connect(dstSwitch.peerInfo.peerId, dstSwitch.peerInfo.addrs) switches.add(srcSwitch)