Skip to content

A bounded multi-producer multi-consumer concurrent queue written in C++11

License

Notifications You must be signed in to change notification settings

rigtorp/MPMCQueue

Repository files navigation

MPMCQueue.h

C/C++ CI License

A bounded multi-producer multi-consumer concurrent queue written in C++11.

It's battle hardened and used daily in production:

It's been cited by the following papers:

  • Peizhao Ou and Brian Demsky. 2018. Towards understanding the costs of avoiding out-of-thin-air results. Proc. ACM Program. Lang. 2, OOPSLA, Article 136 (October 2018), 29 pages. DOI: https://doi.org/10.1145/3276506

Example

MPMCQueue<int> q(10);
auto t1 = std::thread([&] {
  int v;
  q.pop(v);
  std::cout << "t1 " << v << "\n";
});
auto t2 = std::thread([&] {
  int v;
  q.pop(v);
  std::cout << "t2 " << v << "\n";
});
q.push(1);
q.push(2);
t1.join();
t2.join();

Usage

  • MPMCQueue<T>(size_t capacity);

    Constructs a new MPMCQueue holding items of type T with capacity capacity.

  • void emplace(Args &&... args);

    Enqueue an item using inplace construction. Blocks if queue is full.

  • bool try_emplace(Args &&... args);

    Try to enqueue an item using inplace construction. Returns true on success and false if queue is full.

  • void push(const T &v);

    Enqueue an item using copy construction. Blocks if queue is full.

  • template <typename P> void push(P &&v);

    Enqueue an item using move construction. Participates in overload resolution only if std::is_nothrow_constructible<T, P&&>::value == true. Blocks if queue is full.

  • bool try_push(const T &v);

    Try to enqueue an item using copy construction. Returns true on success and false if queue is full.

  • template <typename P> bool try_push(P &&v);

    Try to enqueue an item using move construction. Participates in overload resolution only if std::is_nothrow_constructible<T, P&&>::value == true. Returns true on success and false if queue is full.

  • void pop(T &v);

    Dequeue an item by copying or moving the item into v. Blocks if queue is empty.

  • bool try_pop(T &v);

    Try to dequeue an item by copying or moving the item into v. Return true on sucess and false if the queue is empty.

  • ssize_t size();

    Returns the number of elements in the queue.

    The size can be negative when the queue is empty and there is at least one reader waiting. Since this is a concurrent queue the size is only a best effort guess until all reader and writer threads have been joined.

  • bool empty();

    Returns true if the queue is empty.

    Since this is a concurrent queue this is only a best effort guess until all reader and writer threads have been joined.

All operations except construction and destruction are thread safe.

Implementation

Memory layout

Enqeue:

  1. Acquire next write ticket from head.
  2. Wait for our turn (2 * (ticket / capacity)) to write slot (ticket % capacity).
  3. Set turn = turn + 1 to inform the readers we are done writing.

Dequeue:

  1. Acquire next read ticket from tail.
  2. Wait for our turn (2 * (ticket / capacity) + 1) to read slot (ticket % capacity).
  3. Set turn = turn + 1 to inform the writers we are done reading.

References:

Testing

Testing concurrency algorithms is hard. I'm using two approaches to test the implementation:

  • A single threaded test that the functionality works as intended, including that the element constructor and destructor is invoked correctly.
  • A multithreaded fuzz test that all elements are enqueued and dequeued correctly under heavy contention.

TODO

  • Add allocator supports so that the queue could be used with huge pages and shared memory
  • Add benchmarks and compare to boost::lockfree::queue and others
  • Use C++20 concepts instead of static_assert if available
  • Use std::hardware_destructive_interference_size if available
  • Add API for zero-copy deqeue and batch dequeue operations
  • Add [[nodiscard]] attributes

About

This project was created by Erik Rigtorp <[email protected]>.

About

A bounded multi-producer multi-consumer concurrent queue written in C++11

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published