Skip to content

Commit

Permalink
Reworked reliability test
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Jul 14, 2024
1 parent c2c7b59 commit 84dd100
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions amqpstorm/tests/functional/test_reliability.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,10 @@ def test_functional_verify_passive_declare(self):
self.connection.close()


class PublishAndConsume1kTest(TestFunctionalFramework):
messages_to_send = 1000
messages_consumed = 0
lock = threading.Lock()
class PublishAndConsume5kTest(TestFunctionalFramework):
messages_to_send = 5000
messages_consumed = {}
number_of_threads = 4

def configure(self):
self.disable_logging_validation()
Expand All @@ -237,35 +237,33 @@ def publish_messages(self):
routing_key=self.queue_name)

def consume_messages(self):
thread_id = threading.current_thread()
self.messages_consumed[thread_id] = 0
channel = self.connection.channel()
channel.basic.consume(queue=self.queue_name,
no_ack=False)
for message in channel.build_inbound_messages(
break_on_empty=False):
self.increment_message_count()
self.messages_consumed[thread_id] += 1
message.ack()
if self.messages_consumed == self.messages_to_send:
break

def increment_message_count(self):
with self.lock:
self.messages_consumed += 1

@setup(queue=True)
def test_functional_publish_and_consume_1k_messages(self):
def test_functional_publish_and_consume_5k_messages(self):
self.channel.queue.declare(self.queue_name)

publish_thread = threading.Thread(target=self.publish_messages, )
publish_thread.daemon = True
publish_thread.start()

for _ in range(4):
for _ in range(self.number_of_threads):
consumer_thread = threading.Thread(target=self.consume_messages, )
consumer_thread.daemon = True
consumer_thread.start()

start_time = time.time()
while self.messages_consumed != self.messages_to_send:
while sum(self.messages_consumed.values()) != self.messages_to_send:
if time.time() - start_time >= 60:
break
time.sleep(0.1)
Expand All @@ -274,8 +272,10 @@ def test_functional_publish_and_consume_1k_messages(self):
channel.stop_consuming()
channel.close()

self.assertEqual(self.messages_consumed, self.messages_to_send,
'test took too long')
self.assertEqual(
sum(self.messages_consumed.values()), self.messages_to_send,
'test took too long'
)


class Consume1kUntilEmpty(TestFunctionalFramework):
Expand Down

0 comments on commit 84dd100

Please sign in to comment.