Skip to content

Commit

Permalink
chore: specify raised exceptions in miscellaneous places (#1269)
Browse files Browse the repository at this point in the history
  • Loading branch information
vladopajic authored Feb 28, 2025
1 parent f7daad9 commit adf2345
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 37 deletions.
2 changes: 1 addition & 1 deletion libp2p/connmanager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ proc selectMuxer*(c: ConnManager, peerId: PeerId): Muxer =
trace "connection not found", peerId
return mux

proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [CatchableError].} =
proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [LPError].} =
## store the connection and muxer
##

Expand Down
1 change: 1 addition & 0 deletions libp2p/daemon/daemonapi.nim
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,7 @@ proc openStream*(
raise newException(DaemonLocalError, "Wrong message type!")

proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
# must not specify raised exceptions as this is StreamCallback from chronos
var api = getUserData[DaemonAPI](server)
var message = await transp.recvMessage()
var pb = initProtoBuffer(message)
Expand Down
24 changes: 17 additions & 7 deletions libp2p/discovery/discoverymngr.nim
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,20 @@ type
advertisementUpdated*: AsyncEvent
advertiseLoop*: Future[void]

method request*(self: DiscoveryInterface, pa: PeerAttributes) {.async, base.} =
DiscoveryError* = object of LPError
DiscoveryFinished* = object of LPError

method request*(
self: DiscoveryInterface, pa: PeerAttributes
) {.base, async: (raises: [DiscoveryError, CancelledError]).} =
doAssert(false, "Not implemented!")

method advertise*(self: DiscoveryInterface) {.async, base.} =
method advertise*(
self: DiscoveryInterface
) {.base, async: (raises: [CancelledError]).} =
doAssert(false, "Not implemented!")

type
DiscoveryError* = object of LPError
DiscoveryFinished* = object of LPError

DiscoveryQuery* = ref object
attr: PeerAttributes
peers: AsyncQueue[PeerAttributes]
Expand Down Expand Up @@ -137,7 +141,9 @@ template forEach*(query: DiscoveryQuery, code: untyped) =
## peer attritubtes are available through the variable
## `peer`

proc forEachInternal(q: DiscoveryQuery) {.async.} =
proc forEachInternal(
q: DiscoveryQuery
) {.async: (raises: [CancelledError, DiscoveryError]).} =
while true:
let peer {.inject.} =
try:
Expand All @@ -162,7 +168,11 @@ proc stop*(dm: DiscoveryManager) =
continue
i.advertiseLoop.cancel()

proc getPeer*(query: DiscoveryQuery): Future[PeerAttributes] {.async.} =
proc getPeer*(
query: DiscoveryQuery
): Future[PeerAttributes] {.
async: (raises: [CancelledError, DiscoveryError, DiscoveryFinished])
.} =
let getter = query.peers.popFirst()

try:
Expand Down
6 changes: 4 additions & 2 deletions libp2p/discovery/rendezvousinterface.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ type

proc `==`*(a, b: RdvNamespace): bool {.borrow.}

method request*(self: RendezVousInterface, pa: PeerAttributes) {.async.} =
method request*(
self: RendezVousInterface, pa: PeerAttributes
) {.async: (raises: [DiscoveryError, CancelledError]).} =
var namespace = ""
for attr in pa:
if attr.ofType(RdvNamespace):
Expand All @@ -48,7 +50,7 @@ method request*(self: RendezVousInterface, pa: PeerAttributes) {.async.} =

await sleepAsync(self.timeToRequest)

method advertise*(self: RendezVousInterface) {.async.} =
method advertise*(self: RendezVousInterface) {.async: (raises: [CancelledError]).} =
while true:
var toAdvertise: seq[string]
for attr in self.toAdvertise:
Expand Down
11 changes: 10 additions & 1 deletion libp2p/peerstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,16 @@ proc cleanup*(peerStore: PeerStore, peerId: PeerId) =
peerStore.del(peerStore.toClean[0])
peerStore.toClean.delete(0)

proc identify*(peerStore: PeerStore, muxer: Muxer) {.async.} =
proc identify*(
peerStore: PeerStore, muxer: Muxer
) {.
async: (
raises: [
CancelledError, IdentityNoMatchError, IdentityInvalidMsgError, MultiStreamError,
LPStreamError, MuxerError,
]
)
.} =
# new stream for identify
var stream = await muxer.newStream()
if stream == nil:
Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/connectivity/relay/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ proc handleRelayedConnect(

proc reserve*(
cl: RelayClient, peerId: PeerId, addrs: seq[MultiAddress] = @[]
): Future[Rsvp] {.async.} =
): Future[Rsvp] {.async: (raises: [ReservationError, DialFailedError, CancelledError]).} =
let conn = await cl.switch.dial(peerId, addrs, RelayV2HopCodec)
defer:
await conn.close()
Expand Down
9 changes: 5 additions & 4 deletions libp2p/protocols/rendezvous.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import
../utils/heartbeat,
../stream/connection,
../utils/offsettedseq,
../utils/semaphore
../utils/semaphore,
../discovery/discoverymngr

export chronicles

Expand Down Expand Up @@ -295,7 +296,7 @@ proc decode(_: typedesc[Message], buf: seq[byte]): Opt[Message] =
Opt.some(msg)

type
RendezVousError* = object of LPError
RendezVousError* = object of DiscoveryError
RegisteredData = object
expiration: Moment
peerId: PeerId
Expand Down Expand Up @@ -555,7 +556,7 @@ proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =

proc request*(
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int, peers: seq[PeerId]
): Future[seq[PeerRecord]] {.async.} =
): Future[seq[PeerRecord]] {.async: (raises: [DiscoveryError, CancelledError]).} =
var
s: Table[PeerId, (PeerRecord, Register)]
limit: uint64
Expand Down Expand Up @@ -634,7 +635,7 @@ proc request*(

proc request*(
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
): Future[seq[PeerRecord]] {.async.} =
): Future[seq[PeerRecord]] {.async: (raises: [DiscoveryError, CancelledError]).} =
await rdv.request(ns, l, rdv.peers)

proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
Expand Down
8 changes: 6 additions & 2 deletions libp2p/services/autorelayservice.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ proc addressMapper(

proc reserveAndUpdate(
self: AutoRelayService, relayPid: PeerId, switch: Switch
) {.async.} =
) {.async: (raises: [CatchableError]).} =
# CatchableError used to simplify raised errors here, as there could be
# many different errors raised but caller don't really care what is cause of error
while self.running:
let
rsvp = await self.client.reserve(relayPid).wait(chronos.seconds(5))
Expand Down Expand Up @@ -86,7 +88,9 @@ method setup*(
await self.run(switch)
return hasBeenSetUp

proc manageBackedOff(self: AutoRelayService, pid: PeerId) {.async.} =
proc manageBackedOff(
self: AutoRelayService, pid: PeerId
) {.async: (raises: [CancelledError]).} =
await sleepAsync(chronos.seconds(5))
self.backingOff.keepItIf(it != pid)
self.peerAvailable.fire()
Expand Down
55 changes: 36 additions & 19 deletions libp2p/switch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import
protocols/secure/secure,
peerinfo,
utils/semaphore,
./muxers/muxer,
connmanager,
nameresolving/nameresolver,
peerid,
Expand Down Expand Up @@ -61,6 +62,8 @@ type
started: bool
services*: seq[Service]

UpgradeError* = object of LPError

Service* = ref object of RootObj
inUse: bool

Expand Down Expand Up @@ -216,30 +219,44 @@ proc mount*[T: LPProtocol](
s.ms.addHandler(proto.codecs, proto, matcher)
s.peerInfo.protocols.add(proto.codec)

proc upgrader(switch: Switch, trans: Transport, conn: Connection) {.async.} =
let muxed = await trans.upgrade(conn, Opt.none(PeerId))
switch.connManager.storeMuxer(muxed)
await switch.peerStore.identify(muxed)
await switch.connManager.triggerPeerEvents(
muxed.connection.peerId, PeerEvent(kind: PeerEventKind.Identified, initiator: false)
)
trace "Connection upgrade succeeded"
proc upgrader(
switch: Switch, trans: Transport, conn: Connection
) {.async: (raises: [CancelledError, UpgradeError]).} =
try:
let muxed = await trans.upgrade(conn, Opt.none(PeerId))
switch.connManager.storeMuxer(muxed)
await switch.peerStore.identify(muxed)
await switch.connManager.triggerPeerEvents(
muxed.connection.peerId,
PeerEvent(kind: PeerEventKind.Identified, initiator: false),
)
except CancelledError as e:
raise e
except CatchableError as e:
raise newException(UpgradeError, e.msg, e)

proc upgradeMonitor(
switch: Switch, trans: Transport, conn: Connection, upgrades: AsyncSemaphore
) {.async.} =
) {.async: (raises: []).} =
var upgradeSuccessful = false
try:
await switch.upgrader(trans, conn).wait(30.seconds)
except CatchableError as exc:
if exc isnot CancelledError:
libp2p_failed_upgrades_incoming.inc()
if not isNil(conn):
await conn.close()
trace "Exception awaiting connection upgrade", description = exc.msg, conn
trace "Connection upgrade succeeded"
upgradeSuccessful = true
except CancelledError:
trace "Connection upgrade cancelled", conn
except AsyncTimeoutError:
trace "Connection upgrade timeout", conn
libp2p_failed_upgrades_incoming.inc()
except UpgradeError as e:
trace "Connection upgrade failed", description = e.msg, conn
libp2p_failed_upgrades_incoming.inc()
finally:
if (not upgradeSuccessful) and (not isNil(conn)):
await conn.close()
upgrades.release()

proc accept(s: Switch, transport: Transport) {.async.} = # noraises
proc accept(s: Switch, transport: Transport) {.async: (raises: []).} =
## switch accept loop, ran for every transport
##

Expand Down Expand Up @@ -286,7 +303,7 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises
await conn.close()
return

proc stop*(s: Switch) {.async, public.} =
proc stop*(s: Switch) {.public, async: (raises: [CancelledError]).} =
## Stop listening on every transport, and
## close every active connections

Expand Down Expand Up @@ -318,7 +335,7 @@ proc stop*(s: Switch) {.async, public.} =

trace "Switch stopped"

proc start*(s: Switch) {.async, public.} =
proc start*(s: Switch) {.public, async: (raises: [CancelledError, LPError]).} =
## Start listening on every transport

if s.started:
Expand All @@ -340,7 +357,7 @@ proc start*(s: Switch) {.async, public.} =
for fut in startFuts:
if fut.failed:
await s.stop()
raise fut.error
raise newException(LPError, "starting transports failed", fut.error)

for t in s.transports: # for each transport
if t.addrs.len > 0 or t.running:
Expand Down

0 comments on commit adf2345

Please sign in to comment.