Skip to content

Commit

Permalink
Allow tasks to change the visibility timeout of the message they are …
Browse files Browse the repository at this point in the history
…handling.

When a task fails, immediately make it available again instead of waiting for visibility timeout.

Fixes spulec#47
  • Loading branch information
jhorman committed Jul 16, 2019
1 parent df7edbd commit fb8cce3
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pyqs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .decorator import task # noqa

__title__ = 'pyqs'
__version__ = '0.1.2'
__version__ = '0.1.4'
5 changes: 5 additions & 0 deletions pyqs/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ def wrapper(*args, **kwargs):


class task(object):
""" Decorator that enables sqs based task execution. If the function
accepts an optional `_context` argument, an instance of TaskContext is
passed to the task function. The context allows the function to do things
like change message visibility. """

def __init__(self, queue=None, delay_seconds=None,
custom_function_path=None):
self.queue_name = queue
Expand Down
18 changes: 18 additions & 0 deletions pyqs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pickle

import boto3
from datetime import timedelta


def decode_message(message):
Expand Down Expand Up @@ -36,3 +37,20 @@ def get_aws_region_name():
region_name = 'us-east-1'

return region_name


class TaskContext(object):
""" Tasks may optionally accept a _context variable. If they do, an
instance of this object is passed as the context. """

def __init__(self, conn, queue_url, receipt_handle):
self.conn = conn
self.queue_url = queue_url
self.receipt_handle = receipt_handle

def change_message_visibility(self, timeout=timedelta(minutes=10)):
self.conn.change_message_visibility(
QueueUrl=self.queue_url,
ReceiptHandle=self.receipt_handle,
VisibilityTimeout=int(timeout.total_seconds())
)
28 changes: 26 additions & 2 deletions pyqs/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
import time

from multiprocessing import Event, Process, Queue

try:
from queue import Empty, Full
except ImportError:
from Queue import Empty, Full

try:
from inspect import getfullargspec as get_args
except ImportError:
from inspect import getargspec as get_args

import boto3

from pyqs.utils import get_aws_region_name, decode_message
from pyqs.utils import get_aws_region_name, decode_message, TaskContext

MESSAGE_DOWNLOAD_BATCH_SIZE = 10
LONG_POLLING_INTERVAL = 20
Expand Down Expand Up @@ -180,6 +186,7 @@ def process_message(self):
full_task_path = message_body['task']
args = message_body['args']
kwargs = message_body['kwargs']
receipt_handle = message['ReceiptHandle']

task_name = full_task_path.split(".")[-1]
task_path = ".".join(full_task_path.split(".")[:-1])
Expand All @@ -188,6 +195,15 @@ def process_message(self):

task = getattr(task_module, task_name)

# if the task accepts the optional _context argument, pass it the TaskContext
if '_context' in get_args(task).args:
kwargs = dict(kwargs)
kwargs['_context'] = TaskContext(
conn=self.conn,
queue_url=queue_url,
receipt_handle=receipt_handle
)

current_time = time.time()
if int(current_time - fetch_time) >= timeout:
logger.warning(
Expand All @@ -214,12 +230,20 @@ def process_message(self):
traceback.format_exc(),
)
)

# since the task failed, mark it is available again quickly (10 seconds)
self.conn.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=10
)

return True
else:
end_time = time.clock()
self.conn.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
ReceiptHandle=receipt_handle
)
logger.info(
"Processed task {} in {:.4f} seconds with args: {} "
Expand Down

0 comments on commit fb8cce3

Please sign in to comment.