Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive Streams Signal Processor #5

Closed
jbrisbin opened this issue Mar 11, 2015 · 27 comments
Closed

Reactive Streams Signal Processor #5

jbrisbin opened this issue Mar 11, 2015 · 27 comments

Comments

@jbrisbin
Copy link
Contributor

Reactor 2.0 will introduce a new implementation of Reactive Streams signal dispatching that uses the Reactive Streams Processor to schedule work to be done in form of sending an RS signal to a Subscriber asynchronously.

It's easy enough to simply invoke one of the Subscriber methods directly in the calling thread, but to do "proper" (according to the spec) asynchronous signal publication efficiently we should provide a Processor<Signal> (a Signal consisting of a type, a payload, and the Subscriber) which allows publishers to send signals to subscribers asynchronously, safely, and in an efficient (and ordered) fashion.

We call these components "dispatchers" in Reactor and they are key to the efficient publication of signals. I think we want the same capability here. Whether that publication involves jumping from one thread to another would be determined by the implementation (e.g. with a Disruptor RingBuffer).

Note that this is, IMO, not the same thing as intentionally "scheduling" work to be done on another thread based on foreknowledge of the work being done (like submitting blocking IO to a work queue). We could add a delay to the publication of a signal, which we would need at the minimum to provide abstract timeout capabilities (i.e. not tied to a particular action, like a write). Being able to schedule signals in the future is simply an extension of being able to schedule signals at all. This would be similar to the event loop functionality of "next tick" or "trampolining" signals, which is important when dispatching signals recursively.

@benjchristensen
Copy link
Contributor

I don't fully understand the use cases here so can you please elaborate? What does the Reactive-IPC core need to do with this that the transport implementations wouldn't take care of. For example, if Netty is being used as the transport impl then it has its own eventloops and schedules work as needed and will perform callbacks that we would then model as Reactive Streams.

@jbrisbin
Copy link
Contributor Author

If we don't have a way to dispatch signals to subscribers, then we can't create any reusable abstractions in core. I was trying to implement a simple BoundedSubscription abstraction that takes a capacity value and a Subscriber that could be used generically in a number of places. If I don't have a way to dispatch signals onto the Subscriber, then I have to simply invoke the onXxx methods directly, which could cause issues with recursion if using a small capacity and repeatedly calling the Subscription.request() method.

If I have a single Subscriber as a handler, but I have multiple connections using that shared Subscriber, then I cannot dispatch signals from all those connections onto the Subscriber without providing some kind of ordering per the spec. It's unrealistic to think that synchronizing on the Subscriber from a scoped anonymous inner class or putting synchronized on the Subscriber.onNext(T) method is a solution to not sending concurrent onNext signals.

The RingBuffer thread is not a Netty thread anyway, so no matter how we slice it, IMO we can't push this functionality off onto the implementation and ignore the need to send signals to subscribers in an efficient and orderly fashion. Without this concept in the core, any code that touches a Subscriber would have to resort to direct invocation of the onXxx methods. This wouldn't work in the case of a write in a Subscriber.onNext invocation running in the scope of a RingBuffer thread. It would have to be properly dispatched back onto Netty's EventLoop for that ChannelContext. Without the concept of dispatching signals, no generic abstraction would know how to do that.

@benjchristensen
Copy link
Contributor

which could cause issues with recursion

Under what circumstances is this the concern of this layer of code? Why is this not just the transport layer worrying about this?

The only purpose of what we're building is reactive APIs over IO, and IO implementations will always have their own approaches to scheduling (Netty uses eventloops, Servlet 3.1 uses native threads), and they will all do their callbacks as they wish. Where and why do we introduce further dispatching or scheduling?

It's unrealistic to think that synchronizing on the Subscriber from a scoped anonymous inner class or putting synchronized on the Subscriber.onNext(T) method is a solution to not sending concurrent onNext signals.

