Skip to content

Commit

Permalink
format libp2p/upgrademngrs
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Jun 6, 2024
1 parent 6fc8d78 commit ebfa344
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 48 deletions.
48 changes: 20 additions & 28 deletions libp2p/upgrademngrs/muxedupgrade.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,18 @@
import std/sequtils
import pkg/[chronos, chronicles, metrics]

import ../upgrademngrs/upgrade,
../muxers/muxer
import ../upgrademngrs/upgrade, ../muxers/muxer

export Upgrade

logScope:
topics = "libp2p muxedupgrade"

type
MuxedUpgrade* = ref object of Upgrade
muxers*: seq[MuxerProvider]
streamHandler*: StreamHandler
type MuxedUpgrade* = ref object of Upgrade
muxers*: seq[MuxerProvider]
streamHandler*: StreamHandler

func getMuxerByCodec(
self: MuxedUpgrade, muxerName: string): Opt[MuxerProvider] =
func getMuxerByCodec(self: MuxedUpgrade, muxerName: string): Opt[MuxerProvider] =
if muxerName.len == 0 or muxerName == "na":
return Opt.none(MuxerProvider)
for m in self.muxers:
Expand All @@ -35,10 +32,10 @@ func getMuxerByCodec(
Opt.none(MuxerProvider)

proc mux(
self: MuxedUpgrade,
conn: Connection
): Future[Opt[Muxer]] {.async: (raises: [
CancelledError, LPStreamError, MultiStreamError]).} =
self: MuxedUpgrade, conn: Connection
): Future[Opt[Muxer]] {.
async: (raises: [CancelledError, LPStreamError, MultiStreamError])
.} =
## mux connection
trace "Muxing connection", conn
if self.muxers.len == 0:
Expand Down Expand Up @@ -67,28 +64,25 @@ proc mux(
Opt.some(muxer)

method upgrade*(
self: MuxedUpgrade,
conn: Connection,
peerId: Opt[PeerId]
self: MuxedUpgrade, conn: Connection, peerId: Opt[PeerId]
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
trace "Upgrading connection", conn, direction = conn.dir

let sconn = await self.secure(conn, peerId) # secure the connection
let sconn = await self.secure(conn, peerId) # secure the connection
if sconn == nil:
raise (ref UpgradeFailedError)(msg:
"unable to secure connection, stopping upgrade")
raise (ref UpgradeFailedError)(msg: "unable to secure connection, stopping upgrade")

let muxer = (await self.mux(sconn)).valueOr: # mux it if possible
raise (ref UpgradeFailedError)(msg:
"a muxer is required for outgoing connections")
let muxer = (await self.mux(sconn)).valueOr:
raise (ref UpgradeFailedError)(msg: "a muxer is required for outgoing connections")

when defined(libp2p_agents_metrics):
conn.shortAgent = muxer.connection.shortAgent

if sconn.closed():
await sconn.close()
raise (ref UpgradeFailedError)(msg:
"Connection closed or missing peer info, stopping upgrade")
raise (ref UpgradeFailedError)(
msg: "Connection closed or missing peer info, stopping upgrade"
)

trace "Upgraded connection", conn, sconn, direction = conn.dir
muxer
Expand All @@ -97,11 +91,9 @@ proc new*(
T: type MuxedUpgrade,
muxers: seq[MuxerProvider],
secureManagers: openArray[Secure] = [],
ms: MultistreamSelect): T =
let upgrader = T(
muxers: muxers,
secureManagers: @secureManagers,
ms: ms)
ms: MultistreamSelect,
): T =
let upgrader = T(muxers: muxers, secureManagers: @secureManagers, ms: ms)

upgrader.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
trace "Starting stream handler", conn
Expand Down
38 changes: 18 additions & 20 deletions libp2p/upgrademngrs/upgrade.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@
import std/[sequtils, strutils]
import pkg/[chronos, chronicles, metrics]

import ../stream/connection,
../protocols/secure/secure,
../protocols/identify,
../muxers/muxer,
../multistream,
../connmanager,
../errors,
../utility
import
../stream/connection,
../protocols/secure/secure,
../protocols/identify,
../muxers/muxer,
../multistream,
../connmanager,
../errors,
../utility

export connmanager, connection, identify, secure, multistream

declarePublicCounter(libp2p_failed_upgrades_incoming,
"incoming connections failed upgrades")
declarePublicCounter(libp2p_failed_upgrades_outgoing,
"outgoing connections failed upgrades")
declarePublicCounter(
libp2p_failed_upgrades_incoming, "incoming connections failed upgrades"
)
declarePublicCounter(
libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades"
)

logScope:
topics = "libp2p upgrade"
Expand All @@ -40,17 +43,12 @@ type
secureManagers*: seq[Secure]

method upgrade*(
self: Upgrade,
conn: Connection,
peerId: Opt[PeerId]
): Future[Muxer] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
self: Upgrade, conn: Connection, peerId: Opt[PeerId]
): Future[Muxer] {.async: (raises: [CancelledError, LPError], raw: true), base.} =
raiseAssert("Not implemented!")

proc secure*(
self: Upgrade,
conn: Connection,
peerId: Opt[PeerId]
self: Upgrade, conn: Connection, peerId: Opt[PeerId]
): Future[Connection] {.async: (raises: [CancelledError, LPError]).} =
if self.secureManagers.len <= 0:
raise (ref UpgradeFailedError)(msg: "No secure managers registered!")
Expand Down

0 comments on commit ebfa344

Please sign in to comment.