Skip to content

Commit

Permalink
chore: specifing raising exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
vladopajic committed Feb 11, 2025
1 parent 78a4344 commit 0740837
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 33 deletions.
36 changes: 22 additions & 14 deletions libp2p/connmanager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ proc removeConnEventHandler*(
) =
c.connEvents[kind].excl(handler)

proc triggerConnEvent*(c: ConnManager, peerId: PeerId, event: ConnEvent) {.async.} =
proc triggerConnEvent*(
c: ConnManager, peerId: PeerId, event: ConnEvent
) {.async: (raises: [CancelledError]).} =
try:
trace "About to trigger connection events", peer = peerId
if c.connEvents[event.kind].len() > 0:
Expand Down Expand Up @@ -154,7 +156,9 @@ proc removePeerEventHandler*(
) =
c.peerEvents[kind].excl(handler)

proc triggerPeerEvents*(c: ConnManager, peerId: PeerId, event: PeerEvent) {.async.} =
proc triggerPeerEvents*(
c: ConnManager, peerId: PeerId, event: PeerEvent
) {.async: (raises: [CancelledError]).} =
trace "About to trigger peer events", peer = peerId
if c.peerEvents[event.kind].len == 0:
return
Expand All @@ -174,7 +178,7 @@ proc triggerPeerEvents*(c: ConnManager, peerId: PeerId, event: PeerEvent) {.asyn

proc expectConnection*(
c: ConnManager, p: PeerId, dir: Direction
): Future[Muxer] {.async.} =
): Future[Muxer] {.async: (raises: [AlreadyExpectingConnectionError, CatchableError]).} =
## Wait for a peer to connect to us. This will bypass the `MaxConnectionsPerPeer`
let key = (p, dir)
if key in c.expectedConnectionsOverLimit:
Expand Down Expand Up @@ -205,7 +209,7 @@ proc contains*(c: ConnManager, muxer: Muxer): bool =
let conn = muxer.connection
return muxer in c.muxed.getOrDefault(conn.peerId)

proc closeMuxer(muxer: Muxer) {.async.} =
proc closeMuxer(muxer: Muxer) {.async: (raises:[]).} =
trace "Cleaning up muxer", m = muxer

await muxer.close()
Expand All @@ -216,7 +220,7 @@ proc closeMuxer(muxer: Muxer) {.async.} =
trace "Exception in close muxer handler", description = exc.msg
trace "Cleaned up muxer", m = muxer

proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} =
proc muxCleanup(c: ConnManager, mux: Muxer) {.async: (raises:[]).} =
try:
trace "Triggering disconnect events", mux
let peerId = mux.connection.peerId
Expand All @@ -238,7 +242,7 @@ proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} =
# do not need to propagate CancelledError and should handle other errors
warn "Unexpected exception peer cleanup handler", mux, description = exc.msg

proc onClose(c: ConnManager, mux: Muxer) {.async.} =
proc onClose(c: ConnManager, mux: Muxer) {.async: (raises:[]).} =
## connection close even handler
##
## triggers the connections resource cleanup
Expand Down Expand Up @@ -272,7 +276,7 @@ proc selectMuxer*(c: ConnManager, peerId: PeerId): Muxer =
trace "connection not found", peerId
return mux

proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [CatchableError].} =
proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [LPError].} =
## store the connection and muxer
##

Expand Down Expand Up @@ -324,7 +328,7 @@ proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [CatchableError].} =

trace "Stored muxer", muxer, direction = $muxer.connection.dir, peers = c.muxed.len

proc getIncomingSlot*(c: ConnManager): Future[ConnectionSlot] {.async.} =
proc getIncomingSlot*(c: ConnManager): Future[ConnectionSlot] {.async: (raises:[CatchableError]).} =
await c.inSema.acquire()
return ConnectionSlot(connManager: c, direction: In)

Expand Down Expand Up @@ -357,7 +361,7 @@ proc trackConnection*(cs: ConnectionSlot, conn: Connection) =
cs.release()
return

proc semaphoreMonitor() {.async.} =
proc semaphoreMonitor() {.async: (raises: []).} =
try:
await conn.join()
except CatchableError as exc:
Expand All @@ -373,28 +377,32 @@ proc trackMuxer*(cs: ConnectionSlot, mux: Muxer) =
return
cs.trackConnection(mux.connection)

proc getStream*(c: ConnManager, muxer: Muxer): Future[Connection] {.async.} =
proc getStream*(
c: ConnManager, muxer: Muxer
): Future[Connection] {.async: (raises: [CancelledError, LPStreamError, MuxerError]).} =
## get a muxed stream for the passed muxer
##

if not (isNil(muxer)):
return await muxer.newStream()

