Skip to content

Commit

Permalink
Ensure FIFO queues get MessageGroupId parameter in queues.publish()
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanblock committed Aug 16, 2023
1 parent b29e3ce commit d50d35b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
9 changes: 8 additions & 1 deletion arc/queues/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import os
import urllib.request
import uuid
import boto3
from arc.services import _services
from arc._lib import use_aws, get_ports
Expand Down Expand Up @@ -37,8 +39,13 @@ def publish_aws(name, payload):
_sqs_client_cache = boto3.client("sqs")

def pub(arn):
stack = os.environ.get("ARC_STACK_NAME")
return _sqs_client_cache.send_message(
QueueUrl=arn, MessageBody=json.dumps(payload), DelaySeconds=0
QueueUrl=arn,
MessageBody=json.dumps(payload),
DelaySeconds=0,
MessageGroupId=stack,
MessageDeduplicationId=str(uuid.uuid4()),
)

if _queues_cache.get(name):
Expand Down
4 changes: 3 additions & 1 deletion tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def test_parse():

def test_queue_publish(arc_services, sqs_client):
queue_name = "continuum"
sqs_client.create_queue(QueueName=queue_name)
sqs_client.create_queue(
QueueName=queue_name + ".fifo", Attributes={"FifoQueue": "true"}
)
queues = sqs_client.list_queues()

arc_services(params={f"queues/{queue_name}": queues["QueueUrls"][0]})
Expand Down

0 comments on commit d50d35b

Please sign in to comment.