Skip to content

Commit

Permalink
Executor fixes (#157)
Browse files Browse the repository at this point in the history
* Add more sanity checks

* workaround #155
  • Loading branch information
mratsim authored Jun 26, 2020
1 parent af869e4 commit 97b8f55
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 9 deletions.
7 changes: 7 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Weave tests

Testing multithreading runtime is difficult as bugs may only surface after particular thread patterns.

The main approach to testing Weave is therefore to have extensive internal invariant checking which is enabled in Nim debug mode or with -d:WV_Asserts.

The whole benchmarking suite is run with that flag across a wide range of threading strategies and intensive stress-testing.
2 changes: 1 addition & 1 deletion weave/datatypes/prell_deques.nim
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func steal*[T](dq: var PrellDeque[T]): T =

if dq.tail.prev.isNil:
# Stealing last task of the deque
# ascertain: dq.head == result # Concept are buggy with repr, TODO
ascertain: cast[pointer](dq.head) == cast[pointer](result)
dq.head = dq.tail.addr # isEmpty() condition
else:
dq.tail.prev.next = dq.tail.addr # last task points to dummy
Expand Down
10 changes: 8 additions & 2 deletions weave/executor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ proc processAllandTryPark*(_: typedesc[Weave]) =
## This `syncRoot` then put the Weave runtime to sleep
## if no job submission was received concurrently
##
## This is only valid on Weave root thread.
## Note that if you use `runInBackground`
## this is what the background manager thread does automaticallu
##
## This should be used if Weave root thread (that called init(Weave))
## is on a dedicated long-running thread
## in an event loop:
Expand All @@ -104,6 +108,7 @@ proc processAllandTryPark*(_: typedesc[Weave]) =
## park(Weave)
##
## New job submissions will automatically wakeup the runtime
preCondition: onWeaveThread() and myTask().isRootTask()

manager.jobNotifier[].prepareToPark()
syncRoot(Weave)
Expand Down Expand Up @@ -148,8 +153,8 @@ proc runInBackground*(
init(Weave)
Weave.runUntil(shutdown)
exit(Weave)
{.gcsafe.}: # Workaround regression - https://github.com/nim-lang/Nim/issues/14370
thr.createThread(eventLoop, signalShutdown)

thr.createThread(eventLoop, signalShutdown)

proc runInBackground*(thr: var Thread[void], _: typedesc[Weave]) =
## Start the Weave runtime on a background thread.
Expand All @@ -159,6 +164,7 @@ proc runInBackground*(thr: var Thread[void], _: typedesc[Weave]) =
proc eventLoop() {.thread.} =
init(Weave)
Weave.runForever()

thr.createThread(eventLoop)

proc submitJob*(job: sink Job) =
Expand Down
1 change: 1 addition & 0 deletions weave/scheduler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ proc setupWorker*() =
postCondition: myWorker().workSharingRequests.isEmpty()
postCondition: not ctx.signaledTerminate
postCondition: not myWorker().isWaiting
postCondition: localThreadKind == WorkerThread

# Thread-Local Profiling
# -----------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion weave/state_machines/sync_root.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ setPrologue(syncRootFSA):
Worker: return
debugTermination:
log(">>> Worker %2d enters barrier <<<\n", myID())
preCondition: myTask().isRootTask()
preCondition: onWeaveThread() and myTask().isRootTask()

debug: log("Worker %2d: syncRoot 1 - task from local deque\n", myID())
var task: Task
Expand Down
12 changes: 7 additions & 5 deletions weave/victims.nim
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,13 @@ proc distributeWork*(req: sink StealRequest, workSharing: bool): bool =
var firstJob, lastJob: Job
let count = managerJobQueue.tryRecvBatch(firstJob, lastJob)
if count != 0:
myWorker().deque.addListFirst(
cast[Task](firstJob),
cast[Task](lastJob),
count
)
# TODO: https://github.com/mratsim/weave/issues/155
# myWorker().deque.addListFirst(
# cast[Task](firstJob),
# cast[Task](lastJob),
# count
# )
myWorker().deque.addListFirst(cast[Task](lastJob))
req.dispatchElseDecline()
return true

Expand Down

0 comments on commit 97b8f55

Please sign in to comment.