Skip to content

Commit

Permalink
format libp2p/transports
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Jun 6, 2024
1 parent 2327742 commit 6fc8d78
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 204 deletions.
2 changes: 1 addition & 1 deletion libp2p/transports/tcptransport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -335,4 +335,4 @@ method dial*(
method handles*(t: TcpTransport, address: MultiAddress): bool =
if procCall Transport(t).handles(address):
if address.protocols.isOk:
return TCP.match(address)
return TCP.match(address)
192 changes: 106 additions & 86 deletions libp2p/transports/tortransport.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import std/strformat
import chronos, chronicles, strutils
import stew/[byteutils, endians2, results, objects]
import ../multicodec
import transport,
tcptransport,
../switch,
../builders,
../stream/[lpstream, connection, chronosstream],
../multiaddress,
../upgrademngrs/upgrade
import
transport,
tcptransport,
../switch,
../builders,
../stream/[lpstream, connection, chronosstream],
../multiaddress,
../upgrademngrs/upgrade

const
IPTcp = mapAnd(IP, mapEq("tcp"))
Expand All @@ -44,16 +45,24 @@ type
NoAcceptableMethod = 0xff

Socks5RequestCommand* {.pure.} = enum
Connect = 1, Bind = 2, UdpAssoc = 3
Connect = 1
Bind = 2
UdpAssoc = 3

Socks5AddressType* {.pure.} = enum
IPv4 = 1, FQDN = 3, IPv6 = 4
IPv4 = 1
FQDN = 3
IPv6 = 4

Socks5ReplyType* {.pure.} = enum
Succeeded = (0, "Succeeded"), ServerFailure = (1, "Server Failure"),
ConnectionNotAllowed = (2, "Connection Not Allowed"), NetworkUnreachable = (3, "Network Unreachable"),
HostUnreachable = (4, "Host Unreachable"), ConnectionRefused = (5, "Connection Refused"),
TtlExpired = (6, "Ttl Expired"), CommandNotSupported = (7, "Command Not Supported"),
Succeeded = (0, "Succeeded")
ServerFailure = (1, "Server Failure")
ConnectionNotAllowed = (2, "Connection Not Allowed")
NetworkUnreachable = (3, "Network Unreachable")
HostUnreachable = (4, "Host Unreachable")
ConnectionRefused = (5, "Connection Refused")
TtlExpired = (6, "Ttl Expired")
CommandNotSupported = (7, "Command Not Supported")
AddressTypeNotSupported = (8, "Address Type Not Supported")

TransportStartError* = object of transport.TransportError
Expand All @@ -64,16 +73,18 @@ type
Socks5ServerReplyError* = object of Socks5Error

proc new*(
T: typedesc[TorTransport],
transportAddress: TransportAddress,
flags: set[ServerFlags] = {},
upgrade: Upgrade): T {.public.} =
T: typedesc[TorTransport],
transportAddress: TransportAddress,
flags: set[ServerFlags] = {},
upgrade: Upgrade,
): T {.public.} =
## Creates a Tor transport

T(
transportAddress: transportAddress,
upgrader: upgrade,
tcpTransport: TcpTransport.new(flags, upgrade))
tcpTransport: TcpTransport.new(flags, upgrade),
)

proc handlesDial(address: MultiAddress): bool {.gcsafe.} =
return Onion3.match(address) or TCP.match(address) or DNSANY.match(address)
Expand All @@ -82,10 +93,13 @@ proc handlesStart(address: MultiAddress): bool {.gcsafe.} =
return TcpOnion3.match(address)

