From 87d443051cad39e76838a1bf17549aececddd7c7 Mon Sep 17 00:00:00 2001 From: Daniel Selans Date: Tue, 25 Jun 2024 14:22:36 -0700 Subject: [PATCH 1/2] removing looper to sidestep shutdown complexity --- rabbit.go | 26 ++++++-------------------- rabbit_test.go | 3 --- 2 files changed, 6 insertions(+), 23 deletions(-) diff --git a/rabbit.go b/rabbit.go index b08a6b2..609d6c4 100644 --- a/rabbit.go +++ b/rabbit.go @@ -21,7 +21,6 @@ import ( "github.com/pkg/errors" amqp "github.com/rabbitmq/amqp091-go" - "github.com/relistan/go-director" uuid "github.com/satori/go.uuid" ) @@ -74,7 +73,6 @@ type Rabbit struct { ReconnectInProgressMtx *sync.RWMutex ProducerServerChannel *amqp.Channel ProducerRWMutex *sync.RWMutex - ConsumeLooper director.Looper Options *Options shutdown bool @@ -221,7 +219,6 @@ func New(opts *Options) (*Rabbit, error) { ReconnectInProgress: false, ReconnectInProgressMtx: &sync.RWMutex{}, ProducerRWMutex: &sync.RWMutex{}, - ConsumeLooper: director.NewFreeLooper(director.FOREVER, make(chan error, 1)), Options: opts, ctx: ctx, @@ -381,16 +378,8 @@ func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func r.log.Debug("waiting for messages from rabbit ...") - var quit bool - - r.ConsumeLooper.Loop(func() error { - // This is needed to prevent context flood in case .Quit() wasn't picked - // up quickly enough by director - if quit { - time.Sleep(25 * time.Millisecond) - return nil - } - +MAIN: + for { select { case msg := <-r.delivery(): if _, ok := msg.Headers[ForceReconnectHeader]; ok || msg.Acknowledger == nil { @@ -403,7 +392,7 @@ func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func // No point in continuing execution of consumer func as the // delivery msg is incomplete/invalid. - return nil + continue } if err := f(msg); err != nil { @@ -414,16 +403,13 @@ func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func } case <-ctx.Done(): r.log.Warn("stopped via context") - r.ConsumeLooper.Quit() - quit = true + break MAIN case <-r.ctx.Done(): r.log.Warn("stopped via Stop()") - r.ConsumeLooper.Quit() - quit = true + break MAIN } + } - return nil - }) r.log.Debug("Consume finished - exiting") } diff --git a/rabbit_test.go b/rabbit_test.go index 8d8e954..d741e46 100644 --- a/rabbit_test.go +++ b/rabbit_test.go @@ -12,7 +12,6 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/pkg/errors" - "github.com/relistan/go-director" uuid "github.com/satori/go.uuid" // to test with logrus, uncomment the following // and the log initialiser in generateOptions() @@ -102,7 +101,6 @@ var _ = Describe("Rabbit", func() { Expect(r.ConsumerRWMutex).ToNot(BeNil()) Expect(r.NotifyCloseChan).ToNot(BeNil()) Expect(r.ProducerRWMutex).ToNot(BeNil()) - Expect(r.ConsumeLooper).ToNot(BeNil()) Expect(r.Options).ToNot(BeNil()) }) @@ -805,7 +803,6 @@ var _ = Describe("Rabbit", func() { ConsumerDeliveryChannel: deliveryCh, ReconnectInProgressMtx: &sync.RWMutex{}, ProducerRWMutex: &sync.RWMutex{}, - ConsumeLooper: director.NewFreeLooper(director.FOREVER, make(chan error, 1)), Options: opts, log: &NoOpLogger{}, From 311de4b74eed3c4baf953e1bda0b831b3c8645a7 Mon Sep 17 00:00:00 2001 From: Daniel Selans Date: Tue, 25 Jun 2024 15:07:27 -0700 Subject: [PATCH 2/2] updated Stop() to include optional timeout --- rabbit.go | 46 +++++++++++++++++++++++++++++++++++++++------- rabbit_test.go | 1 + 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/rabbit.go b/rabbit.go index 609d6c4..3ffb9a7 100644 --- a/rabbit.go +++ b/rabbit.go @@ -29,6 +29,10 @@ const ( // to reconnect to a rabbit server DefaultRetryReconnectSec = 60 + // DefaultStopTimeout is the default amount of time Stop() will wait for + // consume function(s) to exit. + DefaultStopTimeout = 5 * time.Second + // Both means that the client is acting as both a consumer and a producer. Both Mode = 0 // Consumer means that the client is acting as a consumer. @@ -40,9 +44,8 @@ const ( ) var ( - // ErrShutdown will be returned if the underlying connection has already - // been closed (ie. if you Close()'d and then tried to Publish()) - ErrShutdown = errors.New("connection has been shutdown") + // ErrShutdown will be returned if the client is shutdown via Stop() or Close() + ErrShutdown = errors.New("client is shutdown") // DefaultConsumerTag is used for identifying consumer DefaultConsumerTag = "c-rabbit-" + uuid.NewV4().String()[0:8] @@ -57,7 +60,7 @@ type IRabbit interface { Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error Publish(ctx context.Context, routingKey string, payload []byte, headers ...amqp.Table) error - Stop() error + Stop(timeout ...time.Duration) error Close() error } @@ -67,6 +70,7 @@ type Rabbit struct { Conn *amqp.Connection ConsumerDeliveryChannel <-chan amqp.Delivery ConsumerRWMutex *sync.RWMutex + ConsumerWG *sync.WaitGroup NotifyCloseChan chan *amqp.Error ReconnectChan chan struct{} ReconnectInProgress bool @@ -214,6 +218,7 @@ func New(opts *Options) (*Rabbit, error) { r := &Rabbit{ Conn: ac, ConsumerRWMutex: &sync.RWMutex{}, + ConsumerWG: &sync.WaitGroup{}, NotifyCloseChan: make(chan *amqp.Error), ReconnectChan: make(chan struct{}, 1), ReconnectInProgress: false, @@ -372,6 +377,9 @@ func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func return } + r.ConsumerWG.Add(1) + defer r.ConsumerWG.Done() + if ctx == nil { ctx = context.Background() } @@ -554,16 +562,40 @@ func (r *Rabbit) Publish(ctx context.Context, routingKey string, body []byte, he } } -// Stop stops an in-progress `Consume()` or `ConsumeOnce()`. -func (r *Rabbit) Stop() error { +// Stop stops an in-progress `Consume()` or `ConsumeOnce()` +func (r *Rabbit) Stop(timeout ...time.Duration) error { r.cancel() - return nil + + doneCh := make(chan struct{}) + + // This will leak if consumer(s) don't exit within timeout + go func() { + r.ConsumerWG.Wait() + doneCh <- struct{}{} + }() + + stopTimeout := DefaultStopTimeout + + if len(timeout) > 0 { + stopTimeout = timeout[0] + } + + select { + case <-doneCh: + return nil + case <-time.After(stopTimeout): + return fmt.Errorf("timeout waiting for consumer to stop after '%v'", stopTimeout) + } } // Close stops any active Consume and closes the amqp connection (and channels using the conn) // // You should re-instantiate the rabbit lib once this is called. func (r *Rabbit) Close() error { + if r.shutdown { + return ErrShutdown + } + r.cancel() if err := r.Conn.Close(); err != nil { diff --git a/rabbit_test.go b/rabbit_test.go index d741e46..1190288 100644 --- a/rabbit_test.go +++ b/rabbit_test.go @@ -798,6 +798,7 @@ var _ = Describe("Rabbit", func() { r := &Rabbit{ Conn: ac, ConsumerRWMutex: &sync.RWMutex{}, + ConsumerWG: &sync.WaitGroup{}, NotifyCloseChan: notifyCloseCh, ReconnectChan: reconnectCh, ConsumerDeliveryChannel: deliveryCh,