Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dialer): list raised exceptions #1264

Merged
merged 31 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
caf9741
chore: add raising exceptions to services setup
vladopajic Feb 12, 2025
08fe20e
chore: add exceptions to stop
vladopajic Feb 13, 2025
d5b2b57
chore: add exceptions list to run method
vladopajic Feb 13, 2025
68e2866
fix: remove CatchableError
vladopajic Feb 13, 2025
66b4cf1
chore: more effort
vladopajic Feb 13, 2025
5a72a86
style: fix
vladopajic Feb 13, 2025
331ebab
chore: discard to raiseAssert
vladopajic Feb 13, 2025
3e999f6
chore: add if
vladopajic Feb 13, 2025
0b53b9d
style: fix
vladopajic Feb 13, 2025
b88e3ac
Merge branch 'master' into raises-servces
vladopajic Feb 13, 2025
49e88df
chore: change log level
vladopajic Feb 14, 2025
80484ed
Merge branch 'master' into raises-servces
vladopajic Feb 18, 2025
44ed79f
Merge branch 'master' into raises-servces
vladopajic Feb 19, 2025
27d5c0a
Merge branch 'master' into raises-servces
vladopajic Feb 19, 2025
bb07f60
chore: add raised errors to dialer
vladopajic Feb 20, 2025
a34b122
chore: add to switch
vladopajic Feb 20, 2025
cb06d1f
chore: autonat tryDial
vladopajic Feb 20, 2025
7562c30
chore: fix tests
vladopajic Feb 20, 2025
d406f6a
chore: more effort
vladopajic Feb 20, 2025
1726725
chore: style fix
vladopajic Feb 20, 2025
295a760
chore: improvement
vladopajic Feb 20, 2025
8207c44
chore: utilize DialFailedError
vladopajic Feb 21, 2025
7b65e32
chore: msg fix
vladopajic Feb 21, 2025
2639d8d
chore: utilize DialFailedError
vladopajic Feb 21, 2025
ca6fef3
chore: cosmetics
vladopajic Feb 21, 2025
1ef5a16
fix: nil stream
vladopajic Feb 21, 2025
41f8f4a
chore: tidy
vladopajic Feb 21, 2025
3f19f50
chore: anyCompleted improvements
vladopajic Feb 24, 2025
95177db
chore: removed if
vladopajic Feb 24, 2025
2e7d50c
Merge branch 'master' into raises-dialer
vladopajic Feb 24, 2025
328e603
Merge branch 'master' into raises-dialer
vladopajic Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions libp2p/dial.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import peerid, stream/connection, transports/transport

export results

type Dial* = ref object of RootObj
type
Dial* = ref object of RootObj
DialFailedError* = object of LPError

method connect*(
self: Dial,
Expand All @@ -24,7 +26,7 @@ method connect*(
forceDial = false,
reuseConnection = true,
dir = Direction.Out,
) {.async, base.} =
) {.base, async: (raises: [DialFailedError, CancelledError]).} =
## connect remote peer without negotiating
## a protocol
##
Expand All @@ -33,14 +35,14 @@ method connect*(

method connect*(
self: Dial, address: MultiAddress, allowUnknownPeerId = false
): Future[PeerId] {.async, base.} =
): Future[PeerId] {.base, async: (raises: [DialFailedError, CancelledError]).} =
## Connects to a peer and retrieve its PeerId

doAssert(false, "Not implemented!")

method dial*(
self: Dial, peerId: PeerId, protos: seq[string]
): Future[Connection] {.async, base.} =
): Future[Connection] {.base, async: (raises: [DialFailedError, CancelledError]).} =
## create a protocol stream over an
## existing connection
##
Expand All @@ -53,7 +55,7 @@ method dial*(
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false,
): Future[Connection] {.async, base.} =
): Future[Connection] {.base, async: (raises: [DialFailedError, CancelledError]).} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##
Expand All @@ -65,5 +67,7 @@ method addTransport*(self: Dial, transport: Transport) {.base.} =

