Skip to content

Commit

Permalink
chore: improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
vladopajic committed Feb 20, 2025
1 parent 1726725 commit 295a760
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 44 deletions.
1 change: 0 additions & 1 deletion libp2p/dial.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export results

type
Dial* = ref object of RootObj

DialFailedError* = object of LPError

method connect*(
Expand Down
75 changes: 41 additions & 34 deletions libp2p/dialer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -179,47 +179,54 @@ proc internalConnect(

# Ensure there's only one in-flight attempt per peer
let lock = self.dialLock.mgetOrPut(peerId.get(default(PeerId)), newAsyncLock())
try:
await lock.acquire()

if reuseConnection:
peerId.withValue(peerId):
self.tryReusingConnection(peerId).withValue(mux):
return mux

let slot = self.connManager.getOutgoingSlot(forceDial)
let muxed =
await lock.acquire()
defer:
if lock.locked():
try:
await self.dialAndUpgrade(peerId, addrs, dir)
except CatchableError as exc:
slot.release()
raise newException(DialFailedError, exc.msg)
lock.release()
except:
raiseAssert "checked with if"

if reuseConnection:
peerId.withValue(peerId):
self.tryReusingConnection(peerId).withValue(mux):
return mux

slot.trackMuxer(muxed)
if isNil(muxed): # None of the addresses connected
raise newException(DialFailedError, "Unable to establish outgoing link")
let slot =
try:
self.connManager.getOutgoingSlot(forceDial)
except TooManyConnectionsError as exc:
raise newException(DialFailedError, exc.msg)

let muxed =
try:
self.connManager.storeMuxer(muxed)
await self.peerStore.identify(muxed)
await self.connManager.triggerPeerEvents(
muxed.connection.peerId,
PeerEvent(kind: PeerEventKind.Identified, initiator: true),
)
await self.dialAndUpgrade(peerId, addrs, dir)
except CancelledError as exc:
slot.release()
raise exc
except CatchableError as exc:
trace "Failed to finish outgoing upgrade", description = exc.msg
await muxed.close()
raise newException(DialFailedError, "Failed to finish outgoing upgrade")
slot.release()
raise newException(DialFailedError, exc.msg)

slot.trackMuxer(muxed)
if isNil(muxed): # None of the addresses connected
raise newException(DialFailedError, "Unable to establish outgoing link")

try:
self.connManager.storeMuxer(muxed)
await self.peerStore.identify(muxed)
await self.connManager.triggerPeerEvents(
muxed.connection.peerId,
PeerEvent(kind: PeerEventKind.Identified, initiator: true),
)
return muxed
except TooManyConnectionsError as exc:
raise newException(DialFailedError, exc.msg)
finally:
if lock.locked():
try:
lock.release()
except:
raiseAssert "checked with if"
except CancelledError as exc:
await muxed.close()
raise exc
except CatchableError as exc:
trace "Failed to finish outgoing upgrade", description = exc.msg
await muxed.close()
raise newException(DialFailedError, "Failed to finish outgoing upgrade")

method connect*(
self: Dialer,
Expand Down
16 changes: 9 additions & 7 deletions libp2p/protocols/connectivity/autonat/server.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,16 @@ proc sendResponseError(
except LPStreamError as exc:
trace "autonat failed to send response error", description = exc.msg, conn

proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} =
proc sendResponseOk(
conn: Connection, ma: MultiAddress
) {.async: (raises: [CancelledError]).} =
let pb = AutonatDialResponse(
status: ResponseStatus.Ok, text: Opt.some("Ok"), ma: Opt.some(ma)
).encode()
await conn.writeLp(pb.buffer)
try:
await conn.writeLp(pb.buffer)
except LPStreamError as exc:
trace "autonat failed to send response ok", description = exc.msg, conn

proc tryDial(
autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]
Expand All @@ -80,8 +85,8 @@ proc tryDial(

# tryDial is to bypass the global max connections limit
futs = addrs.mapIt(autonat.switch.dialer.tryDial(conn.peerId, @[it]))
let raceFut = await anyCompletedCatchable(futs).wait(autonat.dialTimeout)
let ma = await raceFut
let fut = await anyCompletedCatchable(futs).wait(autonat.dialTimeout)
let ma = await fut
ma.withValue(maddr):
await conn.sendResponseOk(maddr)
else:
Expand All @@ -94,9 +99,6 @@ proc tryDial(
except AsyncTimeoutError as exc:
debug "Dial timeout", addrs, description = exc.msg
await conn.sendResponseError(DialError, "Dial timeout")
except CatchableError as exc:
debug "Unexpected error", addrs, description = exc.msg
await conn.sendResponseError(DialError, "Unexpected error")
finally:
autonat.sem.release()
for f in futs:
Expand Down
23 changes: 23 additions & 0 deletions tests/testdialer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import std/options
import chronos
import sequtils
import unittest2
import ../libp2p/[builders, switch]
import ./helpers
Expand All @@ -34,3 +35,25 @@ suite "Dialer":
check src.connManager.connCount(dst.peerInfo.peerId) == 2

await allFutures(src.stop(), dst.stop())

asyncTest "Max connections reached":
var switches: seq[Switch]

let dst = newStandardSwitch(maxConnections = 2)
await dst.start()
switches.add(dst)

for i in 1 ..< 3:
let src = newStandardSwitch()
switches.add(src)
await src.start()
await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs, true, false)

let src = newStandardSwitch()
switches.add(src)
await src.start()
check not await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs).withTimeout(
1000.millis
)

await allFuturesThrowing(allFutures(switches.mapIt(it.stop())))
5 changes: 3 additions & 2 deletions tests/testswitch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import stew/byteutils
import
../libp2p/[
errors,
dial,
switch,
multistream,
builders,
Expand Down Expand Up @@ -739,7 +740,7 @@ suite "Switch":
1000.millis
)

expect TooManyConnectionsError:
expect DialFailedError:
await srcSwitch.connect(dstSwitch.peerInfo.peerId, dstSwitch.peerInfo.addrs)

switches.add(srcSwitch)
Expand Down Expand Up @@ -792,7 +793,7 @@ suite "Switch":
1000.millis
)

expect TooManyConnectionsError:
expect DialFailedError:
await srcSwitch.connect(dstSwitch.peerInfo.peerId, dstSwitch.peerInfo.addrs)

switches.add(srcSwitch)
Expand Down

0 comments on commit 295a760

Please sign in to comment.