Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the new on_cancel callback to ReliableMessageListener [API-2195] #655

Merged
merged 5 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions docs/using_python_client_with_hazelcast.rst
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,76 @@ A Reliable Topic usage example is shown below.
# Publish a message to the Topic
topic.publish("Hello to distributed world")

Using ReliableMessageListener
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

`ReliableMessageListener
<https://github.com/hazelcast/hazelcast/blob/master/hazelcast/src/main/java/com/hazelcast/topic/ReliableMessageListener.java>`__ is a `MessageListener
<https://github.com/hazelcast/hazelcast/blob/master/hazelcast/src/main/java/com/hazelcast/topic/MessageListener.java>`__ to better integrate with the reliable topic.

If a regular MessageListener is registered on a reliable topic, the message listener works fine, but it can't do much more than listen to messages.
If a ReliableMessageListener is registered on a normal topic, only the MessageListener methods are called.

The following is an example Reliable Message Listener class.

.. code:: python

class MyListener(ReliableMessageListener):
def on_message(self, message):
print("Received new message: ", message)

def retrieve_initial_sequence(self):
print("Listener function retrieve_initial_sequence is called")
return 0

def store_sequence(self, sequence):
print("Listener function store_sequence is called with sequence: ", sequence)
pass

def is_loss_tolerant(self):
print("Listener function is_loss_tolerant is called")
return True

def is_terminal(self, error):
print("Listener function is_terminal is called with error: ", error)
return False

def on_cancel(self):
print("Listener function on_cancel is called")

Durable Subscription
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I think it might make sense to move these detailed information to the ReliableMessageListener's documentation (docstring comment) and give a link to it here.

''''''''''''''''''''

The ReliableMessageListener allows you to control where you want to start processing a message when the listener is registered. This makes it possible to create a durable subscription by storing the sequence of the last message and using this sequenceId as the sequenceId to start from.

Exception Handling
''''''''''''''''''

The ReliableMessageListener also gives the ability to deal with exceptions using the `is_terminal(error)` method. This method allows you to control which exceptions should terminate the execution of the listener and cancel it. If a plain MessageListener is used, it won't terminate on exceptions and it will keep on running. But in some cases it is better to stop running.

Global Order
''''''''''''

The ReliableMessageListener will always get all events in order (global order). It will not get duplicates and there will only be gaps (loss of messages) if it is too slow. For more information see `is_loss_tolerant()`.
srknzl marked this conversation as resolved.
Show resolved Hide resolved

Delivery Guarantees
'''''''''''''''''''

Because the ReliableMessageListener controls which item it wants to continue from upon restart, it is very easy to provide an at-least-once or at-most-once delivery guarantee. The `store_sequence(self, sequence)` is always called before a message is processed; so it can be persisted on some non-volatile storage. When the `retrieve_initial_sequence()` returns the stored sequence, then an at-least-once delivery is implemented since the same item is now being processed twice. To implement an at-most-once delivery guarantee, add 1 to the stored sequence when the `retrieve_initial_sequence()` is called.

Loss Tolerance
''''''''''''''

You can provide the `is_loss_tolerant(self) -> bool` method return true if this ReliableMessageListener is able to deal with message loss. Even though the reliable topic promises to be reliable, it can be that a MessageListener is too slow. Eventually the message won't be available anymore.

If the ReliableMessageListener is not loss tolerant and the topic detects that there are missing messages, it will terminate the ReliableMessageListener.

