-
Notifications
You must be signed in to change notification settings - Fork 2
/
Brokers.tex
132 lines (95 loc) · 5.45 KB
/
Brokers.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
\section{Network I/O with Brokers}
\label{broker}
When communicating to other services in the network, sometimes low-level socket I/O is inevitable.
For this reason, \lib provides \emph{brokers}.
A broker is an event-based actor running in the middleman that multiplexes socket I/O.
It can maintain any number of acceptors and connections.
Since the broker runs in the middleman, implementations should be careful to consume as little time as possible in message handlers.
Brokers should outsource any considerable amount of work by spawning new actors or maintaining worker actors.
\subsection{Spawning Brokers}
Brokers are implemented as functions and are spawned by calling on of the three following member functions of the middleman.
\begin{lstlisting}
template <spawn_options Os = no_spawn_options,
class F = std::function<void(broker*)>, class... Ts>
typename infer_handle_from_fun<F>::type
spawn_broker(F fun, Ts&&... xs);
template <spawn_options Os = no_spawn_options,
class F = std::function<void(broker*)>, class... Ts>
expected<typename infer_handle_from_fun<F>::type>
spawn_client(F fun, const std::string& host, uint16_t port, Ts&&... xs);
template <spawn_options Os = no_spawn_options,
class F = std::function<void(broker*)>, class... Ts>
expected<typename infer_handle_from_fun<F>::type>
spawn_server(F fun, uint16_t port, Ts&&... xs);
\end{lstlisting}
The function \lstinline^spawn_broker^ simply spawns a broker. The convenience function \lstinline^spawn_client^ tries to connect to given host and port and returns a broker managing this connection on success. Finally, \lstinline^spawn_server^ opens a local port and spawns a broker managing it on success.
\subsection{Class \texttt{broker}}
\label{broker-class}
\begin{lstlisting}
void configure_read(connection_handle hdl, receive_policy::config config)
\end{lstlisting}
Modifies the receive policy for the connection identified by \lstinline^hdl^. This will cause the middleman to enqueue the next \lstinline^new_data_msg^ according to the given \lstinline^config^ created by \lstinline^receive_policy::exactly(x)^, \lstinline^receive_policy::at_most(x)^, or \lstinline^receive_policy::at_least(x)^ (with \lstinline^x^ denoting the number of bytes).
\begin{lstlisting}
void write(connection_handle hdl, size_t num_bytes, const void* buf)
\end{lstlisting}
Writes data to the output buffer.
\begin{lstlisting}
void flush(connection_handle hdl)
\end{lstlisting}
Sends the data from the output buffer.
\clearpage
\begin{lstlisting}
template <class F, class... Ts>
actor fork(F fun, connection_handle hdl, Ts&&... xs)
\end{lstlisting}
Spawns a new broker that takes ownership of given connection.
\begin{lstlisting}
size_t num_connections()
\end{lstlisting}
Returns the number of open connections.
\begin{lstlisting}
void close(connection_handle hdl)
void close(accept_handle hdl)
\end{lstlisting}
Closes a connection or port.
\subsection{Broker-related Message Types}
Brokers receive system messages directly from the middleman for connection and acceptor events.
\textbf{Note:} brokers are \emph{required} to handle these messages immediately regardless of their current state. Not handling the system messages from the broker results in loss of data, because system messages are \emph{not} delivered through the mailbox and thus cannot be skipped.
\begin{lstlisting}
struct new_connection_msg {
accept_handle source;
connection_handle handle;
};
\end{lstlisting}
Indicates that \lstinline^source^ accepted a new (TCP) connection identified by \lstinline^handle^.
\begin{lstlisting}
struct new_data_msg {
connection_handle handle;
std::vector<char> buf;
};
\end{lstlisting}
Contains raw bytes received from \lstinline^handle^. The amount of data received per event is controlled with \lstinline^configure_read^ (see \ref{broker-class}).
It is worth mentioning that the buffer is re-used whenever possible.
\begin{lstlisting}
struct connection_closed_msg {
connection_handle handle;
};
struct acceptor_closed_msg {
accept_handle handle;
};
\end{lstlisting}
A \lstinline^connection_closed_msg^ or \lstinline^acceptor_closed_msg^ informs the broker that one of it handles is no longer valid.
\begin{lstlisting}
struct connection_passivated_msg {
connection_handle handle;
};
struct acceptor_passivated_msg {
accept_handle handle;
};
\end{lstlisting}
A \lstinline^connection_passivated_msg^ or \lstinline^acceptor_passivated_msg^ informs the broker that one of it handles entered passive mode and no longer accepts new data or connections \see{trigger}.
\subsection{Manually Triggering Events \experimental}
\label{trigger}
Brokers receive new events as \lstinline^new_connection_msg^ and \lstinline^new_data_msg^ as soon and as often as they occur, per default. This means a fast peer can overwhelm a broker by sending it data faster than the broker can process it. In particular if the broker outsources work items to other actors, because work items can accumulate in the mailboxes of the workers.
Calling \lstinline^self->trigger(x, y)^, where \lstinline^x^ is a connection or acceptor handle and \lstinline^y^ is a positive integer, allows brokers to halt activities after \lstinline^y^ additional events. Once a connection or acceptor stops accepting new data or connections, the broker receives a \lstinline^connection_passivated_msg^ or \lstinline^acceptor_passivated_msg^.
Brokers can stop activities unconditionally with \lstinline^self->halt(x)^ and resume activities unconditionally with \lstinline^self->trigger(x)^.