Skip to content

Commit

Permalink
Victim selection (#74)
Browse files Browse the repository at this point in the history
* Add steal lastThief and layout steallastvictim

* Implement LastVictim policy (+300GFlops on matmul)

* LogSumExp is much better, still 2x slower than sequential but it was 10x slower before

* Histogram perf is very random, can change from 1.5x slower than OpenMP to 3x to 10x (see #35)

* Make lastVictim the default
  • Loading branch information
mratsim authored Dec 27, 2019
1 parent 192cb15 commit 2cebb3d
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 16 deletions.
3 changes: 2 additions & 1 deletion benchmarks/xxx_pathological/histogram_2D/weave_histogram.nim
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,9 @@ proc main(matrixSize = 25000'i32, boxes = 1000'i32) =

reportConfig("Sequential", 1, matrixSize, boxes)
runBench(generateHistogramSerial, matrix, boxes, parallel = false)
reportConfig("Weave" & config, nthreads, matrixSize, boxes)
reportConfig("Weave - Parallel Reduce" & config, nthreads, matrixSize, boxes)
runBench(generateHistogramWeaveReduce, matrix, boxes)
reportConfig("Weave - Parallel For Staged" & config, nthreads, matrixSize, boxes)
runBench(generateHistogramWeaveStaged, matrix, boxes)

dispatch(main)
7 changes: 3 additions & 4 deletions benchmarks/xxx_pathological/logsumexp/weave_logsumexp.nim
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,9 @@ proc main(datasetSize = 20000'i64, batchSize = 256'i64, imageLabels = 1000'i64,
echo '\n'
wv_free(sanityM.buffer)

# reportConfig("Sequential", 1, datasetSize, batchSize, imageLabels, textVocabulary)

# block:
# runBench(logsumexpSerial, datasetSize, batchSize, imageLabels)
reportConfig("Sequential", 1, datasetSize, batchSize, imageLabels, textVocabulary)
block:
runBench(logsumexpSerial, datasetSize, batchSize, imageLabels)
# block:
# runBench(logsumexpSerial, datasetSize, batchSize, textVocabulary)

Expand Down
4 changes: 4 additions & 0 deletions weave/await_fsm.nim
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ behavior(awaitFSA):
transition:
ascertain: not task.fn.isNil
debug: log("Worker %2d: forcefut 3 - stoled tasks\n", myID())
TargetLastVictim:
if task.victim != Not_a_worker:
myThefts().lastVictim = task.victim
ascertain: myThefts().lastVictim != myID()

if not task.next.isNil:
profile(enq_deq_task):
Expand Down
19 changes: 17 additions & 2 deletions weave/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,29 @@ const WV_Backoff* {.booldefine.} = true
## steal requests queues when they sleep and there is latency to wake up.

type
StealKind* {.pure.}= enum
StealKind* {.pure.} = enum
one
half
adaptative

SplitKind* {.pure.}= enum
SplitKind* {.pure.} = enum
half
guided
adaptative

VictimSelection* = enum
Random # Always target a new victim randomly
LastVictim # Target the last victim if possible
LastThief # Target the last thief if possible

const
WV_Steal{.strdefine.} = "adaptative"
WV_Split{.strdefine.} = "adaptative"
WV_Target{.strdefine.} = "LastVictim"

StealStrategy* = parseEnum[StealKind](WV_Steal)
SplitStrategy* = parseEnum[SplitKind](WV_Split)
FirstVictim* = parseEnum[VictimSelection](WV_Target)

# Static scopes
# ----------------------------------------------------------------------------------
Expand Down Expand Up @@ -113,6 +120,14 @@ template Backoff*(body: untyped): untyped =
when WV_Backoff:
body

template TargetLastVictim*(body: untyped): untyped =
when FirstVictim == LastVictim:
body

template TargetLastThief*(body: untyped): untyped =
when FirstVictim == LastThief:
body

# Dynamic defines
# ----------------------------------------------------------------------------------

Expand Down
6 changes: 3 additions & 3 deletions weave/datatypes/context_thread_local.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ type
# Before a worker can become quiescent it has to drop MaxSteal - 1
# steal request and send the remaining one to its parent
dropped*: int32
# RRNG state to choose victims
# RNG state to choose victims
rng*: RngState
when defined(StealLastVictim):
when FirstVictim == LastVictim:
lastVictim*: WorkerID
when defined(StealLastThief):
when FirstVictim == LastThief:
lastThief*: WorkerID
# Adaptative theft
stealHalf*: bool
Expand Down
4 changes: 3 additions & 1 deletion weave/datatypes/sync_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ type
stop*: int
stride*: int
# 64 bytes
futures*: pointer # LinkedList of futures required by the current task
when FirstVictim == LastVictim:
victim*: WorkerID
isLoop*: bool
hasFuture*: bool # If a task is associated with a future, the future is stored at data[0]
futureSize*: uint8 # Size of the future result type if relevant
futures*: pointer # LinkedList of futures required by the current task
# 79 bytes
# User data - including the FlowVar channel to send back result.
# It is very likely that User data contains a pointer (the Flowvar channel)
Expand Down
4 changes: 4 additions & 0 deletions weave/runtime_fsm.nim
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ behavior(syncFSA):
# 3. We stole some task(s)
debug: log("Worker %2d: globalsync 3 - stoled tasks\n", myID())
ascertain: not task.fn.isNil
TargetLastVictim:
if task.victim != Not_a_worker:
myThefts().lastVictim = task.victim
ascertain: myThefts().lastVictim != myID()

if not task.next.isNil:
profile(enq_deq_task):
Expand Down
8 changes: 8 additions & 0 deletions weave/scheduler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ proc init*(ctx: var TLContext) {.gcsafe.} =
localCtx.stealCache.access(i).victims.allocate(capacity = workforce())

myThefts().rng.seed(myID())
TargetLastVictim:
myThefts().lastVictim = Not_a_worker
TargetLastThief:
myThefts().lastThief = Not_a_worker

# Debug
# -----------------------------------------------------------
Expand Down Expand Up @@ -194,6 +198,10 @@ proc schedulingLoop() =
# 3. We stole some task(s)
ascertain: not task.fn.isNil
debug: log("Worker %2d: schedloop 3 - stoled tasks\n", myID())
TargetLastVictim:
if task.victim != Not_a_worker:
myThefts().lastVictim = task.victim
ascertain: myThefts().lastVictim != myID()

if not task.next.isNil:
# Add everything
Expand Down
3 changes: 2 additions & 1 deletion weave/signals.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ proc asyncSignal(fn: proc (_: pointer) {.nimcall, gcsafe.}, chan: var ChannelSps
profile(send_recv_task):
let dummy = newTaskFromCache()
dummy.fn = fn
# TODO: StealLastVictim
TargetLastVictim:
dummy.victim = Not_a_worker

let signalSent = chan.trySend(dummy)
debugTermination: log("Worker %2d: sending asyncSignal\n", myID())
Expand Down
13 changes: 12 additions & 1 deletion weave/targets.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,18 @@ proc findVictim*(req: var StealRequest): WorkerID =
markIdle(req.victims, myWorker().right)

ascertain: myID() notin req.victims
result = randomVictim(req.victims, req.thiefID)
when FirstVictim == LastVictim:
if myThefts().lastVictim != Not_a_worker and myThefts().lastVictim in req.victims:
result = myThefts().lastVictim
else:
result = randomVictim(req.victims, req.thiefID)
elif FirstVictim == LastThief:
if myThefts().lastThief != Not_a_worker and myThefts().lastThief in req.victims:
result = myThefts().lastThief
else:
result = randomVictim(req.victims, req.thiefID)
else:
result = randomVictim(req.victims, req.thiefID)

if result == Not_a_worker:
# Couldn't find a victim. Return the steal request to the thief
Expand Down
2 changes: 0 additions & 2 deletions weave/thieves.nim
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ proc trySteal*(isOutOfTasks: bool) =
req.state = Stealing
else:
req.state = Working

# TODO LastVictim/LastThief
req.findVictimAndSteal()

proc forget*(req: sink StealRequest) {.gcsafe.} =
Expand Down
7 changes: 6 additions & 1 deletion weave/victims.nim
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ proc send(req: sink StealRequest, task: sink Task, numStolen: int32 = 1) {.inlin
debug: log("Worker %2d: sending %d tasks (task.fn 0x%.08x) to Worker %2d\n",
myID(), numStolen, task.fn, req.thiefID, req.thiefAddr)
let taskSent = req.thiefAddr[].trySend(task)
TargetLastThief:
myThefts().lastThief = req.thiefID

postCondition: taskSent # SPSC channel with only 1 slot

Expand All @@ -180,7 +182,8 @@ proc dispatchElseDecline*(req: sink StealRequest) {.gcsafe.}=
ascertain: not task.fn.isNil
ascertain: cast[ByteAddress](task.fn) != 0xFACADE
profile(send_recv_task):
# TODO LastVictim
TargetLastVictim:
task.victim = myID()
LazyFV:
batchConvertLazyFlowvar(task)
debug: log("Worker %2d: preparing %d task(s) for worker %2d with function address 0x%.08x\n",
Expand All @@ -199,6 +202,8 @@ proc splitAndSend*(task: Task, req: sink StealRequest) =

# Copy the current task
upperSplit[] = task[]
TargetLastVictim:
upperSplit.victim = myID()

# Split iteration range according to given strategy
# [start, stop) => [start, split) + [split, end)
Expand Down

0 comments on commit 2cebb3d

Please sign in to comment.