-
Notifications
You must be signed in to change notification settings - Fork 621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementation of Context based method for Channel #389
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for working on this!
I like this but would like to suggest a couple of amendments to this PR.
- How about we rename
*Context
functions (e.g.CloseContext
) to*WithContext
(so,CloseWithContext
and such)? - It would be great to add a few basic tests for key channel operations (open, close, queue declaration, etc) that use these new functions
This is great. To bikeshed the names a bit, I think the existing names like With the async nature of client or server channel closure, cancelling the protocol is prone to subtle bugs. Please add some test coverage for the context cancellation cases. |
@michaelklishin @streadway thanks for the feedback. I wanted to make sure the implementation would be ok with you. I will add some tests and documentation for the new public methods. To avoid out-of-sequence received messages, i cancel the current channel if a rpc call is cancelled. Do you think it is the right thing to do? |
OK, if this follows an existing naming convention then we should use those names. @AlexisMontagne I'm not sure I understand the last question of yours, sorry. |
The naming follows an existing convention.
@michaelklishin sorry, i submitted my comment by mistake. I meant: In order to deal with canceled RPC calls. Should we close the channel? or keep it open? cf. https://github.com/streadway/amqp/pull/389/files#diff-861e77702f49a71d32be4b3c7600818fR185 |
My first reaction is to keep the channel open. AMQP Channel lifecycle is complicated and shares client and server resources. If the context methods only relate to RPC round trips instead of server resources, it'd would be easier to document. |
When looking at error values, can you suggest a way of promoting a client side context.Context.Err() into an *amqp.Error? |
@streadway you can find a standardized *amqp.Error here: 95dafc3
If we keep the channel open, i'm scared the message could cause issues Let's say the following case: Is there any way to correlate request and response frame (sequence ID?)? In order to discard frame received for canceled calls |
@AlexisMontagne other clients address this by not allowing channel sharing between threads/routines/etc. There are no sequence identifiers in the protocol, only response order guarantees for synchronous methods. Note that sharing a channel for concurrent publishing, for example, is inherently unsafe because a published messages is 2 or 3 frames on the wire and you can end up with incorrect frame interleaving. While closing the channel might be a decent option since there is no way to undo a send frame, the above makes me wonder if
Keeping a channel open and documenting known limitations would be closer to what virtually every other RabbitMQ client does. WDYT? |
This library is concurrent safe for all messages, including publishing, by holding a channel specific mutex until all publish frames are sent before sending others. |
The primary use case for the context package is to represent a distributed, cancelable transaction. It sounds like this is a discussion of "does a cancelled context propagate to the server or not?" If we were to close a channel on cancellation during any RPC, would we also close the connection should we not be able to open a channel (RPC on channel id 0)? Let's start with discarding the in-flight reply after cancellation to keep the channel open, and let the application determine how to propagate a cancellation to the server based on what the application considers a transaction. |
Then it's up to the client to filter out the responses it cannot/no longer needs to handle. Java client and Bunny do this by yes, ignoring the responses they can detect as stale/mismatching the pending continuation (pending operation). The only scenario where one cannot open a channel is if you run into the |
@streadway i updated the pull request according to your last message. And added a test to ensure the messages received after the call is canceled are dropped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got one of the new tests to hang and also hit this failure in one of the runs:
=== RUN TestTimeoutQosContextCanceled
panic: Fail in goroutine after TestTimeoutQosContextTimeoutDropping has completed
goroutine 397 [running]:
testing.(*common).Fail(0xc0000e4500)
/usr/local/Cellar/go/1.11.5/libexec/src/testing/testing.go:546 +0x1c6
testing.(*common).FailNow(0xc0000e4500)
/usr/local/Cellar/go/1.11.5/libexec/src/testing/testing.go:568 +0x39
testing.(*common).Fatalf(0xc0000e4500, 0x149810b, 0x13, 0xc000115da0, 0x1, 0x1)
/usr/local/Cellar/go/1.11.5/libexec/src/testing/testing.go:634 +0x95
github.com/streadway/amqp.(*server).recv(0xc0001dc870, 0x1, 0x14fd940, 0xc0004d11e8, 0x0, 0x0)
/path/to/gopath/src/github.com/streadway/amqp/client_test.go:105 +0x1a6
github.com/streadway/amqp.TestTimeoutQosContextTimeoutDropping.func1(0xc0001dc870)
/path/to/gopath/src/github.com/streadway/amqp/client_test.go:766 +0x170
created by github.com/streadway/amqp.TestTimeoutQosContextTimeoutDropping
/path/to/gopath/src/github.com/streadway/amqp/client_test.go:758 +0x10a
exit status 2
while running tests in a loop with
for i in {1..10}; do go test -v -cpu 1,3 -tags integration -race; done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is tricky to get right because of the message ordering requirement for protocol sequencing.
I don't think select { default: }
dropping messages on send/recv to the rpc is the right thing to do because it ties the ordering to the runtime scheduler.
Can you come up with a different approach where the reply from every cancelled request message is explicitly consumed, either in the dispatch()
or call()
?
connection.go
Outdated
c.rpc <- m | ||
select { | ||
case c.rpc <- m: | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this now drop if there is no longer a receiver on the rpc
buffered channel?
channel.go
Outdated
ch.rpc <- msg | ||
select { | ||
case ch.rpc <- msg: | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as on the connection, this seems like it could drop replies for unintended reasons.
channel.go
Outdated
default: | ||
} | ||
|
||
if req.wait() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reply from a cancelled request may be in-flight and not reside on the rpc
at this point and it will not drain.
Take the scenario all with different contexts:
client send(a)
client send(b)
client wait(a) - cancelled - returned
server reply(a)
client wait(b) - recv reply(a)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think this use case is possible. Every RPC call is surounded by a mutex.
Therefore that could end up as:
client send(a)
client wait(a) - cancelled - returned
server reply(a) - message dropped
client send(b)
client wait(b)
server reply(b)
recv reply(b)
Or (the case i exposed here #389 (comment))
client send(a)
client wait(a) - cancelled - returned
client send(b)
client wait(b)
server reply(a) - message dropped
recv reply(a)
And it causes issues.
But i can not figure out any way to handle this case with the current protocol. IMHO the safest and most deterministic way of dealing with this situation is to close the channel in case of cancelation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think the most obvious behavior is not to close the channel when a context is Done()
. I believe context done is closer to a late no-wait
rather than a client error.
This is a matter of sequencing the consumption of the replies in the same order as the requests were written. It looks like communication between the writer goroutine (application) and reader goroutine (dispatch) is how the sequencing would be implemented.
@streadway what do you think of #389 (comment)? |
@streadway i think i got what you mean. Tell me what you think of that last commit. Are we really sure the server will always reply to the client? Will it never give up on replying to any message? All my reasoning was based on the fact the server could be flaky and skip replies. |
AMQP requires TCP which offers transport reliability. The heartbeater will close the connection on read timeout, which will return an error should we experience any message loss. I've found RabbitMQ very reliable and not drop or show flakey behavior, so we can count on every protocol request receiving a reply. A counter is one approach, but that assumes that contexts will be cancelled in order which I don't think we would be able to depend on. Consider the message exchange:
|
@streadway for some reason i was thinking a was mutex surrounding every I updated my PR to handle the use case you described (and added a test to ensure it). |
2fc1c86
to
cf3d360
Compare
8e87c57
to
5addcec
Compare
@streadway it should be good now. I wrapped the "sending" part of the It also makes the frame sending thread safe. Previously https://github.com/streadway/amqp/blob/master/channel.go#L219 was not limited by any @michaelklishin The added tests should now pass with the |
@AlexisMontagne thank you for fixing the problematic test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I admire the amount of effort you've put into the docs and perhaps 50 to 65% of the docs can be retained. Unfortunately the docs are not always correct and reinvent RabbitMQ doc guides where it's not really necessary. I'd recommend linking to the relevant RabbitMQ guides more. If something is missing in the guides (e.g. a guide on bindings besides the tutorials) I'd be happy to work on that.
This will benefit all RabbitMQ users regardless of the client.
Thank you!
@michaelklishin i did not write any fo this documentation. I used the one already in the code for the context-free methods |
@AlexisMontagne OK, thanks. I will revisit that together with the new functions then. |
The docs were copied from existing functions
@streadway any opinion on this final version? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR represents 4 different changes - support for in-flight cancellation of RPC messages, context propagation to RPC, exported API change, and linter fixes. I'd like to merge each of these changes separately.
Can you please split this up?
The linter fixes can get merged right away.
The inflight cancellation (call()
+ dispatch()
coordination) is the next change that shouldn't affect anything but needs to be released to capture any deadlocks or panics from unknown reasons.
The context propagation is next and this can be done via non-exported methods first:
func (ch *Channel) Qos(...) error {
return ch.qosContext(context.Background(), ...)
}
After releasing those three changes then we can make sure there is enough time for adoption before exporting the Context based methods along with the copy of the documentation strings.
} | ||
|
||
select { | ||
case <-ch.NotifyClose(make(chan *Error)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NotifyClose will not have been registered prior to Channel.Qos
so there will never be a message to receive.
defer cancel() | ||
|
||
if err := ch.QosContext(ctx, 2, 0, false); err != ErrCanceled { | ||
t.Fatalf("wrong timeout error: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When writing test messages, it's important to communicate the why behind a test failure. The messages in Errorf and Fatalf should clearly communicate why the condition is considered "wrong" and what that means for someone to fix it.
Please review all your test Fatalf and Errorf calls to answer "if the test name and this message is all I know about the library, do I know what to do next?"
srv.channelOpen(1) | ||
|
||
srv.recv(1, &basicQos{}) | ||
time.Sleep(150 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please coordinate client/server conditions with channels or wait groups rather than with wall clock. I have witnessed TravisCI have over 500ms pauses during test execution which would cause this test to fail with a false negative.
t.Fatalf("could not open channel: %v (%s)", ch, err) | ||
} | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use WithCancel
to not depend on time.
srv.recv(1, &basicQos{}) | ||
srv.recv(1, &channelFlow{}) | ||
|
||
time.Sleep(10 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the dependency on time
for this and other tests.
@@ -19,6 +19,11 @@ const ( | |||
ExchangeHeaders = "headers" | |||
) | |||
|
|||
// Constants for custom error code | |||
const ( | |||
Canceled = 542 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@michaelklishin how do other clients represent client specific error codes? This number is not defined in the AMQP spec, so I wonder what the closest one would be or if we need a different error type for client specific errors.
func (ch *Channel) call(ctx context.Context, req message, res ...message) error { | ||
select { | ||
case <-ctx.Done(): | ||
return ErrCanceled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From context
// If Done is closed, Err returns a non-nil error explaining why:
// Canceled if the context was canceled
// or DeadlineExceeded if the context's deadline passed.
This return value should also represent the underlying reason and either be ErrCancelled or ErrDeadlineExceeded
if req.wait() { | ||
select { | ||
case <-ctx.Done(): | ||
atomic.StoreUint32(&status, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race conditions between receiving ctx.Done()
, storing status = 1
and loading status
in dispatch()
.
Instead, let's collect the in-flight contexts, so we will coordinate by communicating over Context.Done()
rather than sharing the status
state.
I think this would mean a responses slice of []atomic.Value
and storing and loading context.Context
types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, the responses slice could contain a buffered chan of messages and that would be used instead of rpc
.
|
||
ch.responsesM.Lock() | ||
|
||
if len(ch.responses) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should never happen. I think we ought to panic to reveal any bugs.
status, ch.responses = ch.responses[0], ch.responses[1:] | ||
ch.responsesM.Unlock() | ||
|
||
if atomic.LoadUint32(status) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we were using channels instead of state, this would be:
switch {
case <-status.Done():
default:
ch.rpc <- msg
}
@michaelklishin This would be great! The method docs were written in 2012 and tried to strike the balance between sharing enough context about AMQP principles, RabbitMQ extensions/constraints and Client library choices. I'd very much welcome documentation review and improvements from the RabbitMQ team! |
@streadway and I discussed this on a Hangout. Besides splitting this PR into a few (@AlexisMontagne thank you for your truly exemplary patience with us!), we'd like to revisit the internal concurrency model used by this client to both simplify @streadway is to file a new issue about that but basically operation dispatch should be synchronised per channel and not over the entire connection. Channels are assumed to be "independent entities" in the protocol. Publishing on a shared [protocol] channel is not supported so other clients guarantee total ordering on a channel, not across channels. A single channel in the server never dispatches its operations concurrently either. This would make |
Hey folks, I'm posting this on behalf of the core team. As you have noticed, this client hasn't seen a lot of activity recently. Because this client has a long tradition of "no breaking public API changes", certain We would like to thank @streadway Team RabbitMQ has adopted a "hard fork" of this client What do we mean by "hard fork" and what does it mean for you? The entire history of the project What does change is that this new fork will accept reasonable breaking API changes according If your PR hasn't been accepted or reviewed, you are welcome to re-submit it for Note that it is a high season for holidays in some parts of the world, so we may be slower Thank you for using RabbitMQ and contributing to this client. On behalf of the RabbitMQ core team, |
Related to #266
Implement a second set of RPC method on the
Channel
having a context.Context as first argument