Why would this ever be happening? If I perform IO on Netty it will always come back synchronously. Nothing at this level would merge streams.

The RingBuffer thread is not a Netty thread anyway,

What does this have to do with IO?

This wouldn't work in the case of a write in a Subscriber.onNext invocation running in the scope of a RingBuffer thread

Why would it be in the RingBuffer thread? And even then I still see it working. For example, Netty would correctly schedule it onto the right event loop if you weren't already on that event loop.

Please walk me through the use case you are considering, as I'm not understanding it.

@jbrisbin
Copy link
Contributor Author

Where

In core.

and why do we introduce further dispatching or scheduling?

Because I can't reuse almost anything we have in Reactor without the concept of dispatching.

But as @smaldini has already suggested to me, we might just do this in Reactor and not rely on the ripc-core for any foundational abstractions.

Nothing at this level would merge streams.

Why not?

@smaldini
Copy link
Contributor

Amend me if I'm wrong @jbrisbin. I think what is being suggested is some sort of base abstraction around dispatching signals, the base machinery to get RS working (which is trivial for publisher, but not for processors, subscribers and subscription).
Let's assume we minimize and replicate if required these behaviors (NPE check, request tracking, recursion protection...) to satisfy the TCK with the proposed Publisher/Subscribers we will have here.

The bonus would be to deal with a rather important constraint: a reactive stream off any of our IO impl (like Netty) will often need to jump to another thread as fast as it can, to avoid blocking things like say the event loop in Netty.
But the async dispatching is a decision for the user in the end with the help of:

  • the composition layer such as rxjava + RS bridge
  • the ad-hoc tools such as the coming standalone Async Processors from reactor-core.
  • a combination of the 2 previous listed elements (a sync subject connected to a reactor processor connected to an ripc input).

@smaldini
Copy link
Contributor

In theory the difficulty will come with the Output abstraction (server reply or client request). We need to listen for a publisher and so create our own RIPC Subscriber to this effect. @jbrisbin mentioned merging because in our HTTP impl for instance, we apply backpressure across multi matching URL endpoint and use Streams.merge() to make sure we manage the write in a single fan in endpoint before flushing. I think we can live /wo that for now.

@NiteshKant
Copy link
Contributor

I think we are missing a point here (I think @smaldini was referring to this, but I will still put it across): None of the applications would be using reactive-ipc-nettyX module directly as dealing with RS interfaces directly is like dealing with callbacks. They would be using a stream abstraction like RxJava, Reactor Streams, etc. So, IMHO, we should not reinvent what either a networking library (netty) or a stream implementation (RxJava) already provides us.

In this context, dispatching semantics overlap with eventloops, jumping threads overlaps with subscribeOn/observeOn semantics in RxJava.

I do not see a reason when I would use a custom dispatcher directly with netty's eventloops. The usecase about switching threads for any blocking operations, idiomatically should leverage RxJava subscribeOn/observeOn semantics. I do not think we should intend to change that idiomatic way. What would really make sense is having a dispatcher based Scheduler for RxJava which can be used with subscribeOn/observeOn. I do not think this is the correct repo to provide such an implementation.

I think what is being suggested is some sort of base abstraction around dispatching signals, the base machinery to get RS working (which is trivial for publisher, but not for processors, subscribers and subscription).

I actually am not able to understand the non-trivial nature of subscriber/subscription implementation. I see a few points mentioned here:

  • Adhering to RS contract of sequential notifications to the Subscriber: I think we should have all notifications from netty's ChannelHandler which guarantees sequential notifications.
  • Recursion for request(n) -> onNext() -> request(n) loop: IMO this would be a deep stack if the data is buffered. Otherwise, we would not have a deep stack as the onNext() will be invoked from within the eventloop (when data is available) and not in the call stack of request(n)
  • Semantic check for negative requested count, Null item, etc: This isn't a big deal I guess.
  • Request tracking: Not sure if I understand this.

