diff --git a/channel.go b/channel.go index cd19ce7e..e019aa7e 100644 --- a/channel.go +++ b/channel.go @@ -6,9 +6,12 @@ package amqp import ( + "context" + "fmt" "reflect" "sync" "sync/atomic" + "time" ) // 0 1 3 7 size+7 size+8 @@ -1363,6 +1366,42 @@ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg return nil } +/* +PublishWithContext can be used for canceling request and closing goroutine + +*/ +func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { + returnChannel := make(chan error, 1) + defer close(returnChannel) + go func(ctx context.Context) { + resChannel := make(chan error, 1) + defer close(resChannel) + resChannel <- ch.Publish(exchange, key, mandatory, immediate, msg) + select { + case <-ctx.Done(): + returnChannel <- fmt.Errorf("publish canceled by context") + return + case err := <-resChannel: + returnChannel <- err + return + } + }(ctx) + select { + case err := <-returnChannel: + return err + } +} + +/* +PublishWithTimeout If MQ does not respond earlier than duration returns error + +*/ +func (ch *Channel) PublishWithTimeout(exchange, key string, mandatory, immediate bool, msg Publishing, duration time.Duration) error { + publishCtx, cancel := context.WithTimeout(context.Background(), duration) + defer cancel() + return ch.PublishWithContext(publishCtx, exchange, key, mandatory, immediate, msg) +} + /* Get synchronously receives a single Delivery from the head of a queue from the server to the client. In almost all cases, using Channel.Consume will be