Skip to content

Commit

Permalink
Separate Thread of Receiving and Parsing (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
Teddy-van-Jerry committed Aug 26, 2023
1 parent b17dd74 commit c286ace
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 8 deletions.
18 changes: 17 additions & 1 deletion include/pkt_parse.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
#ifndef _PKT_PARSE_H_
#define _PKT_PARSE_H_

class Pkt_Parse {};
#include "pkt_def.h"
#include "pkt_receive.h"
#include <iostream>
#include <queue>

// TODO: Implement options for parsing.

class Pkt_Parse {
public:
Pkt_Parse(Pkt_Receive& pkt_receive, std::mutex& mtx_pkt_receive);

bool parse();

private:
Pkt_Receive& pkt_receive;
std::mutex& mtx_pkt_receive;
};

#endif
1 change: 1 addition & 0 deletions include/pkt_receive.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Pkt_Receive {
Pkt_Receive(const Cfg& cfg);
void startReceiving();
void stopReceiving();
bool isRunning() noexcept;
std::queue<Pkt_Payload>& getPayloadQueue(); // Return a reference

private:
Expand Down
2 changes: 1 addition & 1 deletion src/cfg_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Cfg cfg_read(const CLI_Options& opt) {
std::cout << " r.local.port: " << cfg.r.local_port << std::endl;
std::cout << " r.remote.ip: " << cfg.r.remote_ip << std::endl;
std::cout << " r.remote.port: " << cfg.r.remote_port << std::endl;
std::cout << " r.timeout: " << timeout_n << std::endl;
std::cout << " r.timeout: " << cfg.r.timeout << std::endl;
}

return cfg;
Expand Down
9 changes: 3 additions & 6 deletions src/cli_cmd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@
void cmd_receive(const CLI_Options& opt) {
auto cfg = cfg_read(opt);
Pkt_Receive r(cfg);
Pkt_Parse p(r, payload_queue_mutex);
std::thread parser(&Pkt_Parse::parse, &p);
r.startReceiving();
// r.stopReceiving();

{
std::unique_lock<std::mutex> lock(payload_queue_mutex);
auto&& queue = r.getPayloadQueue();
std::cout << "Number of Payloads: " << queue.size() << std::endl;
}
parser.join();
}

void cmd_convert(const CLI_Options& opt) {}
19 changes: 19 additions & 0 deletions src/pkt_parse.cpp
Original file line number Diff line number Diff line change
@@ -1 +1,20 @@
#include "pkt_parse.h"

Pkt_Parse::Pkt_Parse(Pkt_Receive& pkt_receive, std::mutex& mtx_pkt_receive)
: pkt_receive(pkt_receive), mtx_pkt_receive(mtx_pkt_receive) {}

bool Pkt_Parse::parse() {
while (true) {
std::unique_lock<std::mutex> lock(mtx_pkt_receive);
auto&& queue = pkt_receive.getPayloadQueue();
if (auto queue_size = queue.size(); queue_size > 0) {
std::cout << "Number of Payloads: " << queue_size << std::endl;
auto payload = queue.front();
queue.pop();
std::cout << "Payload Size: " << payload.size() << std::endl;
} else if (!pkt_receive.isRunning()) {
std::cout << "Finished parsing and receiving has stopped." << std::endl;
return true;
}
}
}
2 changes: 2 additions & 0 deletions src/pkt_receive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ void Pkt_Receive::handleIncomingPacket(const boost::system::error_code& error, s

void Pkt_Receive::stopReceiving() { is_running = false; }

bool Pkt_Receive::isRunning() noexcept { return is_running; }

std::queue<Pkt_Payload>& Pkt_Receive::getPayloadQueue() { return payload_queue; }

void Pkt_Receive::processIOContext() {
Expand Down

0 comments on commit c286ace

Please sign in to comment.