RS processors are Subjects and I do not think we should have Subject anywhere in this implementation as it breaks backpressure.

@jbrisbin
Copy link
Contributor Author

I'm not suggesting we reinvent anything; this is a codification of existing functionality and not the introduction of a new concept.

Any generic abstraction that lives in ripc-core has no foreknowledge of the tasks being run inside a delegate Subscriber that it is supposed to send signals to and shouldn't rely on synchronous, recursive, direct method invocation. That component cannot trampoline signals without some abstraction that understands that concept. If the only component which can dispatch signals in a way that adheres to the RS spec resides in the Netty implementation module, then that precludes providing abstractions in core that do some of the heavy lifting that's common to any "real" RS component.

At the user level this is for the purpose of intentionally switching threads based on the developer's foreknowledge of the processing pipeline being created. Generic components have no such foreknowledge but still have a need to send signals to a Subscriber in a way that isn't assuming synchronous invocation.

In practical terms, my local experiments show this has meant only two abstractions are required: a Signal class and a SignalProcessor interface (which is exposed to generic components via passing a Subscriber<Signal> through a CTOR).

@jbrisbin
Copy link
Contributor Author

As an example of the importance of this concept of dispatching signals, consider that even this most basic of sketches violates the spec because it provides no protection against concurrent onNext signals.

@smaldini
Copy link
Contributor

@NiteshKant just a quick amend, Processors are not entirely == Subjects. As soon as you subscribe it to your publisher it has to fulfill the demand contract, I'm fighting with that currently on one of our scheduler to make sure that is respected but anyway yes its not a target for here, we will leave the IO thread ownership unless additional layer make sure that is easy to change.

There are 44 tests at leasts to pass plus 22 rules untested and 30% of them are really sometimes hard to implements (talking about IdentityProcessorTest from RS TCK).

@jbrisbin
Copy link
Contributor Author

Just to clarify: this isn't about providing a way to intentionally switch threads. This is entirely about publishing RS signals to subscribers from generic components in an orderly fashion in a way that doesn't violate the spec.

I point to my example above. The Subscriber is a singleton that is passed in from the test code. Relying on synchronous invocation means I am knowingly violating the spec.

@jbrisbin
Copy link
Contributor Author

This point about ZeroMQ that was just brought up on reactive-streams-io issue 1 IMO serves to illustrate that abstractions other than those directly dependent on Netty will need to do async message passing. ZeroMQ has no facility like Netty to schedule tasks asynchronously. It makes sense that the core interactions of Reactive IPC be abstracted into a module not dependent on Netty which could also be used from a ZeroMQ implementation.

@benjchristensen
Copy link
Contributor

will often need to jump to another thread as fast as it can, to avoid blocking things like say the event loop in Netty.

Why? If computation is being done like 'map' then switching threads is unnecessary and actually worse. Computation moves only as fast as the available CPUs.

If there are 8 cores and 8 event loops, moving work to another thread is not speeding anything up.

I agree with @smaldini on the following:

the async dispatching is a decision for the user in the end with the help of ...

The IO layer should do the simplest posssible thing – call back via a Publisher on whatever thread the transport layer wishes.

we apply backpressure across multi matching URL endpoint and use Streams.merge() to make sure we manage the write in a single fan in endpoint before flushing. I think we can live /wo that for now.

Can you please provide more information on this? I don't understand why streams need to be merged. I have only ever seen the need to merge streams as a user defined semantic for application behavior.

we should not reinvent what either a networking library (netty) or a stream implementation (RxJava) already provides us.

I agree with @NiteshKant on this point.

As an example of the importance of this concept of dispatching signals, consider that even this most basic of sketches violates the spec because it provides no protection against concurrent onNext signals.

Jon, I don't understand the use case you're trying to solve. Netty will not give me concurrent onNext for a single channel. A Publisher would only ever emit a stream of events sequentially from the transport layer.

@benjchristensen
Copy link
Contributor

