Skip to content
This repository has been archived by the owner on Nov 26, 2018. It is now read-only.

Example using Celery for plugin queue #147

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
web: manage.py runserver $WEB_PORT
plugins: manage.py run_plugins
plugins: celery -A botbot worker -l info
bot: botbot-bot -v=2 -logtostderr=true
5 changes: 5 additions & 0 deletions botbot/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
Empty file.
Empty file.
18 changes: 0 additions & 18 deletions botbot/apps/plugins/management/commands/run_plugins.py

This file was deleted.

67 changes: 12 additions & 55 deletions botbot/apps/plugins/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,7 @@ class PluginRunner(object):
Calls to plugins are done via greenlets
"""

def __init__(self, use_gevent=False):
if use_gevent:
import gevent
self.gevent = gevent
def __init__(self):
self.bot_bus = redis.StrictRedis.from_url(
settings.REDIS_PLUGIN_QUEUE_URL)
self.storage = redis.StrictRedis.from_url(
Expand Down Expand Up @@ -176,32 +173,15 @@ def register(self, plugin):
getattr(self, attr.route_rule[0] + '_router').setdefault(
plugin.slug, []).append((attr.route_rule[1], attr, plugin))

def listen(self):
"""Listens for incoming messages on the Redis queue"""
while 1:
val = None
try:
val = self.bot_bus.blpop('q', 1)

# Track q length
ql = self.bot_bus.llen('q')
statsd.gauge(".".join(["plugins", "q"]), ql)

if val:
_, val = val
LOG.debug('Recieved: %s', val)
line = Line(json.loads(val), self)

# Calculate the transport latency between go and the plugins.
delta = datetime.utcnow().replace(tzinfo=utc) - line._received
statsd.timing(".".join(["plugins", "latency"]),
delta.total_seconds() * 1000)
def process_line(self, line_json):
LOG.debug('Recieved: %s', line_json)
line = Line(json.loads(line_json), self)
# Calculate the transport latency between go and the plugins.
delta = datetime.utcnow().replace(tzinfo=utc) - line._received
statsd.timing(".".join(["plugins", "latency"]),
delta.total_seconds() * 1000)
self.dispatch(line)

self.dispatch(line)
except Exception:
LOG.error("Line Dispatch Failed", exc_info=True, extra={
"line": val
})

def dispatch(self, line):
"""Given a line, dispatch it to the right plugins & functions."""
Expand All @@ -214,16 +194,11 @@ def dispatch(self, line):
# firehose gets everything, no rule matching
LOG.info('Match: %s.%s', plugin_slug, func.__name__)
with statsd.timer(".".join(["plugins", plugin_slug])):
# FIXME: This will not have correct timing if go back to
# gevent.
channel_plugin = self.setup_plugin_for_channel(
plugin.__class__, line)
new_func = log_on_error(LOG, getattr(channel_plugin,
func.__name__))
if hasattr(self, 'gevent'):
self.gevent.Greenlet.spawn(new_func, line)
else:
channel_plugin.respond(new_func(line))
channel_plugin.respond(new_func(line))

# pass line to other routers
if line._is_message:
Expand Down Expand Up @@ -252,30 +227,12 @@ def check_for_plugin_route_matches(self, line, router):
if match:
LOG.info('Match: %s.%s', plugin_slug, func.__name__)
with statsd.timer(".".join(["plugins", plugin_slug])):
# FIXME: This will not have correct timing if go back to
# gevent.
# Instantiate a plugin specific to this channel
channel_plugin = self.setup_plugin_for_channel(
plugin.__class__, line)
# get the method from the channel-specific plugin
new_func = log_on_error(LOG, getattr(channel_plugin,
func.__name__))
if hasattr(self, 'gevent'):
grnlt = self.gevent.Greenlet(new_func, line,
**match.groupdict())
grnlt.link_value(channel_plugin.greenlet_respond)
grnlt.start()
else:
channel_plugin.respond(new_func(line,
**match.groupdict()))


def start_plugins(*args, **kwargs):
"""
Used by the management command to start-up plugin listener
and register the plugins.
"""
LOG.info('Starting plugins. Gevent=%s', kwargs['use_gevent'])
app = PluginRunner(**kwargs)
app.register_all_plugins()
app.listen()
channel_plugin.respond(new_func(line,
**match.groupdict()))
14 changes: 14 additions & 0 deletions botbot/apps/plugins/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from botbot.celery import app
from .runner import PluginRunner


runner = PluginRunner()
runner.register_all_plugins()

@app.task(bind=True)
def route_line(self, line_json):
try:
runner.process_line(line_json)
# For any error we retry after 10 seconds.
except Exception as exc:
raise self.retry(exc, countdown=10)
16 changes: 16 additions & 0 deletions botbot/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from __future__ import absolute_import

import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'botbot.settings')

from django.conf import settings

app = Celery('botbot')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
4 changes: 3 additions & 1 deletion botbot/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@
# Third party app settings
# ==============================================================================

# SOUTH_DATABASE_ADAPTERS = {'default': 'south.db.postgresql_psycopg2'}
CELERY_TASK_SERIALIZER='json'
CELERY_ACCEPT_CONTENT=['json']
BROKER_URL = REDIS_PLUGIN_QUEUE_URL

SOCIAL_AUTH_USER_MODEL = AUTH_USER_MODEL
SOCIAL_AUTH_PROTECTED_USER_FIELDS = ['email']
Expand Down
5 changes: 0 additions & 5 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@
import sys

if __name__ == "__main__":
if (len(sys.argv) > 1 and
'run_plugins' in sys.argv and '--with-gevent' in sys.argv):
# import gevent as soon as possible
from gevent import monkey; monkey.patch_all()
from psycogreen.gevent import patch_psycopg; patch_psycopg()

import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "botbot.settings")
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
django==1.7.2
celery==3.1.18

pytz
psycopg2==2.5.2
Expand Down