proc getStream*(c: ConnManager, peerId: PeerId): Future[Connection] {.async.} =
proc getStream*(
c: ConnManager, peerId: PeerId
): Future[Connection] {.async: (raises: [CancelledError, LPStreamError, MuxerError]).} =
## get a muxed stream for the passed peer from any connection
##

return await c.getStream(c.selectMuxer(peerId))

proc getStream*(
c: ConnManager, peerId: PeerId, dir: Direction
): Future[Connection] {.async.} =
): Future[Connection] {.async: (raises: [CancelledError, LPStreamError, MuxerError]).} =
## get a muxed stream for the passed peer from a connection with `dir`
##

return await c.getStream(c.selectMuxer(peerId, dir))

proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} =
proc dropPeer*(c: ConnManager, peerId: PeerId) {.async: (raises:[]).} =
## drop connections and cleanup resources for peer
##
trace "Dropping peer", peerId
Expand All @@ -405,7 +413,7 @@ proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} =

trace "Peer dropped", peerId

proc close*(c: ConnManager) {.async.} =
proc close*(c: ConnManager) {.async: (raises:[]).} =
## cleanup resources for the connection
## manager
##
Expand Down
22 changes: 11 additions & 11 deletions libp2p/dialer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ proc dialAndUpgrade(
hostname: string,
address: MultiAddress,
dir = Direction.Out,
): Future[Muxer] {.async.} =
): Future[Muxer] {.async: (raises:[CancelledError]).} =
for transport in self.transports: # for each transport
if transport.handles(address): # check if it can dial it
trace "Dialing address", address, peerId = peerId.get(default(PeerId)), hostname
Expand Down Expand Up @@ -105,7 +105,7 @@ proc dialAndUpgrade(

proc expandDnsAddr(
self: Dialer, peerId: Opt[PeerId], address: MultiAddress
): Future[seq[(MultiAddress, Opt[PeerId])]] {.async.} =
): Future[seq[(MultiAddress, Opt[PeerId])]] {.async: (raises: [CancelledError, MaError, TransportAddressError, LPError, CatchableError ]).} =
if not DNSADDR.matchPartial(address):
return @[(address, peerId)]
if isNil(self.nameResolver):
Expand All @@ -132,7 +132,7 @@ proc expandDnsAddr(

proc dialAndUpgrade(
self: Dialer, peerId: Opt[PeerId], addrs: seq[MultiAddress], dir = Direction.Out
): Future[Muxer] {.async.} =
): Future[Muxer] {.async: (raises: [CancelledError, MaError, TransportAddressError, LPError, CatchableError ]).} =
debug "Dialing peer", peerId = peerId.get(default(PeerId)), addrs

for rawAddress in addrs:
Expand Down Expand Up @@ -169,7 +169,7 @@ proc internalConnect(
forceDial: bool,
reuseConnection = true,
dir = Direction.Out,
): Future[Muxer] {.async.} =
): Future[Muxer] {.async: (raises: [CatchableError]).} =
if Opt.some(self.localPeerId) == peerId:
raise newException(CatchableError, "can't dial self!")

Expand Down Expand Up @@ -218,7 +218,7 @@ method connect*(
forceDial = false,
reuseConnection = true,
dir = Direction.Out,
) {.async.} =
) {.async: (rases: []).} =
## connect remote peer without negotiating
## a protocol
##
Expand All @@ -231,7 +231,7 @@ method connect*(

method connect*(
self: Dialer, address: MultiAddress, allowUnknownPeerId = false
): Future[PeerId] {.async.} =
): Future[PeerId] {.async: (rases: []).} =
## Connects to a peer and retrieve its PeerId