By the way, this was said earlier:

Because I can't reuse almost anything we have in [...] without the concept of dispatching.

That is not a reason for us to make or retain a design decision.

@jbrisbin
Copy link
Contributor Author

Jon, I don't understand the use case you're trying to solve. Netty will not give me concurrent onNext for a single channel. A Publisher would only ever emit a stream of events sequentially from the transport layer.

@benjchristensen this isn't about a single channel, but funneling events from multiple channels into a single Subscriber, which would logically be the case in many circumstances where singletons would be provided to the transport layer as a callback.

Because I can't reuse almost anything we have in [...] without the concept of dispatching.
That is not a reason for us to make or retain a design decision.

Fair enough. That's why I put forward other reasons for suggesting this functionality live in core.

I remain unconvinced that building a kernel that calls itself Reactive can do any meaningful work without providing for asynchronous message passing.

@smaldini
Copy link
Contributor

@benjchristensen

will often need to jump to another thread as fast as it can, to avoid blocking things like say the event loop in Netty.

I wanted to say that its not always the case rather than often maybe. E.G we have customers using ingesting box with more than 16 CPU and 4 on long-living connections (corresponding to 4 feeding clients). These clients must avoid blocking as much as they can and processing in this case is enough non trivial to be more expansive than placing an event in a queue (actually a ring buffer). The other usual use case I've seen is just acknowledgement where data is moved as fast as possible to a queue, possibly durable/ha, and keep reading (so not filling the event loop). Norman gives that advise too in general.

About the HTTP merge, in reactor you can register multiple http endpoints matching a single request using wildcards or group captures. You can also group/route request by IP, HTTP method etc. These will coordinate to write to a single connection back. The model we use is that a connection is bound to a Stream which is a single thread. Each of these http routes may run on different threads at some point (async callbacks) and we ensure these will write back through the thread assigned to the Stream which maps the user netty channel. We ensure ordered processing at least (order of declaration) for each route so you can build interceptors if you want.

@NiteshKant
Copy link
Contributor

@smaldini @jbrisbin @benjchristensen I have added a comment in the goals & motivation issue to discuss what should belong in reactive-ipc-core and how much of an overarching abstraction across network libraries should we create. I would like to discuss that first before we start discussing the need of a buffer or a dispatcher.

@NiteshKant
Copy link
Contributor

this isn't about a single channel, but funneling events from multiple channels into a single Subscriber, which would logically be the case in many circumstances

I actually disagree that this would be the case in many circumstances of concern at the network layer. As @benjchristensen pointed, it is mostly the case in the user application layer where someone would merge streams from various channels.

@NiteshKant
Copy link
Contributor

I remain unconvinced that building a kernel that calls itself Reactive can do any meaningful work without providing for asynchronous message passing.

I think this is where the clarification around what that kernel contains is important, which is good to discuss under goals & motivations

@jbrisbin
Copy link
Contributor Author

I actually disagree that this would be the case in many circumstances of concern at the network layer. As @benjchristensen pointed, it is mostly the case in the user application layer where someone would merge streams from various channels.

@NiteshKant I can't see any other safe but efficient way to send RS signals to the composition library than by providing a facility for asynchronous signal (message) passing.

UPDATE: Maybe having two conversations isn't a good idea. I have no idea on which issue this paragraph belongs:

The entire premise of a Reactive system is that actions that happen in one component are isolated from the others via asynchronous message passing. How can we accomplish this isolation without dispatching signals? If we must dispatch signals at the transport implementation layer, then how is that accomplished? If the only way to dispatch asynchronous RS signals is by using an abstraction that has a hard reference to Netty's EventLoops, then what happens when we want to do the same thing for ZeroMQ or XNIO?

@benjchristensen
Copy link
Contributor

I can't see any other safe but efficient way to send RS signals to the composition library than by providing a facility for asynchronous signal (message) passing.

