diff --git a/docs/using_python_client_with_hazelcast.rst b/docs/using_python_client_with_hazelcast.rst index c75cc796ad..e30acb11c7 100644 --- a/docs/using_python_client_with_hazelcast.rst +++ b/docs/using_python_client_with_hazelcast.rst @@ -378,6 +378,76 @@ A Reliable Topic usage example is shown below. # Publish a message to the Topic topic.publish("Hello to distributed world") +Using ReliableMessageListener +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +`ReliableMessageListener +`__ is a `MessageListener +`__ to better integrate with the reliable topic. + +If a regular MessageListener is registered on a reliable topic, the message listener works fine, but it can't do much more than listen to messages. +If a ReliableMessageListener is registered on a normal topic, only the MessageListener methods are called. + +The following is an example Reliable Message Listener class. + +.. code:: python + + class MyListener(ReliableMessageListener): + def on_message(self, message): + print("Received new message: ", message) + + def retrieve_initial_sequence(self): + print("Listener function retrieve_initial_sequence is called") + return 0 + + def store_sequence(self, sequence): + print("Listener function store_sequence is called with sequence: ", sequence) + pass + + def is_loss_tolerant(self): + print("Listener function is_loss_tolerant is called") + return True + + def is_terminal(self, error): + print("Listener function is_terminal is called with error: ", error) + return False + + def on_cancel(self): + print("Listener function on_cancel is called") + +Durable Subscription +'''''''''''''''''''' + +The ReliableMessageListener allows you to control where you want to start processing a message when the listener is registered. This makes it possible to create a durable subscription by storing the sequence of the last message and using this sequenceId as the sequenceId to start from. + +Exception Handling +'''''''''''''''''' + +The ReliableMessageListener also gives the ability to deal with exceptions using the `is_terminal(error)` method. This method allows you to control which exceptions should terminate the execution of the listener and cancel it. If a plain MessageListener is used, it won't terminate on exceptions and it will keep on running. But in some cases it is better to stop running. + +Global Order +'''''''''''' + +The ReliableMessageListener will always get all events in order (global order). It will not get duplicates and there will only be gaps (loss of messages) if it is too slow. For more information see `is_loss_tolerant()`. + +Delivery Guarantees +''''''''''''''''''' + +Because the ReliableMessageListener controls which item it wants to continue from upon restart, it is very easy to provide an at-least-once or at-most-once delivery guarantee. The `store_sequence(self, sequence)` is always called before a message is processed; so it can be persisted on some non-volatile storage. When the `retrieve_initial_sequence()` returns the stored sequence, then an at-least-once delivery is implemented since the same item is now being processed twice. To implement an at-most-once delivery guarantee, add 1 to the stored sequence when the `retrieve_initial_sequence()` is called. + +Loss Tolerance +'''''''''''''' + +You can provide the `is_loss_tolerant(self) -> bool` method return true if this ReliableMessageListener is able to deal with message loss. Even though the reliable topic promises to be reliable, it can be that a MessageListener is too slow. Eventually the message won't be available anymore. + +If the ReliableMessageListener is not loss tolerant and the topic detects that there are missing messages, it will terminate the ReliableMessageListener. + +onCancel Callback +''''''''''''''''' + +This method is called by Hazelcast when the ReliableMessageListener is cancelled. This can happen when the listener is unregistered or cancelled due to an exception or during shutdown. + + Configuring Reliable Topic ^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/examples/reliable-topic/reliable_topic_example.py b/examples/reliable-topic/reliable_topic_example.py index f1d0207e15..9ff6b41960 100644 --- a/examples/reliable-topic/reliable_topic_example.py +++ b/examples/reliable-topic/reliable_topic_example.py @@ -25,17 +25,24 @@ def on_message(self, message): print("Second listener:", message) def retrieve_initial_sequence(self): + print("Listener function retrieve_initial_sequence is called") return 0 def store_sequence(self, sequence): + print("Listener function store_sequence is called with sequence: ", sequence) pass - def is_loss_tolerant(self): + def is_loss_tolerant(self) -> bool: + print("Listener function is_loss_tolerant is called") return True def is_terminal(self, error): + print("Listener function is_terminal is called with error: ", error) return False + def on_cancel(self): + print("Listener function on_cancel is called") + # Add a custom ReliableMessageListener topic.add_listener(MyListener()) diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index 60188a239e..47c5b93454 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -150,6 +150,13 @@ def is_terminal(self, error: Exception) -> bool: """ raise NotImplementedError("is_terminal") + def on_cancel(self) -> None: + """ + Called when the ReliableMessageListener is cancelled. This can happen + when the listener is unregistered or cancelled due to an exception or during shutdown. + """ + pass + class _MessageRunner: def __init__( @@ -211,6 +218,7 @@ def cancel(self): """ self._cancelled = True self._runners.pop(self._registration_id, None) + self._listener.on_cancel() def _handle_next_batch(self, future): """Handles the result of the read_many request from diff --git a/tests/integration/backward_compatible/proxy/reliable_topic_test.py b/tests/integration/backward_compatible/proxy/reliable_topic_test.py index 46c5527490..52d2d2246f 100644 --- a/tests/integration/backward_compatible/proxy/reliable_topic_test.py +++ b/tests/integration/backward_compatible/proxy/reliable_topic_test.py @@ -1,6 +1,10 @@ import os import unittest +from hazelcast.proxy.base import TopicMessage +from hazelcast.types import MessageType +from hazelcast.util import AtomicInteger + from tests.hzrc.ttypes import Lang try: @@ -85,6 +89,8 @@ def test_add_listener(self): messages = [] + on_cancel_call_count = AtomicInteger() + class Listener(ReliableMessageListener): def on_message(self, message): messages.append(message.message) @@ -101,6 +107,9 @@ def is_loss_tolerant(self): def is_terminal(self, error): return False + def on_cancel(self): + on_cancel_call_count.add(1) + registration_id = topic.add_listener(Listener()) self.assertIsNotNone(registration_id) @@ -109,6 +118,8 @@ def is_terminal(self, error): self.assertTrueEventually(lambda: self.assertEqual(["a", "b"], messages)) + self.assertEqual(0, on_cancel_call_count.get()) + def test_add_listener_with_retrieve_initial_sequence(self): topic = self.get_topic(random_string()) @@ -233,6 +244,8 @@ def test_add_listener_when_on_message_raises_error(self): messages = [] + on_cancel_call_count = AtomicInteger() + class Listener(ReliableMessageListener): def on_message(self, message): message = message.message @@ -253,6 +266,9 @@ def is_loss_tolerant(self): def is_terminal(self, error): return isinstance(error, ValueError) + def on_cancel(self) -> None: + on_cancel_call_count.add(1) + registration_id = topic.add_listener(Listener()) self.assertIsNotNone(registration_id) @@ -263,11 +279,15 @@ def is_terminal(self, error): # Should be cancelled since on_message raised error self.assertTrueEventually(lambda: self.assertEqual(0, len(topic._wrapped._runners))) + self.assertEqual(1, on_cancel_call_count.get()) + def test_add_listener_when_on_message_and_is_terminal_raises_error(self): topic = self.get_topic(random_string()) messages = [] + on_cancel_call_count = AtomicInteger() + class Listener(ReliableMessageListener): def on_message(self, message): message = message.message @@ -288,6 +308,9 @@ def is_loss_tolerant(self): def is_terminal(self, error): raise error + def on_cancel(self) -> None: + on_cancel_call_count.add(1) + registration_id = topic.add_listener(Listener()) self.assertIsNotNone(registration_id) @@ -298,6 +321,8 @@ def is_terminal(self, error): # Should be cancelled since on_message raised error self.assertTrueEventually(lambda: self.assertEqual(0, len(topic._wrapped._runners))) + self.assertEqual(1, on_cancel_call_count.get()) + def test_add_listener_with_non_callable(self): topic = self.get_topic(random_string()) with self.assertRaises(TypeError): @@ -306,8 +331,30 @@ def test_add_listener_with_non_callable(self): def test_remove_listener(self): topic = self.get_topic(random_string()) - registration_id = topic.add_listener(lambda m: m) + on_cancel_call_count = AtomicInteger() + + class Listener(ReliableMessageListener): + def on_message(self, message: TopicMessage[MessageType]) -> None: + pass + + def retrieve_initial_sequence(self) -> int: + return -1 + + def store_sequence(self, sequence: int) -> None: + pass + + def is_loss_tolerant(self) -> bool: + pass + + def is_terminal(self, error: Exception) -> bool: + pass + + def on_cancel(self) -> None: + on_cancel_call_count.add(1) + + registration_id = topic.add_listener(Listener()) self.assertTrue(topic.remove_listener(registration_id)) + self.assertEqual(1, on_cancel_call_count.get()) def test_remove_listener_does_not_receive_messages_after_removal(self): topic = self.get_topic(random_string())