Skip to content

Commit

Permalink
Cleanup repo / update docs after #21
Browse files Browse the repository at this point in the history
  • Loading branch information
mratsim committed Nov 24, 2019
1 parent b796d1a commit e41c927
Show file tree
Hide file tree
Showing 16 changed files with 105 additions and 51 deletions.
26 changes: 26 additions & 0 deletions unused/channels/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Unused channels implementations

This directory tracks channels implementations or documentation/research
that has been superceded by more specialized and/or more efficient channels.

Ultimately they should probably go in an external library.

## Bounded SPSC channel research

Thorough research has been done to avoid wasting memory on very small queues and channels.
An innovative scheme by rolling the front and back index at 2xCapacity instead of just 1xCapacity
allows avoiding allocating an extra always empty slot to differentiate between full and empty.
The usual alternative was to allocate a power-of-2 number of slots and use
bit-masking which is even more wasteful in the general case.

Implementation is available in [./channels_mpsc_bounded_lock.nim](channels_mpsc_bounded_lock.nim)
and also used in the main source at [../../weave/datatypes/bounded_queues.nim](bounded_queues.nim)

Turns out we only need SPSC channels of a single slot for tasks and futures/flowvars

## Bounded MPSC channels

Those were used for steal requests when they were stack objects however:
- using pointers (https://github.com/mratsim/weave/pull/13) reduced overhead by 20% combined with lazy futures (5% on normal futures)
- pointers enabled using intrusive list-based lockless channels for steal requests which requires no allocation
and improves overhead also by about 5% with lazy futures (https://github.com/mratsim/weave/pull/21)
File renamed without changes.
3 changes: 2 additions & 1 deletion weave/channels/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ kind:

unbuf/bounded/unbounded:
- unbuf: unbuffered channel (also called rendezvous). Blocking.
- single: channel can only buffer a single element
- bounded: channel has a max pre-allocated capacity. Usually array-based.
- unbounded: channel has no max capacity. List-based.

Expand All @@ -34,7 +35,7 @@ between senders and receivers however it might be interesting to explore
endpoints with only the recv and send proc implemented.
Furthermore we can add a "can only be moved" restrictions for single consumer or single receiver endpoints.

It prevents misuse of channels however for Picasso:
It prevents misuse of channels however for Weave:
- We store all channels in a context/global, with the sender/receiver distinction
it will be twice the size (and potentially twice the cache misses).
- It would require tracing for the channel buffer to be destroyed
Expand Down
18 changes: 18 additions & 0 deletions weave/channels/channels_legacy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,24 @@ proc channel_receive*[T](chan: ChannelLegacy[T], data: ptr T, size: int32): bool
## (Remove the first item)
recv_fn[chan.impl](chan, data, size)

# Weave API
# ----------------------------------------------------------------------------------

func trySend*[T](c: ChannelLegacy[T], src: sink T): bool {.inline.} =
channel_send(c, src, int32 sizeof(src))

func tryRecv*[T](c: ChannelLegacy[T], dst: var T): bool {.inline.} =
channel_receive(c, dst.addr, int32 sizeof(dst))

func peek*[T](c: ChannelLegacy[T]): int32 =
channel_peek(c)

proc initialize*[T](c: var ChannelLegacy[T], size: int32) =
c = channel_alloc(int32 sizeof(T), size, Mpsc)

proc delete*[T](c: var ChannelLegacy[T]) =
channel_free(c)

# Tests
# ----------------------------------------------------------------------------------

Expand Down
52 changes: 40 additions & 12 deletions weave/channels/channels_mpsc.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,71 @@ Channels are used for the following types:
- SPSC channels
- unbuffered (rendezvous / blocking)
- StealRequest
- object of size 32 bytes
- object of size 32 bytes or pointers to heap allocated steal requests
- MPSC channels
- buffered - "MaxSteal * num_workers" or "2 * MaxSteal * num_workers" (Main thread)
- Memory Pool (pointer to memory blocks)
- pointer objects
- MPSC channel
- unbounded

## Requirements notes

There are a lot of litterature regarding linked-list based MPSC channels.
Given that we do not need linearizability guarantee, the Vyukhov MPSC queue would probably be enough:
- http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue (Intrusive)

Initially the MPSC channels were implemented following Microsoft's message-passing allocator [Snmalloc](https://github.com/microsoft/snmalloc) implementation which is itself heavily inspired by Pony language's actor message queue.

However they:

- Hold on the last item of the queue: unsuitable for steal requests
- Require memory management of a stub node: snmalloc overwrites it and never seem to reclaim its memory
- They never update the "back" pointer of the queue when dequeueing so the back pointer
still points to an unowned item, requiring the caller to do an emptiness check
- There is no way of estimating the number of enqueued items for the consumers which is necessary for both
steal requests (adaptative stealing) and memory pool (estimating free memory blocks)

So the unbounded queue has been heavily modified with inspiration from the intrusive Vyukov queue at:
- http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue

An alternative implementation using CAS is discussed here:
- https://groups.google.com/forum/#!topic/comp.programming.threads/M_ecdRRlgvM

It may be possible to use the required "count" atomic field for further optimization as other queues don't need it
but the likelihood is low given that it's a contended variable by all threads accessign the data-structure
compared to "next" field.

A non-intrusive approach would require some memory reclamation of the node which we don't want when implementing
a memory pool. The extra pointer indirection is probably also costly in terms of cache miss while the
"next" field will be in the same cache-line as fresh data for the producers or required data for the consumer.
- http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue (non-intrusive)
- Reviews:
- https://int08h.com/post/ode-to-a-vyukov-queue/
- http://concurrencyfreaks.blogspot.com/2014/04/multi-producer-single-consumer-queue.html
- http://psy-lob-saw.blogspot.com/2015/04/porting-dvyukov-mpsc.html

compared to something more complex but with wait-free and linearizability guarantees like
the MultiList queue: https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/multilist-2017.pdf which probably has more latency and/or less throughput.

However the MPSC queue will be used for StealRequest. Linked list based queues will require to pass data as a pointer meaning allocation can happen in one thread and deallocation in another.
This requires multithreaded alloc/free scheme. As we probably would want to implement pool allocation to reduce allocator pressure complicating the pool allocator design.
Lastly something more complex but with wait-free and linearizability guarantees like
the MultiList queue: https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/multilist-2017.pdf probably has more latency and/or less throughput.

Another issue is that assuming we pool StealRequests, they will trigger access to the same cacheline from random threads so it's probably better if StealRequests were thread-local.

Lastly, in the channel-based work-stealing, the thread that has work to do must also handle the steal requests (contrary to shared-memory design where the threads that have nothing to do handle the stealing). This means that that busy thread will also need to handle heap/pool deallocation.
Note that for steal requests as pointers we can actually guarantee that the thread that allocated them
can destroy/recycle them when no other thread is reading them, making list-based MPSC channels suitable.
Moving a pointer instead of 32 byte object resulted in a significant overhead reduction (25%) in https://github.com/mratsim/weave/pull/13.

## More notes

Arguably there might be cache thrashing between the producers when writing the tail index and the data. However they don't have any useful work to do. What should be prevented is that they interfere with the consumer.

As the producers have nothing to do anyway, a lock-based solution only on the producer side should be suitable.

Furthermore each producer is only allowed a limited number of steal requests
Furthermore each producer is only allowed a limited number of steal requests.

Also Dmitry Vyokov presents "Memory Queues": https://groups.google.com/forum/#!topic/comp.programming.threads/4ILh6yY5fV4
to be used with thread-safe memory allocator such as: https://groups.google.com/forum/embed/#!topic/comp.programming.threads/gkX0tIWR_Gk

## References

There is a definite lack of papers on ring-buffer based MPSC queues

- https://github.com/dbittman/waitfree-mpsc-queue
- https://github.com/rmind/ringbuf
- https://github.com/cloudfoundry/go-diodes (can overwrite data)
- Disruptor: https://github.com/LMAX-Exchange/disruptor/wiki/Blogs-And-Articles
Expand Down
23 changes: 2 additions & 21 deletions weave/contexts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@

import
./datatypes/[context_global, context_thread_local, sync_types],
./channels/[channels_spsc_single_ptr, channels_mpsc_bounded_lock, channels_mpsc_unbounded],
./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded],
./memory/[persistacks, intrusive_stacks],
./config,
system/ansi_c,
./primitives/barriers,
./instrumentation/[contracts, profilers, loggers]
./instrumentation/[profilers, loggers]

# Contexts
# ----------------------------------------------------------------------------------
Expand Down Expand Up @@ -47,24 +46,6 @@ proc newTaskFromCache*(): Task {.inline.} =
template myTodoBoxes*: Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]] =
globalCtx.com.tasks[localCtx.worker.ID]