The entire premise of a Reactive system is that actions that happen in one component are isolated from the others via asynchronous message passing.

This seems at the crux of this debate. Being "reactive" does not mean every onNext must go through an async dispatch.

Being reactive means something is pushed to us so we react to it. Period. This is the dual of being interactive, where we pull (Iterable).

If a user wishes for something to be async, then the composition layer offers them tools to make something async. If they want to move across threads, they can do so. If they want to join or merge across multiple threads, they can.

Here are examples ...

Most basic form of an IO event being pushed:

incomingTcpStream.subscribe(t -> {
    // receive `t` on whatever thread the transport IO invoked `onNext`
});

In this case there is no further need for dispatch or scheduling.

This next case merges 2 streams:

incomingTcpStreamA = ...
incomingTcpStreamB = ...

combinedStream = merge(incomingTcpStreamA, incomingTcpStreamB).subscribe(t -> {
    // receive `t` on either thread A or B  (of course sequentially as per RS spec)
});

In this case it is a combinatorial operator that will merge the events being pushed at it. Since merging can have concurrency it is now the responsibility to ensure the Reactive Streams spec and ensure sequential emission. It is up to the merge implementation whether this is done via work-stealing, ring-buffers, etc. It could be done with enqueue/drain with separate threads so the consuming thread is always the same, or the producer threads could fight for the right to emit (and steal from each other) so each onNext may be done on a different thread (a or b) but the Reactive Stream spec guaranteed so that it is sequential and thread-safe.

Now if a user wants to move from one thread to another they can again choose to compose as follows:

incomingTcpStream
    .observeOn(someOtherThread)
    .subscribe(t -> {
        // receive `t` on someOtherThread as defined in observeOn rather than the IO thread
    });

Behaviors can be combined like this:

incomingTcpStreamA = ...
incomingTcpStreamB = ...

combinedStream = merge(incomingTcpStreamA, incomingTcpStreamB)
    .observeOn(someOtherThread)
    .subscribe(t -> {
        // receive `t` on someOtherThread as defined in observeOn rather than the IO thread
    });

Now we are merging IO events from potentially 2 threads and moving the output to a 3rd thread.

This is all reactive with functional composition for a user to take control of asynchrony and scheduling to meet their needs.

The IO layer just calls onNext. Async scheduling decisions are left to the user to compose.

If an application wants an event-bus then they should use an event-bus, such as Reactor. The IO layer should have as few opinions as possible and be as simple as possible and not make scheduling decisions on behalf of the user.

Note how in the examples above the IO responsibility did not include any of the dispatch or combinatorial use case. That all lives in the layer above that composes IO Reactive Streams exposes as Publisher. Thus, the "Reactive IPC" core and transport layers are exposing Publisher APIs. Users compose them and use dispatch and asynchrony as they desire.

@smaldini
Copy link
Contributor

I agree with that definition from @benjchristensen just noting that yes Reactive is not required to be async (besides reactive manifesto, onNext/onComplete still abstract the location/resource away), but at least offers for error isolation (onError).

We will find other mandatory abstract objects anyway but I propose to close that issue for now and defer async decision to end-user or end-implementation.

@jbrisbin
Copy link
Contributor Author

The IO layer just calls onNext. Async scheduling decisions are left to the user to compose.

This is the heart of the matter and where we'll have to agree to disagree.

This necessarily limits the kinds of abstractions we can provide as building blocks, but as long as that's intentional and we recognize that, then I don't have a problem coming down on the side of simplicity.

@benjchristensen
Copy link
Contributor

This necessarily limits the kinds of abstractions we can provide as building blocks

What use cases are we preventing? And how would making everything async not cause performance and complexity impact for the majority of cases?

@jbrisbin
Copy link
Contributor Author

I probably shouldn't have banged the async hammer so hard because it does seem I'm advocating for "enforced" asynchrony and that's certainly not the case. The key word I think was that only having synchronous invocation was limiting. I apologize for focusing too much on that aspect. We want to keep things fast and dispatch when we have to. But we've found with the RingBuffer you can do both efficiently.

