From b0896c8df3f64c60b4ec6fe9f56e98dce37a60ea Mon Sep 17 00:00:00 2001 From: alexfilus Date: Mon, 20 Jul 2020 13:13:10 +0300 Subject: [PATCH 1/2] add PublishWithContext and PublishWithTimeout methods --- channel.go | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/channel.go b/channel.go index cd19ce7e..70fdaabb 100644 --- a/channel.go +++ b/channel.go @@ -6,9 +6,12 @@ package amqp import ( + "context" + "fmt" "reflect" "sync" "sync/atomic" + "time" ) // 0 1 3 7 size+7 size+8 @@ -1363,6 +1366,44 @@ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg return nil } +/* +Publish with context can be used for canceling request and closing goroutine + +*/ +func (ch *Channel) PublishWithContext(exchange, key string, mandatory, immediate bool, msg Publishing, ctx context.Context) error { + returnChannel := make(chan error, 1) + defer close(returnChannel) + go func(ctx context.Context) { + resChannel := make(chan error, 1) + defer close(resChannel) + resChannel <- ch.Publish(exchange, key, mandatory, immediate, msg) + select { + case <-ctx.Done(): + returnChannel <- fmt.Errorf("publish canceled by context") + return + case err := <-resChannel: + returnChannel <- err + return + } + }(ctx) + select { + case err := <-returnChannel: + return err + } +} + +/* +Publish with timeout + +If MQ does not respond earlier than duration returns error + +*/ +func (ch *Channel) PublishWithTimeout(exchange, key string, mandatory, immediate bool, msg Publishing, duration time.Duration) error { + publishCtx, cancel := context.WithTimeout(context.Background(), duration) + defer cancel() + return ch.PublishWithContext(exchange, key, mandatory, immediate, msg, publishCtx) +} + /* Get synchronously receives a single Delivery from the head of a queue from the server to the client. In almost all cases, using Channel.Consume will be From 6e70b8662eb06dd926a94ff1f90e2d417424e547 Mon Sep 17 00:00:00 2001 From: alexfilus Date: Tue, 21 Jul 2020 15:08:21 +0300 Subject: [PATCH 2/2] fixes for linter --- channel.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/channel.go b/channel.go index 70fdaabb..e019aa7e 100644 --- a/channel.go +++ b/channel.go @@ -1367,10 +1367,10 @@ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg } /* -Publish with context can be used for canceling request and closing goroutine +PublishWithContext can be used for canceling request and closing goroutine */ -func (ch *Channel) PublishWithContext(exchange, key string, mandatory, immediate bool, msg Publishing, ctx context.Context) error { +func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { returnChannel := make(chan error, 1) defer close(returnChannel) go func(ctx context.Context) { @@ -1393,15 +1393,13 @@ func (ch *Channel) PublishWithContext(exchange, key string, mandatory, immediate } /* -Publish with timeout - -If MQ does not respond earlier than duration returns error +PublishWithTimeout If MQ does not respond earlier than duration returns error */ func (ch *Channel) PublishWithTimeout(exchange, key string, mandatory, immediate bool, msg Publishing, duration time.Duration) error { publishCtx, cancel := context.WithTimeout(context.Background(), duration) defer cancel() - return ch.PublishWithContext(exchange, key, mandatory, immediate, msg, publishCtx) + return ch.PublishWithContext(publishCtx, exchange, key, mandatory, immediate, msg) } /*