From dbacd2632c9caf57d706c0a0b4e3e3ca2c2f7297 Mon Sep 17 00:00:00 2001 From: nitinsingh Date: Thu, 5 Dec 2024 23:33:23 +0530 Subject: [PATCH] feat: added redis support as db for persistent with ssl config --- README.rst | 11 +++++++++++ flower/app.py | 7 ++++++- flower/events.py | 47 +++++++++++++++++++++++++++++++++++++---------- flower/options.py | 10 ++++++++++ 4 files changed, 64 insertions(+), 11 deletions(-) diff --git a/README.rst b/README.rst index c3b8b07e..609e9faf 100644 --- a/README.rst +++ b/README.rst @@ -69,6 +69,17 @@ You can also run Flower using the docker image :: In this example, Flower is using the `tasks.app` defined in the `examples/tasks.py `_ file +## Use Redis as a database for Persistence +To Use Redis as a database for Persistence, you need to pass the following arguments while running Flower: + - redis_host: The host of the Redis server. (Default: None) + - redis_port: The port of the Redis server. (Default: None) + - redis_db: The database number of the Redis server. (Default: 0) + - redis_key: The key to use for storing the state data in Redis. (Default: flower) + - redis_ssl: Whether to use SSL for the Redis connection. (Default: False) + + $ celery -A tasks.app flower --broker=amqp://guest:guest@localhost:5672// --redis_host=localhost --redis_port=6379 --redis_db=0 --redis_key=flower --redis_ssl=True + + API --- diff --git a/flower/app.py b/flower/app.py index 3427e098..3db1872c 100644 --- a/flower/app.py +++ b/flower/app.py @@ -62,7 +62,12 @@ def __init__(self, options=None, capp=None, events=None, enable_events=self.options.enable_events, io_loop=self.io_loop, max_workers_in_memory=self.options.max_workers, - max_tasks_in_memory=self.options.max_tasks) + max_tasks_in_memory=self.options.max_tasks, + redis_host=self.options.redis_host, + redis_port=self.options.redis_port, + redis_db=self.options.redis_db, + redis_ssl=self.options.redis_ssl, + redis_key=self.options.redis_key) self.started = False def start(self): diff --git a/flower/events.py b/flower/events.py index cd15d7a2..0fc82070 100644 --- a/flower/events.py +++ b/flower/events.py @@ -1,11 +1,12 @@ import collections import logging +import pickle +import redis import shelve import threading import time from collections import Counter from functools import partial - from celery.events import EventReceiver from celery.events.state import State from prometheus_client import Counter as PrometheusCounter @@ -129,12 +130,32 @@ def __init__(self, capp, io_loop, db=None, persistent=False, self.state = None self.state_save_timer = None + self.redis_host = kwargs.get('redis_host', None) + self.redis_port = kwargs.get('redis_port', None) + self.redis_db = kwargs.get('redis_db', None) + self.ssl = kwargs.get('ssl', False) + self.redis_key = kwargs.get('redis_key', 'flower') + + # Check if we are using Redis as the database for persistence + self.redis_as_db = self.redis_host and self.redis_port and self.redis_db + if self.persistent: - logger.debug("Loading state from '%s'...", self.db) - state = shelve.open(self.db) - if state: - self.state = state['events'] - state.close() + if self.redis_as_db: + logger.debug("Loading state from Redis...") + redis_client = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=self.redis_db, ssl=self.ssl) + state_data = redis_client.get(self.redis_key) + redis_client.close() + if state_data: + self.state = pickle.loads(state_data) + logger.debug(f"State loaded from Redis: {self.state}") + else: + self.state = None + else: + logger.debug(f"Loading state from file '{self.db}'...") + state = shelve.open(self.db) + if state: + self.state = state['events'] + state.close() if state_save_interval: self.state_save_timer = PeriodicCallback(self.save_state, @@ -195,10 +216,16 @@ def run(self): time.sleep(try_interval) def save_state(self): - logger.debug("Saving state to '%s'...", self.db) - state = shelve.open(self.db, flag='n') - state['events'] = self.state - state.close() + logger.debug("Saving state...") + if self.redis_as_db: + redis_client = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=self.redis_db, ssl=self.ssl) + state_data = pickle.dumps(self.state) + redis_client.set(self.redis_key, state_data) + redis_client.close() + else: + state = shelve.open(self.db, flag='n') + state['events'] = self.state + state.close() def on_enable_events(self): # Periodically enable events for workers diff --git a/flower/options.py b/flower/options.py index 083d4b5b..18e26c7f 100644 --- a/flower/options.py +++ b/flower/options.py @@ -35,6 +35,16 @@ help="flower database file") define("persistent", type=bool, default=False, help="enable persistent mode") +define("redis_host", type=str, default=None, + help="Redis host") +define("redis_port", type=int, default=None, + help="Redis port") +define("redis_db", type=int, default=None, + help="Redis database") +define("redis_ssl", type=bool, default=False, + help="Use SSL for Redis connection") +define("redis_key", type=str, default='flower', + help="Redis key") define("state_save_interval", type=int, default=0, help="state save interval (in milliseconds)") define("broker_api", type=str, default=None,