As to what we're preventing: some things I can think of and some I don't even know what their needs are going to be. In effect, we seem to be bringing the scope of what an IO kernel actually is down to such a simplistic level that if you were to, say, choose a transport module but not a composition library, you'd have nothing useful that you didn't bring along with you to enrich the functionality of the kernel. Going back to vert.x, this model has proven to be quite useful and while there are some design decisions there I'm not too excited about, it's a system that does just what is needed: it allows non-blocking components to interact with the core (they do it over the event bus rather than through Reactive Streams) and provide the core needed functionality without forcing the verticle to work in isolation of all other verticles running in the same app.

So far it seems what we're actually building here is just a very inconsequential API conversion from Netty to Reactive Streams. The usefulness of Reactive Streams is not in the fact that it provides three standard interfaces that don't exist in the JDK (otherwise we'd just use the ones from there). Its usefulness is in the spec which lays out the Reactive pattern in specifics and provides the TCK to check it. It provides error and (optional) resource isolation and a whole host of other things that intermediate components could benefit from.

My idea of this IO kernel when we originally talked about it was much closer to what vert.x has, but made fully-Reactive and reusable: a core kernel that has unlimited potential for functionality because the core has extension modules plugged into it that are exposed exclusively via Publisher and Subscriber. These modules would be available to implementation code and others by sending events/signals to them so that things like error auditing could happen by providing a transport layer implementation a Subscriber they could push onNext(Throwable) signals to which could do any manner of other things.

So far we have proposed a very simplistic API conversion from Netty -> RIPC and then from RIPC -> RxJava. There's a lot that could happen in between those two but I don't see anything currently that would facilitate that.

@rstoyanchev
Copy link
Contributor

Looking at the Reactive Streams spec I see nothing to suggest that automatated dispatching is expected. On the contrary there is plenty to suggest the opposite that publishers and subscribers make deliberate choices about dispatching rather than have that choice made for them.

For example in 2.2 a subscriber is asynchronous if it suspects its processing would impact the producer, not by default. The example code further confirms this by showing both synchronous and asynchronous subscribers. If there is any doubt left, the section on Asynchronous vs Synchronous Processing clearly shows that async boundaries can and are expected to vary and that implies choice.

So I don't see anything inherently wrong with intentionally scheduling work based on the expected situation. In fact intuitively this makes sense. For example for the IO kernel we can fully expect runtimes to be non-blocking so we won't be exposed to synchronous open recursion through Subscription.request. I think we can agree on that much and I see no such issues when looking at ChannelInboundHandlerSubscription.

For the open recursion I think Jon was referring to ChannelInitializerSubscription which is for subscribers of NettyTcpServerConnection. Here Jon does have a point. However I don't actually agree with this part of the current code.

I see no reason for connections to be provided through a reactive streams Publisher and this is further supported by another of Jon's concerns about a singleton subscriber receiving illegal concurrent onNext signals for multiple incoming connections. Again why do we need this to be a Publisher?

Instead I think it would be sufficient to model that as some sort of a ProtocolHandler along the lines of:

public interface ProtocolHandler<I,O> {

    Publisher<Void> handle(Connection<I,O> connection);

}

Such a ProtocolHandler would indeed be a singleton (we only need one per protocol) and it would be mainly responsible for setting up protocol specific handling so it's non-blocking by definition. In RxNetty terms this is the ConnectionHandler.

@benjchristensen
Copy link
Contributor

it allows non-blocking components to interact with the core

@jbrisbin and I spoke on Friday about use cases. I have my notes and once I have some free time will document our conversation and the requirements.

The key point of our conversation was that we need the core to be extensible, and I completely agree, but as we discussed it should not require the core predefining how that happens (such as via an event bus, but permit extension and injection of behavior). More to come later...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants