Skip to content

Commit

Permalink
Refactoring logging and ipc subsystem
Browse files Browse the repository at this point in the history
  • Loading branch information
svpcom committed Nov 29, 2024
1 parent fbd9a61 commit d1b3600
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 103 deletions.
89 changes: 47 additions & 42 deletions src/rx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ void Receiver::loop_iter(void)
} /* while more rt headers */

if (ret != -ENOENT && ant_idx < RX_ANT_MAX){
fprintf(stderr, "Error parsing radiotap header!\n");
WFB_ERR("Error parsing radiotap header!\n");
continue;
}

Expand All @@ -244,21 +244,20 @@ void Receiver::loop_iter(void)

if (flags & IEEE80211_RADIOTAP_F_BADFCS)
{
fprintf(stderr, "Got packet with bad fsc\n");
WFB_ERR("Got packet with bad fsc\n");
continue;
}

/* discard the radiotap header part */
pkt += iterator._max_length;
pktlen -= iterator._max_length;

//fprintf(stderr, "CAPTURE: mcs: %u, bw: %u\n", mcs_index, bandwidth);
if (pktlen > (int)sizeof(ieee80211_header))
{
agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header),
wlan_idx, antenna, rssi, noise, freq, mcs_index, bandwidth, NULL);
} else {
fprintf(stderr, "Short packet (ieee header)\n");
WFB_ERR("Short packet (ieee header)\n");
continue;
}
}
Expand Down Expand Up @@ -428,9 +427,7 @@ int Aggregator::rx_ring_push(void)
2. Reduce packet injection speed or try to unify RX hardware.
*/

#if 0
fprintf(stderr, "Override block 0x%" PRIx64 " flush %d fragments\n", rx_ring[rx_ring_front].block_idx, rx_ring[rx_ring_front].has_fragments);
#endif
WFB_DBG("AGG: Override block 0x%" PRIx64 " flush %d fragments\n", rx_ring[rx_ring_front].block_idx, rx_ring[rx_ring_front].has_fragments);

count_p_override += 1;

Expand Down Expand Up @@ -480,31 +477,31 @@ int Aggregator::get_block_ring_idx(uint64_t block_idx)
return ring_idx;
}

void Aggregator::dump_stats(FILE *fp)
void Aggregator::dump_stats(void)
{
//timestamp in ms
uint64_t ts = get_time_ms();

for(auto it = antenna_stat.begin(); it != antenna_stat.end(); it++)
{
fprintf(fp, "%" PRIu64 "\tRX_ANT\t%u:%u:%u\t%" PRIx64 "\t%d" ":%d:%d:%d" ":%d:%d:%d\n",
IPC_MSG("%" PRIu64 "\tRX_ANT\t%u:%u:%u\t%" PRIx64 "\t%d" ":%d:%d:%d" ":%d:%d:%d\n",
ts, it->first.freq, it->first.mcs_index, it->first.bandwidth, it->first.antenna_id, it->second.count_all,
it->second.rssi_min, it->second.rssi_sum / it->second.count_all, it->second.rssi_max,
it->second.snr_min, it->second.snr_sum / it->second.count_all, it->second.snr_max);
}

fprintf(fp, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_b_all, count_p_dec_err,
IPC_MSG("%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_b_all, count_p_dec_err,
count_p_dec_ok, count_p_fec_recovered, count_p_lost, count_p_bad, count_p_outgoing, count_b_outgoing);
fflush(fp);
IPC_MSG_SEND();

if(count_p_override)
{
fprintf(stderr, "%u block overrides\n", count_p_override);
WFB_ERR("%u block overrides\n", count_p_override);
}

if(count_p_lost)
{
fprintf(stderr, "%u packets lost\n", count_p_lost);
WFB_ERR("%u packets lost\n", count_p_lost);
}

clear_stats();
Expand Down Expand Up @@ -571,7 +568,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id

if (size > MAX_FORWARDER_PACKET_SIZE)
{
fprintf(stderr, "Long packet (fec payload)\n");
WFB_ERR("Long packet (fec payload)\n");
count_p_bad += 1;
return;
}
Expand All @@ -581,7 +578,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
case WFB_PACKET_DATA:
if(size < sizeof(wblock_hdr_t) + crypto_aead_chacha20poly1305_ABYTES + sizeof(wpacket_hdr_t))
{
fprintf(stderr, "Short packet (fec header)\n");
WFB_ERR("Short packet (fec header)\n");
count_p_bad += 1;
return;
}
Expand All @@ -593,7 +590,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
if(size < sizeof(wsession_hdr_t) + sizeof(wsession_data_t) + crypto_box_MACBYTES || \
size > MAX_SESSION_PACKET_SIZE)
{
fprintf(stderr, "Invalid session key packet\n");
WFB_ERR("Invalid session key packet\n");
count_p_bad += 1;
return;
}
Expand All @@ -604,7 +601,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
((wsession_hdr_t*)buf)->session_nonce,
tx_publickey, rx_secretkey) != 0)
{
fprintf(stderr, "Unable to decrypt session key\n");
WFB_ERR("Unable to decrypt session key\n");
count_p_dec_err += 1;
return;
}
Expand All @@ -613,35 +610,35 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id

if (be64toh(new_session_data->epoch) < epoch)
{
fprintf(stderr, "Session epoch doesn't match: %" PRIu64 " < %" PRIu64 "\n", be64toh(new_session_data->epoch), epoch);
WFB_ERR("Session epoch doesn't match: %" PRIu64 " < %" PRIu64 "\n", be64toh(new_session_data->epoch), epoch);
count_p_dec_err += 1;
return;
}

if (be32toh(new_session_data->channel_id) != channel_id)
{
fprintf(stderr, "Session channel_id doesn't match: %u != %u\n", be32toh(new_session_data->channel_id), channel_id);
WFB_ERR("Session channel_id doesn't match: %u != %u\n", be32toh(new_session_data->channel_id), channel_id);
count_p_dec_err += 1;
return;
}

if (new_session_data->fec_type != WFB_FEC_VDM_RS)
{
fprintf(stderr, "Unsupported FEC codec type: %d\n", new_session_data->fec_type);
WFB_ERR("Unsupported FEC codec type: %d\n", new_session_data->fec_type);
count_p_dec_err += 1;
return;
}

if (new_session_data->n < 1)
{
fprintf(stderr, "Invalid FEC N: %d\n", new_session_data->n);
WFB_ERR("Invalid FEC N: %d\n", new_session_data->n);
count_p_dec_err += 1;
return;
}

if (new_session_data->k < 1 || new_session_data->k > new_session_data->n)
{
fprintf(stderr, "Invalid FEC K: %d\n", new_session_data->k);
WFB_ERR("Invalid FEC K: %d\n", new_session_data->k);
count_p_dec_err += 1;
return;
}
Expand All @@ -661,15 +658,14 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id

init_fec(new_session_data->k, new_session_data->n);

fprintf(stdout, "%" PRIu64 "\tSESSION\t%" PRIu64 ":%u:%d:%d\n", get_time_ms(), epoch, WFB_FEC_VDM_RS, fec_k, fec_n);
fflush(stdout);

IPC_MSG("%" PRIu64 "\tSESSION\t%" PRIu64 ":%u:%d:%d\n", get_time_ms(), epoch, WFB_FEC_VDM_RS, fec_k, fec_n);
IPC_MSG_SEND();
}

