Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log warnings when internal queue is full #31

Open
wants to merge 5 commits into
base: python2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions motorway/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import traceback
import uuid
import datetime

import zmq.backend.cython.constants
from isodate import duration_isoformat
from motorway.utils import DateTimeAwareJsonEncoder
import logging
Expand Down Expand Up @@ -81,8 +83,10 @@ def send(self, queue, producer_uuid=None):
self.producer_uuid = producer_uuid
elif not self.producer_uuid:
assert self.producer_uuid
queue.send(
self.as_json()

queue.send_string(
self.as_json(),
flags=zmq.backend.cython.constants.NOBLOCK # Raise an exception if the queue is full.
)

def send_control_message(self, controller_queue, time_consumed=None, process_name=None, destination_endpoint=None,
Expand Down Expand Up @@ -113,7 +117,7 @@ def send_control_message(self, controller_queue, time_consumed=None, process_nam
'producer_uuid': self.producer_uuid,
'destination_endpoint': destination_endpoint,
'destination_uuid': destination_uuid
})
}, flags=zmq.backend.cython.constants.NOBLOCK)

def ack(self, time_consumed=None):
"""
Expand Down
28 changes: 21 additions & 7 deletions motorway/mixins.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime

import itertools
import logging
import random
import socket
import uuid
Expand All @@ -14,6 +15,9 @@
from motorway.utils import set_timeouts_on_socket, get_connections_block, get_ip


logger = logging.getLogger(__name__)


class GrouperMixin(object):
def get_grouper(self, grouper_name):
if grouper_name == "HashRingGrouper":
Expand Down Expand Up @@ -42,14 +46,23 @@ def send_message(self, message, process_id, time_consumed=None, sender=None, con
try:
socket_addresses = self.grouper_instance.get_destinations_for(message.grouping_value)
except GroupingValueMissing:
raise GroupingValueMissing("Message '%s' provided an invalid grouping_value: '%s'" % (message.content, message.grouping_value))
raise GroupingValueMissing("Message '%s' provided an invalid grouping_value: '%s'" %
(message.content, message.grouping_value))
for destination in socket_addresses:
message.send(
self.send_socks[destination],
process_id
)
# Keep trying to send the message until successful
message_retries = 0
while True:
try:
message.send(self.send_socks[destination], process_id)
break
# Queue was full or not available
except zmq.Again as e:
message_retries +=1
logger.warning("Failed to send message from %s (retry # %s)", process_id, message_retries)
time.sleep(5)
guy881 marked this conversation as resolved.
Show resolved Hide resolved

if self.controller_sock and self.send_control_messages and control_message:
for index in xrange(0, retries):
for index in range(0, retries):
try:
message.send_control_message(
self.controller_sock,
Expand All @@ -59,8 +72,9 @@ def send_message(self, message, process_id, time_consumed=None, sender=None, con
destination_endpoint=destination,
sender=sender
)
break
break # Exit retry
except KeyError: # If destination is not known, lookup fails
logger.warning("Destination unknown for message %s, sender %s", message, sender)
time.sleep(random.randrange(1, 3)) # random to avoid peak loads when running multiple processes
except zmq.Again: # If connection fails, retry
time.sleep(random.randrange(1, 3))
Expand Down
2 changes: 1 addition & 1 deletion motorway/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class ZMQSockMock(object):
def __init__(self, control_messages):
self.control_messages = control_messages

def send_json(self, message):
def send_json(self, message, flags=None):
guy881 marked this conversation as resolved.
Show resolved Hide resolved
self.control_messages.append(message)


Expand Down