method tryDial*(
self: Dial, peerId: PeerId, addrs: seq[MultiAddress]
): Future[Opt[MultiAddress]] {.async, base.} =
): Future[Opt[MultiAddress]] {.
base, async: (raises: [DialFailedError, CancelledError])
.} =
doAssert(false, "Not implemented!")
139 changes: 81 additions & 58 deletions libp2p/dialer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,21 @@ declareCounter(libp2p_total_dial_attempts, "total attempted dials")
declareCounter(libp2p_successful_dials, "dialed successful peers")
declareCounter(libp2p_failed_dials, "failed dials")

type
DialFailedError* = object of LPError

Dialer* = ref object of Dial
localPeerId*: PeerId
connManager: ConnManager
dialLock: Table[PeerId, AsyncLock]
transports: seq[Transport]
peerStore: PeerStore
nameResolver: NameResolver
type Dialer* = ref object of Dial
localPeerId*: PeerId
connManager: ConnManager
dialLock: Table[PeerId, AsyncLock]
transports: seq[Transport]
peerStore: PeerStore
nameResolver: NameResolver

proc dialAndUpgrade(
self: Dialer,
peerId: Opt[PeerId],
hostname: string,
address: MultiAddress,
dir = Direction.Out,
): Future[Muxer] {.async.} =
): Future[Muxer] {.async: (raises: [CancelledError]).} =
for transport in self.transports: # for each transport
if transport.handles(address): # check if it can dial it
trace "Dialing address", address, peerId = peerId.get(default(PeerId)), hostname
Expand Down Expand Up @@ -105,7 +102,9 @@ proc dialAndUpgrade(

proc expandDnsAddr(
self: Dialer, peerId: Opt[PeerId], address: MultiAddress
): Future[seq[(MultiAddress, Opt[PeerId])]] {.async.} =
): Future[seq[(MultiAddress, Opt[PeerId])]] {.
async: (raises: [CancelledError, MaError, TransportAddressError, LPError])
.} =
if not DNSADDR.matchPartial(address):
return @[(address, peerId)]
if isNil(self.nameResolver):
Expand All @@ -115,7 +114,10 @@ proc expandDnsAddr(
let
toResolve =
if peerId.isSome:
address & MultiAddress.init(multiCodec("p2p"), peerId.tryGet()).tryGet()
try:
address & MultiAddress.init(multiCodec("p2p"), peerId.tryGet()).tryGet()
except ResultError[void]:
raiseAssert "checked with if"
else:
address
resolved = await self.nameResolver.resolveDnsAddr(toResolve)
Expand All @@ -132,7 +134,9 @@ proc expandDnsAddr(

proc dialAndUpgrade(
self: Dialer, peerId: Opt[PeerId], addrs: seq[MultiAddress], dir = Direction.Out
): Future[Muxer] {.async.} =
): Future[Muxer] {.
async: (raises: [CancelledError, MaError, TransportAddressError, LPError])
.} =
debug "Dialing peer", peerId = peerId.get(default(PeerId)), addrs

for rawAddress in addrs:
Expand Down Expand Up @@ -169,47 +173,59 @@ proc internalConnect(
forceDial: bool,
reuseConnection = true,
dir = Direction.Out,
): Future[Muxer] {.async.} =
): Future[Muxer] {.async: (raises: [DialFailedError, CancelledError]).} =
if Opt.some(self.localPeerId) == peerId:
raise newException(CatchableError, "can't dial self!")
raise newException(DialFailedError, "can't dial self!")

# Ensure there's only one in-flight attempt per peer
let lock = self.dialLock.mgetOrPut(peerId.get(default(PeerId)), newAsyncLock())
try:
await lock.acquire()

if reuseConnection:
peerId.withValue(peerId):
self.tryReusingConnection(peerId).withValue(mux):
return mux

