Skip to content

Commit

Permalink
Issue 781 monitoring (#782)
Browse files Browse the repository at this point in the history
* process message in batches

* adjust thresholds

* add logging

* remove print statements

* remove print, fix flake8

* remove cast to str

* turn metrics-collector into a class
  • Loading branch information
maaikelimper authored Oct 1, 2024
1 parent 97bdaf7 commit 707d4df
Showing 1 changed file with 170 additions and 136 deletions.
306 changes: 170 additions & 136 deletions wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,17 @@
import json
import time

from threading import Lock
from threading import Thread

from prometheus_client import start_http_server, Counter, Gauge

# de-register default-collectors
from prometheus_client import REGISTRY, PROCESS_COLLECTOR, PLATFORM_COLLECTOR

message_buffer = []
buffer_lock = Lock()

REGISTRY.unregister(PROCESS_COLLECTOR)
REGISTRY.unregister(PLATFORM_COLLECTOR)

Expand Down Expand Up @@ -86,148 +92,176 @@
["WSI"])


def update_stations_gauge(station_list):
station_wsi._metrics.clear()
for station in station_list:
station_wsi.labels(station).set(1)
notify_wsi_total.labels(station).inc(0)
failure_wsi_total.labels(station).inc(0)


def init_stations_gauge():
station_list = []
# read currently configured stations from wis2box-api
url = 'http://wis2box-api:80/oapi/collections/stations/items?f=json'
res = requests.get(url)
try:
json_data = json.loads(res.content)
if 'description' in json_data:
if json_data['description'] == 'Collection not found':
logger.error("No stations configured yet")
station_list.append('none') # placeholder to init series
class MetricsCollector:
def __init__(self):
self.message_buffer = []
self.buffer_lock = Lock()

def update_stations_gauge(self, station_list):
"""
function to update the stations-gauge
:param station_list: list of stations
:returns: `None`
"""

station_wsi._metrics.clear()
for station in station_list:
station_wsi.labels(station).set(1)
notify_wsi_total.labels(station).inc(0)
failure_wsi_total.labels(station).inc(0)

def init_stations_gauge(self):
"""
function to initialize the stations-gauge
:returns: `None`
"""

station_list = []
url = 'http://wis2box-api:80/oapi/collections/stations/items?f=json'
try:
res = requests.get(url)
json_data = json.loads(res.content)
if 'description' in json_data:
if json_data['description'] == 'Collection not found':
logger.error("No stations configured yet")
station_list.append('none')
else:
logger.error(json_data['description'])
else:
logger.error(json_data['description'])
station_list = [item['id'] for item in json_data["features"]]
except Exception as err:
logger.error(f'Failed to update stations-gauge: {err}')
self.update_stations_gauge(station_list)

def sub_connect(self, client, userdata, flags, rc, properties=None):
"""
function executed 'on_connect' for paho.mqtt.client
:param client: client-object associated to 'on_connect'
:param userdata: userdata
:param flags: flags
:param rc: return-code received 'on_connect'
:param properties: properties
:returns: `None`
"""

logger.info(f"on connection to subscribe: {mqtt.connack_string(rc)}")
for s in ["wis2box/#", '$SYS/broker/messages/#']:
client.subscribe(s, qos=1)

def sub_mqtt_metrics(self, client, userdata, msg):
"""
function executed 'on_message' for paho.mqtt.client
updates counters for each new message received
:param client: client-object associated to 'on_message'
:param userdata: MQTT-userdata
:param msg: MQTT-message-object received by subscriber
:returns: `None`
"""

logger.debug(f"Received message on topic={msg.topic}")

if msg.topic.startswith('$SYS/broker/messages/'):
if msg.topic.endswith('/sent'):
broker_msg_sent.set(float(msg.payload))
elif msg.topic.endswith('/received'):
broker_msg_received.set(float(msg.payload))
elif msg.topic.endswith('/stored'):
broker_msg_stored.set(float(msg.payload))
elif msg.topic.endswith('/dropped'):
broker_msg_dropped.set(float(msg.payload))
else:
for item in json_data["features"]:
station_list.append(item['id'])
except Exception as err:
logger.error(f'Failed to update stations-gauge: {err}')
update_stations_gauge(station_list)


def sub_connect(client, userdata, flags, rc, properties=None):
"""
function executed 'on_connect' for paho.mqtt.client
:param client: client-object associated to 'on_connect'
:param userdata: userdata
:param flags: flags
:param rc: return-code received 'on_connect'
:param properties: properties
:returns: `None`
"""

logger.info(f"on connection to subscribe: {mqtt.connack_string(rc)}")
for s in ["wis2box/#", '$SYS/broker/messages/#']:
print(f'subscribe to: {s}')
client.subscribe(s, qos=0)


def sub_mqtt_metrics(client, userdata, msg):
"""
function executed 'on_message' for paho.mqtt.client
updates counters for each new message received
:param client: client-object associated to 'on_message'
:param userdata: MQTT-userdata
:param msg: MQTT-message-object received by subscriber
:returns: `None`
"""

logger.debug(f"Received message on topic={msg.topic}")

if str(msg.topic).startswith('$SYS/broker/messages/sent'):
broker_msg_sent.set(msg.payload)
elif str(msg.topic).startswith('$SYS/broker/messages/received'):
broker_msg_received.set(msg.payload)
elif str(msg.topic).startswith('$SYS/broker/messages/stored'):
broker_msg_received.set(msg.payload)
elif str(msg.topic).startswith('$SYS/broker/messages/dropped'):
broker_msg_received.set(msg.payload)

if str(msg.topic).startswith('$SYS'):
return

m = json.loads(msg.payload.decode('utf-8'))
if str(msg.topic).startswith('wis2box/stations'):
update_stations_gauge(m['station_list'])
elif str(msg.topic).startswith('wis2box/notifications'):
wsi = 'none'
if 'wigos_station_identifier' in m['properties']:
wsi = m['properties']['wigos_station_identifier']
# if label wsi is not in notify_wsi_total, set to 0 and sleep 5s
if wsi not in notify_wsi_total._metrics:
logger.info(f"new station: {wsi}, sleep 5s before incrementing")
notify_wsi_total.labels(wsi).inc(0)
failure_wsi_total.labels(wsi).inc(0)
station_wsi.labels(wsi).set(1)
time.sleep(5)
notify_wsi_total.labels(wsi).inc(1)
failure_wsi_total.labels(wsi).inc(0)
station_wsi.labels(wsi).set(1)
notify_total.inc(1)
elif str(msg.topic).startswith('wis2box/failure'):
descr = m['description'] if 'description' in m else 'none'
wsi = 'none'
if 'wigos_station_identifier' in m:
wsi = m['wigos_station_identifier']
failure_descr_wsi_total.labels(descr, wsi).inc(1)
notify_wsi_total.labels(wsi).inc(0)
failure_wsi_total.labels(wsi).inc(1)
station_wsi.labels(wsi).set(1)
failure_total.inc(1)
elif str(msg.topic).startswith('wis2box/storage'):
if str(m["Key"]).startswith('wis2box-incoming'):
storage_incoming_total.inc(1)
if str(m["Key"]).startswith('wis2box-public'):
storage_public_total.inc(1)


def gather_mqtt_metrics():
"""
setup mqtt-client to monitor metrics from broker on this box
:returns: `None`
"""

# connect to the internal broker
broker_host = os.environ.get('WIS2BOX_BROKER_HOST', '')
broker_username = os.environ.get('WIS2BOX_BROKER_USERNAME', '')
broker_password = os.environ.get('WIS2BOX_BROKER_PASSWORD', '')
broker_port = int(os.environ.get('WIS2BOX_BROKER_PORT', '1883'))

# generate a random clientId for the mqtt-session
r = random.Random()
client_id = f"mqtt_metrics_collector_{r.randint(1,1000):04d}"
try:
logger.info("setup connection")
logger.info(f"host={broker_host}, user={broker_username}")
client = mqtt.Client(client_id=client_id, protocol=mqtt.MQTTv5)
client.on_connect = sub_connect
client.on_message = sub_mqtt_metrics
client.username_pw_set(broker_username, broker_password)
client.connect(broker_host, broker_port)
client.loop_forever()
except Exception as err:
logger.error(f"Failed to setup MQTT-client with error: {err}")
with self.buffer_lock:
self.message_buffer.append((msg.topic, msg))
if len(self.message_buffer) >= 100:
self.process_buffered_messages()

def process_buffered_messages(self):
"""
function to process buffered messages
:returns: `None`
"""

with self.buffer_lock:
messages_to_process = self.message_buffer
self.message_buffer = []

for topic, msg in messages_to_process:
m = json.loads(msg.payload.decode('utf-8'))
if topic.startswith('wis2box/stations'):
self.update_stations_gauge(m['station_list'])
elif topic.startswith('wis2box/notifications'):
wsi = m['properties'].get('wigos_station_identifier', 'none')
if (wsi,) not in notify_wsi_total._metrics:
notify_wsi_total.labels(wsi).inc(0)
failure_wsi_total.labels(wsi).inc(0)
station_wsi.labels(wsi).set(1)
time.sleep(5)
notify_wsi_total.labels(wsi).inc(1)
failure_wsi_total.labels(wsi).inc(0)
notify_total.inc(1)
elif topic.startswith('wis2box/failure'):
wsi = m.get('wigos_station_identifier', 'none')
notify_wsi_total.labels(wsi).inc(0)
failure_wsi_total.labels(wsi).inc(1)
failure_total.inc(1)
elif topic.startswith('wis2box/storage'):
if str(m["Key"]).startswith('wis2box-incoming'):
storage_incoming_total.inc(1)
if str(m["Key"]).startswith('wis2box-public'):
storage_public_total.inc(1)

def periodic_buffer_processing(self):
"""
function to process buffered messages every second
:returns: `None`
"""

while True:
self.process_buffered_messages()
time.sleep(1)

def gather_mqtt_metrics(self):
"""
setup mqtt-client to monitor metrics from broker on this box
:returns: `None`
"""

broker_host = os.environ.get('WIS2BOX_BROKER_HOST', '')
broker_username = os.environ.get('WIS2BOX_BROKER_USERNAME', '')
broker_password = os.environ.get('WIS2BOX_BROKER_PASSWORD', '')
broker_port = int(os.environ.get('WIS2BOX_BROKER_PORT', '1883'))

r = random.Random()
client_id = f"mqtt_metrics_collector_{r.randint(1,1000):04d}"
try:
logger.info(f"setup connection: host={broker_host}, user={broker_username}") # noqa
client = mqtt.Client(client_id=client_id, protocol=mqtt.MQTTv5)
client.on_connect = self.sub_connect
client.on_message = self.sub_mqtt_metrics
client.username_pw_set(broker_username, broker_password)
client.connect(broker_host, broker_port)
client.loop_forever()
except Exception as err:
logger.error(f"Failed to setup MQTT-client with error: {err}")


def main():
start_http_server(8001)
init_stations_gauge()
gather_mqtt_metrics()
collector = MetricsCollector()
collector.init_stations_gauge()

Thread(target=collector.periodic_buffer_processing, daemon=True).start()
collector.gather_mqtt_metrics()


if __name__ == '__main__':
Expand Down

0 comments on commit 707d4df

Please sign in to comment.