proc connectToTorServer(
transportAddress: TransportAddress): Future[StreamTransport] {.async.} =
transportAddress: TransportAddress
): Future[StreamTransport] {.async.} =
let transp = await connect(transportAddress)
try:
discard await transp.write(@[Socks5ProtocolVersion, NMethods, Socks5AuthMethod.NoAuth.byte])
discard await transp.write(
@[Socks5ProtocolVersion, NMethods, Socks5AuthMethod.NoAuth.byte]
)
let
serverReply = await transp.read(2)
socks5ProtocolVersion = serverReply[0]
Expand Down Expand Up @@ -115,35 +129,40 @@ proc readServerReply(transp: StreamTransport) {.async.} =
if serverReply != Socks5ReplyType.Succeeded.byte:
var socks5ReplyType: Socks5ReplyType
if socks5ReplyType.checkedEnumAssign(serverReply):
raise newException(Socks5ServerReplyError, fmt"Server reply error: {socks5ReplyType}")
raise
newException(Socks5ServerReplyError, fmt"Server reply error: {socks5ReplyType}")
else:
raise newException(LPError, fmt"Unexpected server reply: {serverReply}")
let atyp = firstFourOctets[3]
case atyp:
of Socks5AddressType.IPv4.byte:
discard await transp.read(ipV4NumOctets + portNumOctets)
of Socks5AddressType.FQDN.byte:
let fqdnNumOctets = await transp.read(1)
discard await transp.read(int(uint8.fromBytes(fqdnNumOctets)) + portNumOctets)
of Socks5AddressType.IPv6.byte:
discard await transp.read(ipV6NumOctets + portNumOctets)
else:
raise newException(LPError, "Address not supported")
case atyp
of Socks5AddressType.IPv4.byte:
discard await transp.read(ipV4NumOctets + portNumOctets)
of Socks5AddressType.FQDN.byte:
let fqdnNumOctets = await transp.read(1)
discard await transp.read(int(uint8.fromBytes(fqdnNumOctets)) + portNumOctets)
of Socks5AddressType.IPv6.byte:
discard await transp.read(ipV6NumOctets + portNumOctets)
else:
raise newException(LPError, "Address not supported")

proc parseOnion3(address: MultiAddress): (byte, seq[byte], seq[byte]) {.raises: [LPError, ValueError].} =
proc parseOnion3(
address: MultiAddress
): (byte, seq[byte], seq[byte]) {.raises: [LPError, ValueError].} =
var addressArray = ($address).split('/')
if addressArray.len < 2: raise newException(LPError, fmt"Onion address not supported {address}")
if addressArray.len < 2:
raise newException(LPError, fmt"Onion address not supported {address}")
addressArray = addressArray[2].split(':')
if addressArray.len == 0: raise newException(LPError, fmt"Onion address not supported {address}")
if addressArray.len == 0:
raise newException(LPError, fmt"Onion address not supported {address}")
let
addressStr = addressArray[0] & ".onion"
dstAddr = @(uint8(addressStr.len).toBytes()) & addressStr.toBytes()
dstPort = address.data.buffer[37..38]
dstPort = address.data.buffer[37 .. 38]
return (Socks5AddressType.FQDN.byte, dstAddr, dstPort)

proc parseIpTcp(address: MultiAddress):
(byte, seq[byte], seq[byte])
{.raises: [LPError, ValueError].} =
proc parseIpTcp(
address: MultiAddress
): (byte, seq[byte], seq[byte]) {.raises: [LPError, ValueError].} =
let (codec, atyp) =
if IPv4Tcp.match(address):
(multiCodec("ip4"), Socks5AddressType.IPv4.byte)
Expand All @@ -156,17 +175,16 @@ proc parseIpTcp(address: MultiAddress):
dstPort = address[multiCodec("tcp")].tryGet().protoArgument().tryGet()
(atyp, dstAddr, dstPort)

proc parseDnsTcp(address: MultiAddress):
(byte, seq[byte], seq[byte])
{.raises: [LPError, ValueError].} =
proc parseDnsTcp(
address: MultiAddress
): (byte, seq[byte], seq[byte]) {.raises: [LPError, ValueError].} =
let
dnsAddress = address[multiCodec("dns")].tryGet().protoArgument().tryGet()
dstAddr = @(uint8(dnsAddress.len).toBytes()) & dnsAddress
dstPort = address[multiCodec("tcp")].tryGet().protoArgument().tryGet()
(Socks5AddressType.FQDN.byte, dstAddr, dstPort)

