Skip to content

Commit

Permalink
format libp2p/utils
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Jun 6, 2024
1 parent ebfa344 commit 67dc10a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 32 deletions.
7 changes: 4 additions & 3 deletions libp2p/utils/future.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

import chronos

type
AllFuturesFailedError* = object of CatchableError
type AllFuturesFailedError* = object of CatchableError

proc anyCompleted*[T](futs: seq[Future[T]]): Future[Future[T]] {.async.} =
## Returns a future that will complete with the first future that completes.
Expand All @@ -22,7 +21,9 @@ proc anyCompleted*[T](futs: seq[Future[T]]): Future[Future[T]] {.async.} =

while true:
if requests.len == 0:
raise newException(AllFuturesFailedError, "None of the futures completed successfully")
raise newException(
AllFuturesFailedError, "None of the futures completed successfully"
)

var raceFut = await one(requests)
if raceFut.completed:
Expand Down
7 changes: 3 additions & 4 deletions libp2p/utils/heartbeat.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ template heartbeat*(name: string, interval: Duration, body: untyped): untyped =
delay = now - nextHeartbeat
itv = interval
if delay > itv:
info "Missed multiple heartbeats", heartbeat = name,
delay = delay, hinterval = itv
info "Missed multiple heartbeats",
heartbeat = name, delay = delay, hinterval = itv
else:
debug "Missed heartbeat", heartbeat = name,
delay = delay, hinterval = itv
debug "Missed heartbeat", heartbeat = name, delay = delay, hinterval = itv
nextHeartbeat = now + itv
await sleepAsync(nextHeartbeat - now)
29 changes: 15 additions & 14 deletions libp2p/utils/offsettedseq.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,48 +9,49 @@

import sequtils

type
OffsettedSeq*[T] = object
s*: seq[T]
offset*: int
type OffsettedSeq*[T] = object
s*: seq[T]
offset*: int

proc initOffsettedSeq*[T](offset: int = 0): OffsettedSeq[T] =
OffsettedSeq[T](s: newSeq[T](), offset: offset)

proc all*[T](o: OffsettedSeq[T], pred: proc (x: T): bool): bool =
proc all*[T](o: OffsettedSeq[T], pred: proc(x: T): bool): bool =
o.s.all(pred)

proc any*[T](o: OffsettedSeq[T], pred: proc (x: T): bool): bool =
proc any*[T](o: OffsettedSeq[T], pred: proc(x: T): bool): bool =
o.s.any(pred)

proc apply*[T](o: OffsettedSeq[T], op: proc (x: T)) =
proc apply*[T](o: OffsettedSeq[T], op: proc(x: T)) =
o.s.apply(pred)

proc apply*[T](o: OffsettedSeq[T], op: proc (x: T): T) =
proc apply*[T](o: OffsettedSeq[T], op: proc(x: T): T) =
o.s.apply(pred)

proc apply*[T](o: OffsettedSeq[T], op: proc (x: var T)) =
proc apply*[T](o: OffsettedSeq[T], op: proc(x: var T)) =
o.s.apply(pred)

func count*[T](o: OffsettedSeq[T], x: T): int =
o.s.count(x)

proc flushIf*[T](o: OffsettedSeq[T], pred: proc (x: T): bool) =
proc flushIf*[T](o: OffsettedSeq[T], pred: proc(x: T): bool) =
var i = 0
for e in o.s:
if not pred(e): break
if not pred(e):
break
i.inc()
if i > 0:
o.s.delete(0..<i)
o.s.delete(0 ..< i)
o.offset.inc(i)

template flushIfIt*(o, pred: untyped) =
var i = 0
for it {.inject.} in o.s:
if not pred: break
if not pred:
break
i.inc()
if i > 0:
o.s.delete(0..<i)
o.s.delete(0 ..< i)
o.offset.inc(i)

proc add*[T](o: var OffsettedSeq[T], v: T) =
Expand Down
20 changes: 9 additions & 11 deletions libp2p/utils/semaphore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ import chronos, chronicles
logScope:
topics = "libp2p semaphore"

type
AsyncSemaphore* = ref object of RootObj
size*: int
count: int
queue: seq[Future[void]]
type AsyncSemaphore* = ref object of RootObj
size*: int
count: int
queue: seq[Future[void]]

proc newAsyncSemaphore*(size: int): AsyncSemaphore =
AsyncSemaphore(size: size, count: size)

proc `count`*(s: AsyncSemaphore): int = s.count
proc `count`*(s: AsyncSemaphore): int =
s.count

proc tryAcquire*(s: AsyncSemaphore): bool =
## Attempts to acquire a resource, if successful
Expand All @@ -53,7 +53,7 @@ proc acquire*(s: AsyncSemaphore): Future[void] =
proc cancellation(udata: pointer) {.gcsafe.} =
fut.cancelCallback = nil
if not fut.finished:
s.queue.keepItIf( it != fut )
s.queue.keepItIf(it != fut)

fut.cancelCallback = cancellation

Expand All @@ -79,8 +79,7 @@ proc release*(s: AsyncSemaphore) =
doAssert(s.count <= s.size)

if s.count < s.size:
trace "Releasing slot", available = s.count,
queue = s.queue.len
trace "Releasing slot", available = s.count, queue = s.queue.len

s.count.inc
while s.queue.len > 0:
Expand All @@ -91,6 +90,5 @@ proc release*(s: AsyncSemaphore) =
fut.complete()
break

trace "Released slot", available = s.count,
queue = s.queue.len
trace "Released slot", available = s.count, queue = s.queue.len
return

0 comments on commit 67dc10a

Please sign in to comment.