onCancel Callback
'''''''''''''''''

This method is called by Hazelcast when the ReliableMessageListener is cancelled. This can happen when the listener is unregistered or cancelled due to an exception or during shutdown.


Configuring Reliable Topic
^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
9 changes: 8 additions & 1 deletion examples/reliable-topic/reliable_topic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,24 @@ def on_message(self, message):
print("Second listener:", message)

def retrieve_initial_sequence(self):
print("Listener function retrieve_initial_sequence is called")
return 0

def store_sequence(self, sequence):
print("Listener function store_sequence is called with sequence: ", sequence)
pass

def is_loss_tolerant(self):
def is_loss_tolerant(self) -> bool:
print("Listener function is_loss_tolerant is called")
return True

def is_terminal(self, error):
print("Listener function is_terminal is called with error: ", error)
return False

def on_cancel(self):
print("Listener function on_cancel is called")


# Add a custom ReliableMessageListener
topic.add_listener(MyListener())
Expand Down
8 changes: 8 additions & 0 deletions hazelcast/proxy/reliable_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ def is_terminal(self, error: Exception) -> bool:
"""
raise NotImplementedError("is_terminal")

def on_cancel(self) -> None:
"""
Called when the ReliableMessageListener is cancelled. This can happen
when the listener is unregistered or cancelled due to an exception or during shutdown.
"""
pass


class _MessageRunner:
def __init__(
Expand Down Expand Up @@ -211,6 +218,7 @@ def cancel(self):
"""
self._cancelled = True
self._runners.pop(self._registration_id, None)
self._listener.on_cancel()

def _handle_next_batch(self, future):
"""Handles the result of the read_many request from
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os
import unittest

from hazelcast.proxy.base import TopicMessage
from hazelcast.types import MessageType
from hazelcast.util import AtomicInteger

from tests.hzrc.ttypes import Lang

try:
Expand Down Expand Up @@ -85,6 +89,8 @@ def test_add_listener(self):

messages = []

on_cancel_call_count = AtomicInteger()

class Listener(ReliableMessageListener):
def on_message(self, message):
messages.append(message.message)
Expand All @@ -101,6 +107,9 @@ def is_loss_tolerant(self):
def is_terminal(self, error):
return False

def on_cancel(self):
on_cancel_call_count.add(1)

registration_id = topic.add_listener(Listener())
self.assertIsNotNone(registration_id)

Expand All @@ -109,6 +118,8 @@ def is_terminal(self, error):

self.assertTrueEventually(lambda: self.assertEqual(["a", "b"], messages))

self.assertEqual(0, on_cancel_call_count.get())

def test_add_listener_with_retrieve_initial_sequence(self):
topic = self.get_topic(random_string())

Expand Down Expand Up @@ -233,6 +244,8 @@ def test_add_listener_when_on_message_raises_error(self):

messages = []

on_cancel_call_count = AtomicInteger()

class Listener(ReliableMessageListener):
def on_message(self, message):
message = message.message
Expand All @@ -253,6 +266,9 @@ def is_loss_tolerant(self):
def is_terminal(self, error):
return isinstance(error, ValueError)

def on_cancel(self) -> None:
on_cancel_call_count.add(1)

registration_id = topic.add_listener(Listener())
self.assertIsNotNone(registration_id)

Expand All @@ -263,11 +279,15 @@ def is_terminal(self, error):
# Should be cancelled since on_message raised error
self.assertTrueEventually(lambda: self.assertEqual(0, len(topic._wrapped._runners)))

self.assertEqual(1, on_cancel_call_count.get())

def test_add_listener_when_on_message_and_is_terminal_raises_error(self):
topic = self.get_topic(random_string())

messages = []

on_cancel_call_count = AtomicInteger()

class Listener(ReliableMessageListener):
def on_message(self, message):
message = message.message
Expand All @@ -288,6 +308,9 @@ def is_loss_tolerant(self):
def is_terminal(self, error):
raise error

def on_cancel(self) -> None:
on_cancel_call_count.add(1)

registration_id = topic.add_listener(Listener())
self.assertIsNotNone(registration_id)

Expand All @@ -298,6 +321,8 @@ def is_terminal(self, error):
# Should be cancelled since on_message raised error
self.assertTrueEventually(lambda: self.assertEqual(0, len(topic._wrapped._runners)))

self.assertEqual(1, on_cancel_call_count.get())

def test_add_listener_with_non_callable(self):
topic = self.get_topic(random_string())
with self.assertRaises(TypeError):
Expand All @@ -306,8 +331,30 @@ def test_add_listener_with_non_callable(self):
def test_remove_listener(self):
topic = self.get_topic(random_string())

registration_id = topic.add_listener(lambda m: m)
on_cancel_call_count = AtomicInteger()

class Listener(ReliableMessageListener):
def on_message(self, message: TopicMessage[MessageType]) -> None:
pass

def retrieve_initial_sequence(self) -> int:
return -1

def store_sequence(self, sequence: int) -> None:
pass

def is_loss_tolerant(self) -> bool:
pass

def is_terminal(self, error: Exception) -> bool:
pass

def on_cancel(self) -> None:
on_cancel_call_count.add(1)

registration_id = topic.add_listener(Listener())
self.assertTrue(topic.remove_listener(registration_id))
self.assertEqual(1, on_cancel_call_count.get())

def test_remove_listener_does_not_receive_messages_after_removal(self):
topic = self.get_topic(random_string())
Expand Down
Loading