diff --git a/weave/cross_thread_com/channels_mpsc_unbounded_batch.nim b/weave/cross_thread_com/channels_mpsc_unbounded_batch.nim index 3cc82eb..52be5fa 100644 --- a/weave/cross_thread_com/channels_mpsc_unbounded_batch.nim +++ b/weave/cross_thread_com/channels_mpsc_unbounded_batch.nim @@ -76,17 +76,21 @@ type # Implementation # -------------------------------------------------------------- +template whenCount(body): untyped = + when keepCount or defined(WV_DebugExecutor) or defined(WV_Debug): + body + proc initialize*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCount]) {.inline.}= chan.front.next.store(nil, moRelaxed) chan.back.store(chan.front.addr, moRelaxed) - when keepCount: + whenCount: chan.count.store(0, moRelaxed) proc trySend*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCount], src: sink T): bool {.inline.}= ## Send an item to the back of the channel ## As the channel has unbounded capacity, this should never fail - when keepCount: + whenCount: let oldCount {.used.} = chan.count.fetchAdd(1, moRelease) ascertain: oldCount >= 0 @@ -103,7 +107,7 @@ proc trySendBatch*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCoun ## They should be linked together by their next field ## As the channel has unbounded capacity this should never fail - when keepCount: + whenCount: let oldCount {.used.} = chan.count.fetchAdd(int(count), moRelease) ascertain: oldCount >= 0 @@ -139,7 +143,7 @@ proc tryRecv*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCount], d # fence(moAcquire) # Sync "first.next.load(moRelaxed)" dst = first - when keepCount: + whenCount: let oldCount {.used.} = chan.count.fetchSub(1, moRelaxed) postCondition: oldCount >= 1 # The producers may overestimate the count return true @@ -158,7 +162,7 @@ proc tryRecv*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCount], d prefetch(first) dst = first - when keepCount: + whenCount: let oldCount {.used.} = chan.count.fetchSub(1, moRelaxed) postCondition: oldCount >= 1 # The producers may overestimate the count return true @@ -180,7 +184,7 @@ proc tryRecv*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCount], d # fence(moAcquire) # sync first.next.load(moRelaxed) dst = first - when keepCount: + whenCount: let oldCount {.used.} = chan.count.fetchSub(1, moRelaxed) postCondition: oldCount >= 1 # The producers may overestimate the count return true @@ -236,7 +240,7 @@ proc tryRecvBatch*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCoun # We lose the competition, bail out chan.front.next.store(front, moRelease) - when keepCount: + whenCount: let oldCount {.used.} = chan.count.fetchSub(result, moRelaxed) postCondition: oldCount >= result # TODO: somehow it can be negative return @@ -249,7 +253,7 @@ proc tryRecvBatch*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCoun result += 1 bLast = front - when keepCount: + whenCount: let oldCount {.used.} = chan.count.fetchSub(result, moRelaxed) postCondition: oldCount >= result # TODO: somehow it can be negative return @@ -275,7 +279,7 @@ proc tryRecvBatch*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCoun # fence(moAcquire) # sync front.next.load(moRelaxed) bLast = front - when keepCount: + whenCount: let oldCount {.used.} = chan.count.fetchSub(result, moRelaxed) postCondition: oldCount >= result # TODO: somehow it can be negative