let slot = self.connManager.getOutgoingSlot(forceDial)
let muxed =
try:
await self.dialAndUpgrade(peerId, addrs, dir)
except CatchableError as exc:
slot.release()
raise exc
slot.trackMuxer(muxed)
if isNil(muxed): # None of the addresses connected
raise newException(DialFailedError, "Unable to establish outgoing link")
await lock.acquire()
defer:
try:
lock.release()
except AsyncLockError:
raiseAssert "lock must have been acquired in line above"

if reuseConnection:
peerId.withValue(peerId):
self.tryReusingConnection(peerId).withValue(mux):
return mux

let slot =
try:
self.connManager.storeMuxer(muxed)
await self.peerStore.identify(muxed)
await self.connManager.triggerPeerEvents(
muxed.connection.peerId,
PeerEvent(kind: PeerEventKind.Identified, initiator: true),
)
except CatchableError as exc:
trace "Failed to finish outgoung upgrade", description = exc.msg
await muxed.close()
self.connManager.getOutgoingSlot(forceDial)
except TooManyConnectionsError as exc:
raise newException(DialFailedError, exc.msg)

let muxed =
try:
await self.dialAndUpgrade(peerId, addrs, dir)
except CancelledError as exc:
slot.release()
raise exc
except CatchableError as exc:
slot.release()
raise newException(DialFailedError, exc.msg)

slot.trackMuxer(muxed)
if isNil(muxed): # None of the addresses connected
raise newException(DialFailedError, "Unable to establish outgoing link")

try:
self.connManager.storeMuxer(muxed)
await self.peerStore.identify(muxed)
await self.connManager.triggerPeerEvents(
muxed.connection.peerId,
PeerEvent(kind: PeerEventKind.Identified, initiator: true),
)
return muxed
finally:
if lock.locked():
lock.release()
except CancelledError as exc:
await muxed.close()
raise exc
except CatchableError as exc:
trace "Failed to finish outgoing upgrade", description = exc.msg
await muxed.close()
raise newException(DialFailedError, "Failed to finish outgoing upgrade")

method connect*(
self: Dialer,
Expand All @@ -218,7 +234,7 @@ method connect*(
forceDial = false,
reuseConnection = true,
dir = Direction.Out,
) {.async.} =
) {.async: (raises: [DialFailedError, CancelledError]).} =
## connect remote peer without negotiating
## a protocol
##
Expand All @@ -231,7 +247,7 @@ method connect*(

method connect*(
self: Dialer, address: MultiAddress, allowUnknownPeerId = false
): Future[PeerId] {.async.} =
): Future[PeerId] {.async: (raises: [DialFailedError, CancelledError]).} =
## Connects to a peer and retrieve its PeerId

parseFullAddress(address).toOpt().withValue(fullAddress):
Expand All @@ -249,7 +265,7 @@ method connect*(

proc negotiateStream(
self: Dialer, conn: Connection, protos: seq[string]
): Future[Connection] {.async.} =
): Future[Connection] {.async: (raises: [CatchableError]).} =
trace "Negotiating stream", conn, protos
let selected = await MultistreamSelect.select(conn, protos)
if not protos.contains(selected):
Expand All @@ -260,7 +276,7 @@ proc negotiateStream(

method tryDial*(
self: Dialer, peerId: PeerId, addrs: seq[MultiAddress]
): Future[Opt[MultiAddress]] {.async.} =
): Future[Opt[MultiAddress]] {.async: (raises: [DialFailedError, CancelledError]).} =
## Create a protocol stream in order to check
## if a connection is possible.
## Doesn't use the Connection Manager to save it.
Expand All @@ -280,25 +296,32 @@ method tryDial*(

method dial*(
self: Dialer, peerId: PeerId, protos: seq[string]
): Future[Connection] {.async.} =
): Future[Connection] {.async: (raises: [DialFailedError, CancelledError]).} =
## create a protocol stream over an
## existing connection
##

trace "Dialing (existing)", peerId, protos
let stream = await self.connManager.getStream(peerId)
if stream.isNil:
raise newException(DialFailedError, "Couldn't get muxed stream")