# TODO: used to debug a recurrent deadlock on trySend with 5 workers
import ./channels/channels_legacy

func trySend*[T](c: ChannelLegacy[T], src: sink T): bool {.inline.} =
channel_send(c, src, int32 sizeof(src))

func tryRecv*[T](c: ChannelLegacy[T], dst: var T): bool {.inline.} =
channel_receive(c, dst.addr, int32 sizeof(dst))

func peek*[T](c: ChannelLegacy[T]): int32 =
channel_peek(c)

proc initialize*[T](c: var ChannelLegacy[T], size: int32) =
c = channel_alloc(int32 sizeof(T), size, Mpsc)

proc delete*[T](c: var ChannelLegacy[T]) =
channel_free(c)

template myThieves*: ChannelMpscUnbounded[StealRequest] =
globalCtx.com.thefts[localCtx.worker.ID]

Expand Down
8 changes: 3 additions & 5 deletions weave/datatypes/context_global.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.

import
../channels/channels_mpsc_bounded_lock,
../channels/channels_mpsc_unbounded,
../channels/channels_spsc_single_ptr,
../memory/persistacks,
Expand All @@ -17,9 +16,6 @@ import
# Global / inter-thread communication channels
# ----------------------------------------------------------------------------------

# TODO: used to debug a recurrent deadlock on trySend with 5 workers
import ../channels/channels_legacy

