You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When using requestChannel(), one can subscribe to the Flowable returned. One can unsubscribe as well. If one subscribes again to this flowable, it does not work.
Expected Behavior
After re-subscribing, one will receive values again with the given subscriber. No warning will occur on console.
Actual Behavior
A warning is logged on console (RSocketClient: re-entrant call to request n before initial channel established.). No new values are received by the subscriber.
Steps to Reproduce
constmetadata= ...;constclient=newRSocketClient(...);constprocessor=newFlowableProcessor(sub=>{});constinputFlowable=processor.map(i=>{return{data: i,metadata: metadata}});client.connect().then(socket=>{varsubscription;constsubscriber={onNext: value=>console.log('Value from Responder',value.data),onSubscribe: sub=>{subscription=sub;sub.request(0x7fffffff)},};// responder will multiply the given value by 2constchannelFlowable=socket.requestChannel(inputFlowable);channelFlowable.subscribe(subscriber);setTimeout(()=>processor.onNext(1),1000);setTimeout(()=>processor.onNext(2),2000);setTimeout(()=>{console.log("unsubscribe");subscription.cancel();},3000);setTimeout(()=>{console.log("subscribe again");channelFlowable.subscribe(subscriber);},4000);setTimeout(()=>processor.onNext(3),5000);//output on console// Value from Responder 2// Value from Responder 4// Unsubscribe// subscribe again// RSocketClient: re-entrant call to request n before initial channel established.});
Workaround
Instead of re-subscribing call requestChannel() again
Possible Solution
Looks like the variables 'initialized' and 'payloadsSubscribed' in the requestChannel()-implementation of RSocketMachine are not reset.
Your Environment
RSocket version(s) used: 0.0.27
Happy new year!
The text was updated successfully, but these errors were encountered:
If I am interpreting the protocol documentation correctly, I believe that this is working as per protocol spec. The Protocol spec for Request Channel states:
Upon receiving a CANCEL, the stream is terminated on the Responder.
Upon sending a CANCEL, the stream is terminated on the Requester.
Given that, if the Requester or a Responder of a RequestChannel stream send CANCEL, you must re-establish the stream using a subsequent call to requestChannel. You cannot simply re-subscribe to the stream returned form requestStream, as it has already been terminated.
I will also note that while the protocol has not changed, these APIs have changed recently with 1.0.0-alpha.1, and the flowable API is no longer available.
When using requestChannel(), one can subscribe to the Flowable returned. One can unsubscribe as well. If one subscribes again to this flowable, it does not work.
Expected Behavior
After re-subscribing, one will receive values again with the given subscriber. No warning will occur on console.
Actual Behavior
A warning is logged on console (RSocketClient: re-entrant call to request n before initial channel established.). No new values are received by the subscriber.
Steps to Reproduce
Workaround
Instead of re-subscribing call requestChannel() again
Possible Solution
Looks like the variables 'initialized' and 'payloadsSubscribed' in the requestChannel()-implementation of RSocketMachine are not reset.
Your Environment
Happy new year!
The text was updated successfully, but these errors were encountered: