Skip to content

Commit

Permalink
Add second celery health chat that uses ping instead of executing a t…
Browse files Browse the repository at this point in the history
…ask (#272)



Co-authored-by: Witold Greń <[email protected]>
  • Loading branch information
maszaa and witold-gren authored Nov 4, 2020
1 parent f5244fb commit 60ac341
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 7 deletions.
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The following health checks are bundled with this project:
- disk and memory utilization (via ``psutil``)
- AWS S3 storage
- Celery task queue
- Celery ping
- RabbitMQ
- Migrations

Expand Down Expand Up @@ -70,6 +71,7 @@ Add the ``health_check`` applications to your ``INSTALLED_APPS``:
'health_check.storage',
'health_check.contrib.migrations',
'health_check.contrib.celery', # requires celery
'health_check.contrib.celery_ping', # requires celery
'health_check.contrib.psutil', # disk and memory utilization; requires psutil
'health_check.contrib.s3boto3_storage', # requires boto3 and S3BotoStorage backend
'health_check.contrib.rabbitmq', # requires RabbitMQ broker
Expand Down
21 changes: 21 additions & 0 deletions docs/contrib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,24 @@ to disable any of these checks, set its value to ``None``.
'DISK_USAGE_MAX': 90, # percent
'MEMORY_MIN' = 100, # in MB
}
``celery``
----------

If you are using Celery you may choose between two different Celery checks.

`health_check.contrib.celery` sends a task to the queue and it expects that task
to be executed in `HEALTHCHECK_CELERY_TIMEOUT` seconds which by default is three seconds.
You may override that in your Django settings module. This check is suitable for use cases
which require that tasks can be processed frequently all the time.

`health_check.contrib.celery_ping` is a different check. It checks that each predefined
Celery task queue has a consumer (i.e. worker) that responds `{"ok": "pong"}` in
`HEALTHCHECK_CELERY_PING_TIMEOUT` seconds. The default for this is one second.
You may override that in your Django settings module. This check is suitable for use cases
which don't require that tasks are executed almost instantly but require that they are going
to be executed in sometime the future i.e. that the worker process is alive and processing tasks
all the time.

You may also use both of them. To use these checks add them to `INSTALLED_APPS` in your
Django settings module.
1 change: 1 addition & 0 deletions health_check/contrib/celery_ping/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
default_app_config = 'health_check.contrib.celery_ping.apps.HealthCheckConfig'
12 changes: 12 additions & 0 deletions health_check/contrib/celery_ping/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from django.apps import AppConfig

from health_check.plugins import plugin_dir


class HealthCheckConfig(AppConfig):
name = 'health_check.contrib.celery_ping'

def ready(self):
from .backends import CeleryPingHealthCheck

plugin_dir.register(CeleryPingHealthCheck)
66 changes: 66 additions & 0 deletions health_check/contrib/celery_ping/backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from celery.app import default_app as app
from django.conf import settings

from health_check.backends import BaseHealthCheckBackend
from health_check.exceptions import ServiceUnavailable


class CeleryPingHealthCheck(BaseHealthCheckBackend):
CORRECT_PING_RESPONSE = {"ok": "pong"}

def check_status(self):
timeout = getattr(settings, "HEALTHCHECK_CELERY_PING_TIMEOUT", 1)

try:
ping_result = app.control.ping(timeout=timeout)
except IOError as e:
self.add_error(ServiceUnavailable("IOError"), e)
except NotImplementedError as exc:
self.add_error(
ServiceUnavailable(
"NotImplementedError: Make sure CELERY_RESULT_BACKEND is set"
),
exc,
)
except BaseException as exc:
self.add_error(ServiceUnavailable("Unknown error"), exc)
else:
if not ping_result:
self.add_error(
ServiceUnavailable("Celery workers unavailable"),
)
else:
self._check_ping_result(ping_result)

def _check_ping_result(self, ping_result):
active_workers = []

for worker, response in ping_result[0].items():
if response != self.CORRECT_PING_RESPONSE:
self.add_error(
ServiceUnavailable(
f"Celery worker {worker} response was incorrect"
),
)
continue
active_workers.append(worker)

if not self.errors:
self._check_active_queues(active_workers)

def _check_active_queues(self, active_workers):
defined_queues = app.conf.CELERY_QUEUES

if not defined_queues:
return

defined_queues = set([queue.name for queue in defined_queues])
active_queues = set()

for queues in app.control.inspect(active_workers).active_queues().values():
active_queues.update([queue.get("name") for queue in queues])

for queue in defined_queues.difference(active_queues):
self.add_error(
ServiceUnavailable(f"No worker for Celery task queue {queue}"),
)
7 changes: 5 additions & 2 deletions tests/test_autodiscover.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@
from django.conf import settings

from health_check.contrib.celery.backends import CeleryHealthCheck
from health_check.contrib.celery_ping.backends import CeleryPingHealthCheck
from health_check.plugins import plugin_dir


class TestAutoDiscover:

def test_autodiscover(self):
health_check_plugins = list(filter(
lambda x: 'health_check.' in x and 'celery' not in x,
lambda x: x.startswith('health_check.') and 'celery' not in x,
settings.INSTALLED_APPS
))

non_celery_plugins = [x for x in plugin_dir._registry if not issubclass(x[0], CeleryHealthCheck)]
non_celery_plugins = [x for x in plugin_dir._registry
if not issubclass(x[0], (CeleryHealthCheck, CeleryPingHealthCheck))]