type
ComChannels* = object
## Communication channels
Expand All @@ -36,7 +32,9 @@ type
# would work but then it requires a pointer indirection
# per channel
# and a known max number of workers
thefts*: ptr UncheckedArray[ChannelMPSCunbounded[StealRequest]]

# Theft channels is bounded to "NumWorkers * WV_MaxConcurrentStealPerWorker"
thefts*: ptr UncheckedArray[ChannelMpscUnbounded[StealRequest]]
tasks*: ptr UncheckedArray[Persistack[WV_MaxConcurrentStealPerWorker, ChannelSpscSinglePtr[Task]]]

GlobalContext* = object
Expand Down
2 changes: 1 addition & 1 deletion weave/datatypes/flowvars.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
../channels/[channels_spsc_single_ptr, channels_spsc_single_object],
../memory/allocs,
../instrumentation/contracts,
../contexts, ../config
../config

# TODO for the Flowvar we need critically need a caching scheme for the channels
# we use the legacy channels in the mean time
Expand Down
2 changes: 1 addition & 1 deletion weave/loop_splitting.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.

import
./config, ./contexts,
./config,
./datatypes/sync_types,
./instrumentation/[contracts, loggers]

Expand Down
6 changes: 6 additions & 0 deletions weave/memory/multithreaded_memory_management.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@ This article goes over the skeleton of a threadsafe growable pool allocator (but

https://www.qt.io/blog/a-fast-and-thread-safe-pool-allocator-for-qt-part-1

#### comp.programming.threads

Lock-free slab allocator by Chris Thomasson: https://groups.google.com/forum/embed/#!topic/comp.programming.threads/gkX0tIWR_Gk

Associated memory queue by Dmitry Vyukov: https://groups.google.com/forum/#!topic/comp.programming.threads/4ILh6yY5fV4

## False sharing

keywords: cache ping-pong, false sharing, cache threashing, cache lines
Expand Down
5 changes: 1 addition & 4 deletions weave/runtime.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import
./instrumentation/[contracts, profilers, loggers],
./contexts, ./config,
./datatypes/[sync_types, prell_deques],
./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_mpsc_unbounded],
./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded],
./memory/[persistacks, intrusive_stacks, allocs],
./scheduler, ./signals, ./workers, ./thieves, ./victims,
# Low-level primitives
Expand All @@ -23,9 +23,6 @@ import

type Runtime* = object

# TODO: used to debug a recurrent deadlock on trySend with 5 workers
import ./channels/channels_legacy

proc init*(_: type Runtime) =
# TODO detect Hyper-Threading and NUMA domain

Expand Down
2 changes: 1 addition & 1 deletion weave/scheduler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
./instrumentation/[contracts, profilers, loggers],
./primitives/barriers,
./datatypes/[sync_types, prell_deques, context_thread_local, flowvars, sparsesets],
./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_spsc_single_object, channels_mpsc_unbounded],
./channels/[channels_legacy, channels_spsc_single_ptr, channels_mpsc_unbounded],
./memory/[persistacks, intrusive_stacks, allocs],
./contexts, ./config,
./victims, ./loop_splitting,
Expand Down
3 changes: 1 addition & 2 deletions weave/targets.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import
./contexts,
./random/rng,
./instrumentation/[contracts, loggers],
./config,
./memory/allocs
./config

# Victim selection
# ----------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion weave/victims.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import
sparsesets, prell_deques, flowvars],
./contexts, ./config,
./instrumentation/[contracts, profilers, loggers],
./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_mpsc_unbounded],
./channels/[channels_spsc_single_ptr, channels_mpsc_unbounded],
./thieves, ./loop_splitting

# Victims - Adaptative task splitting
Expand Down
4 changes: 2 additions & 2 deletions weave/workers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.

import
./datatypes/[sync_types, context_thread_local, bounded_queues],
./datatypes/[sync_types, context_thread_local],
./contexts,
./instrumentation/[contracts, profilers, loggers],
./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr],
./channels/channels_spsc_single_ptr,
./memory/persistacks,
./config,
./thieves
Expand Down

0 comments on commit e41c927

Please sign in to comment.