return;

default:
fprintf(stderr, "Unknown packet type 0x%x\n", buf[0]);
WFB_ERR("Unknown packet type 0x%x\n", buf[0]);
count_p_bad += 1;
return;
}
Expand All @@ -685,7 +681,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
sizeof(wblock_hdr_t),
(uint8_t*)(&(block_hdr->data_nonce)), session_key) != 0)
{
fprintf(stderr, "Unable to decrypt packet #0x%" PRIx64 "\n", be64toh(block_hdr->data_nonce));
WFB_ERR("Unable to decrypt packet #0x%" PRIx64 "\n", be64toh(block_hdr->data_nonce));
count_p_dec_err += 1;
return;
}
Expand All @@ -702,14 +698,14 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
// Should never happend due to generating new session key on tx side
if (block_idx > MAX_BLOCK_IDX)
{
fprintf(stderr, "block_idx overflow\n");
WFB_ERR("block_idx overflow\n");
count_p_bad += 1;
return;
}

if (fragment_idx >= fec_n)
{
fprintf(stderr, "Invalid fragment_idx: %d\n", fragment_idx);
WFB_ERR("Invalid fragment_idx: %d\n", fragment_idx);
count_p_bad += 1;
return;
}
Expand Down Expand Up @@ -782,6 +778,8 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
{
if(! p->fragment_map[f_idx])
{
uint32_t fec_count = 0;

//Recover missed fragments using FEC
apply_fec(ring_idx);

Expand All @@ -790,9 +788,15 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
{
if(! p->fragment_map[f_idx])
{
count_p_fec_recovered += 1;
fec_count += 1;
}
}

if(fec_count)
{
count_p_fec_recovered += fec_count;
WFB_DBG("FEC recovered %d packets\n", fec_count);
}
break;
}
}
Expand Down Expand Up @@ -820,14 +824,15 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx)

if (packet_seq > seq + 1 && seq > 0)
{
ANDROID_IPC_MSG("PKT_LOST\t%d", (packet_seq - seq - 1));
count_p_lost += (packet_seq - seq - 1);
}

seq = packet_seq;

