Skip to content

Commit

Permalink
Reduce metadata in task descriptor (#72)
Browse files Browse the repository at this point in the history
* avoid torn reads in task user data

* Remove the batch field

* work-stealing deque: popping should set next to nil + delete unused proc and add 0xFACADE dummy

* We may have solved #51 - non-root task ending up in the barrier

* renable tests in parallel_for and used a smaller iteration domain for strided nested loop

* Solve #51 for good
  • Loading branch information
mratsim authored Dec 27, 2019
1 parent 7f94752 commit 192cb15
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 158 deletions.
15 changes: 6 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,13 @@ matrix:
- CHANNEL=devel
compiler: clang

# OSX tests for Travis are very strange. A non-root task
# can end up in a root barrier even when there is no nested parallelism
# TODO: https://github.com/mratsim/weave/issues/51
# On OSX we only test against clang (gcc is mapped to clang by default)
# - os: osx
# arch: amd64
# env:
# - ARCH=amd64
# - CHANNEL=devel
# compiler: clang
- os: osx
arch: amd64
env:
- ARCH=amd64
- CHANNEL=devel
compiler: clang
fast_finish: true

before_install:
Expand Down
1 change: 0 additions & 1 deletion experiments/e04_channel_based_work_stealing/runtime.nim
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,6 @@ proc async_action(fn: proc (_: pointer) {.nimcall.}, chan: Channel[Task]) =
profile(send_recv_task):
let dummy = task_alloc()
dummy.fn = fn
dummy.batch = 1
when defined(StealLastVictim):
dummy.victim = -1

Expand Down
6 changes: 2 additions & 4 deletions weave.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ task test, "Run Weave tests":
test "", "weave/memory/persistacks.nim"

test "", "weave/parallel_tasks.nim"
when defined(linux): # Need nestable barriers - https://github.com/mratsim/weave/issues/51
test "", "weave/parallel_for.nim"
test "", "weave/parallel_for.nim"
test "", "weave/parallel_for_staged.nim"
# test "", "weave/parallel_reduce.nim"

test "-d:WV_LazyFlowvar", "weave/parallel_tasks.nim"
when defined(linux):
test "-d:WV_LazyFlowvar", "weave/parallel_for.nim"
test "-d:WV_LazyFlowvar", "weave/parallel_for.nim"
test "-d:WV_LazyFlowvar", "weave/parallel_for_staged.nim"
# test "-d:WV_LazyFlowvar", "weave/parallel_reduce.nim" # Experimental

Expand Down
5 changes: 2 additions & 3 deletions weave/await_fsm.nim
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,10 @@ behavior(awaitFSA):
ascertain: not task.fn.isNil
debug: log("Worker %2d: forcefut 3 - stoled tasks\n", myID())

let loot = task.batch
if loot > 1:
if not task.next.isNil:
profile(enq_deq_task):
# Add everything
myWorker().deque.addListFirst(task, loot)
myWorker().deque.addListFirst(task)
# And then only use the last
task = myWorker().deque.popFirst()

Expand Down
1 change: 0 additions & 1 deletion weave/contexts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ proc newTaskFromCache*(): Task =
result.stop = 0
result.stride = 0
result.futures = nil
result.batch = 0
result.isLoop = false
result.hasFuture = false

Expand Down
114 changes: 29 additions & 85 deletions weave/datatypes/prell_deques.nim
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func popFirst*[T](dq: var PrellDeque[T]): T {.inline.} =
result = dq.head
dq.head = dq.head.next
dq.head.prev = nil
result.next = nil

dq.pendingTasks -= 1

Expand All @@ -101,7 +102,8 @@ func popFirst*[T](dq: var PrellDeque[T]): T {.inline.} =
proc initialize*[T: StealableTask](dq: var PrellDeque[T]) {.inline.} =
dq.head = dq.tail.addr
dq.pendingTasks = 0
# result.numSteals = 0
when compileOption("assertions"):
dq.tail.fn = cast[type dq.tail.fn](0xFACADE)

proc flush*[T: StealableTask](dq: var PrellDeque[T]): T {.inline.} =
## This returns all the StealableTasks left in the deque
Expand All @@ -116,7 +118,7 @@ proc flush*[T: StealableTask](dq: var PrellDeque[T]): T {.inline.} =
# Batch routines
# ---------------------------------------------------------------

func addListFirst[T](dq: var PrellDeque[T], head, tail: sink T, len: int32) =
func addListFirst[T](dq: var PrellDeque[T], head, tail: sink T, len: int32) {.inline.} =
# Add a list of tasks [head ... tail] of length len to the front of the deque
preCondition: not head.isNil and not tail.isNil
preCondition: len > 0
Expand All @@ -130,21 +132,17 @@ func addListFirst[T](dq: var PrellDeque[T], head, tail: sink T, len: int32) =
dq.head = head
dq.pendingTasks += len

func addListFirst*[T](dq: var PrellDeque[T], head: sink T, len: int32) =
func addListFirst*[T](dq: var PrellDeque[T], head: sink T) =
preCondition: not head.isNil
preCondition: len > 0

var tail = head
when compileOption("boundChecks"):
var index = 0'i32
var count = 1'i32
while not tail.next.isNil:
tail = tail.next
when compileOption("boundChecks"):
index += 1
count += 1
ascertain: cast[ByteAddress](tail.fn) != 0xFACADE

when compileOption("boundChecks"):
postCondition: index + 1 == len
dq.addListFirst(head, tail, len)
dq.addListFirst(head, tail, count)

# Task-specific routines
# ---------------------------------------------------------------
Expand Down Expand Up @@ -190,15 +188,14 @@ func steal*[T](dq: var PrellDeque[T]): T =
dq.tail.prev.next = dq.tail.addr # last task points to dummy

dq.pendingTasks -= 1
# dq.numSteals += 1
postCondition: not result.fn.isNil
postCondition: cast[ByteAddress](result.fn) != 0xFACADE

template multistealImpl[T](
func stealHalf*[T](
dq: var PrellDeque[T],
stolenHead: var T,
numStolen: var int32,
maxStmt: untyped,
tailAssignStmt: untyped
): untyped =
) =
## Implementation of stealing multiple tasks.
## All procs:
## - update the numStolen param with the number of tasks stolen
Expand All @@ -211,87 +208,31 @@ template multistealImpl[T](
if dq.isEmpty():
return

# Make sure to steal at least one task
numStolen = dq.pendingTasks shr 1 # half tasks
if numStolen == 0: numStolen = 1
maxStmt # <-- 1st statement "if numStolen > max: numStolen = max" injected here
if numStolen == 0:
# Only one task left
numStolen = 1
ascertain: dq.tail.prev == dq.head

stolenHead = dq.tail.addr # dummy node

tailAssignStmt # <-- 2nd statement "tail = dummy.prev" injected here

# Walk backwards from the dummy node
for i in 0 ..< numStolen:
stolenHead = stolenHead.prev
ascertain: cast[ByteAddress](stolenHead.fn) != 0xFACADE

dq.tail.prev.next = nil # Detach the true tail from the dummy
dq.tail.prev.next = nil # Detach the true tail from the dummy
dq.tail.prev = stolenHead.prev # Update the node the dummy points to
stolenHead.prev = nil # Detach the stolenHead head from the deque
if dq.tail.prev.isNil:
# Stealing the last task of the deque
ascertain: dq.head == stolenHead
# ascertain: dq.head == stolenHead
dq.head = dq.tail.addr # isEmpty() condition
else:
dq.tail.prev.next = dq.tail.addr # last task points to dummy

dq.pendingTasks -= numStolen
# dq.numSteals += 1

func stealMany*[T](dq: var PrellDeque[T],
maxSteals: int32, # should be range[1'i32 .. high(int32)]
head, tail: var T,
numStolen: var int32) =
## Steal up to half of the deque's tasks, but at most maxSteals tasks
## head will point to the first task in the returned list
## tail will point to the last task in the returned list
## numStolen will contain the number of transferred tasks
preCondition: maxSteals >= 1

multistealImpl(dq, head, numStolen):
if numStolen > maxSteals:
numStolen = maxSteals
do:
tail = dq.tail.prev

func stealMany*[T](dq: var PrellDeque[T],
maxSteals: int32, # should be range[1'i32 .. high(int32)]
head: var T,
numStolen: var int32) =
## Steal up to half of the deque's tasks, but at most maxSteals tasks
## head will point to the first task in the returned list
## numStolen will contain the number of transferred tasks
preCondition: maxSteals >= 1

multistealImpl(dq, head, numStolen):
if numStolen > maxSteals:
numStolen = maxSteals
do:
discard

func stealHalf*[T](dq: var PrellDeque[T],
head, tail: var T,
numStolen: var int32) =
## Steal half of the deque's tasks (minimum one)
## head will point to the first task in the returned list
## tail will point to the last task in the returned list
## numStolen will contain the number of transferred tasks

multistealImpl(dq, head, numStolen):
discard
do:
tail = dq.tail.prev

func stealHalf*[T](dq: var PrellDeque[T],
head: var T,
numStolen: var int32) =
## Steal half of the deque's tasks (minimum one)
## head will point to the first task in the returned list
## numStolen will contain the number of transferred tasks

multistealImpl(dq, head, numStolen):
discard
do:
discard
postCondition: cast[ByteAddress](stolenHead.fn) != 0xFACADE

# Unit tests
# ---------------------------------------------------------------
Expand All @@ -301,7 +242,6 @@ when isMainModule:

const
N = 1000000 # Number of tasks to push/pop/steal
M = 100 # Max number of tasks to steal in one swoop
TaskDataSize = 192 - 96

type
Expand All @@ -323,10 +263,13 @@ when isMainModule:
pool.initialize()

proc newTask(cache: var LookAsideList[Task]): Task =
var taskID{.global.} = 1
result = cache.pop()
if result.isNil:
result = pool.borrow(deref(Task))
zeroMem(result, sizeof(deref(Task)))
result.fn = cast[type result.fn](taskID)
taskID += 1

proc delete(task: Task) =
recycle(task)
Expand Down Expand Up @@ -397,19 +340,20 @@ when isMainModule:

var i, numStolen = 0'i32
while i < N:
var head, tail: Task
deq.stealMany(maxSteals = M, head, tail, numStolen)
var head: Task
let M = deq.pendingTasks
deq.stealHalf(head, numStolen)
check:
not head.isNil
1 <= numStolen and numStolen <= M
M div 2 <= numStolen and numStolen <= M div 2 + 1

# "Other thread"
var deq2: PrellDeque[Task]
deq2.initialize()
var cache2: LookAsideList[Task]
cache2.freeFn = recycle

deq2.addListFirst(head, tail, numStolen)
deq2.addListFirst(head)
check:
not deq2.isEmpty
deq2.pendingTasks == numStolen
Expand Down
8 changes: 3 additions & 5 deletions weave/datatypes/sync_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,14 @@ type
stride*: int
# 64 bytes
futures*: pointer # LinkedList of futures required by the current task
batch*: int32 # TODO remove
isLoop*: bool
hasFuture*: bool # If a task is associated with a future, the future is stored at data[0]
futureSize*: uint8 # Size of the future result type if relevant
# 79 bytes
# User data - including the FlowVar channel to send back result.
data*: array[TaskDataSize, byte]

# TODO: support loops with steps

# It is very likely that User data contains a pointer (the Flowvar channel)
# We align to avoid torn reads/extra bookkeeping.
data*{.align:sizeof(int).}: array[TaskDataSize, byte]

# Steal requests
# ----------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 192cb15

Please sign in to comment.