diff --git a/weave/channels/channels_mpsc_unbounded.nim b/weave/channels/channels_mpsc_unbounded.nim index 83a6c50..f7add05 100644 --- a/weave/channels/channels_mpsc_unbounded.nim +++ b/weave/channels/channels_mpsc_unbounded.nim @@ -1,12 +1,14 @@ import std/atomics, - ../config, # TODO: for CacheLineSize - ../primitives/compiler_optimization_hints # for prefetch + ../config, + ../primitives/compiler_optimization_hints, # for prefetch + ../instrumentation/[contracts, loggers] type Enqueueable = concept x, type T x is ptr - x.next is Atomic[T] + x.next is Atomic[pointer] + # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 ChannelMpscUnbounded*[T: Enqueueable] = object ## Lockless multi-producer single-consumer channel @@ -16,72 +18,129 @@ type ## - Lock-free (?): Progress guarantees to determine ## - Unbounded ## - Intrusive List based + ## - Keep an approximate count on enqueued # TODO: pass this through Relacy and Valgrind/Helgrind # to make sure there are no bugs # on arch with relaxed memory models + count: Atomic[int] + dummy: typeof(default(T)[]) # Deref the pointer type + pad0: array[WV_CacheLineSize - sizeof(pointer), byte] front: T - # TODO: align - back: Atomic[T] - -proc initialize*[T](chan: var ChannelMpscUnbounded[T], dummy: T) = - ## This queue is designed for use within a thread-safe allocator - ## It requires an allocated dummy node for initialization - ## but cannot rely on an allocator. - assert not dummy.isNil - dummy.next.store(nil, moRelaxed) - chan.front = dummy - chan.back.store(dummy, moRelaxed) - - assert not(chan.front.isNil) - assert not(chan.back.load(moRelaxed).isNil) - -proc removeDummy*[T](chan: var ChannelMpscUnbounded[T]): T = - ## Remove dummy for its deallocation - ## The queue should be testroyed afterwards - assert not(chan.front.isNil) - assert not(chan.back.load(moRelaxed).isNil) - # Only the dummy should be left - assert chan.front == chan.back.load(moRelease) - assert chan.front.next.load(moRelease).isNil - - result = chan.front - chan.front = nil - chan.back.store(nil, moRelaxed) - -proc trySend*[T](chan: var ChannelMpscUnbounded[T], src: sink T): bool = + pad1: array[WV_CacheLineSize - sizeof(int), byte] + back: Atomic[pointer] # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 + +template checkInvariants(): untyped = + ascertain: not(chan.front.isNil) + ascertain: not(chan.back.load(moRelaxed).isNil) + +proc initialize*[T](chan: var ChannelMpscUnbounded[T]) = + # We keep a dummy node within the queue itself + # it doesn't need any dynamic allocation which simplify + # its use in an allocator + chan.dummy.reset() + chan.front = chan.dummy.addr + chan.back.store(chan.dummy.addr, moRelaxed) + +proc trySendImpl[T](chan: var ChannelMpscUnbounded[T], src: sink T, count: static bool): bool {.inline.}= ## Send an item to the back of the channel ## As the channel as unbounded capacity, this should never fail - assert not(chan.front.isNil) - assert not(chan.back.load(moRelaxed).isNil) + checkInvariants() src.next.store(nil, moRelaxed) fence(moRelease) let oldBack = chan.back.exchange(src, moRelaxed) - oldBack.next.store(src, moRelaxed) + cast[T](oldBack).next.store(src, moRelaxed) # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 + when count: + discard chan.count.fetchAdd(1, moRelaxed) return true +proc trySend*[T](chan: var ChannelMpscUnbounded[T], src: sink T): bool = + # log("Channel 0x%.08x trySend - front: 0x%.08x (%d), second: 0x%.08x, back: 0x%.08x\n", chan.addr, chan.front, chan.front.val, chan.front.next, chan.back) + chan.trySendImpl(src, count = true) + +proc reenqueueDummy[T](chan: var ChannelMpscUnbounded[T]) = + # log("Channel 0x%.08x reenqueing dummy\n") + discard chan.trySendImpl(chan.dummy.addr, count = false) + proc tryRecv*[T](chan: var ChannelMpscUnbounded[T], dst: var T): bool = ## Try receiving the next item buffered in the channel ## Returns true if successful (channel was not empty) + ## This can fail spuriously on the last element if producer + ## enqueues a new element while the consumer was dequeing it assert not(chan.front.isNil) assert not(chan.back.load(moRelaxed).isNil) - let first = chan.front # dummy - let next = first.next.load(moRelaxed) + var first = chan.front + # Workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 + var next = cast[T](first.next.load(moRelaxed)) - if not next.isNil: + # log("Channel 0x%.08x tryRecv - first: 0x%.08x (%d), next: 0x%.08x (%d), last: 0x%.08x\n", + # chan.addr, first, first.val, next, if not next.isNil: next.val else: 0, chan.back) + + if first == chan.dummy.addr: + # First node is the dummy + if next.isNil: + # Dummy has no next node + return false + # Overwrite the dummy, with the real first element chan.front = next + first = next + next = cast[T](next.next.load(moRelaxed)) + + # Fast-path + if not next.isNil: + # second element exist, setup the queue, only consumer touches the front + chan.front = next # switch the front prefetch(first.next.load(moRelaxed)) + # Publish the changes fence(moAcquire) - dst = next + dst = first + discard chan.count.fetchSub(1, moRelaxed) return true + # End fast-path + + # No second element, but we really need something to take + # the place of the first, have a look on the producer side + fence(moAcquire) + let last = chan.back.load(moRelaxed) + if first != last: + # A producer got ahead of us, spurious failure + return false + + # Reenqueue dummy, it is now in the second slot or later + chan.reenqueueDummy() + # Reload the second item + next = cast[T](first.next.load(moRelaxed)) - dst = nil + if not next.isNil: + # second element exist, setup the queue, only consumer touches the front + chan.front = next # switch the front + prefetch(first.next.load(moRelaxed)) + # Publish the changes + fence(moAcquire) + dst = first + discard chan.count.fetchSub(1, moRelaxed) + return true + + # No empty element?! There was a race in enqueueing + # and the new "next" still isn't published + # spurious failure return false +func peek*(chan: var ChannelMpscUnbounded): int32 {.inline.} = + ## Estimates the number of items pending in the channel + ## - If called by the consumer the true number might be more + ## due to producers adding items concurrently. + ## - If called by a producer the true number is undefined + ## as other producers also add items concurrently and + ## the consumer removes them concurrently. + ## + ## This is a non-locking operation. + result = int32 chan.count.load(moRelaxed) + # Sanity checks # ------------------------------------------------------------------------------ when isMainModule: @@ -110,7 +169,7 @@ when isMainModule: while not chan.tryRecv(data): body - const NumVals = 100000 + const NumVals = 1000000 const Padding = 10 * NumVals # Pad with a 0 so that iteration 10 of thread 3 is 3010 with 99 max iters type @@ -134,7 +193,7 @@ when isMainModule: Val = ptr ValObj ValObj = object - next: Atomic[Val] + next: Atomic[pointer] val: int ThreadArgs = object @@ -185,7 +244,7 @@ when isMainModule: args.chan[].recvLoop(val): # Busy loop, in prod we might want to yield the core/thread timeslice discard - # echo "Receiver got: ", val.val, " at address 0x", toLowerASCII toHex cast[ByteAddress](val) + # log("Receiver got: %d at address 0x%.08x\n", val.val, val) let sender = WorkerKind(val.val div Padding) doAssert val.val == counts[sender] + ord(sender) * Padding, "Incorrect value: " & $val.val inc counts[sender] @@ -209,8 +268,7 @@ when isMainModule: echo "------------------------------------------------------------------------" var threads: array[WorkerKind, Thread[ThreadArgs]] let chan = createSharedU(ChannelMpscUnbounded[Val]) # CreateU is not zero-init - let dummy = valAlloc() - chan[].initialize(dummy) + chan[].initialize() createThread(threads[Receiver], thread_func, ThreadArgs(ID: Receiver, chan: chan)) for sender in Sender1..Sender15: @@ -219,7 +277,6 @@ when isMainModule: for worker in WorkerKind: joinThread(threads[worker]) - chan[].removeDummy.valFree() deallocShared(chan) echo "------------------------------------------------------------------------" echo "Success" diff --git a/weave/contexts.nim b/weave/contexts.nim index 9987779..c5a1b48 100644 --- a/weave/contexts.nim +++ b/weave/contexts.nim @@ -7,7 +7,7 @@ import ./datatypes/[context_global, context_thread_local, sync_types], - ./channels/[channels_spsc_single_ptr, channels_mpsc_bounded_lock], + ./channels/[channels_spsc_single_ptr, channels_mpsc_bounded_lock, channels_mpsc_unbounded], ./memory/[persistacks, intrusive_stacks], ./config, system/ansi_c, @@ -65,7 +65,7 @@ proc initialize*[T](c: var ChannelLegacy[T], size: int32) = proc delete*[T](c: var ChannelLegacy[T]) = channel_free(c) -template myThieves*: ChannelLegacy[StealRequest] = +template myThieves*: ChannelMpscUnbounded[StealRequest] = globalCtx.com.thefts[localCtx.worker.ID] template workforce*: int32 = diff --git a/weave/datatypes/context_global.nim b/weave/datatypes/context_global.nim index 6d2bef5..3b50ff7 100644 --- a/weave/datatypes/context_global.nim +++ b/weave/datatypes/context_global.nim @@ -7,6 +7,7 @@ import ../channels/channels_mpsc_bounded_lock, + ../channels/channels_mpsc_unbounded, ../channels/channels_spsc_single_ptr, ../memory/persistacks, ../config, @@ -35,7 +36,7 @@ type # would work but then it requires a pointer indirection # per channel # and a known max number of workers - thefts*: ptr UncheckedArray[ChannelLegacy[StealRequest]] + thefts*: ptr UncheckedArray[ChannelMPSCunbounded[StealRequest]] tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]] GlobalContext* = object diff --git a/weave/datatypes/sparsesets.nim b/weave/datatypes/sparsesets.nim index be299a8..3cf4412 100644 --- a/weave/datatypes/sparsesets.nim +++ b/weave/datatypes/sparsesets.nim @@ -49,9 +49,6 @@ type func allocate*(s: var SparseSet, capacity: SomeInteger) = preCondition: capacity <= WV_MaxWorkers - preCondition: s.indices.isNil - preCondition: s.values.isNil - preCondition: s.rawBuffer.isNil s.capacity = Setuint capacity s.rawBuffer = wv_alloc(Setuint, 2*capacity) diff --git a/weave/datatypes/sync_types.nim b/weave/datatypes/sync_types.nim index 71f58c8..307a3fb 100644 --- a/weave/datatypes/sync_types.nim +++ b/weave/datatypes/sync_types.nim @@ -10,7 +10,8 @@ import ../config, ../channels/channels_spsc_single_ptr, ../instrumentation/contracts, - ../memory/allocs + ../memory/allocs, + std/atomics # Inter-thread synchronization types # ---------------------------------------------------------------------------------- @@ -72,7 +73,10 @@ type # Padding shouldn't be needed as steal requests are used as value types # and deep-copied between threads StealRequest* = ptr object - thiefAddr*: ptr ChannelSpscSinglePtr[Task] # Channel for sending tasks back to the thief + # TODO: padding to cache line + # TODO: Remove workaround generic atomics bug: https://github.com/nim-lang/Nim/issues/12695 + next*: Atomic[pointer] # For intrusive lists and queues + thiefAddr*: ptr ChannelSpscSinglePtr[Task] # Channel for sending tasks back to the thief thiefID*: WorkerID retry*: int32 # 0 <= retry <= num_workers victims*: SparseSet # set of potential victims diff --git a/weave/primitives/compiler_optimization_hints.nim b/weave/primitives/compiler_optimization_hints.nim index 1dd3066..175acbb 100644 --- a/weave/primitives/compiler_optimization_hints.nim +++ b/weave/primitives/compiler_optimization_hints.nim @@ -18,8 +18,8 @@ const withBuiltins = defined(gcc) or defined(clang) or defined(icc) when withBuiltins: proc builtin_prefetch(data: pointer, rw: PrefetchRW, locality: PrefetchLocality) {.importc: "__builtin_prefetch", noDecl.} -template prefetch*[T]( - data: ptr (T or UncheckedArray[T]), +template prefetch*( + data: pointer, rw: static PrefetchRW = Read, locality: static PrefetchLocality = HighTemporalLocality) = ## Prefetch examples: diff --git a/weave/runtime.nim b/weave/runtime.nim index 2921bb7..8cc4339 100644 --- a/weave/runtime.nim +++ b/weave/runtime.nim @@ -12,7 +12,7 @@ import ./instrumentation/[contracts, profilers, loggers], ./contexts, ./config, ./datatypes/[sync_types, prell_deques], - ./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr], + ./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_mpsc_unbounded], ./memory/[persistacks, intrusive_stacks, allocs], ./scheduler, ./signals, ./workers, ./thieves, ./victims, # Low-level primitives @@ -40,7 +40,7 @@ proc init*(_: type Runtime) = ## Allocation of the global context. globalCtx.threadpool = wv_alloc(Thread[WorkerID], workforce()) - globalCtx.com.thefts = wv_alloc(ChannelLegacy[StealRequest], workforce()) + globalCtx.com.thefts = wv_alloc(ChannelMpscUnbounded[StealRequest], workforce()) globalCtx.com.tasks = wv_alloc(Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]], workforce()) discard pthread_barrier_init(globalCtx.barrier, nil, workforce()) diff --git a/weave/scheduler.nim b/weave/scheduler.nim index f9778f5..ed3168d 100644 --- a/weave/scheduler.nim +++ b/weave/scheduler.nim @@ -9,8 +9,8 @@ import ./instrumentation/[contracts, profilers, loggers], ./primitives/barriers, ./datatypes/[sync_types, prell_deques, context_thread_local, flowvars, sparsesets], - ./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_spsc_single_object], - ./memory/[persistacks, intrusive_stacks], + ./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_spsc_single_object, channels_mpsc_unbounded], + ./memory/[persistacks, intrusive_stacks, allocs], ./contexts, ./config, ./victims, ./loop_splitting, ./thieves, ./workers, @@ -28,9 +28,9 @@ proc init*(ctx: var TLContext) = ## Initialize the thread-local context of a worker (including the lead worker) myWorker().deque = newPrellDeque(Task) - myThieves().initialize(WV_MaxConcurrentStealPerWorker * workforce()) myTodoBoxes().initialize() myWorker().initialize(maxID = workforce() - 1) + myThieves().initialize() localCtx.stealCache.initialize() for i in 0 ..< localCtx.stealCache.len: @@ -38,7 +38,7 @@ proc init*(ctx: var TLContext) = ascertain: myTodoBoxes().len == WV_MaxConcurrentStealPerWorker - # Workers see their RNG with their myID() + # Workers seed their RNG with their myID() myThefts().rng.seed(myID()) # Thread-Local Profiling @@ -148,7 +148,6 @@ proc schedulingLoop() = proc threadLocalCleanup*() = myWorker().deque.delete() - myThieves().delete() for i in 0 ..< WV_MaxConcurrentStealPerWorker: # No tasks left diff --git a/weave/thieves.nim b/weave/thieves.nim index c01ad55..ba9e2c2 100644 --- a/weave/thieves.nim +++ b/weave/thieves.nim @@ -9,9 +9,10 @@ import ./datatypes/[sparsesets, sync_types, context_thread_local], ./contexts, ./targets, ./instrumentation/[contracts, profilers, loggers], - ./channels/channels_mpsc_bounded_lock, + ./channels/channels_mpsc_unbounded, ./memory/persistacks, - ./config, ./signals + ./config, ./signals, + std/atomics # Thief # ---------------------------------------------------------------------------------- @@ -22,6 +23,7 @@ proc newStealRequest(): StealRequest {.inline.} = result = localCtx.stealCache.borrow() ascertain: result.victims.capacity.int32 == workforce() + result.next.store(nil, moRelaxed) result.thiefAddr = myTodoBoxes.borrow() result.thiefID = myID() result.retry = 0 @@ -36,6 +38,8 @@ proc newStealRequest(): StealRequest {.inline.} = proc rawSend(victimID: WorkerID, req: sink StealRequest) {.inline.}= ## Send a steal or work sharing request # TODO: check for race condition on runtime exit + # log("Worker %d: sending request 0x%.08x to %d (Channel: 0x%.08x)\n", + # myID(), cast[ByteAddress](req), victimID, globalCtx.com.thefts[victimID].addr) let stealRequestSent = globalCtx.com .thefts[victimID] .trySend(req) diff --git a/weave/victims.nim b/weave/victims.nim index f200c4a..001b2e9 100644 --- a/weave/victims.nim +++ b/weave/victims.nim @@ -10,7 +10,7 @@ import sparsesets, prell_deques, flowvars], ./contexts, ./config, ./instrumentation/[contracts, profilers, loggers], - ./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr], + ./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_mpsc_unbounded], ./thieves, ./loop_splitting # Victims - Adaptative task splitting @@ -38,6 +38,11 @@ proc recv*(req: var StealRequest): bool {.inline.} = profile(send_recv_req): result = myThieves().tryRecv(req) + debug: + if result: + log("Worker %d: receives request 0x%.08x from %d with %d potential victims. (Channel: 0x%.08x)\n", + myID(), cast[ByteAddress](req), req.thiefID, req.victims.len, myThieves().addr) + # We treat specially the case where children fail to steal # and defer to the current worker (their parent) while result and req.state == Waiting: