From e3f1ad8e3a2b2f255c65af2b40eacee911876dce Mon Sep 17 00:00:00 2001 From: Ihor Ivlev Date: Thu, 28 Nov 2024 19:54:30 +0100 Subject: [PATCH] Add configurable logger support The new logger interface provides a single function, `logln`, which can be implemented by the users of the API. The interface has a default implementation, `StdErrLogger`, which logs the data to `stderr` if the level is higher than `LogLevel::INFO`. `LogLevel::ALWAYS` is always logged without a prefix. --- src/logger.hpp | 61 ++++++++++++++++++++++++++++++++++++++++++ src/rx.cpp | 72 ++++++++++++++++++++++++++++---------------------- src/rx.hpp | 13 ++++++--- 3 files changed, 111 insertions(+), 35 deletions(-) create mode 100644 src/logger.hpp diff --git a/src/logger.hpp b/src/logger.hpp new file mode 100644 index 00000000..45486263 --- /dev/null +++ b/src/logger.hpp @@ -0,0 +1,61 @@ +#pragma once + +#include +#include + + +// Since this file is most likely to be used outside of the project (like, in Android), +// using a namespace to avoid conflicts +namespace wfb { +enum class LogLevel { + ALWAYS, + ERROR, + WARNING, + INFO, + DEBUG +}; + +class Logger +{ +public: + virtual ~Logger() = default; + virtual void logln(LogLevel level, const char* format, ...) = 0; +}; + +class StdErrLogger : public Logger +{ +public: + void logln(LogLevel level, const char* format, ...) override { + const char* level_str = ""; + switch (level) { + case LogLevel::DEBUG: + case LogLevel::INFO: + return; + case LogLevel::ALWAYS: + break; + case LogLevel::WARNING: + level_str = "WARNING: "; + break; + case LogLevel::ERROR: + level_str = "ERROR: "; + break; + } + + char buffer[1024]; + va_list args; + va_start(args, format); + int prefix_len = snprintf(buffer, sizeof(buffer), "%s", level_str); + if (prefix_len < 0) + return; + + int message_len = vsnprintf(buffer + prefix_len, sizeof(buffer) - prefix_len - 2, format, args); + if (message_len < 0) + return; + + va_end(args); + + buffer[prefix_len + message_len] = '\n'; + buffer[prefix_len + message_len + 1] = '\0'; + } +}; +} // namespace wfb \ No newline at end of file diff --git a/src/rx.cpp b/src/rx.cpp index f52bb990..c041e0e5 100644 --- a/src/rx.cpp +++ b/src/rx.cpp @@ -47,10 +47,14 @@ extern "C" #include "wifibroadcast.hpp" #include "rx.hpp" +#define WFB_LOG(...) do { if (logger) { logger->logln(__VA_ARGS__); } } while(0) + using namespace std; +using LogLevel = wfb::LogLevel; -Receiver::Receiver(const char *wlan, int wlan_idx, uint32_t channel_id, BaseAggregator *agg, int rcv_buf_size) : wlan_idx(wlan_idx), agg(agg) +Receiver::Receiver(const char *wlan, int wlan_idx, uint32_t channel_id, BaseAggregator *agg, int rcv_buf_size, + std::shared_ptr logger): wlan_idx(wlan_idx), agg(agg), logger(std::move(logger)) { char errbuf[PCAP_ERRBUF_SIZE]; @@ -227,7 +231,7 @@ void Receiver::loop_iter(void) } /* while more rt headers */ if (ret != -ENOENT && ant_idx < RX_ANT_MAX){ - fprintf(stderr, "Error parsing radiotap header!\n"); + WFB_LOG(LogLevel::ERROR, "Error parsing radiotap header!\n"); continue; } @@ -244,7 +248,7 @@ void Receiver::loop_iter(void) if (flags & IEEE80211_RADIOTAP_F_BADFCS) { - fprintf(stderr, "Got packet with bad fsc\n"); + WFB_LOG(LogLevel::ERROR, "Got packet with bad fcs"); continue; } @@ -252,24 +256,24 @@ void Receiver::loop_iter(void) pkt += iterator._max_length; pktlen -= iterator._max_length; - //fprintf(stderr, "CAPTURE: mcs: %u, bw: %u\n", mcs_index, bandwidth); + WFB_LOG(LogLevel::DEBUG, "CAPTURE: mcs: %u, bw: %u", mcs_index, bandwidth); if (pktlen > (int)sizeof(ieee80211_header)) { agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header), wlan_idx, antenna, rssi, noise, freq, mcs_index, bandwidth, NULL); } else { - fprintf(stderr, "Short packet (ieee header)\n"); + WFB_LOG(LogLevel::ERROR, "Short packet (ieee header)"); continue; } } } - -Aggregator::Aggregator(const string &client_addr, int client_port, const string &keypair, uint64_t epoch, uint32_t channel_id) : \ +Aggregator::Aggregator(const std::string &client_addr, int client_port, const std::string &keypair, uint64_t epoch, uint32_t channel_id, + std::shared_ptr logger) :\ count_p_all(0), count_b_all(0), count_p_dec_err(0), count_p_dec_ok(0), count_p_fec_recovered(0), count_p_lost(0), count_p_bad(0), count_p_override(0), count_p_outgoing(0), count_b_outgoing(0), fec_p(NULL), fec_k(-1), fec_n(-1), seq(0), rx_ring{}, rx_ring_front(0), rx_ring_alloc(0), - last_known_block((uint64_t)-1), epoch(epoch), channel_id(channel_id) + last_known_block((uint64_t)-1), epoch(epoch), channel_id(channel_id), logger(std::move(logger)) { sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (sockfd < 0) throw std::runtime_error(string_format("Error opening socket: %s", strerror(errno))); @@ -364,7 +368,8 @@ void Aggregator::deinit_fec(void) } -Forwarder::Forwarder(const string &client_addr, int client_port) +Forwarder::Forwarder(const string &client_addr, int client_port, + std::shared_ptr logger) : logger(logger) { sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (sockfd < 0) throw std::runtime_error(string_format("Error opening socket: %s", strerror(errno))); @@ -428,10 +433,8 @@ int Aggregator::rx_ring_push(void) 2. Reduce packet injection speed or try to unify RX hardware. */ -#if 0 - fprintf(stderr, "Override block 0x%" PRIx64 " flush %d fragments\n", rx_ring[rx_ring_front].block_idx, rx_ring[rx_ring_front].has_fragments); -#endif - + WFB_LOG(LogLevel::DEBUG, "Override block 0x%" PRIx64 " flush %d fragments", + rx_ring[rx_ring_front].block_idx, rx_ring[rx_ring_front].has_fragments); count_p_override += 1; for(int f_idx=rx_ring[rx_ring_front].fragment_to_send_idx; f_idx < fec_k; f_idx++) @@ -491,20 +494,26 @@ void Aggregator::dump_stats(FILE *fp) ts, it->first.freq, it->first.mcs_index, it->first.bandwidth, it->first.antenna_id, it->second.count_all, it->second.rssi_min, it->second.rssi_sum / it->second.count_all, it->second.rssi_max, it->second.snr_min, it->second.snr_sum / it->second.count_all, it->second.snr_max); + WFB_LOG(LogLevel::INFO, "%" PRIu64 "\tRX_ANT\t%u:%u:%u\t%" PRIx64 "\t%d" ":%d:%d:%d" ":%d:%d:%d", + ts, it->first.freq, it->first.mcs_index, it->first.bandwidth, it->first.antenna_id, it->second.count_all, + it->second.rssi_min, it->second.rssi_sum / it->second.count_all, it->second.rssi_max, + it->second.snr_min, it->second.snr_sum / it->second.count_all, it->second.snr_max); } fprintf(fp, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_b_all, count_p_dec_err, count_p_dec_ok, count_p_fec_recovered, count_p_lost, count_p_bad, count_p_outgoing, count_b_outgoing); fflush(fp); + WFB_LOG(LogLevel::INFO, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u:%u:%u", ts, count_p_all, count_b_all, count_p_dec_err, + count_p_dec_ok, count_p_fec_recovered, count_p_lost, count_p_bad, count_p_outgoing, count_b_outgoing); if(count_p_override) { - fprintf(stderr, "%u block overrides\n", count_p_override); + WFB_LOG(LogLevel::WARNING, "%u block overrides", count_p_override); } if(count_p_lost) { - fprintf(stderr, "%u packets lost\n", count_p_lost); + WFB_LOG(LogLevel::WARNING, "%u packets lost", count_p_lost); } clear_stats(); @@ -571,7 +580,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id if (size > MAX_FORWARDER_PACKET_SIZE) { - fprintf(stderr, "Long packet (fec payload)\n"); + WFB_LOG(wfb::LogLevel::ERROR, "Long packet (forwarder payload)"); count_p_bad += 1; return; } @@ -581,7 +590,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id case WFB_PACKET_DATA: if(size < sizeof(wblock_hdr_t) + crypto_aead_chacha20poly1305_ABYTES + sizeof(wpacket_hdr_t)) { - fprintf(stderr, "Short packet (fec header)\n"); + WFB_LOG(LogLevel::ERROR, "Short packet (data header)"); count_p_bad += 1; return; } @@ -593,7 +602,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id if(size < sizeof(wsession_hdr_t) + sizeof(wsession_data_t) + crypto_box_MACBYTES || \ size > MAX_SESSION_PACKET_SIZE) { - fprintf(stderr, "Invalid session key packet\n"); + WFB_LOG(LogLevel::ERROR, "Invalid session packet"); count_p_bad += 1; return; } @@ -604,7 +613,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id ((wsession_hdr_t*)buf)->session_nonce, tx_publickey, rx_secretkey) != 0) { - fprintf(stderr, "Unable to decrypt session key\n"); + WFB_LOG(LogLevel::ERROR, "Unable to decrypt session packet"); count_p_dec_err += 1; return; } @@ -613,35 +622,35 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id if (be64toh(new_session_data->epoch) < epoch) { - fprintf(stderr, "Session epoch doesn't match: %" PRIu64 " < %" PRIu64 "\n", be64toh(new_session_data->epoch), epoch); + WFB_LOG(LogLevel::ERROR, "Session epoch is too old: %" PRIu64 " < %" PRIu64, be64toh(new_session_data->epoch), epoch); count_p_dec_err += 1; return; } if (be32toh(new_session_data->channel_id) != channel_id) { - fprintf(stderr, "Session channel_id doesn't match: %u != %u\n", be32toh(new_session_data->channel_id), channel_id); + WFB_LOG(LogLevel::ERROR, "Session channel_id doesn't match: %u != %u", be32toh(new_session_data->channel_id), channel_id); count_p_dec_err += 1; return; } if (new_session_data->fec_type != WFB_FEC_VDM_RS) { - fprintf(stderr, "Unsupported FEC codec type: %d\n", new_session_data->fec_type); + WFB_LOG(LogLevel::ERROR, "Unsupported FEC codec type: %d", new_session_data->fec_type); count_p_dec_err += 1; return; } if (new_session_data->n < 1) { - fprintf(stderr, "Invalid FEC N: %d\n", new_session_data->n); + WFB_LOG(LogLevel::ERROR, "Invalid FEC N: %d", new_session_data->n); count_p_dec_err += 1; return; } if (new_session_data->k < 1 || new_session_data->k > new_session_data->n) { - fprintf(stderr, "Invalid FEC K: %d\n", new_session_data->k); + WFB_LOG(LogLevel::ERROR, "Invalid FEC K: %d", new_session_data->k); count_p_dec_err += 1; return; } @@ -661,15 +670,13 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id init_fec(new_session_data->k, new_session_data->n); - fprintf(stdout, "%" PRIu64 "\tSESSION\t%" PRIu64 ":%u:%d:%d\n", get_time_ms(), epoch, WFB_FEC_VDM_RS, fec_k, fec_n); - fflush(stdout); - + WFB_LOG(LogLevel::INFO, "%" PRIu64 "\tSESSION\t%" PRIu64 ":%u:%d:%d", get_time_ms(), epoch, WFB_FEC_VDM_RS, fec_k, fec_n); } return; default: - fprintf(stderr, "Unknown packet type 0x%x\n", buf[0]); + WFB_LOG(LogLevel::ERROR, "Unknown packet type 0x%x", buf[0]); count_p_bad += 1; return; } @@ -685,7 +692,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id sizeof(wblock_hdr_t), (uint8_t*)(&(block_hdr->data_nonce)), session_key) != 0) { - fprintf(stderr, "Unable to decrypt packet #0x%" PRIx64 "\n", be64toh(block_hdr->data_nonce)); + WFB_LOG(LogLevel::ERROR, "Unable to decrypt packet #0x%" PRIx64, be64toh(block_hdr->data_nonce)); count_p_dec_err += 1; return; } @@ -702,14 +709,14 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id // Should never happend due to generating new session key on tx side if (block_idx > MAX_BLOCK_IDX) { - fprintf(stderr, "block_idx overflow\n"); + WFB_LOG(LogLevel::ERROR, "block_idx overflow"); count_p_bad += 1; return; } if (fragment_idx >= fec_n) { - fprintf(stderr, "Invalid fragment_idx: %d\n", fragment_idx); + WFB_LOG(LogLevel::ERROR, "Invalid fragment_idx: %d", fragment_idx); count_p_bad += 1; return; } @@ -820,6 +827,7 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx) if (packet_seq > seq + 1 && seq > 0) { + WFB_LOG(LogLevel::WARNING, "Lost packet %u\n", seq + 1); count_p_lost += (packet_seq - seq - 1); } @@ -827,7 +835,7 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx) if(packet_size > MAX_PAYLOAD_SIZE) { - fprintf(stderr, "Corrupted packet %u\n", seq); + WFB_LOG(LogLevel::ERROR, "Corrupted packet %u\n", seq); count_p_bad += 1; } else if(!(flags & WFB_PACKET_FEC_ONLY)) diff --git a/src/rx.hpp b/src/rx.hpp index f2def749..12b623fd 100644 --- a/src/rx.hpp +++ b/src/rx.hpp @@ -28,6 +28,7 @@ #include #include +#include "logger.hpp" #include "wifibroadcast.hpp" @@ -52,7 +53,8 @@ class BaseAggregator class Forwarder : public BaseAggregator { public: - Forwarder(const std::string &client_addr, int client_port); + Forwarder(const std::string &client_addr, int client_port, + std::shared_ptr logger = std::make_shared()); virtual ~Forwarder(); virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index, @@ -61,6 +63,7 @@ class Forwarder : public BaseAggregator private: int sockfd; struct sockaddr_in saddr; + std::shared_ptr logger; }; @@ -158,7 +161,8 @@ typedef std::unordered_map rx_antenna_stat_t; class Aggregator : public BaseAggregator { public: - Aggregator(const std::string &client_addr, int client_port, const std::string &keypair, uint64_t epoch, uint32_t channel_id); + Aggregator(const std::string &client_addr, int client_port, const std::string &keypair, uint64_t epoch, uint32_t channel_id, + std::shared_ptr logger = std::make_shared()); virtual ~Aggregator(); virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index, @@ -220,6 +224,7 @@ class Aggregator : public BaseAggregator uint64_t last_known_block; //id of last known block uint64_t epoch; // current epoch const uint32_t channel_id; // (link_id << 8) + port_number + std::shared_ptr logger; // rx->tx keypair uint8_t rx_secretkey[crypto_box_SECRETKEYBYTES]; @@ -230,7 +235,8 @@ class Aggregator : public BaseAggregator class Receiver { public: - Receiver(const char* wlan, int wlan_idx, uint32_t channel_id, BaseAggregator* agg, int rcv_buf_size); + Receiver(const char* wlan, int wlan_idx, uint32_t channel_id, BaseAggregator* agg, int rcv_buf_size, + std::shared_ptr logger = std::make_shared()); ~Receiver(); void loop_iter(void); int getfd(void){ return fd; } @@ -239,4 +245,5 @@ class Receiver BaseAggregator *agg; int fd; pcap_t *ppcap; + std::shared_ptr logger; };