if(packet_size > MAX_PAYLOAD_SIZE)
{
fprintf(stderr, "Corrupted packet %u\n", seq);
WFB_ERR("Corrupted packet %u\n", seq);
count_p_bad += 1;
}
else if(!(flags & WFB_PACKET_FEC_ONLY))
Expand Down Expand Up @@ -914,7 +919,7 @@ void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, un
cur_ts = get_time_ms();
if (cur_ts >= log_send_ts)
{
agg->dump_stats(stdout);
agg->dump_stats();
log_send_ts = cur_ts + log_interval - ((cur_ts - log_send_ts) % log_interval);
}

Expand Down Expand Up @@ -961,7 +966,7 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_s
cur_ts = get_time_ms();
if (cur_ts >= log_send_ts)
{
agg.dump_stats(stdout);
agg.dump_stats();
log_send_ts = cur_ts + log_interval - ((cur_ts - log_send_ts) % log_interval);
}

Expand Down Expand Up @@ -1064,12 +1069,12 @@ int main(int argc, char* const *argv)
break;
default: /* '?' */
show_usage:
fprintf(stderr, "Local receiver: %s [-K rx_key] [-c client_addr] [-u client_port] [-p radio_port] [-R rcv_buf] [-l log_interval] [-e epoch] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] [-R rcv_buf] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (aggregator): %s -a server_port [-K rx_key] [-c client_addr] [-R rcv_buf] [-u client_port] [-l log_interval] [-p radio_port] [-e epoch] [-i link_id]\n", argv[0]);
fprintf(stderr, "Default: K='%s', connect=%s:%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", log_interval=%d, rcv_buf=system_default\n", keypair.c_str(), client_addr.c_str(), client_port, link_id, radio_port, epoch, log_interval);
fprintf(stderr, "WFB-ng version %s\n", WFB_VERSION);
fprintf(stderr, "WFB-ng home page: <http://wfb-ng.org>\n");
WFB_INFO("Local receiver: %s [-K rx_key] [-c client_addr] [-u client_port] [-p radio_port] [-R rcv_buf] [-l log_interval] [-e epoch] [-i link_id] interface1 [interface2] ...\n", argv[0]);
WFB_INFO("Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] [-R rcv_buf] [-i link_id] interface1 [interface2] ...\n", argv[0]);
WFB_INFO("Remote (aggregator): %s -a server_port [-K rx_key] [-c client_addr] [-R rcv_buf] [-u client_port] [-l log_interval] [-p radio_port] [-e epoch] [-i link_id]\n", argv[0]);
WFB_INFO("Default: K='%s', connect=%s:%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", log_interval=%d, rcv_buf=system_default\n", keypair.c_str(), client_addr.c_str(), client_port, link_id, radio_port, epoch, log_interval);
WFB_INFO("WFB-ng version %s\n", WFB_VERSION);
WFB_INFO("WFB-ng home page: <http://wfb-ng.org>\n");
exit(1);
}
}
Expand All @@ -1080,7 +1085,7 @@ int main(int argc, char* const *argv)

if ((fd = open("/dev/random", O_RDONLY)) != -1) {
if (ioctl(fd, RNDGETENTCNT, &c) == 0 && c < 160) {
fprintf(stderr, "This system doesn't provide enough entropy to quickly generate high-quality random numbers.\n"
WFB_ERR("This system doesn't provide enough entropy to quickly generate high-quality random numbers.\n"
"Installing the rng-utils/rng-tools, jitterentropy or haveged packages may help.\n"
"On virtualized Linux environments, also consider using virtio-rng.\n"
"The service will not start until enough entropy has been collected.\n");
Expand All @@ -1091,7 +1096,7 @@ int main(int argc, char* const *argv)

if (sodium_init() < 0)
{
fprintf(stderr, "Libsodium init failed\n");
WFB_ERR("Libsodium init failed\n");
return 1;
}

Expand Down Expand Up @@ -1121,7 +1126,7 @@ int main(int argc, char* const *argv)
}
}catch(runtime_error &e)
{
fprintf(stderr, "Error: %s\n", e.what());
WFB_ERR("Error: %s\n", e.what());
exit(1);
}
return 0;
Expand Down
6 changes: 3 additions & 3 deletions src/rx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class BaseAggregator
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth, sockaddr_in *sockaddr) = 0;

virtual void dump_stats(FILE *fp) = 0;
virtual void dump_stats(void) = 0;
};


Expand All @@ -57,7 +57,7 @@ class Forwarder : public BaseAggregator
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth,sockaddr_in *sockaddr);
virtual void dump_stats(FILE *) {}
virtual void dump_stats(void) {}
private:
int sockfd;
struct sockaddr_in saddr;
Expand Down Expand Up @@ -163,7 +163,7 @@ class Aggregator : public BaseAggregator
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth, sockaddr_in *sockaddr);
virtual void dump_stats(FILE *fp);
virtual void dump_stats(void);

// Make stats public for android userspace receiver
void clear_stats(void)
Expand Down
Loading

0 comments on commit d1b3600

Please sign in to comment.