diff --git a/include/pkt_parse.h b/include/pkt_parse.h index edb359c..9fc129a 100644 --- a/include/pkt_parse.h +++ b/include/pkt_parse.h @@ -1,6 +1,22 @@ #ifndef _PKT_PARSE_H_ #define _PKT_PARSE_H_ -class Pkt_Parse {}; +#include "pkt_def.h" +#include "pkt_receive.h" +#include +#include + +// TODO: Implement options for parsing. + +class Pkt_Parse { + public: + Pkt_Parse(Pkt_Receive& pkt_receive, std::mutex& mtx_pkt_receive); + + bool parse(); + + private: + Pkt_Receive& pkt_receive; + std::mutex& mtx_pkt_receive; +}; #endif diff --git a/include/pkt_receive.h b/include/pkt_receive.h index c083ac8..998c572 100644 --- a/include/pkt_receive.h +++ b/include/pkt_receive.h @@ -18,6 +18,7 @@ class Pkt_Receive { Pkt_Receive(const Cfg& cfg); void startReceiving(); void stopReceiving(); + bool isRunning() noexcept; std::queue& getPayloadQueue(); // Return a reference private: diff --git a/src/cfg_read.cpp b/src/cfg_read.cpp index 9ed21c9..06fd549 100644 --- a/src/cfg_read.cpp +++ b/src/cfg_read.cpp @@ -31,7 +31,7 @@ Cfg cfg_read(const CLI_Options& opt) { std::cout << " r.local.port: " << cfg.r.local_port << std::endl; std::cout << " r.remote.ip: " << cfg.r.remote_ip << std::endl; std::cout << " r.remote.port: " << cfg.r.remote_port << std::endl; - std::cout << " r.timeout: " << timeout_n << std::endl; + std::cout << " r.timeout: " << cfg.r.timeout << std::endl; } return cfg; diff --git a/src/cli_cmd.cpp b/src/cli_cmd.cpp index 9d7d6b7..e4e5506 100644 --- a/src/cli_cmd.cpp +++ b/src/cli_cmd.cpp @@ -4,14 +4,11 @@ void cmd_receive(const CLI_Options& opt) { auto cfg = cfg_read(opt); Pkt_Receive r(cfg); + Pkt_Parse p(r, payload_queue_mutex); + std::thread parser(&Pkt_Parse::parse, &p); r.startReceiving(); - // r.stopReceiving(); - { - std::unique_lock lock(payload_queue_mutex); - auto&& queue = r.getPayloadQueue(); - std::cout << "Number of Payloads: " << queue.size() << std::endl; - } + parser.join(); } void cmd_convert(const CLI_Options& opt) {} diff --git a/src/pkt_parse.cpp b/src/pkt_parse.cpp index daef049..6dad161 100644 --- a/src/pkt_parse.cpp +++ b/src/pkt_parse.cpp @@ -1 +1,20 @@ #include "pkt_parse.h" + +Pkt_Parse::Pkt_Parse(Pkt_Receive& pkt_receive, std::mutex& mtx_pkt_receive) + : pkt_receive(pkt_receive), mtx_pkt_receive(mtx_pkt_receive) {} + +bool Pkt_Parse::parse() { + while (true) { + std::unique_lock lock(mtx_pkt_receive); + auto&& queue = pkt_receive.getPayloadQueue(); + if (auto queue_size = queue.size(); queue_size > 0) { + std::cout << "Number of Payloads: " << queue_size << std::endl; + auto payload = queue.front(); + queue.pop(); + std::cout << "Payload Size: " << payload.size() << std::endl; + } else if (!pkt_receive.isRunning()) { + std::cout << "Finished parsing and receiving has stopped." << std::endl; + return true; + } + } +} diff --git a/src/pkt_receive.cpp b/src/pkt_receive.cpp index 7ac6d74..dd1965c 100644 --- a/src/pkt_receive.cpp +++ b/src/pkt_receive.cpp @@ -59,6 +59,8 @@ void Pkt_Receive::handleIncomingPacket(const boost::system::error_code& error, s void Pkt_Receive::stopReceiving() { is_running = false; } +bool Pkt_Receive::isRunning() noexcept { return is_running; } + std::queue& Pkt_Receive::getPayloadQueue() { return payload_queue; } void Pkt_Receive::processIOContext() {