Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol draft - for discussion only #4

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
117 changes: 116 additions & 1 deletion NETWORK_PROTOCOL.md
Original file line number Diff line number Diff line change
@@ -1 +1,116 @@
Placeholder for network protocol.
Status: this is a rough draft, intended to get some ideas out there.

Missing things which might be added:
- How to run on top of HTTP/2 (but see #6).
- Possibly the structure or semantics of publisher name strings.
- Need to choose serialization method (protobuf, msgpack, other?)
- Support for resuming a broken connection without losing subscription state

Possibly not needed and could be be removed: extension support.

## Transport assumptions

A supporting transport must be similar to a streaming socket:

1. Bidirectional and full-duplex
2. An octet stream, i.e. all octet values may be sent unencoded
3. Ordered delivery: an implementation may map protocol messages to some features of the underlying transport (e.g. [Ømq](http://zeromq.org/) messages), but the messages must arrive in the same order as they were sent

Definitely supported transports include TCP, TLS (over TCP), WebSockets, and most socket-like objects (e.g. pipes). HTTP/2 will be supported, but may require a dedicated specification for implementing this protocol.

## Message framing

An existing serialization format should be used. Current candidates are [Protocol Buffers](https://github.com/google/protobuf/) (which is slightly less space efficient), [MessagePack](http://msgpack.org/) (whose Scala implementation may not be as good as the others), and possibly [Thrift](https://thrift.apache.org/) (with which I'm less familiar).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is requiring a specific serialization format the appropriate solution? It would definitely simplify things from a spec and implementation perspective, but is it okay for broad adoption?

Does it impede usage in environments such as Node.js?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protobuf, msgpack and Thrift all have implementations for Javasript / Node.js.

If we don't specify a serialization format, I think there will be a lot of confusion and incompatibility out there. My goal in writing this wasn't to specify a meta-protocol that users then have to further adapt to their needs, but to specify a complete network protocol such that if someone advertises RS.io on TCP port 12345, you can pick any client implementation and expect it to work.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you decide to mandate one, please pick one that has the potential to be decoded with zero copying. Without prejudice, other options are Cap'n Proto, Simple Binary Encoding.. Just ask Todd :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no special experience or expertise with serialization formats, and no special preference for any particular one. I picked these three for being well-known, compact, and very widely- (and I hope well-) implemented.

I've never used Cap'n Proto or SBE before. Looking at them just now, SBE only has implementations for C++, Java and C# - not even JS, which would be a real problem for this specification to mandate. Cap'n Proto is better in that regard, but still nowhere near the number of implementations of msgpack or protobuf.

However, CP is also much more complex, precisely because it's a direct representation of usable memory layout and so deals with pointers and alignment and manual packing and so on. I could write a protobuf encoder in a couple of hours, but CP is a whole other story. This also makes me worry about its space efficiency - CP recommends using compression. So maybe CP has advantages, but it's sufficiently complex that the tradeoff isn't obvious to me, FWIW.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compression should definitely be an option, and I wouldn't want to foreclose the possibility of implementing hot observables over reliable multicast either.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tmontgomery I suggest the protocol should only specify a particular serialization format for the framing and protocol messages. And that format should be simple enough, or the part of it used should be small enough, that it would be easy to implement from scratch if needed for some reason as a hardcoded part of an RS.io implementation. Then the implementation or its user could use an unrelated serialization format for the message content.

If we simplify this spec a bit, e.g. say the publisher name is a byte array to avoid getting into specifying string semantics, then the only types used are ints and byte arrays. Even with some varint representation, that's small and simple enough that I think we shouldn't stress as much as we are over the selection of the serialization format.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danarmak that makes sense to me. varints have me a little concerned since they add branching checks as well as "elongate" types passed offset boundaries. i.e. they don't match up on nice word boundaries and slower to handle.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to use int8/16/32/64 and pay for greater per-frame overhead? Transport compression would negate some of the cost, since the extra bytes are zeros.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better for the CPU and striding usually, yes. So, more efficient.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will word boundaries be a significant problem? The frame header sizes probably won't be word multiples unless we pad.


The serialization format should be fast and space-efficient. It does not need to be self-describing, since the message types and their structures are fully specified in the protocol. It needs to have the types boolean, byte, string / byte array (length-prefixed), and varint (an integer encoded using 1 or more bytes depending on its value).

The type alias `Id` used below is a varint which serves to uniquely identify something.

The full complexity of these formats may not be needed today, but future protocol extensions might benefit. Also, an implementation might encode the published elements using the same format and decode both the framing and the messages using the same parser.

Each message (frame) begins with a message type, which is a single byte, followed by its contents. Messages are self-delimiting, because their structure is known from their type, and all fields are either of fixed size, self-delimited varints, or length-prefixed strings or byte arrays.

## Protocol negotiation

The protocol is versioned and supports future extensions. The client (i.e. the side that opened the connection) and the server do a loose handshake:

--> clientHello(version: byte, extensions: Array[Id])
<-- serverHello(version: byte, extensions: Array[Id])

An `Id`, as noted above, is a varint. An Array is length-prefixed by a varint.

This is a 'loose' handshake because the server doesn't have to wait for the `clientHello` before sending its `serverHello`.

### The protocol version

The protocol version is currently version 0. If either side receives a hello message with a version it doesn't support, it MUST send a `goodbye` message (defined below) and close the connection. When future versions of the protocol introduce incompatible changes and increment the version number, transports SHOULD indicate the incompatibility when suitable, e.g. by changing the HTTP Content-Type or TCP port number).

The client can optimistically send more messages after the `clientHello` without waiting for the `serverHello`. If it eventually receieves a `serverHello` with a different protocol version, it must consider that its messages were discarded. Future protocol versions will not be backward-compatible with version 0, in the sense that if a server multiple versions (e.g. both version 0 and some future version 1), it must wait for the `clientHello` and then send a `serverHello` with a version number matching the client's.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The negotiation looks fine. It might be good to mention that the union of extensions is what is chosen. I.e. both ends must support it and agree to use it. Also, ordering of extensions might be necessary to specify. Just some wording to be clear.

It might be good to think of most operation as extensions. Such as serialization, compression, encryption, etc. Might be cleaner way to specify these changing needs. If so, we might just borrow some HTTP semantics here. Lots of good stuff that can be leveraged.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed an update that clarifies extension negotiation.

What is chosen is not the union but the intersection of extensions. I suspect this is what you mean too, since I don't see how the union could work; it would include extensions not supported by one of the two parties.

### Protocol extensions

Extensions allow for the protocol to be extended in the future in backward-compatible ways, without changing the protocol version.

1. The set of extensions in use, or available for use (for extensions that define optional behaviors), is the intersection of the extensions listed in both `hello` messages.
2. Extensions MAY define new message types with new semantics. The client MUST NOT send messages of a new message type defined in an extension until it receives the `ServerHello` and confirms that the server supports the extension.
3. Extensions MAY change the semantics of existing message types (e.g. to add transparent compression to payloads). Such modified behavior MUST be negotiated by one of the parties sending, and the other acknowledging, a message (defined by the extension being discussed) that declares the new behavior as active. A party supporting such an extension SHOULD NOT send messages whose semantics are modified by it before this secondary negotiation is completed, due to potential for confusion as to whether or not the modified semantics are in effect.

## The Reactive Streams core protocol

The basic RS signalling is:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list of signals looks good to me. I'm going to go play with a basic implementation to try use cases, but it's what I'd expect.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks relatively complete at first glance. It also gives us some good data points for how we might need to lay this out. Id is a common element.

What is the need for varint? Could it be bounded to 64-bits?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tmontgomery The biggest varint values used are message size prefixes, and they can definitely be bounded to 64bit or less. I used the varints only in an attempt to save space. The greatest need to reduce frame overhead is when there are many small frames, and that is also when the varints have the smallest values, fitting in one or two bytes. I had in mind the usecase of a stream of integers.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

varints can be a little slow to handle on modern CPUs compared to static fields, that is. For a stream of ints, you are absolutely correct. varints are a good tradeoff of space/CPU cycles to parse. But what about limiting varint to those cases only? For framing and control, it might be better to consider the sizes needed and make them static if we can. Not sure we can, but worth a shot.


--> subscribe(publisher: String, subscriber: Id, initialDemand: Long = 0)
--> request(subscriber: Id, demand: Long)
--> cancel(subscriber: Id)
<-- onSubscribe(subscriber: Id, elementSize: varint = 0) // For elementSize != 0, see the next section
<-- onNext(subscriber: Id, element: bytes)
<-- onComplete(subscriber: Id)
<-- onError(subscriber: Id, error: String)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are you envisioning these signals being encoded over the wire? I'm assuming binary encoding would represent each of these signals as a byte rather than string of text? Is this the varint you reference in the message frame above?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These message definitions can be translated into formal definitions once we choose a serialization format. But like I said above, each message looks like a message type + payload, and the message type is a varint. The varint idea is from protobuf; msgpack has a slightly different solution, but in both cases small (7-bit) integers take 1 byte, bigger integers take 2 bytes, 3, and so on. So the message type code would take 1 byte.

The protocol is fully bidirectional; either party can act in the `-->` direction. The semantics for ordering and asynchronous delivery are the same as in the Reactive Streams specification.

Unlike in RS, there is no separate Subscription object; the subscriber Id identifies the recipient in all messages going <-- this way. This id is generated by the subscriber and sent in the `subscribe` message.

The publisher String needs to be parsed by the recipient; it is not described by this specification. [Could be added?]

The field `onSubscribe.elementSize`, if nonzero, indicates the fixed size of the elements that will be published in this stream. In fixed-size mode, the `onNext.element` field is not length-prefixed. This saves space when the messages are very small, such as individual ints.

After a subscription is closed, its Id can be reused, to prevent Ids from growing without limit. The subscriber MAY reuse an Id in a `subscribe` message after it has sent `cancel` or received `onComplete` or `onError` for that Id. If it does so, it MUST guarantee that the publisher will not receive messages meant for the previous subscription with that Id after it receives the second `subscribe` message.

## Packed messaging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice. I hadn't considered this.


In typical use, the most common messages by far are `onNext`. The overhead per message is typically 1 byte (message code) + 1-2 bytes (subscriber id) + 1-3 bytes (payload length) = 3-6 bytes total. When the message type is very small (e.g. an int), the overhead can be 100% or more.

To reduce the overhead, the publisher can optionally declare that all stream elements will have a fixed size by setting the `onSubscribe.elementSize` field to a value greater than zero:

<-- onSubscribe(subscriber: Id, elementSize: varint)

The publisher can then send not just `onNext` messages but also `onNextPacked` messages:

<-- onNextPacked(subscriber: Id, count: varint, messages: count * elementSize bytes)

Packing does not help if new data becomes available very frequently and must not be delayed before being sent. A typical example is a ticker source. It also can't be done if the client doesn't provide enough demand.

## Split messaging

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might consider using the terms "fragmentation" and "reassembly".

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max size of a "fragment" will need to be defined. I would suggest no larger than 64K minus some overhead. That way it can be kept to a single 2 byte length.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense: then the overhead from not using varints won't be so large. Unfragmented messages will also need to be limted to 64K.


A very large `onNext` message might significantly delay messages from other streams. Therefore, large stream elements can be optionally split across multiple messages. Publishers MAY split elements; subscribers MUST support this.

When an element is split, the publisher will send one or more `onNextPart` messages, followed by a single `onNextLastPart`:

<-- onNextPart(subscriber: Id, element: Id, data: bytes)
<-- onNextLastPart(subscriber: Id, element: Id, data: bytes)

`element` is an Id assigned by the Publisher; messages with the same `element` value, in the same stream, will be joined by the Subscriber. The order of the parts is that in which they were sent and received (the transport is required to provide ordered delivery).

The subscriber's driver will typically join the parts transparently and deliver a single message to the application.

## Closing the connection

When the underlying transport is closed, both sides should release all related resources. This protocol version does not specify reusing previously negotiated state after reconnecting.

The orderly way of closing a connection is to send a `goodbye` message, wait for acknowledgement and then close the underlying connection:

--> goodbye(reason: String)
<-- goodbyeAck(message: String)

Sending `goodbye` implicitly closes all open streams, equivalently to receiving `cancel` or `onError` messages.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to use ACK for the acknowledgement instead. That way it is differentiated from the goodbye.