Skip to content

Project Heracles

Wojtek Bednarzak edited this page Feb 8, 2018 · 2 revisions

Overview

Project Heracles is an overhaul of the Cerberus MapReduce Framework. It is build with redundancy and reliability in mind.

The main differences between Cerberus and Heracles is the overall architecture, concentrating on micro-services and simplicity

Objectives

  • Maintain API compatibility layer with cerberus
    • Jobs scheduled for Cerberus can be scheduled on Heracles
  • Maintain compatibility with libcerberus

There are no promises however to maintain the following compatibility:

  • Internal communication between individual components of Heracles and Cerberus

Architecture

    +------+                                     +------------+
    | User |------------------------------------>| Data Store |<-+
    +------+                                     +------------+  |
       |                                                |        |
       v                                                v        |
  +-----------+            +--------+              +--------+    |
+-| scheduler |+---------->| Broker |------------->| Worker |    |
| +-----------+|+          +--------+              +--------+    |
|  +-----------+|                                   |    |       |
|   +-----------+               +-------------------+    |       |
|                               v                   |    v       |
|                            +------+              +--------+    |
+----------------------------| etcd |------------->| Worker |----+
                             +------+              +--------+

Data flow

The life of a MapReduce begins when the user uploads the binary with libheracle/libcerberus embedded and input data to the Data Store, in there the data is distributed (outside the scope of the project). The user then uses the cli or dashboard to create a MapReduce job. Using gRPC the message is send to the schedulers, which splits the jobs into tasks, and send on those tasks to the broker which queues the data until the a heraclid (worker) comes in and takes a task. It marks its current task in etcd as well as its health data. It then picks takes the allocated input data and the binary from the data store. When done it sends the the address of the produced intermediate data to etcd. When all map tasks are complete, the produced intermediate data is scheduled to be processed by the reducers. When that is done, the data is send on to the file system as the final output.

Bold indicates components produced by us, italics are third party components. For all the components, adaptors will be created in a modular system to allow an exchange of the components in the future.

Bootstrapping

To bootstrap the cluster, data would be placed etcd concerning the location of the scheduler, and brokers. When the user tries to connect to the cluster it would pick a scheduler from the list in etcd and send it its job. The workers would also be given the location of the etcd to find the address of the broker and periodically send health messages so that the schedulers know how many workers are healthy.

Scheduler

The new master is a lot smaller than the old one. Since it no longer has to pick which workers to run jobs on, it can focus just on splitting the jobs into tasks and sending it off to the queue, meaning it can be just regarded as scheduler.

+-------------+      +----------------+      +-------------------+
| gRPC Server |----->| Authentication |      | Data Store Reader |
+-------------+      +----------------+      +-------------------+
       |                                               |
       v                                               v
+-------------+                              +-------------------+
|  Scheduler  |<-------------+-------------->|   Task Splitter   |
+-------------+              |               +-------------------+
       |                     |
       v                     v
+-------------+      +---------------+
| etcd interf |      | Broker interf |
+-------------+      +---------------+

When are request comes in, and being checked by the authentication system, it gets send to the scheduler which then requests the task splitter. The tasks splitter reads chunks from the file system, and creates tasks from them. The created tasks are then send to the broker service and saved in etcd. When a master picks up dead master job, it checks does it need to redo any tasks, or start a new set (reduce after a map). If a map task intermediate data is not available, the the old map task is read from etcd and rescheduled.

Heraclid (worker)

The new worker is designed to be multicore from the beginning. Once the task arrives the worker starts a new thread which runs the task. It reads the data outlined in the task and sends it to the binary via STDIN, and listens for results on STDOUT. Once the task is complete, it dumps the data to the disk, tells the broker its work is done, and writes the output file location to etcd. If the map data has to be served the intermediate data server handles the request.

                             +---------------+       +---------------+
                             | Broker interf |       | Interm server |
                             +---------------+       +---------------+
+---------------+                    |                       |
| Other Workers |-------_            |                       v
+---------------+         \_         |                    +------+
                            \_       v                    | disk |
+----------------+            \ +--------+--------------->+------+
| Shared Storage |------------->| Runner |
+----------------+              +--------+--------------->+-------------+
                                     ^                    | etcd interf |
                                     |                    +-------------+
                        +------------+-----------+
                        |            |           |
                        v            v           v
                   +--------+   +--------+   +--------+
                   | binary |   | binary |   | binary |
                   +--------+   +--------+   +--------+

Future Potential Improvements

  • Getting the scheduled job into a queue to be processed by any scheduler
  • Pre-chunking input data (cli prepare /input/directory)
  • Configuration files
  • Job Chaining
Clone this wiki locally