-
Notifications
You must be signed in to change notification settings - Fork 532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TCK: Enable replay-like and delay-error like Processor impls #348
TCK: Enable replay-like and delay-error like Processor impls #348
Conversation
@@ -738,6 +774,20 @@ public void expectNextElement(ManualSubscriber<T> sub, T expected) throws Interr | |||
} | |||
} | |||
|
|||
public void expectAnyNextElement(ManualSubscriber<T> sub, Collection<T> expected) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't expected be a Set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set
extends Collection
, plus there is no Set.of
in pre Java-9 but there is Arrays.asList()
. For the particular test, there are 3 elements to expect at most so asList
is okay, for any other use, you can call it with a Set
if you want to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is an API I'd rather have the types be specific.
private final <T extends Comparable<T>> Set<T> toSet(T... ts) {
final Set<T> set;
if (ts.length == 0) set = Collections.emptySet();
else {
set = new HashSet<T>();
for(final T t : ts)
set.add(t);
}
return set;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, this is an API that should be as enabling as possible, that means using the most general type from the type hierarchy that has a contains
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akarnokd I disagree, the semantics of expected is that of a Set, which is why Set is the most general type with the correct semantics, instead of requiring a HashSet or a TreeSet etc.
@@ -973,6 +977,21 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru | |||
} // else, ok | |||
} | |||
|
|||
public boolean tryExpectCompletion(long timeoutMillis) throws InterruptedException { | |||
while (timeoutMillis-- > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be wrong as it assumes it'll take a millisecond per pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
} | ||
return false; | ||
} | ||
Thread.sleep(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this ISO poll(remaining)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this question. This routine uses peek because in case the next received element is not a terminal indicator, that element should remain available for any of the other next
calls that may want to verify the item.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akarnokd if you use abq.poll(remaining) instead of abq.peek() + abp.poll() you do not need to do the sleep(1)-dance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- There is no such method of
abq.poll(arg0)
: Javadoc - The purpose of the method is to consume a non-defined, non-null
Optional
and return true indicating the next event in line was a terminal event. If the next event is not a terminal event, don't consume it and return false and leave up to the test what to do with that situation. In other terms,poll()
is not good because it consumes the item unconditionally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ArrayBlockingQueue.html#poll(long,%20java.util.concurrent.TimeUnit)
2: Good point. Seems like the API is missing a peek(timeout, unit)!
@@ -738,6 +774,20 @@ public void expectNextElement(ManualSubscriber<T> sub, T expected) throws Interr | |||
} | |||
} | |||
|
|||
public void expectAnyNextElement(ManualSubscriber<T> sub, Set<T> expected) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
* indicating that {@code onError} may cut ahead and get emitted even if there are | ||
* {@code onNext} events ready for consumption (via {@code request()}) by the {@code Subscriber}. | ||
*/ | ||
public boolean strictEventOrdering() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm n ot super-convinced by the name. Isn't this more about eagerTermination
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or shortCircuitTermination
?
@reactive-streams/contributors Yay or nay? |
I think such non-identity processor could have separate standalone TCK class instead. |
This PR extends the TCK to support two features of
Processor
implementations:Replay all
Some
Processor
implementations may replay allonNext
items they received over their lifetime to the incomingSubscriber
. Therequired_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo
however expected that a lateSubscriber
receives only the very latestonNext
item and fail the test if any of the first two was observed. The enhancement allows observing(x, y, z)
or(y, z)
or(z)
.Note that indeed this change may let
Processor
implementations pass which were supposed to emit onlyz
. My opinion is that such implementation should already have unit tests verifying its emission pattern and the TCK should verify the implementation honors the RS protocol, even if it means getting 1-3onNext
s before the terminal event. If this is a problem, the change could be gated behind aIdentityProcessorVerification
overridable property method, similar to the next change.Emit all onNexts before an onError
Some
Processor
implementations may relay/replay all receivedonNext
s items before they relay/replay a receivedonError
event. This allows valid items to be processed before the sequence terminates with an error. Since this behavior may be specified explicitly on aProcessor
implementation, the TCK'sIdentityProcessorVerification
has been extended with astrictEventOrdering()
method (default false) that when true, the otherwise failingrequired_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError
first consumes the pending item and then verifies the error state.Since this change requires overriding a method, existing
Processor
implementations that eagerly callonError
should still pass or fail this test as before.ReplayProcessor
To demonstrate a
Processor
implementation that can exhibit both behaviors, a custom and package-privateReplayProcessor
implementation has been added along with its TCK tests for non-strict and strict mode. (Note that this particular processor cancels its upstream if all downstream Subscribers have cancelled before the upstream's terminal event. This behavior is uncommon as ReactiveX styleProcessor
s keep running and relay new events to laterSubscriber
s after the first set is gone.)Related: reactive-streams/reactive-streams-dotnet#30