diff --git a/wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py b/wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py index d4440f58..96e63777 100644 --- a/wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py +++ b/wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py @@ -92,169 +92,176 @@ ["WSI"]) -def update_stations_gauge(station_list): - station_wsi._metrics.clear() - for station in station_list: - station_wsi.labels(station).set(1) - notify_wsi_total.labels(station).inc(0) - failure_wsi_total.labels(station).inc(0) - - -def init_stations_gauge(): - station_list = [] - # read currently configured stations from wis2box-api - url = 'http://wis2box-api:80/oapi/collections/stations/items?f=json' - res = requests.get(url) - try: - json_data = json.loads(res.content) - if 'description' in json_data: - if json_data['description'] == 'Collection not found': - logger.error("No stations configured yet") - station_list.append('none') # placeholder to init series +class MetricsCollector: + def __init__(self): + self.message_buffer = [] + self.buffer_lock = Lock() + + def update_stations_gauge(self, station_list): + """ + function to update the stations-gauge + + :param station_list: list of stations + + :returns: `None` + """ + + station_wsi._metrics.clear() + for station in station_list: + station_wsi.labels(station).set(1) + notify_wsi_total.labels(station).inc(0) + failure_wsi_total.labels(station).inc(0) + + def init_stations_gauge(self): + """ + function to initialize the stations-gauge + + :returns: `None` + """ + + station_list = [] + url = 'http://wis2box-api:80/oapi/collections/stations/items?f=json' + try: + res = requests.get(url) + json_data = json.loads(res.content) + if 'description' in json_data: + if json_data['description'] == 'Collection not found': + logger.error("No stations configured yet") + station_list.append('none') + else: + logger.error(json_data['description']) else: - logger.error(json_data['description']) + station_list = [item['id'] for item in json_data["features"]] + except Exception as err: + logger.error(f'Failed to update stations-gauge: {err}') + self.update_stations_gauge(station_list) + + def sub_connect(self, client, userdata, flags, rc, properties=None): + """ + function executed 'on_connect' for paho.mqtt.client + + :param client: client-object associated to 'on_connect' + :param userdata: userdata + :param flags: flags + :param rc: return-code received 'on_connect' + :param properties: properties + + :returns: `None` + """ + + logger.info(f"on connection to subscribe: {mqtt.connack_string(rc)}") + for s in ["wis2box/#", '$SYS/broker/messages/#']: + client.subscribe(s, qos=1) + + def sub_mqtt_metrics(self, client, userdata, msg): + """ + function executed 'on_message' for paho.mqtt.client + updates counters for each new message received + + :param client: client-object associated to 'on_message' + :param userdata: MQTT-userdata + :param msg: MQTT-message-object received by subscriber + + :returns: `None` + """ + + logger.debug(f"Received message on topic={msg.topic}") + + if msg.topic.startswith('$SYS/broker/messages/'): + if msg.topic.endswith('/sent'): + broker_msg_sent.set(float(msg.payload)) + elif msg.topic.endswith('/received'): + broker_msg_received.set(float(msg.payload)) + elif msg.topic.endswith('/stored'): + broker_msg_stored.set(float(msg.payload)) + elif msg.topic.endswith('/dropped'): + broker_msg_dropped.set(float(msg.payload)) else: - for item in json_data["features"]: - station_list.append(item['id']) - except Exception as err: - logger.error(f'Failed to update stations-gauge: {err}') - update_stations_gauge(station_list) - - -def sub_connect(client, userdata, flags, rc, properties=None): - """ - function executed 'on_connect' for paho.mqtt.client - - :param client: client-object associated to 'on_connect' - :param userdata: userdata - :param flags: flags - :param rc: return-code received 'on_connect' - :param properties: properties - - :returns: `None` - """ - - logger.info(f"on connection to subscribe: {mqtt.connack_string(rc)}") - for s in ["wis2box/#", '$SYS/broker/messages/#']: - client.subscribe(s, qos=1) - - -def sub_mqtt_metrics(client, userdata, msg): - """ - function executed 'on_message' for paho.mqtt.client - updates counters for each new message received - - :param client: client-object associated to 'on_message' - :param userdata: MQTT-userdata - :param msg: MQTT-message-object received by subscriber - - :returns: `None` - """ - - logger.debug(f"Received message on topic={msg.topic}") - - if msg.topic.startswith('$SYS/broker/messages/sent'): - broker_msg_sent.set(msg.payload) - elif msg.topic.startswith('$SYS/broker/messages/received'): - broker_msg_received.set(msg.payload) - elif msg.topic.startswith('$SYS/broker/messages/stored'): - broker_msg_received.set(msg.payload) - elif msg.topic.startswith('$SYS/broker/messages/dropped'): - broker_msg_received.set(msg.payload) - - if msg.topic.startswith('$SYS'): - return - - with buffer_lock: - message_buffer.append((msg.topic, msg)) - # Process buffered messages if buffer size reaches a threshold - if len(message_buffer) >= 100: # Adjust this threshold as needed - process_buffered_messages() - - -def process_buffered_messages(): - global message_buffer - - with buffer_lock: - messages_to_process = message_buffer - message_buffer = [] - - for topic, msg in messages_to_process: - m = json.loads(msg.payload.decode('utf-8')) - if topic.startswith('wis2box/stations'): - update_stations_gauge(m['station_list']) - elif topic.startswith('wis2box/notifications'): - wsi = 'none' - if 'wigos_station_identifier' in m['properties']: - wsi = m['properties']['wigos_station_identifier'] - # if label wsi is not in notify_wsi_total, set to 0 and sleep 5s - if (wsi,) not in notify_wsi_total._metrics: - notify_wsi_total.labels(wsi).inc(0) + with self.buffer_lock: + self.message_buffer.append((msg.topic, msg)) + if len(self.message_buffer) >= 100: + self.process_buffered_messages() + + def process_buffered_messages(self): + """ + function to process buffered messages + + :returns: `None` + """ + + with self.buffer_lock: + messages_to_process = self.message_buffer + self.message_buffer = [] + + for topic, msg in messages_to_process: + m = json.loads(msg.payload.decode('utf-8')) + if topic.startswith('wis2box/stations'): + self.update_stations_gauge(m['station_list']) + elif topic.startswith('wis2box/notifications'): + wsi = m['properties'].get('wigos_station_identifier', 'none') + if (wsi,) not in notify_wsi_total._metrics: + notify_wsi_total.labels(wsi).inc(0) + failure_wsi_total.labels(wsi).inc(0) + station_wsi.labels(wsi).set(1) + time.sleep(5) + notify_wsi_total.labels(wsi).inc(1) failure_wsi_total.labels(wsi).inc(0) - station_wsi.labels(wsi).set(1) - time.sleep(5) - notify_wsi_total.labels(wsi).inc(1) - failure_wsi_total.labels(wsi).inc(0) - notify_total.inc(1) - elif topic.startswith('wis2box/failure'): - wsi = 'none' - if 'wigos_station_identifier' in m: - wsi = m['wigos_station_identifier'] + notify_total.inc(1) + elif topic.startswith('wis2box/failure'): + wsi = m.get('wigos_station_identifier', 'none') notify_wsi_total.labels(wsi).inc(0) failure_wsi_total.labels(wsi).inc(1) - failure_total.inc(1) - elif topic.startswith('wis2box/storage'): - if str(m["Key"]).startswith('wis2box-incoming'): - storage_incoming_total.inc(1) - if str(m["Key"]).startswith('wis2box-public'): - storage_public_total.inc(1) - - -# Call this function periodically, e.g., in a separate thread -def periodic_buffer_processing(): - while True: - process_buffered_messages() - time.sleep(1) # Adjust sleep time as needed - - -def gather_mqtt_metrics(): - """ - setup mqtt-client to monitor metrics from broker on this box - - :returns: `None` - """ - - # connect to the internal broker - broker_host = os.environ.get('WIS2BOX_BROKER_HOST', '') - broker_username = os.environ.get('WIS2BOX_BROKER_USERNAME', '') - broker_password = os.environ.get('WIS2BOX_BROKER_PASSWORD', '') - broker_port = int(os.environ.get('WIS2BOX_BROKER_PORT', '1883')) - - # generate a random clientId for the mqtt-session - r = random.Random() - client_id = f"mqtt_metrics_collector_{r.randint(1,1000):04d}" - try: - logger.info("setup connection") - logger.info(f"host={broker_host}, user={broker_username}") - client = mqtt.Client(client_id=client_id, protocol=mqtt.MQTTv5) - client.on_connect = sub_connect - client.on_message = sub_mqtt_metrics - client.username_pw_set(broker_username, broker_password) - client.connect(broker_host, broker_port) - client.loop_forever() - except Exception as err: - logger.error(f"Failed to setup MQTT-client with error: {err}") + failure_total.inc(1) + elif topic.startswith('wis2box/storage'): + if str(m["Key"]).startswith('wis2box-incoming'): + storage_incoming_total.inc(1) + if str(m["Key"]).startswith('wis2box-public'): + storage_public_total.inc(1) + + def periodic_buffer_processing(self): + """ + function to process buffered messages every second + + :returns: `None` + """ + + while True: + self.process_buffered_messages() + time.sleep(1) + + def gather_mqtt_metrics(self): + """ + setup mqtt-client to monitor metrics from broker on this box + + :returns: `None` + """ + + broker_host = os.environ.get('WIS2BOX_BROKER_HOST', '') + broker_username = os.environ.get('WIS2BOX_BROKER_USERNAME', '') + broker_password = os.environ.get('WIS2BOX_BROKER_PASSWORD', '') + broker_port = int(os.environ.get('WIS2BOX_BROKER_PORT', '1883')) + + r = random.Random() + client_id = f"mqtt_metrics_collector_{r.randint(1,1000):04d}" + try: + logger.info(f"setup connection: host={broker_host}, user={broker_username}") # noqa + client = mqtt.Client(client_id=client_id, protocol=mqtt.MQTTv5) + client.on_connect = self.sub_connect + client.on_message = self.sub_mqtt_metrics + client.username_pw_set(broker_username, broker_password) + client.connect(broker_host, broker_port) + client.loop_forever() + except Exception as err: + logger.error(f"Failed to setup MQTT-client with error: {err}") def main(): start_http_server(8001) + collector = MetricsCollector() + collector.init_stations_gauge() - init_stations_gauge() - - # Start periodic buffer processing - Thread(target=periodic_buffer_processing, daemon=True).start() - - gather_mqtt_metrics() + Thread(target=collector.periodic_buffer_processing, daemon=True).start() + collector.gather_mqtt_metrics() if __name__ == '__main__':