Skip to content

Commit

Permalink
Add an example of reliable consumer
Browse files Browse the repository at this point in the history
The example function Example_consume() shows how to write a consumer
with reconnection support.

This commit also changes the notification channels to be buffered with
capacity of 1. This is important to avoid potential deadlocks during an
abnormal disconnection. See #32 and #18 for more details.

Both example functions now have a context with timeout, so that they
don't run forever. The QoS of 1 is set to slowdown the consumption; this
is helpful to test the reconnection capabilities, by giving enough time
to close the connection via the Management UI (or any other means).

Signed-off-by: Aitor Perez Cedres <[email protected]>
  • Loading branch information
Zerpet committed Aug 18, 2022
1 parent e8a1547 commit feb4ba5
Showing 1 changed file with 86 additions and 8 deletions.
94 changes: 86 additions & 8 deletions example_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,82 @@ func Example() {
addr := "amqp://guest:guest@localhost:5672/"
queue := New(queueName, addr)
message := []byte("message")
// Attempt to push a message every 2 seconds

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*20))
defer cancel()
loop:
for {
time.Sleep(time.Second * 2)
if err := queue.Push(message); err != nil {
fmt.Printf("Push failed: %s\n", err)
} else {
fmt.Println("Push succeeded!")
select {
// Attempt to push a message every 2 seconds
case <-time.After(time.Second * 2):
if err := queue.Push(message); err != nil {
fmt.Printf("Push failed: %s\n", err)
} else {
fmt.Println("Push succeeded!")
}
case <-ctx.Done():
queue.Close()
break loop
}
}
}

func Example_consume() {
queueName := "job_queue"
addr := "amqp://guest:guest@localhost:5672/"
queue := New(queueName, addr)

// Give the connection sometime to setup
<-time.After(time.Second)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

deliveries, err := queue.Consume()
if err != nil {
fmt.Printf("Could not start consuming: %s\n", err)
return
}

// This channel will receive a notification when a channel closed event
// happens. This must be different than Client.notifyChanClose because the
// library sends only one notification and Client.notifyChanClose already has
// a receiver in handleReconnect().
// Recommended to make it buffered to avoid deadlocks
chClosedCh := make(chan *amqp.Error, 1)
queue.channel.NotifyClose(chClosedCh)

for {
select {
case <-ctx.Done():
queue.Close()
return

case amqErr := <-chClosedCh:
// This case handles the event of closed channel e.g. abnormal shutdown
fmt.Printf("AMQP Channel closed due to: %s\n", amqErr)

deliveries, err = queue.Consume()
if err != nil {
// If the AMQP channel is not ready, it will continue the loop. Next
// iteration will enter this case because chClosedCh is closed by the
// library
fmt.Println("Error trying to consume, will try again")
continue
}

// Re-set channel to receive notifications
// The library closes this channel after abnormal shutdown
chClosedCh = make(chan *amqp.Error, 1)
queue.channel.NotifyClose(chClosedCh)

case delivery := <-deliveries:
// Ack a message every 2 seconds
fmt.Printf("Received message: %s\n", delivery.Body)
if err := delivery.Ack(false); err != nil {
fmt.Printf("Error acknowledging message: %s\n", err)
}
<-time.After(time.Second * 2)
}
}
}
Expand Down Expand Up @@ -189,15 +258,15 @@ func (client *Client) init(conn *amqp.Connection) error {
// and updates the close listener to reflect this.
func (client *Client) changeConnection(connection *amqp.Connection) {
client.connection = connection
client.notifyConnClose = make(chan *amqp.Error)
client.notifyConnClose = make(chan *amqp.Error, 1)
client.connection.NotifyClose(client.notifyConnClose)
}

// changeChannel takes a new channel to the queue,
// and updates the channel listeners to reflect this.
func (client *Client) changeChannel(channel *amqp.Channel) {
client.channel = channel
client.notifyChanClose = make(chan *amqp.Error)
client.notifyChanClose = make(chan *amqp.Error, 1)
client.notifyConfirm = make(chan amqp.Confirmation, 1)
client.channel.NotifyClose(client.notifyChanClose)
client.channel.NotifyPublish(client.notifyConfirm)
Expand Down Expand Up @@ -268,6 +337,15 @@ func (client *Client) Consume() (<-chan amqp.Delivery, error) {
if !client.isReady {
return nil, errNotConnected
}

if err := client.channel.Qos(
1, // prefetchCount
0, // prefrechSize
false, // global
); err != nil {
return nil, err
}

return client.channel.Consume(
client.queueName,
"", // Consumer
Expand Down

0 comments on commit feb4ba5

Please sign in to comment.