proc dialPeer(
transp: StreamTransport, address: MultiAddress) {.async.} =
proc dialPeer(transp: StreamTransport, address: MultiAddress) {.async.} =
let (atyp, dstAddr, dstPort) =
if Onion3.match(address):
parseOnion3(address)
Expand All @@ -178,19 +196,18 @@ proc dialPeer(
raise newException(LPError, fmt"Address not supported: {address}")

let reserved = byte(0)
let request = @[
Socks5ProtocolVersion,
Socks5RequestCommand.Connect.byte,
reserved,
atyp] & dstAddr & dstPort
let request =
@[Socks5ProtocolVersion, Socks5RequestCommand.Connect.byte, reserved, atyp] & dstAddr &
dstPort
discard await transp.write(request)
await readServerReply(transp)

method dial*(
self: TorTransport,
hostname: string,
address: MultiAddress,
peerId: Opt[PeerId] = Opt.none(PeerId)): Future[Connection] {.async.} =
self: TorTransport,
hostname: string,
address: MultiAddress,
peerId: Opt[PeerId] = Opt.none(PeerId),
): Future[Connection] {.async.} =
## dial a peer
##
if not handlesDial(address):
Expand All @@ -205,20 +222,18 @@ method dial*(
await transp.closeWait()
raise err

method start*(
self: TorTransport,
addrs: seq[MultiAddress]) {.async.} =
method start*(self: TorTransport, addrs: seq[MultiAddress]) {.async.} =
## listen on the transport
##

var listenAddrs: seq[MultiAddress]
var onion3Addrs: seq[MultiAddress]
for i, ma in addrs:
if not handlesStart(ma):
warn "Invalid address detected, skipping!", address = ma
continue
warn "Invalid address detected, skipping!", address = ma
continue

let listenAddress = ma[0..1].tryGet()
let listenAddress = ma[0 .. 1].tryGet()
listenAddrs.add(listenAddress)
let onion3 = ma[multiCodec("onion3")].tryGet()
onion3Addrs.add(onion3)
Expand All @@ -227,7 +242,10 @@ method start*(
await procCall Transport(self).start(onion3Addrs)
await self.tcpTransport.start(listenAddrs)
else:
raise newException(TransportStartError, "Tor Transport couldn't start, no supported addr was provided.")
raise newException(
TransportStartError,
"Tor Transport couldn't start, no supported addr was provided.",
)

method accept*(self: TorTransport): Future[Connection] {.async.} =
## accept a new Tor connection
Expand All @@ -246,35 +264,37 @@ method handles*(t: TorTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address):
return handlesDial(address) or handlesStart(address)

type
TorSwitch* = ref object of Switch
type TorSwitch* = ref object of Switch

proc new*(
T: typedesc[TorSwitch],
torServer: TransportAddress,
rng: ref HmacDrbgContext,
addresses: seq[MultiAddress] = @[],
flags: set[ServerFlags] = {}): TorSwitch
{.raises: [LPError], public.} =
var builder = SwitchBuilder.new()
.withRng(rng)
.withTransport(proc(upgr: Upgrade): Transport = TorTransport.new(torServer, flags, upgr))
if addresses.len != 0:
builder = builder.withAddresses(addresses)
let switch = builder.withMplex()
.withNoise()
.build()
let torSwitch = T(
peerInfo: switch.peerInfo,
ms: switch.ms,
transports: switch.transports,
connManager: switch.connManager,
peerStore: switch.peerStore,
dialer: Dialer.new(switch.peerInfo.peerId, switch.connManager, switch.peerStore, switch.transports, nil),
nameResolver: nil)

torSwitch.connManager.peerStore = switch.peerStore
return torSwitch
T: typedesc[TorSwitch],
torServer: TransportAddress,
rng: ref HmacDrbgContext,
addresses: seq[MultiAddress] = @[],
flags: set[ServerFlags] = {},
): TorSwitch {.raises: [LPError], public.} =
var builder = SwitchBuilder.new().withRng(rng).withTransport(
proc(upgr: Upgrade): Transport =
TorTransport.new(torServer, flags, upgr)
)
if addresses.len != 0:
builder = builder.withAddresses(addresses)
let switch = builder.withMplex().withNoise().build()
let torSwitch = T(
peerInfo: switch.peerInfo,
ms: switch.ms,
transports: switch.transports,
connManager: switch.connManager,
peerStore: switch.peerStore,
dialer: Dialer.new(
switch.peerInfo.peerId, switch.connManager, switch.peerStore, switch.transports,
nil,
),
nameResolver: nil,
)

torSwitch.connManager.peerStore = switch.peerStore
return torSwitch

method addTransport*(s: TorSwitch, t: Transport) =
doAssert(false, "not implemented!")
Expand Down
Loading

0 comments on commit 6fc8d78

Please sign in to comment.