parseFullAddress(address).toOpt().withValue(fullAddress):
Expand All @@ -249,7 +249,7 @@ method connect*(

proc negotiateStream(
self: Dialer, conn: Connection, protos: seq[string]
): Future[Connection] {.async.} =
): Future[Connection] {.async: (raises: [DialFailedError, CancelledError, LPStreamError, MultiStreamError])} =
trace "Negotiating stream", conn, protos
let selected = await MultistreamSelect.select(conn, protos)
if not protos.contains(selected):
Expand All @@ -260,7 +260,7 @@ proc negotiateStream(

method tryDial*(
self: Dialer, peerId: PeerId, addrs: seq[MultiAddress]
): Future[Opt[MultiAddress]] {.async.} =
): Future[Opt[MultiAddress]] {.base, async: (raises: [DialFailedError, CancelledError]).} =
## Create a protocol stream in order to check
## if a connection is possible.
## Doesn't use the Connection Manager to save it.
Expand All @@ -280,7 +280,7 @@ method tryDial*(

method dial*(
self: Dialer, peerId: PeerId, protos: seq[string]
): Future[Connection] {.async.} =
): Future[Connection] {.base, async: (raises: [DialFailedError, CancelledError, LPStreamError, MultiStreamError, MuxerError])} =
## create a protocol stream over an
## existing connection
##
Expand All @@ -298,7 +298,7 @@ method dial*(
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false,
): Future[Connection] {.async.} =
): Future[Connection] {.base, async: (raises: [DialFailedError, CancelledError, CatchableError]).} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##
Expand All @@ -307,7 +307,7 @@ method dial*(
conn: Muxer
stream: Connection

proc cleanup() {.async.} =
proc cleanup() {.async: (raises: []).} =
if not (isNil(stream)):
await stream.closeWithEOF()

Expand Down
2 changes: 1 addition & 1 deletion libp2p/peerinfo.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func shortLog*(p: PeerInfo): auto =
chronicles.formatIt(PeerInfo):
shortLog(it)

proc update*(p: PeerInfo) {.async.} =
proc update*(p: PeerInfo) {.async: (raises: [CatchableError]).} =
# p.addrs.len == 0 overrides addrs only if it is the first time update is being executed or if the field is empty.
# p.addressMappers.len == 0 is for when all addressMappers have been removed,
# and we wish to have addrs in its initial state, i.e., a copy of listenAddrs.
Expand Down
4 changes: 3 additions & 1 deletion libp2p/peerstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ proc cleanup*(peerStore: PeerStore, peerId: PeerId) =
peerStore.del(peerStore.toClean[0])
peerStore.toClean.delete(0)

proc identify*(peerStore: PeerStore, muxer: Muxer) {.async.} =
proc identify*(
peerStore: PeerStore, muxer: Muxer
) {.async: (raises: [CancelledError, LPStreamError, MultiStreamError, MuxerError, IdentityInvalidMsgError, IdentityNoMatchError]).} =
# new stream for identify
var stream = await muxer.newStream()
if stream == nil:
Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/identify.nim
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ method init*(p: Identify) =

proc identify*(
self: Identify, conn: Connection, remotePeerId: PeerId
): Future[IdentifyInfo] {.async.} =
): Future[IdentifyInfo] {.async: (raises: [CancelledError, LPStreamError, IdentityInvalidMsgError, IdentityNoMatchError]).} =
trace "initiating identify", conn
var message = await conn.readLp(64 * 1024)
if len(message) == 0:
Expand Down
11 changes: 7 additions & 4 deletions libp2p/switch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import
peerstore,
errors,
utility,
dialer
dialer,
./muxers/muxer

export connmanager, upgrade, dialer, peerstore

Expand Down Expand Up @@ -200,7 +201,9 @@ proc mount*[T: LPProtocol](
s.ms.addHandler(proto.codecs, proto, matcher)
s.peerInfo.protocols.add(proto.codec)

proc upgrader(switch: Switch, trans: Transport, conn: Connection) {.async.} =
proc upgrader(
switch: Switch, trans: Transport, conn: Connection
) {.async: (raises: [CancelledError, LPError, LPStreamError, MultiStreamError, MuxerError, IdentityInvalidMsgError, IdentityNoMatchError]).} =
let muxed = await trans.upgrade(conn, Opt.none(PeerId))
switch.connManager.storeMuxer(muxed)
await switch.peerStore.identify(muxed)
Expand All @@ -211,7 +214,7 @@ proc upgrader(switch: Switch, trans: Transport, conn: Connection) {.async.} =

proc upgradeMonitor(
switch: Switch, trans: Transport, conn: Connection, upgrades: AsyncSemaphore
) {.async.} =
) {.async: (raises: []).} =
try:
await switch.upgrader(trans, conn).wait(30.seconds)
except CatchableError as exc:
Expand All @@ -223,7 +226,7 @@ proc upgradeMonitor(
finally:
upgrades.release()

proc accept(s: Switch, transport: Transport) {.async.} = # noraises
proc accept(s: Switch, transport: Transport) {.async: (raises: []).} =
## switch accept loop, ran for every transport
##

Expand Down
2 changes: 1 addition & 1 deletion libp2p/wire.nim
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ proc connect*(
child: StreamTransport = nil,
flags = default(set[SocketFlags]),
localAddress: Opt[MultiAddress] = Opt.none(MultiAddress),
): Future[StreamTransport] {.async.} =
): Future[StreamTransport] {.async: (raises:[MaInvalidAddress, CancelledError, TransportError, LPError]).} =
## Open new connection to remote peer with address ``ma`` and create
## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` is size of internal buffer for transport.
Expand Down

0 comments on commit 0740837

Please sign in to comment.