# The number of installed apps excluding celery should equal to all plugins except celery
assert len(non_celery_plugins) == len(health_check_plugins)
Expand Down
130 changes: 130 additions & 0 deletions tests/test_celery_ping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import pytest
from django.apps import apps
from django.conf import settings
from mock import patch

from health_check.contrib.celery_ping.apps import HealthCheckConfig
from health_check.contrib.celery_ping.backends import CeleryPingHealthCheck


class TestCeleryPingHealthCheck:
CELERY_APP_CONTROL_PING = (
"health_check.contrib.celery_ping.backends.app.control.ping"
)
CELERY_APP_CONTROL_INSPECT_ACTIVE_QUEUES = (
"health_check.contrib.celery_ping.backends.app.control.inspect.active_queues"
)

@pytest.fixture
def health_check(self):
return CeleryPingHealthCheck()

def test_check_status_doesnt_add_errors_when_ping_successfull(self, health_check):
celery_worker = "celery@4cc150a7b49b"

with patch(
self.CELERY_APP_CONTROL_PING,
return_value=[{celery_worker: CeleryPingHealthCheck.CORRECT_PING_RESPONSE}],
), patch(
self.CELERY_APP_CONTROL_INSPECT_ACTIVE_QUEUES,
return_value={
celery_worker: [
{"name": queue.name} for queue in settings.CELERY_QUEUES
]
},
):
health_check.check_status()

assert not health_check.errors

def test_check_status_reports_errors_if_ping_responses_are_incorrect(
self, health_check
):
with patch(
self.CELERY_APP_CONTROL_PING,
return_value=[
{
"celery1@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE,
"celery2@4cc150a7b49b": {},
"celery3@4cc150a7b49b": {"error": "pong"},
}
],
):
health_check.check_status()

assert len(health_check.errors) == 2

def test_check_status_adds_errors_when_ping_successfull_but_not_all_defined_queues_have_consumers(
self,
health_check,
):
celery_worker = "celery@4cc150a7b49b"
queues = list(settings.CELERY_QUEUES)

with patch(
self.CELERY_APP_CONTROL_PING,
return_value=[{celery_worker: CeleryPingHealthCheck.CORRECT_PING_RESPONSE}],
), patch(
self.CELERY_APP_CONTROL_INSPECT_ACTIVE_QUEUES,
return_value={celery_worker: [{"name": queues.pop().name}]},
):
health_check.check_status()

assert len(health_check.errors) == len(queues)

@pytest.mark.parametrize(
"exception_to_raise",
[
IOError,
TimeoutError,
],
)
def test_check_status_add_error_when_io_error_raised_from_ping(
self, exception_to_raise, health_check
):
with patch(self.CELERY_APP_CONTROL_PING, side_effect=exception_to_raise):
health_check.check_status()

assert len(health_check.errors) == 1
assert "ioerror" in health_check.errors[0].message.lower()

@pytest.mark.parametrize(
"exception_to_raise", [ValueError, SystemError, IndexError, MemoryError]
)
def test_check_status_add_error_when_any_exception_raised_from_ping(
self, exception_to_raise, health_check
):
with patch(self.CELERY_APP_CONTROL_PING, side_effect=exception_to_raise):
health_check.check_status()

assert len(health_check.errors) == 1
assert health_check.errors[0].message.lower() == "unknown error"

def test_check_status_when_raised_exception_notimplementederror(self, health_check):
expected_error_message = (
"notimplementederror: make sure celery_result_backend is set"
)

with patch(self.CELERY_APP_CONTROL_PING, side_effect=NotImplementedError):
health_check.check_status()

assert len(health_check.errors) == 1
assert health_check.errors[0].message.lower() == expected_error_message

@pytest.mark.parametrize("ping_result", [None, list()])
def test_check_status_add_error_when_ping_result_failed(
self, ping_result, health_check
):
with patch(self.CELERY_APP_CONTROL_PING, return_value=ping_result):
health_check.check_status()

assert len(health_check.errors) == 1
assert "workers unavailable" in health_check.errors[0].message.lower()


class TestCeleryPingHealthCheckApps:
def test_apps(self):
assert HealthCheckConfig.name == "health_check.contrib.celery_ping"

celery_ping = apps.get_app_config("celery_ping")
assert celery_ping.name == "health_check.contrib.celery_ping"
2 changes: 1 addition & 1 deletion tests/testapp/celery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from celery import Celery

app = Celery('testapp')
app = Celery('testapp', broker='memory://')
app.config_from_object('django.conf:settings', namespace='CELERY')
11 changes: 7 additions & 4 deletions tests/testapp/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os.path
import uuid

from kombu import Queue

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DEBUG = True

Expand All @@ -28,6 +30,7 @@
'health_check.storage',
'health_check.contrib.celery',
'health_check.contrib.migrations',
'health_check.contrib.celery_ping',
'health_check.contrib.s3boto_storage',
'tests',
)
Expand Down Expand Up @@ -59,7 +62,7 @@

USE_L10N = True

CELERY_TASK_QUEUES = {
'default': {},
'queue2': {}
}
CELERY_QUEUES = [
Queue('default'),
Queue('queue2'),
]

0 comments on commit 60ac341

Please sign in to comment.