Skip to content

Commit

Permalink
chore(protocols): specify raised exceptions (part 2) (#1268)
Browse files Browse the repository at this point in the history
  • Loading branch information
vladopajic authored Feb 28, 2025
1 parent 65052d7 commit f7daad9
Show file tree
Hide file tree
Showing 14 changed files with 116 additions and 55 deletions.
4 changes: 2 additions & 2 deletions libp2p/connmanager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type
discard

ConnEventHandler* =
proc(peerId: PeerId, event: ConnEvent): Future[void] {.gcsafe, raises: [].}
proc(peerId: PeerId, event: ConnEvent): Future[void] {.gcsafe, async: (raises: []).}

PeerEventKind* {.pure.} = enum
Left
Expand All @@ -58,7 +58,7 @@ type
discard

PeerEventHandler* =
proc(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe, raises: [].}
proc(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe, async: (raises: []).}

ConnManager* = ref object of RootObj
maxConnsPerPeer: int
Expand Down
43 changes: 36 additions & 7 deletions libp2p/protocols/connectivity/autonat/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ logScope:

type AutonatClient* = ref object of RootObj

proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} =
proc sendDial(
conn: Connection, pid: PeerId, addrs: seq[MultiAddress]
) {.async: (raises: [CancelledError, LPStreamError]).} =
let pb = AutonatDial(
peerInfo: Opt.some(AutonatPeerInfo(id: Opt.some(pid), addrs: addrs))
).encode()
Expand All @@ -30,7 +32,9 @@ method dialMe*(
switch: Switch,
pid: PeerId,
addrs: seq[MultiAddress] = newSeq[MultiAddress](),
): Future[MultiAddress] {.base, async.} =
): Future[MultiAddress] {.
base, async: (raises: [AutonatError, AutonatUnreachableError, CancelledError])
.} =
proc getResponseOrRaise(
autonatMsg: Opt[AutonatMsg]
): AutonatDialResponse {.raises: [AutonatError].} =
Expand All @@ -47,6 +51,8 @@ method dialMe*(
await switch.dial(pid, @[AutonatCodec])
else:
await switch.dial(pid, addrs, AutonatCodec)
except CancelledError as err:
raise err
except DialFailedError as err:
raise
newException(AutonatError, "Unexpected error when dialling: " & err.msg, err)
Expand All @@ -61,14 +67,37 @@ method dialMe*(
incomingConnection.cancel()
# Safer to always try to cancel cause we aren't sure if the peer dialled us or not
if incomingConnection.completed():
await (await incomingConnection).connection.close()
trace "sending Dial", addrs = switch.peerInfo.addrs
await conn.sendDial(switch.peerInfo.peerId, switch.peerInfo.addrs)
let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024)))
try:
await (await incomingConnection).connection.close()
except AlreadyExpectingConnectionError as e:
# this err is already handled above and could not happen later
error "Unexpected error", description = e.msg

try:
trace "sending Dial", addrs = switch.peerInfo.addrs
await conn.sendDial(switch.peerInfo.peerId, switch.peerInfo.addrs)
except CancelledError as e:
raise e
except CatchableError as e:
raise newException(AutonatError, "Sending dial failed", e)

var respBytes: seq[byte]
try:
respBytes = await conn.readLp(1024)
except CancelledError as e:
raise e
except CatchableError as e:
raise newException(AutonatError, "read Dial response failed", e)

let response = getResponseOrRaise(AutonatMsg.decode(respBytes))

return
case response.status
of ResponseStatus.Ok:
response.ma.tryGet()
try:
response.ma.tryGet()
except:
raiseAssert("checked with if")
of ResponseStatus.DialError:
raise newException(
AutonatUnreachableError, "Peer could not dial us back: " & response.text.get("")
Expand Down
4 changes: 3 additions & 1 deletion libp2p/protocols/connectivity/autonat/server.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ type Autonat* = ref object of LPProtocol
switch*: Switch
dialTimeout: Duration

proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} =
proc sendDial(
conn: Connection, pid: PeerId, addrs: seq[MultiAddress]
) {.async: (raises: [LPStreamError, CancelledError]).} =
let pb = AutonatDial(
peerInfo: Opt.some(AutonatPeerInfo(id: Opt.some(pid), addrs: addrs))
).encode()
Expand Down
10 changes: 8 additions & 2 deletions libp2p/protocols/connectivity/autonat/service.nim
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ proc askConnectedPeers(
if (await askPeer(self, switch, peer)) != Unknown:
answersFromPeers.inc()

proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.async.} =
proc schedule(
service: AutonatService, switch: Switch, interval: Duration
) {.async: (raises: [CancelledError]).} =
heartbeat "Scheduling AutonatService run", interval:
await service.run(switch)

Expand Down Expand Up @@ -214,15 +216,19 @@ method setup*(
if self.askNewConnectedPeers:
self.newConnectedPeerHandler = proc(
peerId: PeerId, event: PeerEvent
): Future[void] {.async.} =
): Future[void] {.async: (raises: []).} =
discard askPeer(self, switch, peerId)

