Skip to content

Commit

Permalink
Add JSON API #382
Browse files Browse the repository at this point in the history
  • Loading branch information
svpcom committed Nov 23, 2024
1 parent 6dfd2c0 commit b1ccf75
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 13 deletions.
9 changes: 7 additions & 2 deletions wfb_ng/conf/master.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ base_port_node = 11000 # UDP ports allocated on nodes

## Required fiels in top level profiles
## 1. "streams" -- that have required keys: "name", "service_type" and "profiles". Any other k/v pairs can be used to override values inherided from low level profiles.
## 2. "stats_port" -- UDP port for CLI. Should be int or None to disable statistics completely.
## 3. "link_domain" -- moved from common.link_id
## 2. "stats_port" -- TCP port for CLI. Should be int or None to disable statistics completely.
## 3. "api_port" -- TCP port for public JSON API. Should be int or None to disable API.
## 4. "link_domain" -- moved from common.link_id

## All streams can be point-to-point(one- or two-way transfers) or point-to-multipoint(one-way transfers only), but not multipoint-to-point or multipoint-to-multipoint
## In case of one-way stream you can specify only "stream_rx" on RX side and "stream_tx" on TX side.
Expand All @@ -125,6 +126,7 @@ streams = [{'name': 'video', 'stream_rx': None, 'stream_tx': 0x00, 'service_ty
]

stats_port = 8002 # used by wfb-cli, set to None completely disable statistics API
api_port = 8102 # public JSON API

link_domain = "default" # It will be hashed and mapped to three bytes of MAC
# You can use different link ids for multi-vehicle setup without stream remapping.
Expand All @@ -140,6 +142,7 @@ streams = [{'name': 'video', 'stream_rx': 0x00, 'stream_tx': None, 'service_ty
]

stats_port = 8003 # used by wfb-cli
api_port = 8103 # public JSON API
link_domain = "default"

## Example of custom profiles not related to drones
Expand All @@ -149,13 +152,15 @@ streams = [{'name': 'audio', 'stream_rx': 0xb0, 'stream_tx': 0x30, 'service_type
'profiles': ['base', 'radio_base'], 'peer': 'listen://127.0.0.1:1235', 'keypair': 'drone.key'}]

stats_port = 8004
api_port = 8104
link_domain = "test_two_way_udp"

[two_way_udp_example_side_b]
streams = [{'name': 'audio', 'stream_rx': 0x30, 'stream_tx': 0xb0, 'service_type': 'udp_proxy',
'profiles': ['base', 'radio_base'], 'peer': 'connect://127.0.0.1:1234', 'keypair': 'gs.key'}]

stats_port = 8005
api_port = 8105
link_domain = "test_two_way_udp"

###########################################################################################################################
Expand Down
69 changes: 61 additions & 8 deletions wfb_ng/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import msgpack
import os
import time
import json

from itertools import groupby
from copy import deepcopy
from twisted.python import log, failure
from twisted.internet import reactor, defer, threads, task
from twisted.internet.protocol import ProcessProtocol, Factory
Expand All @@ -42,9 +44,7 @@ class WFBFlags(object):

fec_types = {1: 'VDM_RS'}

class StatisticsProtocol(Int32StringReceiver):
MAX_LENGTH = 1024 * 1024

class StatisticsMsgPackProtocol(Int32StringReceiver):
def connectionMade(self):
# Push all config values for CLI into session
# to allow CLI run without config file
Expand All @@ -69,6 +69,43 @@ def send_stats(self, data):
self.sendString(msgpack.packb(data, use_bin_type=True))


class StatisticsJSONProtocol(LineReceiver):
delimiter = b'\n'

def connectionMade(self):
# Push all config values on the start
msg = json.dumps(dict(type='settings',
profile=self.factory.profile,
is_cluster=self.factory.is_cluster,
settings = deepcopy(settings)))

self.sendLine(msg.encode('utf-8'))
self.factory.ui_sessions.append(self)

def lineReceived(self, line):
pass

def connectionLost(self, reason):
self.factory.ui_sessions.remove(self)

def send_stats(self, data):
data = dict(data)

if data['type'] == 'rx':
ka = ('ant', 'freq', 'mcs', 'bw')
va = ('pkt_recv', 'rssi_min', 'rssi_avg', 'rssi_max', 'snr_min', 'snr_avg', 'snr_max')
data['rx_ant_stats'] = list(dict(zip(ka + va, (ant_id,) + k + v))
for (k, ant_id), v in data.pop('rx_ant_stats').items())
elif data['type'] == 'tx':
ka = ('ant',)
va = ('pkt_sent', 'pkt_drop', 'lat_min', 'lat_avg', 'lat_max')
data['tx_ant_stats'] = list(dict(zip(ka + va, (k,) + v))
for k, v in data.pop('latency').items())

msg = json.dumps(data)
self.sendLine(msg.encode('utf-8'))


class RFTempMeter(object):
def __init__(self, wlans, measurement_interval):
# RF module temperature by rf_path
Expand Down Expand Up @@ -112,16 +149,33 @@ def _got_temp(temp_d):
return threads.deferToThread(_read_temperature).addCallback(_got_temp)


class StatsAndSelectorFactory(Factory):

class MsgPackAPIFactory(Factory):
noisy = False
protocol = StatisticsProtocol
protocol = StatisticsMsgPackProtocol

def __init__(self, ui_sessions, is_cluster=False, cli_title=None):
self.ui_sessions = ui_sessions
self.is_cluster = is_cluster
self.cli_title = cli_title


class JSONAPIFactory(Factory):
noisy = False
protocol = StatisticsJSONProtocol

def __init__(self, ui_sessions, is_cluster=False, profile=None):
self.ui_sessions = ui_sessions
self.is_cluster = is_cluster
self.profile = profile


class AntStatsAndSelector(object):
"""
Aggregate RX stats and select TX antenna
"""

def __init__(self, logger, cli_title=None, rf_temp_meter=None, is_cluster=False, rx_only_wlan_ids=None):
self.is_cluster = is_cluster
def __init__(self, logger, rx_only_wlan_ids=None, rf_temp_meter=None):
self.rx_only_wlan_ids = rx_only_wlan_ids or set()
self.ant_sel_cb_list = []
self.rssi_cb_l = []
Expand All @@ -141,7 +195,6 @@ def __init__(self, logger, cli_title=None, rf_temp_meter=None, is_cluster=False,
if logger is not None:
self.ui_sessions.append(logger)

self.cli_title = cli_title
self.rf_temp_meter = rf_temp_meter

self.lc = task.LoopingCall(self.aggregate_stats)
Expand Down
11 changes: 8 additions & 3 deletions wfb_ng/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from . import _log_msg, ConsoleObserver, ErrorSafeLogFile, call_and_check_rc, ExecError, version_msg
from .common import abort_on_crash, exit_status, df_sleep, search_attr
from .protocols import StatsAndSelectorFactory, RFTempMeter, SSHClientProtocol
from .protocols import AntStatsAndSelector, RFTempMeter, SSHClientProtocol, MsgPackAPIFactory, JSONAPIFactory
from .services import parse_services, init_udp_direct_tx, init_udp_direct_rx, init_mavlink, init_tunnel, init_udp_proxy, hash_link_domain, bandwidth_map
from .cluster import parse_cluster_services, gen_cluster_scripts
from .conf import settings, cfg_files
Expand Down Expand Up @@ -216,13 +216,18 @@ def _cleanup(x):
'cluster' if is_cluster else ', '.join(wlans),
profile_cfg.link_domain)

ant_sel_f = StatsAndSelectorFactory(logger, cli_title, rf_temp_meter, is_cluster, rx_only_wlan_ids)
ant_sel_f = AntStatsAndSelector(logger, rx_only_wlan_ids, rf_temp_meter)
cleanup_l.append(ant_sel_f)

link_id = hash_link_domain(profile_cfg.link_domain)

if profile_cfg.stats_port:
sockets.append(reactor.listenTCP(profile_cfg.stats_port, ant_sel_f))
p_f = MsgPackAPIFactory(ant_sel_f.ui_sessions, is_cluster, cli_title)
sockets.append(reactor.listenTCP(profile_cfg.stats_port, p_f))

if profile_cfg.api_port:
p_f = JSONAPIFactory(ant_sel_f.ui_sessions, is_cluster, profile)
sockets.append(reactor.listenTCP(profile_cfg.api_port, p_f))

for service_name, service_type, srv_cfg in service_list:
log.msg('Starting %s/%s@%s' % (profile, service_name, profile_cfg.link_domain))
Expand Down

0 comments on commit b1ccf75

Please sign in to comment.