Skip to content

Presentation of Reactor

Antony MECHIN edited this page Apr 4, 2017 · 2 revisions

Reactor, the Elle's asynchronous coroutine-based framework.

Motivation

C++ standard offers a native support for threads, offering access to parallelism with its pros and cons. However, the standard still lacks the support for non-preemptive multitasking. Coroutines, "a generalization of subroutines for non-preemptive multitasking", are one of the hot topics in the world of C++. Dozens of languages already benefits from a native support for coroutines (Go, Haskell, Lua, Ruby, etc.). In C++, Boost.Coroutine and Boost.Coroutine2, amongs with other few libraries, provide templates for generatilized subroutines for C++.

Elle's reactor is one of them, but also try to be more than a simple support. The reactor uses Boost.Context (or libcoroutine for Windows) to implement it's own coroutines and provides:

  • Synchronization mechanisms (mutexes, barriers, signals, etc.) designed for coroutines
  • Sockets (based on boost::asio) for different protocols
  • Support for HTTP requests
  • Finite state machines
  • Asynchronous filesystem
  • etc.

Structure of the reactor

The scheduler

The reactor's entry point is called the Scheduler (elle::reactor::Scheduler). The coroutines (elle::reactor::Thread, see couroutines) are managed by the scheduler, which is in charge of scheduling execution, managing locks, polling the boost::asio::io_service, etc.

Example

#include <elle/reactor/scheduler.hh>

int
main()
{
  elle::reactor::Scheduler s;
  s.run();
  return 0;
}

In this case, with no coroutines to manage, elle::reactor::Scheduler::run will return instantly. However, if the scheduler has coroutines to manage, run will block until all coroutines are over, or an exception escapes from one of them.

N.B.: The scheduler is not thread-safe (std::thread safe), however, it provides a thread-safe API, elle::reactor::Scheduler::mt_run. Consult the documentation for more details.

The couroutines

N.B.: I'll use thread to represent a system thread and Thread for an elle::reactor::Thread.

The coroutines in the reactor are named elle::reactor::Thread. A Thread body is a simple std::function with no special restrictions: a Thread can instantiate another Thread, throw exceptions, etc. The Threads can yield or wait on waitables (see Waitables, giving other Threads room to execute. You have to understand that blocking operations (like std::this_thread::sleep_for(2s);) will block the entire scheduler (coroutines are non-preemptive, they chose when to be stoped). Collaborative multi-tasking requires you to understand when you are going to yield or wait, how to synchronize coroutines, etc.

Example

#include <elle/reactor/scheduler.hh>
#include <elle/reactor/thread.hh>

int
main()
{
  elle::reactor::Scheduler s;                  // 1.
  int i = 0;
  elle::reactor::Thread t{                     // 2.
      s, "<name of the thread>",
      [&i]
      {
        assert(i == 1);                        // 5
        elle::reactor::Thread other_thread{    // 6
          // No need to specify the Scheduler.
          "do something",
          [&i]
          {
            i = 2;                             // 9.
          }};
        assert(i == 1);                        // 7.
        // Following line can be replaced by:
        // elle::reactor::Scheduler::scheduler()->current()->wait(other_thread);
        elle::reactor::wait(other_thread);     // 8.
        assert(i == 2);                        // 10.
        i = 3;                                 // 11.
      }};
  i = 1;                                       // 3
  s.run();                                     // 4.
  assert(i == 3);                              // 12.
  return 0;
}

What's going on in this example:

  1. Create the scheduler.
  2. Create a Thread, named t, managed by the scheduler. This Thread captures i by reference, whose value is 0.
  3. Before running the Scheduler, increment i by one.
  4. Run the Scheduler, which will run the Thread named t (currently the only Thread managed by the Scheduler).
  5. Assert i is still equal to 1.
  6. Create a new Thread, named other_thread, managed by the same Scheduler. (This Thread is still inactive.)
  7. Assert i is still equal to 1 (other_thread hasn't ran yet).
  8. Make current Thread t wait for other_thread to terminate (t goes sleeping).
  9. Run other_thread, which will set i to 2 (and terminate).
  10. After waiting other_thread wa can assert its body has been executed, i must be equal to 2.
  11. Set i to 4 and terminate.
  12. Because all Threads are over, run has returned and we can assert i is now equal to 3.

The waitables

The waitables (elle::reactor::Waitable) are objects Threads can wait for. Depending on their implementation, waitable can produce a wild variety of synchronization mechanisms. For example, Threads are waitables that will wakes waiters to wait until its termination.

The different waitables

  • Thread (elle::reactor::Thread): Threads waiting on a Thread will wait until the Thread terminates.
  • Signal (elle::reactor::Signal): Threads waiting on a signal will wait until Signal::signal or Signal::signal_one are called.
  • Barrier (elle::reactor::Signal): Threads waiting on a barrier will wait until the brrier is open, or just pass if the brrier is already opened.
  • Timer (elle::reactor::Timer): Threads waiting on a timer will wait until the timer's duration is elapsed.
  • Semaphore (elle::reactor::Semaphore): Threads trying to lock a semaphore will wait until a resource is available.
  • etc.

Example

tbd.

Networking

Sockets