Skip to content


format libp2p/protocols/connectivity/relay folder
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Jun 6, 2024
1 parent 5e4e51c commit 207b884
Show file tree
Hide file tree
Showing 6 changed files with 403 additions and 346 deletions.
230 changes: 129 additions & 101 deletions libp2p/protocols/connectivity/relay/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@

import times
import chronos, chronicles
import ./relay,

topics = "libp2p relay relay-client"
Expand All @@ -30,26 +31,28 @@ type
ReservationError* = object of RelayClientError
RelayV1DialError* = object of RelayClientError
RelayV2DialError* = object of RelayClientError
RelayClientAddConn* = proc(conn: Connection,
duration: uint32,
data: uint64): Future[void] {.gcsafe, raises: [].}
RelayClientAddConn* = proc(
conn: Connection, duration: uint32, data: uint64
): Future[void] {.gcsafe, raises: [].}
RelayClient* = ref object of Relay
onNewConnection*: RelayClientAddConn
canHop: bool

Rsvp* = object
expire*: uint64 # required, Unix expiration time (UTC)
expire*: uint64 # required, Unix expiration time (UTC)
addrs*: seq[MultiAddress] # relay address for reserving peer
voucher*: Opt[Voucher] # optional, reservation voucher
limitDuration*: uint32 # seconds
limitData*: uint64 # bytes
voucher*: Opt[Voucher] # optional, reservation voucher
limitDuration*: uint32 # seconds
limitData*: uint64 # bytes

proc sendStopError(conn: Connection, code: StatusV2) {.async.} =
trace "send stop status", status = $code & " (" & $ord(code) & ")"
let msg = StopMessage(msgType: StopMessageType.Status, status: Opt.some(code))
await conn.writeLp(encode(msg).buffer)

proc handleRelayedConnect(cl: RelayClient, conn: Connection, msg: StopMessage) {.async.} =
proc handleRelayedConnect(
cl: RelayClient, conn: Connection, msg: StopMessage
) {.async.} =
# TODO: check the go version to see in which way this could fail
# it's unclear in the spec
Expand All @@ -58,9 +61,7 @@ proc handleRelayedConnect(cl: RelayClient, conn: Connection, msg: StopMessage) {
limitDuration = msg.limit.duration
limitData =
msg = StopMessage(
msgType: StopMessageType.Status,
status: Opt.some(Ok))
msg = StopMessage(msgType: StopMessageType.Status, status: Opt.some(Ok))
pb = encode(msg)

trace "incoming relay connection", src
Expand All @@ -72,24 +73,28 @@ proc handleRelayedConnect(cl: RelayClient, conn: Connection, msg: StopMessage) {
await conn.writeLp(pb.buffer)
# This sound redundant but the callback could, in theory, be set to nil during
# conn.writeLp so it's safer to double check
if cl.onNewConnection != nil: await cl.onNewConnection(conn, limitDuration, limitData)
else: await conn.close()
if cl.onNewConnection != nil:
await cl.onNewConnection(conn, limitDuration, limitData)
await conn.close()

proc reserve*(cl: RelayClient,
peerId: PeerId,
addrs: seq[MultiAddress] = @[]): Future[Rsvp] {.async.} =
proc reserve*(
cl: RelayClient, peerId: PeerId, addrs: seq[MultiAddress] = @[]
): Future[Rsvp] {.async.} =
let conn = await cl.switch.dial(peerId, addrs, RelayV2HopCodec)
defer: await conn.close()
await conn.close()
pb = encode(HopMessage(msgType: HopMessageType.Reserve))
msg = try:
await conn.writeLp(pb.buffer)
HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error writing or reading reservation message", exc=exc.msg
raise newException(ReservationError, exc.msg)
msg =
await conn.writeLp(pb.buffer)
HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error writing or reading reservation message", exc = exc.msg
raise newException(ReservationError, exc.msg)

if msg.msgType != HopMessageType.Status:
raise newException(ReservationError, "Unexpected relay response type")
Expand All @@ -99,7 +104,7 @@ proc reserve*(cl: RelayClient,
let reservation = msg.reservation.valueOr:
raise newException(ReservationError, "Missing reservation information")
if reservation.expire > int64.high().uint64 or
now().utc > reservation.expire.int64.fromUnix.utc:
now().utc > reservation.expire.int64.fromUnix.utc:
raise newException(ReservationError, "Bad expiration date")
result.expire = reservation.expire
result.addrs = reservation.addrs
Expand All @@ -115,43 +120,49 @@ proc reserve*(cl: RelayClient,
result.limitData =

proc dialPeerV1*(
cl: RelayClient,
conn: Connection,
dstPeerId: PeerId,
dstAddrs: seq[MultiAddress]): Future[Connection] {.async.} =
cl: RelayClient, conn: Connection, dstPeerId: PeerId, dstAddrs: seq[MultiAddress]
): Future[Connection] {.async.} =
msg = RelayMessage(
msgType: Opt.some(RelayType.Hop),
srcPeer: Opt.some(RelayPeer(peerId: cl.switch.peerInfo.peerId, addrs: cl.switch.peerInfo.addrs)),
dstPeer: Opt.some(RelayPeer(peerId: dstPeerId, addrs: dstAddrs)))
srcPeer: Opt.some(
RelayPeer(peerId: cl.switch.peerInfo.peerId, addrs: cl.switch.peerInfo.addrs)
dstPeer: Opt.some(RelayPeer(peerId: dstPeerId, addrs: dstAddrs)),
pb = encode(msg)

trace "Dial peer", msgSend=msg
trace "Dial peer", msgSend = msg

await conn.writeLp(pb.buffer)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error writing hop request", exc=exc.msg
trace "error writing hop request", exc = exc.msg
raise exc

let msgRcvFromRelayOpt = try:
RelayMessage.decode(await conn.readLp(RelayClientMsgSize))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error reading stop response", exc=exc.msg
await sendStatus(conn, StatusV1.HopCantOpenDstStream)
raise exc
let msgRcvFromRelayOpt =
RelayMessage.decode(await conn.readLp(RelayClientMsgSize))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error reading stop response", exc = exc.msg
await sendStatus(conn, StatusV1.HopCantOpenDstStream)
raise exc

let msgRcvFromRelay = msgRcvFromRelayOpt.valueOr:
raise newException(RelayV1DialError, "Hop can't open destination stream")
if msgRcvFromRelay.msgType.tryGet() != RelayType.Status:
raise newException(RelayV1DialError, "Hop can't open destination stream: wrong message type")
raise newException(
RelayV1DialError, "Hop can't open destination stream: wrong message type"
if msgRcvFromRelay.status.tryGet() != StatusV1.Success:
raise newException(RelayV1DialError, "Hop can't open destination stream: status failed")
raise newException(
RelayV1DialError, "Hop can't open destination stream: status failed"
except RelayV1DialError as exc:
await sendStatus(conn, StatusV1.HopCantOpenDstStream)
raise exc
Expand All @@ -164,21 +175,23 @@ proc dialPeerV2*(
cl: RelayClient,
conn: RelayConnection,
dstPeerId: PeerId,
dstAddrs: seq[MultiAddress]): Future[Connection] {.async.} =
dstAddrs: seq[MultiAddress],
): Future[Connection] {.async.} =
p = Peer(peerId: dstPeerId, addrs: dstAddrs)
pb = encode(HopMessage(msgType: HopMessageType.Connect, peer: Opt.some(p)))

trace "Dial peer", p

let msgRcvFromRelay = try:
await conn.writeLp(pb.buffer)
HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error reading stop response", exc=exc.msg
raise newException(RelayV2DialError, exc.msg)
let msgRcvFromRelay =
await conn.writeLp(pb.buffer)
HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error reading stop response", exc = exc.msg
raise newException(RelayV2DialError, exc.msg)

if msgRcvFromRelay.msgType != HopMessageType.Status:
raise newException(RelayV2DialError, "Unexpected stop response")
Expand All @@ -198,7 +211,7 @@ proc handleStopStreamV2(cl: RelayClient, conn: Connection) {.async.} =
if msg.msgType == StopMessageType.Connect:
await cl.handleRelayedConnect(conn, msg)
trace "Unexpected client / relayv2 handshake", msgType=msg.msgType
trace "Unexpected client / relayv2 handshake", msgType = msg.msgType
await sendStopError(conn, MalformedMessage)

proc handleStop(cl: RelayClient, conn: Connection, msg: RelayMessage) {.async.} =
Expand All @@ -223,8 +236,10 @@ proc handleStop(cl: RelayClient, conn: Connection, msg: RelayMessage) {.async.}
await sendStatus(conn, StatusV1.Success)
# This sound redundant but the callback could, in theory, be set to nil during
# sendStatus(Success) so it's safer to double check
if cl.onNewConnection != nil: await cl.onNewConnection(conn, 0, 0)
else: await conn.close()
if cl.onNewConnection != nil:
await cl.onNewConnection(conn, 0, 0)
await conn.close()

proc handleStreamV1(cl: RelayClient, conn: Connection) {.async.} =
let msg = RelayMessage.decode(await conn.readLp(RelayClientMsgSize)).valueOr:
Expand All @@ -236,42 +251,54 @@ proc handleStreamV1(cl: RelayClient, conn: Connection) {.async.} =
trace "Message type not set"
await sendStatus(conn, StatusV1.MalformedMessage)
case typ:
of RelayType.Hop:
if cl.canHop: await cl.handleHop(conn, msg)
else: await sendStatus(conn, StatusV1.HopCantSpeakRelay)
of RelayType.Stop: await cl.handleStop(conn, msg)
of RelayType.CanHop:
if cl.canHop: await sendStatus(conn, StatusV1.Success)
else: await sendStatus(conn, StatusV1.HopCantSpeakRelay)
case typ
of RelayType.Hop:
if cl.canHop:
await cl.handleHop(conn, msg)
await sendStatus(conn, StatusV1.HopCantSpeakRelay)
of RelayType.Stop:
await cl.handleStop(conn, msg)
of RelayType.CanHop:
if cl.canHop:
await sendStatus(conn, StatusV1.Success)
trace "Unexpected relay handshake", msgType=msg.msgType
await sendStatus(conn, StatusV1.MalformedMessage)

proc new*(T: typedesc[RelayClient], canHop: bool = false,
reservationTTL: times.Duration = DefaultReservationTTL,
limitDuration: uint32 = DefaultLimitDuration,
limitData: uint64 = DefaultLimitData,
heartbeatSleepTime: uint32 = DefaultHeartbeatSleepTime,
maxCircuit: int = MaxCircuit,
maxCircuitPerPeer: int = MaxCircuitPerPeer,
msgSize: int = RelayClientMsgSize,
circuitRelayV1: bool = false): T =

let cl = T(canHop: canHop,
reservationTTL: reservationTTL,
limit: Limit(duration: limitDuration, data: limitData),
heartbeatSleepTime: heartbeatSleepTime,
maxCircuit: maxCircuit,
maxCircuitPerPeer: maxCircuitPerPeer,
msgSize: msgSize,
isCircuitRelayV1: circuitRelayV1)
await sendStatus(conn, StatusV1.HopCantSpeakRelay)
trace "Unexpected relay handshake", msgType = msg.msgType
await sendStatus(conn, StatusV1.MalformedMessage)

proc new*(
T: typedesc[RelayClient],
canHop: bool = false,
reservationTTL: times.Duration = DefaultReservationTTL,
limitDuration: uint32 = DefaultLimitDuration,
limitData: uint64 = DefaultLimitData,
heartbeatSleepTime: uint32 = DefaultHeartbeatSleepTime,
maxCircuit: int = MaxCircuit,
maxCircuitPerPeer: int = MaxCircuitPerPeer,
msgSize: int = RelayClientMsgSize,
circuitRelayV1: bool = false,
): T =
let cl = T(
canHop: canHop,
reservationTTL: reservationTTL,
limit: Limit(duration: limitDuration, data: limitData),
heartbeatSleepTime: heartbeatSleepTime,
maxCircuit: maxCircuit,
maxCircuitPerPeer: maxCircuitPerPeer,
msgSize: msgSize,
isCircuitRelayV1: circuitRelayV1,
proc handleStream(conn: Connection, proto: string) {.async.} =
case proto:
of RelayV1Codec: await cl.handleStreamV1(conn)
of RelayV2StopCodec: await cl.handleStopStreamV2(conn)
of RelayV2HopCodec: await cl.handleHopStreamV2(conn)
case proto
of RelayV1Codec:
await cl.handleStreamV1(conn)
of RelayV2StopCodec:
await cl.handleStopStreamV2(conn)
of RelayV2HopCodec:
await cl.handleHopStreamV2(conn)
except CancelledError as exc:
raise exc
except CatchableError as exc:
Expand All @@ -281,8 +308,9 @@ proc new*(T: typedesc[RelayClient], canHop: bool = false,
await conn.close()

cl.handler = handleStream
cl.codecs = if cl.canHop:
@[RelayV1Codec, RelayV2HopCodec, RelayV2StopCodec]
@[RelayV1Codec, RelayV2StopCodec]
cl.codecs =
if cl.canHop:
@[RelayV1Codec, RelayV2HopCodec, RelayV2StopCodec]
@[RelayV1Codec, RelayV2StopCodec]

0 comments on commit 207b884

Please sign in to comment.