switch.connManager.addPeerEventHandler(
self.newConnectedPeerHandler, PeerEventKind.Joined
)

self.scheduleInterval.withValue(interval):
self.scheduleHandle = schedule(self, switch, interval)

if self.enableAddressMapper:
switch.peerInfo.addressMappers.add(self.addressMapper)

return hasBeenSetup

method run*(
Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/connectivity/dcutr/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ proc new*(

proc startSync*(
self: DcutrClient, switch: Switch, remotePeerId: PeerId, addrs: seq[MultiAddress]
) {.async.} =
) {.async: (raises: [DcutrError, CancelledError]).} =
logScope:
peerId = switch.peerInfo.peerId

Expand Down
4 changes: 3 additions & 1 deletion libp2p/protocols/connectivity/dcutr/core.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ proc decode*(_: typedesc[DcutrMsg], buf: seq[byte]): DcutrMsg {.raises: [DcutrEr
raise newException(DcutrError, "Received malformed message")
return dcutrMsg

proc send*(conn: Connection, msgType: MsgType, addrs: seq[MultiAddress]) {.async.} =
proc send*(
conn: Connection, msgType: MsgType, addrs: seq[MultiAddress]
) {.async: (raises: [CancelledError, LPStreamError]).} =
let pb = DcutrMsg(msgType: msgType, addrs: addrs).encode()
await conn.writeLp(pb.buffer)

Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/connectivity/relay/relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ proc handleStreamV1(r: Relay, conn: Connection) {.async.} =
proc setup*(r: Relay, switch: Switch) =
r.switch = switch
r.switch.addPeerEventHandler(
proc(peerId: PeerId, event: PeerEvent) {.async.} =
proc(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} =
r.rsvp.del(peerId),
Left,
)
Expand Down
7 changes: 6 additions & 1 deletion libp2p/protocols/identify.nim
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ method init*(p: Identify) =

proc identify*(
self: Identify, conn: Connection, remotePeerId: PeerId
): Future[IdentifyInfo] {.async.} =
): Future[IdentifyInfo] {.
async: (
raises:
[IdentityInvalidMsgError, IdentityNoMatchError, LPStreamError, CancelledError]
)
.} =
trace "initiating identify", conn
var message = await conn.readLp(64 * 1024)
if len(message) == 0:
Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ proc init*[PubParams: object | bool](
topicsHigh: int.high,
)

proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} =
if event.kind == PeerEventKind.Joined:
pubsub.subscribePeer(peerId)
else:
Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/rendezvous.nim
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ proc unsubscribe*(rdv: RendezVous, ns: string) {.async.} =

proc setup*(rdv: RendezVous, switch: Switch) =
rdv.switch = switch
proc handlePeer(peerId: PeerId, event: PeerEvent) {.async.} =
proc handlePeer(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} =
if event.kind == PeerEventKind.Joined:
rdv.peers.add(peerId)
elif event.kind == PeerEventKind.Left:
Expand Down
6 changes: 4 additions & 2 deletions libp2p/services/autorelayservice.nim
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ method setup*(

let hasBeenSetUp = await procCall Service(self).setup(switch)
if hasBeenSetUp:
proc handlePeerIdentified(peerId: PeerId, event: PeerEvent) {.async.} =
proc handlePeerIdentified(
peerId: PeerId, event: PeerEvent
) {.async: (raises: []).} =
trace "Peer Identified", peerId
if self.relayPeers.len < self.maxNumRelays:
self.peerAvailable.fire()

proc handlePeerLeft(peerId: PeerId, event: PeerEvent) {.async.} =
proc handlePeerLeft(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} =
trace "Peer Left", peerId
self.relayPeers.withValue(peerId, future):
future[].cancel()
Expand Down
19 changes: 13 additions & 6 deletions libp2p/services/hpservice.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ proc new*(

proc tryStartingDirectConn(
self: HPService, switch: Switch, peerId: PeerId
): Future[bool] {.async.} =
proc tryConnect(address: MultiAddress): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CancelledError]).} =
proc tryConnect(
address: MultiAddress
): Future[bool] {.async: (raises: [DialFailedError, CancelledError]).} =
debug "Trying to create direct connection", peerId, address
await switch.connect(peerId, @[address], true, false)
debug "Direct connection created."
Expand All @@ -57,13 +59,13 @@ proc tryStartingDirectConn(
continue
return false

proc closeRelayConn(relayedConn: Connection) {.async.} =
proc closeRelayConn(relayedConn: Connection) {.async: (raises: [CancelledError]).} =
await sleepAsync(2000.milliseconds) # grace period before closing relayed connection
await relayedConn.close()

proc newConnectedPeerHandler(
self: HPService, switch: Switch, peerId: PeerId, event: PeerEvent
) {.async.} =
) {.async: (raises: []).} =
try:
# Get all connections to the peer. If there is at least one non-relayed connection, return.
let connections = switch.connManager.getConnections()[peerId].mapIt(it.connection)
Expand Down Expand Up @@ -102,8 +104,13 @@ method setup*(
except LPError as err:
error "Failed to mount Dcutr", err = err.msg

self.newConnectedPeerHandler = proc(peerId: PeerId, event: PeerEvent) {.async.} =
await newConnectedPeerHandler(self, switch, peerId, event)
self.newConnectedPeerHandler = proc(
peerId: PeerId, event: PeerEvent
) {.async: (raises: []).} =
try:
await newConnectedPeerHandler(self, switch, peerId, event)
except CancelledError:
trace "hole punching cancelled"

switch.connManager.addPeerEventHandler(
self.newConnectedPeerHandler, PeerEventKind.Joined
Expand Down
6 changes: 4 additions & 2 deletions tests/stubs/autonatclientstub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ method dialMe*(
switch: Switch,
pid: PeerId,
addrs: seq[MultiAddress] = newSeq[MultiAddress](),
): Future[MultiAddress] {.async.} =
): Future[MultiAddress] {.
async: (raises: [AutonatError, AutonatUnreachableError, CancelledError])
.} =
self.dials += 1

if self.dials == self.expectedDials:
self.finished.complete()
case self.answer
of Reachable:
return MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
return MultiAddress.init("/ip4/0.0.0.0/tcp/0").get()
of NotReachable:
raise newException(AutonatUnreachableError, "")
of Unknown:
Expand Down
60 changes: 33 additions & 27 deletions tests/testswitch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ suite "Switch":

var step = 0
var kinds: set[ConnEventKind]
proc hook(peerId: PeerId, event: ConnEvent) {.async.} =
proc hook(peerId: PeerId, event: ConnEvent) {.async: (raises: []).} =
kinds = kinds + {event.kind}
case step
of 0:
Expand Down Expand Up @@ -356,7 +356,7 @@ suite "Switch":

var step = 0
var kinds: set[ConnEventKind]
proc hook(peerId: PeerId, event: ConnEvent) {.async.} =
proc hook(peerId: PeerId, event: ConnEvent) {.async: (raises: []).} =
kinds = kinds + {event.kind}
case step
of 0:
Expand Down Expand Up @@ -404,7 +404,7 @@ suite "Switch":

var step = 0
var kinds: set[PeerEventKind]
proc handler(peerId: PeerId, event: PeerEvent) {.async.} =
proc handler(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} =
kinds = kinds + {event.kind}
case step
of 0:
Expand Down Expand Up @@ -451,7 +451,7 @@ suite "Switch":

var step = 0
var kinds: set[PeerEventKind]
proc handler(peerId: PeerId, event: PeerEvent) {.async.} =
proc handler(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} =
kinds = kinds + {event.kind}
case step
of 0:
Expand Down Expand Up @@ -504,7 +504,7 @@ suite "Switch":

var step = 0
var kinds: set[PeerEventKind]
proc handler(peerId: PeerId, event: PeerEvent) {.async.} =
proc handler(peerId: PeerId, event: PeerEvent) {.async: (raises: []).} =
kinds = kinds + {event.kind}
case step
of 0:
Expand Down Expand Up @@ -562,14 +562,17 @@ suite "Switch":
var switches: seq[Switch]
var done = newFuture[void]()
var onConnect: Future[void]
proc hook(peerId: PeerId, event: ConnEvent) {.async.} =
case event.kind
of ConnEventKind.Connected:
await onConnect
await switches[0].disconnect(peerInfo.peerId) # trigger disconnect
of ConnEventKind.Disconnected:
check not switches[0].isConnected(peerInfo.peerId)
done.complete()
proc hook(peerId: PeerId, event: ConnEvent) {.async: (raises: []).} =
try:
case event.kind
of ConnEventKind.Connected:
await onConnect
await switches[0].disconnect(peerInfo.peerId) # trigger disconnect
of ConnEventKind.Disconnected:
check not switches[0].isConnected(peerInfo.peerId)
done.complete()
except:
check false # should not get here

switches.add(newStandardSwitch(rng = rng))

Expand Down Expand Up @@ -597,20 +600,23 @@ suite "Switch":
var switches: seq[Switch]
var done = newFuture[void]()
var onConnect: Future[void]
proc hook(peerId2: PeerId, event: ConnEvent) {.async.} =
case event.kind
of ConnEventKind.Connected:
if conns == 5:
await onConnect
await switches[0].disconnect(peerInfo.peerId) # trigger disconnect
return

conns.inc
of ConnEventKind.Disconnected:
if conns == 1:
check not switches[0].isConnected(peerInfo.peerId)
done.complete()
conns.dec
proc hook(peerId2: PeerId, event: ConnEvent) {.async: (raises: []).} =
try:
case event.kind
of ConnEventKind.Connected:
if conns == 5:
await onConnect
await switches[0].disconnect(peerInfo.peerId) # trigger disconnect
return

conns.inc
of ConnEventKind.Disconnected:
if conns == 1:
check not switches[0].isConnected(peerInfo.peerId)
done.complete()
conns.dec
except:
check false # should not get here

switches.add(newStandardSwitch(maxConnsPerPeer = 10, rng = rng))

Expand Down

0 comments on commit f7daad9

Please sign in to comment.