return await self.negotiateStream(stream, protos)
try:
let stream = await self.connManager.getStream(peerId)
if stream.isNil:
raise newException(DialFailedError, "Couldn't get muxed stream")
return await self.negotiateStream(stream, protos)
except CancelledError as exc:
trace "Dial canceled"
raise exc
except CatchableError as exc:
trace "Error dialing", description = exc.msg
raise newException(DialFailedError, exc.msg)

method dial*(
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false,
): Future[Connection] {.async.} =
): Future[Connection] {.async: (raises: [DialFailedError, CancelledError]).} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##
Expand All @@ -307,7 +330,7 @@ method dial*(
conn: Muxer
stream: Connection

proc cleanup() {.async.} =
proc cleanup() {.async: (raises: []).} =
if not (isNil(stream)):
await stream.closeWithEOF()

Expand All @@ -331,7 +354,7 @@ method dial*(
except CatchableError as exc:
debug "Error dialing", conn, description = exc.msg
await cleanup()
raise exc
raise newException(DialFailedError, exc.msg)

method addTransport*(self: Dialer, t: Transport) =
self.transports &= t
Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/connectivity/autonat/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ method dialMe*(
await switch.dial(pid, @[AutonatCodec])
else:
await switch.dial(pid, addrs, AutonatCodec)
except CatchableError as err:
except DialFailedError as err:
raise
newException(AutonatError, "Unexpected error when dialling: " & err.msg, err)

Expand Down
30 changes: 18 additions & 12 deletions libp2p/protocols/connectivity/autonat/server.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.}

proc sendResponseError(
conn: Connection, status: ResponseStatus, text: string = ""
) {.async.} =
) {.async: (raises: [CancelledError]).} =
let pb = AutonatDialResponse(
status: status,
text:
Expand All @@ -50,17 +50,27 @@ proc sendResponseError(
Opt.some(text),
ma: Opt.none(MultiAddress),
).encode()
await conn.writeLp(pb.buffer)
try:
await conn.writeLp(pb.buffer)
except LPStreamError as exc:
trace "autonat failed to send response error", description = exc.msg, conn

proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} =
proc sendResponseOk(
conn: Connection, ma: MultiAddress
) {.async: (raises: [CancelledError]).} =
let pb = AutonatDialResponse(
status: ResponseStatus.Ok, text: Opt.some("Ok"), ma: Opt.some(ma)
).encode()
await conn.writeLp(pb.buffer)
try:
await conn.writeLp(pb.buffer)
except LPStreamError as exc:
trace "autonat failed to send response ok", description = exc.msg, conn

proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} =
proc tryDial(
autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]
) {.async: (raises: [DialFailedError, CancelledError]).} =
await autonat.sem.acquire()
var futs: seq[Future[Opt[MultiAddress]]]
var futs: seq[Future[Opt[MultiAddress]].Raising([DialFailedError, CancelledError])]
try:
# This is to bypass the per peer max connections limit
let outgoingConnection =
Expand All @@ -71,7 +81,8 @@ proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.asy
return
# Safer to always try to cancel cause we aren't sure if the connection was established
defer:
outgoingConnection.cancel()
outgoingConnection.cancelSoon()

# tryDial is to bypass the global max connections limit
futs = addrs.mapIt(autonat.switch.dialer.tryDial(conn.peerId, @[it]))
let fut = await anyCompleted(futs).wait(autonat.dialTimeout)
Expand All @@ -88,9 +99,6 @@ proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.asy
except AsyncTimeoutError as exc:
debug "Dial timeout", addrs, description = exc.msg
await conn.sendResponseError(DialError, "Dial timeout")
except CatchableError as exc:
debug "Unexpected error", addrs, description = exc.msg
await conn.sendResponseError(DialError, "Unexpected error")
finally:
autonat.sem.release()
for f in futs:
Expand Down Expand Up @@ -163,8 +171,6 @@ proc new*(
await autonat.handleDial(conn, msg)
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "exception in autonat handler", description = exc.msg, conn
finally:
trace "exiting autonat handler", conn
await conn.close()
Expand Down
Loading
Loading