Skip to content

Commit

Permalink
init channel count when debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
mratsim committed Mar 7, 2021
1 parent d0c6403 commit b2fb8dc
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions weave/cross_thread_com/channels_mpsc_unbounded_batch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,21 @@ type
# Implementation
# --------------------------------------------------------------

template whenCount(body): untyped =
when keepCount or defined(WV_DebugExecutor) or defined(WV_Debug):
body

proc initialize*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCount]) {.inline.}=
chan.front.next.store(nil, moRelaxed)
chan.back.store(chan.front.addr, moRelaxed)
when keepCount:
whenCount:
chan.count.store(0, moRelaxed)

proc trySend*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCount], src: sink T): bool {.inline.}=
## Send an item to the back of the channel
## As the channel has unbounded capacity, this should never fail

when keepCount:
whenCount:
let oldCount {.used.} = chan.count.fetchAdd(1, moRelease)
ascertain: oldCount >= 0

Expand All @@ -103,7 +107,7 @@ proc trySendBatch*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCoun
## They should be linked together by their next field
## As the channel has unbounded capacity this should never fail

when keepCount:
whenCount:
let oldCount {.used.} = chan.count.fetchAdd(int(count), moRelease)
ascertain: oldCount >= 0

Expand Down Expand Up @@ -139,7 +143,7 @@ proc tryRecv*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCount], d
# fence(moAcquire) # Sync "first.next.load(moRelaxed)"
dst = first

when keepCount:
whenCount:
let oldCount {.used.} = chan.count.fetchSub(1, moRelaxed)
postCondition: oldCount >= 1 # The producers may overestimate the count
return true
Expand All @@ -158,7 +162,7 @@ proc tryRecv*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCount], d
prefetch(first)
dst = first

when keepCount:
whenCount:
let oldCount {.used.} = chan.count.fetchSub(1, moRelaxed)
postCondition: oldCount >= 1 # The producers may overestimate the count
return true
Expand All @@ -180,7 +184,7 @@ proc tryRecv*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCount], d
# fence(moAcquire) # sync first.next.load(moRelaxed)
dst = first

when keepCount:
whenCount:
let oldCount {.used.} = chan.count.fetchSub(1, moRelaxed)
postCondition: oldCount >= 1 # The producers may overestimate the count
return true
Expand Down Expand Up @@ -236,7 +240,7 @@ proc tryRecvBatch*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCoun
# We lose the competition, bail out
chan.front.next.store(front, moRelease)

when keepCount:
whenCount:
let oldCount {.used.} = chan.count.fetchSub(result, moRelaxed)
postCondition: oldCount >= result # TODO: somehow it can be negative
return
Expand All @@ -249,7 +253,7 @@ proc tryRecvBatch*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCoun
result += 1
bLast = front

when keepCount:
whenCount:
let oldCount {.used.} = chan.count.fetchSub(result, moRelaxed)
postCondition: oldCount >= result # TODO: somehow it can be negative
return
Expand All @@ -275,7 +279,7 @@ proc tryRecvBatch*[T, keepCount](chan: var ChannelMpscUnboundedBatch[T, keepCoun
# fence(moAcquire) # sync front.next.load(moRelaxed)
bLast = front

when keepCount:
whenCount:
let oldCount {.used.} = chan.count.fetchSub(result, moRelaxed)
postCondition: oldCount >= result # TODO: somehow it can be negative

Expand Down

0 comments on commit b2fb8dc

Please sign in to comment.