diff --git a/libp2p/protocols/connectivity/relay/client.nim b/libp2p/protocols/connectivity/relay/client.nim index 3ddeb991d3..15f0334457 100644 --- a/libp2p/protocols/connectivity/relay/client.nim +++ b/libp2p/protocols/connectivity/relay/client.nim @@ -11,14 +11,15 @@ import times import chronos, chronicles -import ./relay, - ./messages, - ./rconn, - ./utils, - ../../../peerinfo, - ../../../switch, - ../../../multiaddress, - ../../../stream/connection +import + ./relay, + ./messages, + ./rconn, + ./utils, + ../../../peerinfo, + ../../../switch, + ../../../multiaddress, + ../../../stream/connection logScope: topics = "libp2p relay relay-client" @@ -30,26 +31,28 @@ type ReservationError* = object of RelayClientError RelayV1DialError* = object of RelayClientError RelayV2DialError* = object of RelayClientError - RelayClientAddConn* = proc(conn: Connection, - duration: uint32, - data: uint64): Future[void] {.gcsafe, raises: [].} + RelayClientAddConn* = proc( + conn: Connection, duration: uint32, data: uint64 + ): Future[void] {.gcsafe, raises: [].} RelayClient* = ref object of Relay onNewConnection*: RelayClientAddConn canHop: bool Rsvp* = object - expire*: uint64 # required, Unix expiration time (UTC) + expire*: uint64 # required, Unix expiration time (UTC) addrs*: seq[MultiAddress] # relay address for reserving peer - voucher*: Opt[Voucher] # optional, reservation voucher - limitDuration*: uint32 # seconds - limitData*: uint64 # bytes + voucher*: Opt[Voucher] # optional, reservation voucher + limitDuration*: uint32 # seconds + limitData*: uint64 # bytes proc sendStopError(conn: Connection, code: StatusV2) {.async.} = trace "send stop status", status = $code & " (" & $ord(code) & ")" let msg = StopMessage(msgType: StopMessageType.Status, status: Opt.some(code)) await conn.writeLp(encode(msg).buffer) -proc handleRelayedConnect(cl: RelayClient, conn: Connection, msg: StopMessage) {.async.} = +proc handleRelayedConnect( + cl: RelayClient, conn: Connection, msg: StopMessage +) {.async.} = let # TODO: check the go version to see in which way this could fail # it's unclear in the spec @@ -58,9 +61,7 @@ proc handleRelayedConnect(cl: RelayClient, conn: Connection, msg: StopMessage) { return limitDuration = msg.limit.duration limitData = msg.limit.data - msg = StopMessage( - msgType: StopMessageType.Status, - status: Opt.some(Ok)) + msg = StopMessage(msgType: StopMessageType.Status, status: Opt.some(Ok)) pb = encode(msg) trace "incoming relay connection", src @@ -72,24 +73,28 @@ proc handleRelayedConnect(cl: RelayClient, conn: Connection, msg: StopMessage) { await conn.writeLp(pb.buffer) # This sound redundant but the callback could, in theory, be set to nil during # conn.writeLp so it's safer to double check - if cl.onNewConnection != nil: await cl.onNewConnection(conn, limitDuration, limitData) - else: await conn.close() + if cl.onNewConnection != nil: + await cl.onNewConnection(conn, limitDuration, limitData) + else: + await conn.close() -proc reserve*(cl: RelayClient, - peerId: PeerId, - addrs: seq[MultiAddress] = @[]): Future[Rsvp] {.async.} = +proc reserve*( + cl: RelayClient, peerId: PeerId, addrs: seq[MultiAddress] = @[] +): Future[Rsvp] {.async.} = let conn = await cl.switch.dial(peerId, addrs, RelayV2HopCodec) - defer: await conn.close() + defer: + await conn.close() let pb = encode(HopMessage(msgType: HopMessageType.Reserve)) - msg = try: - await conn.writeLp(pb.buffer) - HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet() - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "error writing or reading reservation message", exc=exc.msg - raise newException(ReservationError, exc.msg) + msg = + try: + await conn.writeLp(pb.buffer) + HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet() + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "error writing or reading reservation message", exc = exc.msg + raise newException(ReservationError, exc.msg) if msg.msgType != HopMessageType.Status: raise newException(ReservationError, "Unexpected relay response type") @@ -99,7 +104,7 @@ proc reserve*(cl: RelayClient, let reservation = msg.reservation.valueOr: raise newException(ReservationError, "Missing reservation information") if reservation.expire > int64.high().uint64 or - now().utc > reservation.expire.int64.fromUnix.utc: + now().utc > reservation.expire.int64.fromUnix.utc: raise newException(ReservationError, "Bad expiration date") result.expire = reservation.expire result.addrs = reservation.addrs @@ -115,43 +120,49 @@ proc reserve*(cl: RelayClient, result.limitData = msg.limit.data proc dialPeerV1*( - cl: RelayClient, - conn: Connection, - dstPeerId: PeerId, - dstAddrs: seq[MultiAddress]): Future[Connection] {.async.} = + cl: RelayClient, conn: Connection, dstPeerId: PeerId, dstAddrs: seq[MultiAddress] +): Future[Connection] {.async.} = var msg = RelayMessage( msgType: Opt.some(RelayType.Hop), - srcPeer: Opt.some(RelayPeer(peerId: cl.switch.peerInfo.peerId, addrs: cl.switch.peerInfo.addrs)), - dstPeer: Opt.some(RelayPeer(peerId: dstPeerId, addrs: dstAddrs))) + srcPeer: Opt.some( + RelayPeer(peerId: cl.switch.peerInfo.peerId, addrs: cl.switch.peerInfo.addrs) + ), + dstPeer: Opt.some(RelayPeer(peerId: dstPeerId, addrs: dstAddrs)), + ) pb = encode(msg) - trace "Dial peer", msgSend=msg + trace "Dial peer", msgSend = msg try: await conn.writeLp(pb.buffer) except CancelledError as exc: raise exc except CatchableError as exc: - trace "error writing hop request", exc=exc.msg + trace "error writing hop request", exc = exc.msg raise exc - let msgRcvFromRelayOpt = try: - RelayMessage.decode(await conn.readLp(RelayClientMsgSize)) - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "error reading stop response", exc=exc.msg - await sendStatus(conn, StatusV1.HopCantOpenDstStream) - raise exc + let msgRcvFromRelayOpt = + try: + RelayMessage.decode(await conn.readLp(RelayClientMsgSize)) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "error reading stop response", exc = exc.msg + await sendStatus(conn, StatusV1.HopCantOpenDstStream) + raise exc try: let msgRcvFromRelay = msgRcvFromRelayOpt.valueOr: raise newException(RelayV1DialError, "Hop can't open destination stream") if msgRcvFromRelay.msgType.tryGet() != RelayType.Status: - raise newException(RelayV1DialError, "Hop can't open destination stream: wrong message type") + raise newException( + RelayV1DialError, "Hop can't open destination stream: wrong message type" + ) if msgRcvFromRelay.status.tryGet() != StatusV1.Success: - raise newException(RelayV1DialError, "Hop can't open destination stream: status failed") + raise newException( + RelayV1DialError, "Hop can't open destination stream: status failed" + ) except RelayV1DialError as exc: await sendStatus(conn, StatusV1.HopCantOpenDstStream) raise exc @@ -164,21 +175,23 @@ proc dialPeerV2*( cl: RelayClient, conn: RelayConnection, dstPeerId: PeerId, - dstAddrs: seq[MultiAddress]): Future[Connection] {.async.} = + dstAddrs: seq[MultiAddress], +): Future[Connection] {.async.} = let p = Peer(peerId: dstPeerId, addrs: dstAddrs) pb = encode(HopMessage(msgType: HopMessageType.Connect, peer: Opt.some(p))) trace "Dial peer", p - let msgRcvFromRelay = try: - await conn.writeLp(pb.buffer) - HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet() - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "error reading stop response", exc=exc.msg - raise newException(RelayV2DialError, exc.msg) + let msgRcvFromRelay = + try: + await conn.writeLp(pb.buffer) + HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet() + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "error reading stop response", exc = exc.msg + raise newException(RelayV2DialError, exc.msg) if msgRcvFromRelay.msgType != HopMessageType.Status: raise newException(RelayV2DialError, "Unexpected stop response") @@ -198,7 +211,7 @@ proc handleStopStreamV2(cl: RelayClient, conn: Connection) {.async.} = if msg.msgType == StopMessageType.Connect: await cl.handleRelayedConnect(conn, msg) else: - trace "Unexpected client / relayv2 handshake", msgType=msg.msgType + trace "Unexpected client / relayv2 handshake", msgType = msg.msgType await sendStopError(conn, MalformedMessage) proc handleStop(cl: RelayClient, conn: Connection, msg: RelayMessage) {.async.} = @@ -223,8 +236,10 @@ proc handleStop(cl: RelayClient, conn: Connection, msg: RelayMessage) {.async.} await sendStatus(conn, StatusV1.Success) # This sound redundant but the callback could, in theory, be set to nil during # sendStatus(Success) so it's safer to double check - if cl.onNewConnection != nil: await cl.onNewConnection(conn, 0, 0) - else: await conn.close() + if cl.onNewConnection != nil: + await cl.onNewConnection(conn, 0, 0) + else: + await conn.close() proc handleStreamV1(cl: RelayClient, conn: Connection) {.async.} = let msg = RelayMessage.decode(await conn.readLp(RelayClientMsgSize)).valueOr: @@ -236,42 +251,54 @@ proc handleStreamV1(cl: RelayClient, conn: Connection) {.async.} = trace "Message type not set" await sendStatus(conn, StatusV1.MalformedMessage) return - case typ: - of RelayType.Hop: - if cl.canHop: await cl.handleHop(conn, msg) - else: await sendStatus(conn, StatusV1.HopCantSpeakRelay) - of RelayType.Stop: await cl.handleStop(conn, msg) - of RelayType.CanHop: - if cl.canHop: await sendStatus(conn, StatusV1.Success) - else: await sendStatus(conn, StatusV1.HopCantSpeakRelay) + case typ + of RelayType.Hop: + if cl.canHop: + await cl.handleHop(conn, msg) + else: + await sendStatus(conn, StatusV1.HopCantSpeakRelay) + of RelayType.Stop: + await cl.handleStop(conn, msg) + of RelayType.CanHop: + if cl.canHop: + await sendStatus(conn, StatusV1.Success) else: - trace "Unexpected relay handshake", msgType=msg.msgType - await sendStatus(conn, StatusV1.MalformedMessage) - -proc new*(T: typedesc[RelayClient], canHop: bool = false, - reservationTTL: times.Duration = DefaultReservationTTL, - limitDuration: uint32 = DefaultLimitDuration, - limitData: uint64 = DefaultLimitData, - heartbeatSleepTime: uint32 = DefaultHeartbeatSleepTime, - maxCircuit: int = MaxCircuit, - maxCircuitPerPeer: int = MaxCircuitPerPeer, - msgSize: int = RelayClientMsgSize, - circuitRelayV1: bool = false): T = - - let cl = T(canHop: canHop, - reservationTTL: reservationTTL, - limit: Limit(duration: limitDuration, data: limitData), - heartbeatSleepTime: heartbeatSleepTime, - maxCircuit: maxCircuit, - maxCircuitPerPeer: maxCircuitPerPeer, - msgSize: msgSize, - isCircuitRelayV1: circuitRelayV1) + await sendStatus(conn, StatusV1.HopCantSpeakRelay) + else: + trace "Unexpected relay handshake", msgType = msg.msgType + await sendStatus(conn, StatusV1.MalformedMessage) + +proc new*( + T: typedesc[RelayClient], + canHop: bool = false, + reservationTTL: times.Duration = DefaultReservationTTL, + limitDuration: uint32 = DefaultLimitDuration, + limitData: uint64 = DefaultLimitData, + heartbeatSleepTime: uint32 = DefaultHeartbeatSleepTime, + maxCircuit: int = MaxCircuit, + maxCircuitPerPeer: int = MaxCircuitPerPeer, + msgSize: int = RelayClientMsgSize, + circuitRelayV1: bool = false, +): T = + let cl = T( + canHop: canHop, + reservationTTL: reservationTTL, + limit: Limit(duration: limitDuration, data: limitData), + heartbeatSleepTime: heartbeatSleepTime, + maxCircuit: maxCircuit, + maxCircuitPerPeer: maxCircuitPerPeer, + msgSize: msgSize, + isCircuitRelayV1: circuitRelayV1, + ) proc handleStream(conn: Connection, proto: string) {.async.} = try: - case proto: - of RelayV1Codec: await cl.handleStreamV1(conn) - of RelayV2StopCodec: await cl.handleStopStreamV2(conn) - of RelayV2HopCodec: await cl.handleHopStreamV2(conn) + case proto + of RelayV1Codec: + await cl.handleStreamV1(conn) + of RelayV2StopCodec: + await cl.handleStopStreamV2(conn) + of RelayV2HopCodec: + await cl.handleHopStreamV2(conn) except CancelledError as exc: raise exc except CatchableError as exc: @@ -281,8 +308,9 @@ proc new*(T: typedesc[RelayClient], canHop: bool = false, await conn.close() cl.handler = handleStream - cl.codecs = if cl.canHop: - @[RelayV1Codec, RelayV2HopCodec, RelayV2StopCodec] - else: - @[RelayV1Codec, RelayV2StopCodec] + cl.codecs = + if cl.canHop: + @[RelayV1Codec, RelayV2HopCodec, RelayV2StopCodec] + else: + @[RelayV1Codec, RelayV2StopCodec] cl diff --git a/libp2p/protocols/connectivity/relay/messages.nim b/libp2p/protocols/connectivity/relay/messages.nim index 8cb2bfa654..f3d92f4611 100644 --- a/libp2p/protocols/connectivity/relay/messages.nim +++ b/libp2p/protocols/connectivity/relay/messages.nim @@ -11,8 +11,7 @@ import macros import stew/[objects, results] -import ../../../peerinfo, - ../../../signed_envelope +import ../../../peerinfo, ../../../signed_envelope # Circuit Relay V1 Message @@ -87,22 +86,22 @@ proc decode*(_: typedesc[RelayMessage], buf: seq[byte]): Opt[RelayMessage] = let pb = initProtoBuffer(buf) - if ? pb.getField(1, msgTypeOrd).toOpt(): + if ?pb.getField(1, msgTypeOrd).toOpt(): if msgTypeOrd.int notin RelayType: return Opt.none(RelayMessage) rMsg.msgType = Opt.some(RelayType(msgTypeOrd)) - if ? pb.getField(2, pbSrc).toOpt(): - discard ? pbSrc.getField(1, src.peerId).toOpt() - discard ? pbSrc.getRepeatedField(2, src.addrs).toOpt() + if ?pb.getField(2, pbSrc).toOpt(): + discard ?pbSrc.getField(1, src.peerId).toOpt() + discard ?pbSrc.getRepeatedField(2, src.addrs).toOpt() rMsg.srcPeer = Opt.some(src) - if ? pb.getField(3, pbDst).toOpt(): - discard ? pbDst.getField(1, dst.peerId).toOpt() - discard ? pbDst.getRepeatedField(2, dst.addrs).toOpt() + if ?pb.getField(3, pbDst).toOpt(): + discard ?pbDst.getField(1, dst.peerId).toOpt() + discard ?pbDst.getRepeatedField(2, dst.addrs).toOpt() rMsg.dstPeer = Opt.some(dst) - if ? pb.getField(4, statusOrd).toOpt(): + if ?pb.getField(4, statusOrd).toOpt(): var status: StatusV1 if not checkedEnumAssign(status, statusOrd): return Opt.none(RelayMessage) @@ -111,19 +110,18 @@ proc decode*(_: typedesc[RelayMessage], buf: seq[byte]): Opt[RelayMessage] = # Voucher -type - Voucher* = object - relayPeerId*: PeerId # peer ID of the relay - reservingPeerId*: PeerId # peer ID of the reserving peer - expiration*: uint64 # UNIX UTC expiration time for the reservation +type Voucher* = object + relayPeerId*: PeerId # peer ID of the relay + reservingPeerId*: PeerId # peer ID of the reserving peer + expiration*: uint64 # UNIX UTC expiration time for the reservation proc decode*(_: typedesc[Voucher], buf: seq[byte]): Result[Voucher, ProtoError] = let pb = initProtoBuffer(buf) var v = Voucher() - ? pb.getRequiredField(1, v.relayPeerId) - ? pb.getRequiredField(2, v.reservingPeerId) - ? pb.getRequiredField(3, v.expiration) + ?pb.getRequiredField(1, v.relayPeerId) + ?pb.getRequiredField(2, v.reservingPeerId) + ?pb.getRequiredField(3, v.expiration) ok(v) @@ -137,20 +135,23 @@ proc encode*(v: Voucher): seq[byte] = pb.finish() pb.buffer -proc init*(T: typedesc[Voucher], - relayPeerId: PeerId, - reservingPeerId: PeerId, - expiration: uint64): T = +proc init*( + T: typedesc[Voucher], + relayPeerId: PeerId, + reservingPeerId: PeerId, + expiration: uint64, +): T = T( - relayPeerId = relayPeerId, - reservingPeerId = reservingPeerId, - expiration: expiration + relayPeerId = relayPeerId, reservingPeerId = reservingPeerId, expiration: expiration ) type SignedVoucher* = SignedPayload[Voucher] -proc payloadDomain*(_: typedesc[Voucher]): string = "libp2p-relay-rsvp" -proc payloadType*(_: typedesc[Voucher]): seq[byte] = @[ (byte)0x03, (byte)0x02 ] +proc payloadDomain*(_: typedesc[Voucher]): string = + "libp2p-relay-rsvp" + +proc payloadType*(_: typedesc[Voucher]): seq[byte] = + @[(byte) 0x03, (byte) 0x02] proc checkValid*(spr: SignedVoucher): Result[void, EnvelopeError] = if not spr.data.relayPeerId.match(spr.envelope.publicKey): @@ -164,13 +165,15 @@ type Peer* = object peerId*: PeerId addrs*: seq[MultiAddress] + Reservation* = object - expire*: uint64 # required, Unix expiration time (UTC) - addrs*: seq[MultiAddress] # relay address for reserving peer - svoucher*: Opt[seq[byte]] # optional, reservation voucher + expire*: uint64 # required, Unix expiration time (UTC) + addrs*: seq[MultiAddress] # relay address for reserving peer + svoucher*: Opt[seq[byte]] # optional, reservation voucher + Limit* = object - duration*: uint32 # seconds - data*: uint64 # bytes + duration*: uint32 # seconds + data*: uint64 # bytes StatusV2* = enum Ok = 100 @@ -181,10 +184,12 @@ type NoReservation = 204 MalformedMessage = 400 UnexpectedMessage = 401 + HopMessageType* {.pure.} = enum Reserve = 0 Connect = 1 Status = 2 + HopMessage* = object msgType*: HopMessageType peer*: Opt[Peer] @@ -214,8 +219,10 @@ proc encode*(msg: HopMessage): ProtoBuffer = pb.write(3, rpb.buffer) if msg.limit.duration > 0 or msg.limit.data > 0: var lpb = initProtoBuffer() - if msg.limit.duration > 0: lpb.write(1, msg.limit.duration) - if msg.limit.data > 0: lpb.write(2, msg.limit.data) + if msg.limit.duration > 0: + lpb.write(1, msg.limit.duration) + if msg.limit.data > 0: + lpb.write(2, msg.limit.data) lpb.finish() pb.write(4, lpb.buffer) msg.status.withValue(status): @@ -229,35 +236,35 @@ proc decode*(_: typedesc[HopMessage], buf: seq[byte]): Opt[HopMessage] = let pb = initProtoBuffer(buf) var msgTypeOrd: uint32 - ? pb.getRequiredField(1, msgTypeOrd).toOpt() + ?pb.getRequiredField(1, msgTypeOrd).toOpt() if not checkedEnumAssign(msg.msgType, msgTypeOrd): return Opt.none(HopMessage) var pbPeer: ProtoBuffer - if ? pb.getField(2, pbPeer).toOpt(): + if ?pb.getField(2, pbPeer).toOpt(): var peer: Peer - ? pbPeer.getRequiredField(1, peer.peerId).toOpt() - discard ? pbPeer.getRepeatedField(2, peer.addrs).toOpt() + ?pbPeer.getRequiredField(1, peer.peerId).toOpt() + discard ?pbPeer.getRepeatedField(2, peer.addrs).toOpt() msg.peer = Opt.some(peer) var pbReservation: ProtoBuffer - if ? pb.getField(3, pbReservation).toOpt(): + if ?pb.getField(3, pbReservation).toOpt(): var svoucher: seq[byte] reservation: Reservation - if ? pbReservation.getField(3, svoucher).toOpt(): + if ?pbReservation.getField(3, svoucher).toOpt(): reservation.svoucher = Opt.some(svoucher) - ? pbReservation.getRequiredField(1, reservation.expire).toOpt() - discard ? pbReservation.getRepeatedField(2, reservation.addrs).toOpt() + ?pbReservation.getRequiredField(1, reservation.expire).toOpt() + discard ?pbReservation.getRepeatedField(2, reservation.addrs).toOpt() msg.reservation = Opt.some(reservation) var pbLimit: ProtoBuffer - if ? pb.getField(4, pbLimit).toOpt(): - discard ? pbLimit.getField(1, msg.limit.duration).toOpt() - discard ? pbLimit.getField(2, msg.limit.data).toOpt() + if ?pb.getField(4, pbLimit).toOpt(): + discard ?pbLimit.getField(1, msg.limit.duration).toOpt() + discard ?pbLimit.getField(2, msg.limit.data).toOpt() var statusOrd: uint32 - if ? pb.getField(5, statusOrd).toOpt(): + if ?pb.getField(5, statusOrd).toOpt(): var status: StatusV2 if not checkedEnumAssign(status, statusOrd): return Opt.none(HopMessage) @@ -270,13 +277,13 @@ type StopMessageType* {.pure.} = enum Connect = 0 Status = 1 + StopMessage* = object msgType*: StopMessageType peer*: Opt[Peer] limit*: Limit status*: Opt[StatusV2] - proc encode*(msg: StopMessage): ProtoBuffer = var pb = initProtoBuffer() @@ -290,8 +297,10 @@ proc encode*(msg: StopMessage): ProtoBuffer = pb.write(2, ppb.buffer) if msg.limit.duration > 0 or msg.limit.data > 0: var lpb = initProtoBuffer() - if msg.limit.duration > 0: lpb.write(1, msg.limit.duration) - if msg.limit.data > 0: lpb.write(2, msg.limit.data) + if msg.limit.duration > 0: + lpb.write(1, msg.limit.duration) + if msg.limit.data > 0: + lpb.write(2, msg.limit.data) lpb.finish() pb.write(3, lpb.buffer) msg.status.withValue(status): @@ -306,26 +315,25 @@ proc decode*(_: typedesc[StopMessage], buf: seq[byte]): Opt[StopMessage] = let pb = initProtoBuffer(buf) var msgTypeOrd: uint32 - ? pb.getRequiredField(1, msgTypeOrd).toOpt() + ?pb.getRequiredField(1, msgTypeOrd).toOpt() if msgTypeOrd.int notin StopMessageType: return Opt.none(StopMessage) msg.msgType = StopMessageType(msgTypeOrd) - var pbPeer: ProtoBuffer - if ? pb.getField(2, pbPeer).toOpt(): + if ?pb.getField(2, pbPeer).toOpt(): var peer: Peer - ? pbPeer.getRequiredField(1, peer.peerId).toOpt() - discard ? pbPeer.getRepeatedField(2, peer.addrs).toOpt() + ?pbPeer.getRequiredField(1, peer.peerId).toOpt() + discard ?pbPeer.getRepeatedField(2, peer.addrs).toOpt() msg.peer = Opt.some(peer) var pbLimit: ProtoBuffer - if ? pb.getField(3, pbLimit).toOpt(): - discard ? pbLimit.getField(1, msg.limit.duration).toOpt() - discard ? pbLimit.getField(2, msg.limit.data).toOpt() + if ?pb.getField(3, pbLimit).toOpt(): + discard ?pbLimit.getField(1, msg.limit.duration).toOpt() + discard ?pbLimit.getField(2, msg.limit.data).toOpt() var statusOrd: uint32 - if ? pb.getField(4, statusOrd).toOpt(): + if ?pb.getField(4, statusOrd).toOpt(): var status: StatusV2 if not checkedEnumAssign(status, statusOrd): return Opt.none(StopMessage) diff --git a/libp2p/protocols/connectivity/relay/rconn.nim b/libp2p/protocols/connectivity/relay/rconn.nim index a4699a0773..cb416ea5e1 100644 --- a/libp2p/protocols/connectivity/relay/rconn.nim +++ b/libp2p/protocols/connectivity/relay/rconn.nim @@ -13,24 +13,20 @@ import chronos import ../../../stream/connection -type - RelayConnection* = ref object of Connection - conn*: Connection - limitDuration*: uint32 - limitData*: uint64 - dataSent*: uint64 +type RelayConnection* = ref object of Connection + conn*: Connection + limitDuration*: uint32 + limitData*: uint64 + dataSent*: uint64 method readOnce*( - self: RelayConnection, - pbytes: pointer, - nbytes: int + self: RelayConnection, pbytes: pointer, nbytes: int ): Future[int] {.async: (raises: [CancelledError, LPStreamError], raw: true).} = self.activity = true self.conn.readOnce(pbytes, nbytes) method write*( - self: RelayConnection, - msg: seq[byte] + self: RelayConnection, msg: seq[byte] ): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} = self.dataSent.inc(msg.len) if self.limitData != 0 and self.dataSent > self.limitData: @@ -43,13 +39,15 @@ method closeImpl*(self: RelayConnection): Future[void] {.async: (raises: []).} = await self.conn.close() await procCall Connection(self).closeImpl() -method getWrapped*(self: RelayConnection): Connection = self.conn +method getWrapped*(self: RelayConnection): Connection = + self.conn proc new*( T: typedesc[RelayConnection], conn: Connection, limitDuration: uint32, - limitData: uint64): T = + limitData: uint64, +): T = let rc = T(conn: conn, limitDuration: limitDuration, limitData: limitData) rc.dir = conn.dir rc.initStream() @@ -59,5 +57,6 @@ proc new*( await noCancel conn.join().wait(limitDuration.seconds()) except AsyncTimeoutError: await conn.close() + asyncSpawn checkDurationConnection() return rc diff --git a/libp2p/protocols/connectivity/relay/relay.nim b/libp2p/protocols/connectivity/relay/relay.nim index 6fa9894f8d..a6c76125a2 100644 --- a/libp2p/protocols/connectivity/relay/relay.nim +++ b/libp2p/protocols/connectivity/relay/relay.nim @@ -13,18 +13,19 @@ import sequtils, tables import chronos, chronicles -import ./messages, - ./rconn, - ./utils, - ../../../peerinfo, - ../../../switch, - ../../../multiaddress, - ../../../multicodec, - ../../../stream/connection, - ../../../protocols/protocol, - ../../../errors, - ../../../utils/heartbeat, - ../../../signed_envelope +import + ./messages, + ./rconn, + ./utils, + ../../../peerinfo, + ../../../switch, + ../../../multiaddress, + ../../../multicodec, + ../../../stream/connection, + ../../../protocols/protocol, + ../../../errors, + ../../../utils/heartbeat, + ../../../signed_envelope # TODO: # * Eventually replace std/times by chronos/timer. Currently chronos/timer @@ -54,47 +55,53 @@ type # Relay Side -type - Relay* = ref object of LPProtocol - switch*: Switch - peerCount: CountTable[PeerId] - - # number of reservation (relayv2) + number of connection (relayv1) - maxCircuit*: int - - maxCircuitPerPeer*: int - msgSize*: int - # RelayV1 - isCircuitRelayV1*: bool - streamCount: int - # RelayV2 - rsvp: Table[PeerId, DateTime] - reservationLoop: Future[void] - reservationTTL*: times.Duration - heartbeatSleepTime*: uint32 - limit*: Limit +type Relay* = ref object of LPProtocol + switch*: Switch + peerCount: CountTable[PeerId] + + # number of reservation (relayv2) + number of connection (relayv1) + maxCircuit*: int + + maxCircuitPerPeer*: int + msgSize*: int + # RelayV1 + isCircuitRelayV1*: bool + streamCount: int + # RelayV2 + rsvp: Table[PeerId, DateTime] + reservationLoop: Future[void] + reservationTTL*: times.Duration + heartbeatSleepTime*: uint32 + limit*: Limit # Relay V2 proc createReserveResponse( - r: Relay, - pid: PeerId, - expire: DateTime): Result[HopMessage, CryptoError] = + r: Relay, pid: PeerId, expire: DateTime +): Result[HopMessage, CryptoError] = let expireUnix = expire.toTime.toUnix.uint64 - v = Voucher(relayPeerId: r.switch.peerInfo.peerId, - reservingPeerId: pid, - expiration: expireUnix) - sv = ? SignedVoucher.init(r.switch.peerInfo.privateKey, v) - ma = ? MultiAddress.init("/p2p/" & $r.switch.peerInfo.peerId).orErr(CryptoError.KeyError) - rsrv = Reservation(expire: expireUnix, - addrs: r.switch.peerInfo.addrs.mapIt( - ? it.concat(ma).orErr(CryptoError.KeyError)), - svoucher: Opt.some(? sv.encode)) - msg = HopMessage(msgType: HopMessageType.Status, - reservation: Opt.some(rsrv), - limit: r.limit, - status: Opt.some(Ok)) + v = Voucher( + relayPeerId: r.switch.peerInfo.peerId, + reservingPeerId: pid, + expiration: expireUnix, + ) + sv = ?SignedVoucher.init(r.switch.peerInfo.privateKey, v) + ma = + ?MultiAddress.init("/p2p/" & $r.switch.peerInfo.peerId).orErr( + CryptoError.KeyError + ) + rsrv = Reservation( + expire: expireUnix, + addrs: r.switch.peerInfo.addrs.mapIt(?it.concat(ma).orErr(CryptoError.KeyError)), + svoucher: Opt.some(?sv.encode), + ) + msg = HopMessage( + msgType: HopMessageType.Status, + reservation: Opt.some(rsrv), + limit: r.limit, + status: Opt.some(Ok), + ) return ok(msg) proc isRelayed*(conn: Connection): bool = @@ -126,9 +133,7 @@ proc handleReserve(r: Relay, conn: Connection) {.async.} = r.rsvp[pid] = expire await conn.writeLp(encode(msg).buffer) -proc handleConnect(r: Relay, - connSrc: Connection, - msg: HopMessage) {.async.} = +proc handleConnect(r: Relay, connSrc: Connection, msg: HopMessage) {.async.} = if connSrc.isRelayed(): trace "connection attempt over relay connection" await sendHopStatus(connSrc, PermissionDenied) @@ -150,38 +155,42 @@ proc handleConnect(r: Relay, r.peerCount.inc(src, -1) r.peerCount.inc(dst, -1) - if r.peerCount[src] > r.maxCircuitPerPeer or - r.peerCount[dst] > r.maxCircuitPerPeer: - trace "too many connections", src = r.peerCount[src], - dst = r.peerCount[dst], - max = r.maxCircuitPerPeer + if r.peerCount[src] > r.maxCircuitPerPeer or r.peerCount[dst] > r.maxCircuitPerPeer: + trace "too many connections", + src = r.peerCount[src], dst = r.peerCount[dst], max = r.maxCircuitPerPeer await sendHopStatus(connSrc, ResourceLimitExceeded) return - let connDst = try: - await r.switch.dial(dst, RelayV2StopCodec) - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "error opening relay stream", dst, exc=exc.msg - await sendHopStatus(connSrc, ConnectionFailed) - return + let connDst = + try: + await r.switch.dial(dst, RelayV2StopCodec) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "error opening relay stream", dst, exc = exc.msg + await sendHopStatus(connSrc, ConnectionFailed) + return defer: await connDst.close() proc sendStopMsg() {.async.} = - let stopMsg = StopMessage(msgType: StopMessageType.Connect, - peer: Opt.some(Peer(peerId: src, addrs: @[])), - limit: r.limit) + let stopMsg = StopMessage( + msgType: StopMessageType.Connect, + peer: Opt.some(Peer(peerId: src, addrs: @[])), + limit: r.limit, + ) await connDst.writeLp(encode(stopMsg).buffer) let msg = StopMessage.decode(await connDst.readLp(r.msgSize)).valueOr: raise newException(SendStopError, "Malformed message") if msg.msgType != StopMessageType.Status: - raise newException(SendStopError, "Unexpected stop response, not a status message") + raise + newException(SendStopError, "Unexpected stop response, not a status message") if msg.status.get(UnexpectedMessage) != Ok: raise newException(SendStopError, "Relay stop failure") - await connSrc.writeLp(encode(HopMessage(msgType: HopMessageType.Status, - status: Opt.some(Ok))).buffer) + await connSrc.writeLp( + encode(HopMessage(msgType: HopMessageType.Status, status: Opt.some(Ok))).buffer + ) + try: await sendStopMsg() except CancelledError as exc: @@ -205,20 +214,24 @@ proc handleHopStreamV2*(r: Relay, conn: Connection) {.async.} = await sendHopStatus(conn, MalformedMessage) return trace "relayv2 handle stream", msg = msg - case msg.msgType: - of HopMessageType.Reserve: await r.handleReserve(conn) - of HopMessageType.Connect: await r.handleConnect(conn, msg) + case msg.msgType + of HopMessageType.Reserve: + await r.handleReserve(conn) + of HopMessageType.Connect: + await r.handleConnect(conn, msg) else: - trace "Unexpected relayv2 handshake", msgType=msg.msgType + trace "Unexpected relayv2 handshake", msgType = msg.msgType await sendHopStatus(conn, MalformedMessage) # Relay V1 proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async.} = r.streamCount.inc() - defer: r.streamCount.dec() + defer: + r.streamCount.dec() if r.streamCount + r.rsvp.len() >= r.maxCircuit: - trace "refusing connection; too many active circuit", streamCount = r.streamCount, rsvp = r.rsvp.len() + trace "refusing connection; too many active circuit", + streamCount = r.streamCount, rsvp = r.rsvp.len() await sendStatus(connSrc, StatusV1.HopCantSpeakRelay) return @@ -236,13 +249,14 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async.} = trace "relay not connected to dst", dst return err(StatusV1.HopNoConnToDst) ok(msg) + let check = checkMsg() if check.isErr: await sendStatus(connSrc, check.error()) return if r.peerCount[src.peerId] >= r.maxCircuitPerPeer or - r.peerCount[dst.peerId] >= r.maxCircuitPerPeer: + r.peerCount[dst.peerId] >= r.maxCircuitPerPeer: trace "refusing connection; too many connection from src or to dst", src, dst await sendStatus(connSrc, StatusV1.HopCantSpeakRelay) return @@ -252,31 +266,32 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async.} = r.peerCount.inc(src.peerId, -1) r.peerCount.inc(dst.peerId, -1) - let connDst = try: - await r.switch.dial(dst.peerId, RelayV1Codec) - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "error opening relay stream", dst, exc=exc.msg - await sendStatus(connSrc, StatusV1.HopCantDialDst) - return + let connDst = + try: + await r.switch.dial(dst.peerId, RelayV1Codec) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "error opening relay stream", dst, exc = exc.msg + await sendStatus(connSrc, StatusV1.HopCantDialDst) + return defer: await connDst.close() let msgToSend = RelayMessage( - msgType: Opt.some(RelayType.Stop), - srcPeer: Opt.some(src), - dstPeer: Opt.some(dst)) + msgType: Opt.some(RelayType.Stop), srcPeer: Opt.some(src), dstPeer: Opt.some(dst) + ) - let msgRcvFromDstOpt = try: - await connDst.writeLp(encode(msgToSend).buffer) - RelayMessage.decode(await connDst.readLp(r.msgSize)) - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "error writing stop handshake or reading stop response", exc=exc.msg - await sendStatus(connSrc, StatusV1.HopCantOpenDstStream) - return + let msgRcvFromDstOpt = + try: + await connDst.writeLp(encode(msgToSend).buffer) + RelayMessage.decode(await connDst.readLp(r.msgSize)) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "error writing stop handshake or reading stop response", exc = exc.msg + await sendStatus(connSrc, StatusV1.HopCantOpenDstStream) + return let msgRcvFromDst = msgRcvFromDstOpt.valueOr: trace "error reading stop response", msg = msgRcvFromDstOpt @@ -284,7 +299,7 @@ proc handleHop*(r: Relay, connSrc: Connection, msg: RelayMessage) {.async.} = return if msgRcvFromDst.msgType.get(RelayType.Stop) != RelayType.Status or - msgRcvFromDst.status.get(StatusV1.StopRelayRefused) != StatusV1.Success: + msgRcvFromDst.status.get(StatusV1.StopRelayRefused) != StatusV1.Success: trace "unexcepted relay stop response", msgRcvFromDst await sendStatus(connSrc, StatusV1.HopCantOpenDstStream) return @@ -303,44 +318,54 @@ proc handleStreamV1(r: Relay, conn: Connection) {.async.} = trace "Message type not set" await sendStatus(conn, StatusV1.MalformedMessage) return - case typ: - of RelayType.Hop: await r.handleHop(conn, msg) - of RelayType.Stop: await sendStatus(conn, StatusV1.StopRelayRefused) - of RelayType.CanHop: await sendStatus(conn, StatusV1.Success) - else: - trace "Unexpected relay handshake", msgType=msg.msgType - await sendStatus(conn, StatusV1.MalformedMessage) + case typ + of RelayType.Hop: + await r.handleHop(conn, msg) + of RelayType.Stop: + await sendStatus(conn, StatusV1.StopRelayRefused) + of RelayType.CanHop: + await sendStatus(conn, StatusV1.Success) + else: + trace "Unexpected relay handshake", msgType = msg.msgType + await sendStatus(conn, StatusV1.MalformedMessage) proc setup*(r: Relay, switch: Switch) = r.switch = switch r.switch.addPeerEventHandler( - proc (peerId: PeerId, event: PeerEvent) {.async.} = - r.rsvp.del(peerId), - Left) - -proc new*(T: typedesc[Relay], - reservationTTL: times.Duration = DefaultReservationTTL, - limitDuration: uint32 = DefaultLimitDuration, - limitData: uint64 = DefaultLimitData, - heartbeatSleepTime: uint32 = DefaultHeartbeatSleepTime, - maxCircuit: int = MaxCircuit, - maxCircuitPerPeer: int = MaxCircuitPerPeer, - msgSize: int = RelayMsgSize, - circuitRelayV1: bool = false): T = - - let r = T(reservationTTL: reservationTTL, - limit: Limit(duration: limitDuration, data: limitData), - heartbeatSleepTime: heartbeatSleepTime, - maxCircuit: maxCircuit, - maxCircuitPerPeer: maxCircuitPerPeer, - msgSize: msgSize, - isCircuitRelayV1: circuitRelayV1) + proc(peerId: PeerId, event: PeerEvent) {.async.} = + r.rsvp.del(peerId) + , + Left, + ) + +proc new*( + T: typedesc[Relay], + reservationTTL: times.Duration = DefaultReservationTTL, + limitDuration: uint32 = DefaultLimitDuration, + limitData: uint64 = DefaultLimitData, + heartbeatSleepTime: uint32 = DefaultHeartbeatSleepTime, + maxCircuit: int = MaxCircuit, + maxCircuitPerPeer: int = MaxCircuitPerPeer, + msgSize: int = RelayMsgSize, + circuitRelayV1: bool = false, +): T = + let r = T( + reservationTTL: reservationTTL, + limit: Limit(duration: limitDuration, data: limitData), + heartbeatSleepTime: heartbeatSleepTime, + maxCircuit: maxCircuit, + maxCircuitPerPeer: maxCircuitPerPeer, + msgSize: msgSize, + isCircuitRelayV1: circuitRelayV1, + ) proc handleStream(conn: Connection, proto: string) {.async.} = try: - case proto: - of RelayV2HopCodec: await r.handleHopStreamV2(conn) - of RelayV1Codec: await r.handleStreamV1(conn) + case proto + of RelayV2HopCodec: + await r.handleHopStreamV2(conn) + of RelayV1Codec: + await r.handleStreamV1(conn) except CancelledError as exc: raise exc except CatchableError as exc: @@ -349,8 +374,11 @@ proc new*(T: typedesc[Relay], trace "exiting relayv2 handler", conn await conn.close() - r.codecs = if r.isCircuitRelayV1: @[RelayV1Codec] - else: @[RelayV2HopCodec, RelayV1Codec] + r.codecs = + if r.isCircuitRelayV1: + @[RelayV1Codec] + else: + @[RelayV2HopCodec, RelayV1Codec] r.handler = handleStream r @@ -361,9 +389,7 @@ proc deletesReservation(r: Relay) {.async.} = if n > r.rsvp[k]: r.rsvp.del(k) -method start*( - r: Relay -): Future[void] {.async: (raises: [CancelledError], raw: true).} = +method start*(r: Relay): Future[void] {.async: (raises: [CancelledError], raw: true).} = let fut = newFuture[void]() fut.complete() if not r.reservationLoop.isNil: diff --git a/libp2p/protocols/connectivity/relay/rtransport.nim b/libp2p/protocols/connectivity/relay/rtransport.nim index fac56d904b..007d462267 100644 --- a/libp2p/protocols/connectivity/relay/rtransport.nim +++ b/libp2p/protocols/connectivity/relay/rtransport.nim @@ -13,21 +13,21 @@ import sequtils, strutils import chronos, chronicles -import ./client, - ./rconn, - ./utils, - ../../../switch, - ../../../stream/connection, - ../../../transports/transport +import + ./client, + ./rconn, + ./utils, + ../../../switch, + ../../../stream/connection, + ../../../transports/transport logScope: topics = "libp2p relay relay-transport" -type - RelayTransport* = ref object of Transport - client*: RelayClient - queue: AsyncQueue[Connection] - selfRunning: bool +type RelayTransport* = ref object of Transport + client*: RelayClient + queue: AsyncQueue[Connection] + selfRunning: bool method start*(self: RelayTransport, ma: seq[MultiAddress]) {.async.} = if self.selfRunning: @@ -35,11 +35,10 @@ method start*(self: RelayTransport, ma: seq[MultiAddress]) {.async.} = return self.client.onNewConnection = proc( - conn: Connection, - duration: uint32 = 0, - data: uint64 = 0) {.async.} = - await self.queue.addLast(RelayConnection.new(conn, duration, data)) - await conn.join() + conn: Connection, duration: uint32 = 0, data: uint64 = 0 + ) {.async.} = + await self.queue.addLast(RelayConnection.new(conn, duration, data)) + await conn.join() self.selfRunning = true await procCall Transport(self).start(ma) trace "Starting Relay transport" @@ -57,7 +56,7 @@ method accept*(self: RelayTransport): Future[Connection] {.async.} = proc dial*(self: RelayTransport, ma: MultiAddress): Future[Connection] {.async.} = let sma = toSeq(ma.items()) - relayAddrs = sma[0..sma.len-4].mapIt(it.tryGet()).foldl(a & b) + relayAddrs = sma[0 .. sma.len - 4].mapIt(it.tryGet()).foldl(a & b) var relayPeerId: PeerId dstPeerId: PeerId @@ -68,13 +67,12 @@ proc dial*(self: RelayTransport, ma: MultiAddress): Future[Connection] {.async.} trace "Dial", relayPeerId, dstPeerId let conn = await self.client.switch.dial( - relayPeerId, - @[ relayAddrs ], - @[ RelayV2HopCodec, RelayV1Codec ]) + relayPeerId, @[relayAddrs], @[RelayV2HopCodec, RelayV1Codec] + ) conn.dir = Direction.Out var rc: RelayConnection try: - case conn.protocol: + case conn.protocol of RelayV1Codec: return await self.client.dialPeerV1(conn, dstPeerId, @[]) of RelayV2HopCodec: @@ -83,14 +81,16 @@ proc dial*(self: RelayTransport, ma: MultiAddress): Future[Connection] {.async.} except CancelledError as exc: raise exc except CatchableError as exc: - if not rc.isNil: await rc.close() + if not rc.isNil: + await rc.close() raise exc method dial*( - self: RelayTransport, - hostname: string, - ma: MultiAddress, - peerId: Opt[PeerId] = Opt.none(PeerId)): Future[Connection] {.async.} = + self: RelayTransport, + hostname: string, + ma: MultiAddress, + peerId: Opt[PeerId] = Opt.none(PeerId), +): Future[Connection] {.async.} = peerId.withValue(pid): let address = MultiAddress.init($ma & "/p2p/" & $pid).tryGet() result = await self.dial(address) diff --git a/libp2p/protocols/connectivity/relay/utils.nim b/libp2p/protocols/connectivity/relay/utils.nim index 46ba9cf795..1440f73b7e 100644 --- a/libp2p/protocols/connectivity/relay/utils.nim +++ b/libp2p/protocols/connectivity/relay/utils.nim @@ -10,8 +10,7 @@ {.push raises: [].} import chronos, chronicles -import ./messages, - ../../../stream/connection +import ./messages, ../../../stream/connection logScope: topics = "libp2p relay relay-utils" @@ -22,19 +21,16 @@ const RelayV2StopCodec* = "/libp2p/circuit/relay/0.2.0/stop" proc sendStatus*( - conn: Connection, - code: StatusV1 + conn: Connection, code: StatusV1 ) {.async: (raises: [CancelledError, LPStreamError], raw: true).} = trace "send relay/v1 status", status = $code & "(" & $ord(code) & ")" let - msg = RelayMessage( - msgType: Opt.some(RelayType.Status), status: Opt.some(code)) + msg = RelayMessage(msgType: Opt.some(RelayType.Status), status: Opt.some(code)) pb = encode(msg) conn.writeLp(pb.buffer) proc sendHopStatus*( - conn: Connection, - code: StatusV2 + conn: Connection, code: StatusV2 ) {.async: (raises: [CancelledError, LPStreamError], raw: true).} = trace "send hop relay/v2 status", status = $code & "(" & $ord(code) & ")" let @@ -43,8 +39,7 @@ proc sendHopStatus*( conn.writeLp(pb.buffer) proc sendStopStatus*( - conn: Connection, - code: StatusV2 + conn: Connection, code: StatusV2 ) {.async: (raises: [CancelledError, LPStreamError], raw: true).} = trace "send stop relay/v2 status", status = $code & " (" & $ord(code) & ")" let @@ -53,8 +48,8 @@ proc sendStopStatus*( conn.writeLp(pb.buffer) proc bridge*( - connSrc: Connection, - connDst: Connection) {.async: (raises: [CancelledError]).} = + connSrc: Connection, connDst: Connection +) {.async: (raises: [CancelledError]).} = const bufferSize = 4096 var bufSrcToDst: array[bufferSize, byte] @@ -67,9 +62,10 @@ proc bridge*( try: while not connSrc.closed() and not connDst.closed(): - try: # https://github.com/status-im/nim-chronos/issues/516 + try: # https://github.com/status-im/nim-chronos/issues/516 discard await race(futSrc, futDst) - except ValueError: raiseAssert("Futures list is not empty") + except ValueError: + raiseAssert("Futures list is not empty") if futSrc.finished(): bufRead = await futSrc if bufRead > 0: @@ -91,7 +87,7 @@ proc bridge*( trace "relay src closed connection", src = connSrc.peerId if connDst.closed() or connDst.atEof(): trace "relay dst closed connection", dst = connDst.peerId - trace "relay error", exc=exc.msg + trace "relay error", exc = exc.msg trace "end relaying", bytesSentFromSrcToDst, bytesSentFromDstToSrc await futSrc.cancelAndWait() await futDst.cancelAndWait()