Skip to content

Commit

Permalink
feat: added redis support as db for persistent with ssl config
Browse files Browse the repository at this point in the history
  • Loading branch information
NitinSingh8 committed Dec 5, 2024
1 parent d689888 commit dbacd26
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 11 deletions.
11 changes: 11 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ You can also run Flower using the docker image ::

In this example, Flower is using the `tasks.app` defined in the `examples/tasks.py <https://github.com/mher/flower/blob/master/examples/tasks.py>`_ file

## Use Redis as a database for Persistence
To Use Redis as a database for Persistence, you need to pass the following arguments while running Flower:
- redis_host: The host of the Redis server. (Default: None)
- redis_port: The port of the Redis server. (Default: None)
- redis_db: The database number of the Redis server. (Default: 0)
- redis_key: The key to use for storing the state data in Redis. (Default: flower)
- redis_ssl: Whether to use SSL for the Redis connection. (Default: False)

$ celery -A tasks.app flower --broker=amqp://guest:guest@localhost:5672// --redis_host=localhost --redis_port=6379 --redis_db=0 --redis_key=flower --redis_ssl=True


API
---

Expand Down
7 changes: 6 additions & 1 deletion flower/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ def __init__(self, options=None, capp=None, events=None,
enable_events=self.options.enable_events,
io_loop=self.io_loop,
max_workers_in_memory=self.options.max_workers,
max_tasks_in_memory=self.options.max_tasks)
max_tasks_in_memory=self.options.max_tasks,
redis_host=self.options.redis_host,
redis_port=self.options.redis_port,
redis_db=self.options.redis_db,
redis_ssl=self.options.redis_ssl,
redis_key=self.options.redis_key)
self.started = False

def start(self):
Expand Down
47 changes: 37 additions & 10 deletions flower/events.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import collections
import logging
import pickle
import redis
import shelve
import threading
import time
from collections import Counter
from functools import partial

from celery.events import EventReceiver
from celery.events.state import State
from prometheus_client import Counter as PrometheusCounter
Expand Down Expand Up @@ -129,12 +130,32 @@ def __init__(self, capp, io_loop, db=None, persistent=False,
self.state = None
self.state_save_timer = None

self.redis_host = kwargs.get('redis_host', None)
self.redis_port = kwargs.get('redis_port', None)
self.redis_db = kwargs.get('redis_db', None)
self.ssl = kwargs.get('ssl', False)
self.redis_key = kwargs.get('redis_key', 'flower')

# Check if we are using Redis as the database for persistence
self.redis_as_db = self.redis_host and self.redis_port and self.redis_db

if self.persistent:
logger.debug("Loading state from '%s'...", self.db)
state = shelve.open(self.db)
if state:
self.state = state['events']
state.close()
if self.redis_as_db:
logger.debug("Loading state from Redis...")
redis_client = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=self.redis_db, ssl=self.ssl)
state_data = redis_client.get(self.redis_key)
redis_client.close()
if state_data:
self.state = pickle.loads(state_data)
logger.debug(f"State loaded from Redis: {self.state}")
else:
self.state = None
else:
logger.debug(f"Loading state from file '{self.db}'...")
state = shelve.open(self.db)
if state:
self.state = state['events']
state.close()

if state_save_interval:
self.state_save_timer = PeriodicCallback(self.save_state,
Expand Down Expand Up @@ -195,10 +216,16 @@ def run(self):
time.sleep(try_interval)

def save_state(self):
logger.debug("Saving state to '%s'...", self.db)
state = shelve.open(self.db, flag='n')
state['events'] = self.state
state.close()
logger.debug("Saving state...")
if self.redis_as_db:
redis_client = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=self.redis_db, ssl=self.ssl)
state_data = pickle.dumps(self.state)
redis_client.set(self.redis_key, state_data)
redis_client.close()
else:
state = shelve.open(self.db, flag='n')
state['events'] = self.state
state.close()

def on_enable_events(self):
# Periodically enable events for workers
Expand Down
10 changes: 10 additions & 0 deletions flower/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@
help="flower database file")
define("persistent", type=bool, default=False,
help="enable persistent mode")
define("redis_host", type=str, default=None,
help="Redis host")
define("redis_port", type=int, default=None,
help="Redis port")
define("redis_db", type=int, default=None,
help="Redis database")
define("redis_ssl", type=bool, default=False,
help="Use SSL for Redis connection")
define("redis_key", type=str, default='flower',
help="Redis key")
define("state_save_interval", type=int, default=0,
help="state save interval (in milliseconds)")
define("broker_api", type=str, default=None,
Expand Down

0 comments on commit dbacd26

Please sign in to comment.