Skip to content

Commit

Permalink
Parquet Initial Design (#2)
Browse files Browse the repository at this point in the history
Buggy now, trying to resolve.
  • Loading branch information
Teddy-van-Jerry committed Aug 30, 2023
1 parent dfa7b2b commit 4083e64
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 21 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,8 @@
/bin
/build

# Dataset Output
*.parquet

# macOS
.DS_Store
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"typeinfo": "cpp",
"variant": "cpp",
"bit": "cpp",
"unordered_set": "cpp"
"unordered_set": "cpp",
"*.ipp": "cpp"
}
}
13 changes: 7 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ else()
COMPONENTS iostreams program_options filesystem
)
endif()
set(ARROW_HTTP OFF CACHE BOOL "Do not want http" FORCE)
set(ARROW_WITH_CURL OFF CACHE BOOL "Do not want curl dependency" FORCE)
find_package(Arrow REQUIRED)
find_package(Parquet REQUIRED)

if (UNIX)
set(USE_THREADS TRUE BOOL "Use Threads in Unix")
Expand All @@ -39,7 +43,7 @@ include_directories(
)
link_directories(${Boost_LIBRARY_DIRS})

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED TRUE)
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/lib)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/lib)
Expand All @@ -51,10 +55,6 @@ if (NOT CMAKE_CXX_COMPILER_ID MATCHES "MSVC")
set(CMAKE_CXX_FLAGS -Wno-error=narrowing)
endif()

if (MMCESIM_BUILD_ASTYLE)
add_subdirectory(ext/astyle)
endif()

set(SOURCES
"src/pkt_receive.cpp"
"src/pkt_parse.cpp"
Expand All @@ -64,7 +64,8 @@ set(SOURCES
)
add_executable(${PROJECT_NAME} ${SOURCES})

target_link_libraries(${PROJECT_NAME} LINK_PUBLIC ${Boost_LIBRARIES} ryml::ryml)
target_link_libraries(${PROJECT_NAME} LINK_PRIVATE ${Boost_LIBRARIES} ryml::ryml
CURL::libcurl Arrow::arrow_shared Parquet::parquet_shared)
if (USE_THREADS)
target_link_libraries(${PROJECT_NAME} PRIVATE Threads::Threads)
endif()
5 changes: 4 additions & 1 deletion include/pkt_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ const inline constexpr static size_t PKT_PAYLOAD_LENGTH = 1040;
const inline constexpr static size_t PKT_HEADER_LENGTH = 8;
const inline constexpr static size_t PKT_LENGTH = PKT_HEADER_LENGTH + PKT_PAYLOAD_LENGTH;
const inline constexpr static size_t PKT_BUFFER_SIZE = PKT_LENGTH + 8;
const inline constexpr static size_t CIR_LENGTH = 256;
const inline constexpr static size_t CIR_BYTES = CIR_LENGTH * 2; // for either I or Q
const inline constexpr static size_t CIR_BEGIN_BYTE_N = 16;

using Pkt_Payload = std::array<std::byte, PKT_PAYLOAD_LENGTH>;
using Pkt_Payload = std::string;
using IP_Addr = std::string;
using Port_N = unsigned short;
using Boost_Time_Unit = boost::posix_time::microseconds;
Expand Down
27 changes: 27 additions & 0 deletions include/pkt_parse.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@

#include "pkt_def.h"
#include "pkt_receive.h"
#include "utils.h"
#include <bit>
#include <arrow/io/file.h>
#include <parquet/arrow/writer.h>
#include <parquet/stream_writer.h>
#include <arrow/util/type_fwd.h>
#include <iostream>
#include <queue>

Expand All @@ -12,11 +18,32 @@ class Pkt_Parse {
public:
Pkt_Parse(Pkt_Receive& pkt_receive, std::mutex& mtx_pkt_receive);

// ~Pkt_Parse();

bool parse();

private:
Pkt_Receive& pkt_receive;
std::mutex& mtx_pkt_receive;
std::shared_ptr<parquet::WriterProperties> writer_props;
std::shared_ptr<parquet::schema::GroupNode> writer_schema;
std::shared_ptr<arrow::io::FileOutputStream> outfile;

struct Data {
uint32_t timestamp;
uint32_t noise;
std::array<uint32_t, CIR_LENGTH> CIR;

Data() = default;

Data(const Pkt_Payload& payload);

private:
uint32_t bytesToUInt32(const std::array<std::byte,4>& bytes);

uint32_t strToUInt32(const std::string& str);

} data;
};

#endif
15 changes: 15 additions & 0 deletions include/utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef _UTILS_H_
#define _UTILS_H_

#include <array>

namespace _dirty {
template<std::size_t N, typename T>
const std::array<std::byte, N>& arraySubBytes(const T& arr, std::size_t index) {
return reinterpret_cast<const std::array<std::byte, N>&>(arr[index]);
}
}

// using namespace _dirty;

#endif
44 changes: 44 additions & 0 deletions reader/reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# read parquet file
import pandas as pd
# import pyarrow.parquet as pq
# import pyarrow as pa
import os
import sys
import time
import datetime
import numpy as np


def read_parquet_file(file_path):
"""
read parquet file
:param file_path: parquet file path
:return: pandas dataframe
"""
df = pd.read_parquet(file_path)
return df

# print the data table
def print_table(df):
"""
print the data table
:param df: pandas dataframe
:return: None
"""
print(df)

# main
if __name__ == '__main__':
# start time
start_time = time.time()

# read parquet file
file_path = 'test1.parquet'
df = read_parquet_file(file_path)

# print the data table
print_table(df)

# end time
end_time = time.time()
print('Time elapsed: ', end_time - start_time, ' seconds')
2 changes: 1 addition & 1 deletion scripts/self_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
exe_name="bin/m2c-udp-process"

# First setup Python UDP packets sender
python3 tests/udp_sender.py 127.0.0.1 1234 -q -i0.1 &
python3 tests/udp_sender.py 127.0.0.1 1234 -q -i1 &
python_pid=$!

# Then run the test
Expand Down
5 changes: 3 additions & 2 deletions src/cli_cmd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ void cmd_receive(const CLI_Options& opt) {
auto cfg = cfg_read(opt);
Pkt_Receive r(cfg);
Pkt_Parse p(r, payload_queue_mutex);
std::thread parser(&Pkt_Parse::parse, &p);
// std::thread parser(&Pkt_Parse::parse, &p);
r.startReceiving();
p.parse();

parser.join();
// parser.join();
}

void cmd_convert(const CLI_Options& opt) {}
92 changes: 89 additions & 3 deletions src/pkt_parse.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,106 @@
#include "pkt_parse.h"
// #include <arrow/api.h>
// #include <arrow/io/api.h>
// #include <parquet/arrow/reader.h>
// #include <parquet/arrow/writer.h>
// #include <parquet/exception.h>


// using parquet::ArrowWriterProperties;

Pkt_Parse::Pkt_Parse(Pkt_Receive& pkt_receive, std::mutex& mtx_pkt_receive)
: pkt_receive(pkt_receive), mtx_pkt_receive(mtx_pkt_receive) {}
: pkt_receive(pkt_receive), mtx_pkt_receive(mtx_pkt_receive), writer_props(
parquet::WriterProperties::Builder()
.created_by("M2C UDP Process")
// ->max_row_group_length(10000)
// ->version(parquet::ParquetVersion::PARQUET_2_6)
// ->data_page_version(parquet::ParquetDataPageVersion::V2)
// ->compression(parquet::Compression::SNAPPY)
->compression(parquet::Compression::UNCOMPRESSED)
->build()
) {
PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open("test1.parquet"));
parquet::schema::NodeVector fields;
fields.push_back(parquet::schema::PrimitiveNode::Make(
"timestamp", parquet::Repetition::REQUIRED, parquet::Type::INT32, parquet::ConvertedType::INT_32));
// fields.push_back(parquet::schema::PrimitiveNode::Make(
// "noise", parquet::Repetition::REQUIRED, parquet::Type::INT32, parquet::ConvertedType::INT_32));
// for (size_t i = 0; i != CIR_LENGTH; ++i) {
// fields.push_back(parquet::schema::PrimitiveNode::Make(
// "CIR" + std::to_string(i), parquet::Repetition::REQUIRED, parquet::Type::INT32, parquet::ConvertedType::UINT_32));
// }
writer_schema = std::static_pointer_cast<parquet::schema::GroupNode>(
parquet::schema::GroupNode::Make("UDP Payload", parquet::Repetition::REQUIRED, fields));
}

bool Pkt_Parse::parse() {
parquet::StreamWriter os{ parquet::ParquetFileWriter::Open(outfile, writer_schema, writer_props) };
while (true) {
std::unique_lock<std::mutex> lock(mtx_pkt_receive);
auto&& queue = pkt_receive.getPayloadQueue();
if (auto queue_size = queue.size(); queue_size > 0) {
std::cout << "Number of Payloads: " << queue_size << std::endl;
auto payload = queue.front();
queue.pop();
std::cout << "Payload Size: " << payload.size() << std::endl;
Data data(payload);
// os << data.timestamp << data.noise;
try {
// os << int32_t(1) << int32_t(2);
os << int32_t(1);
std::cout << data.timestamp << ' ' << data.noise << std::endl;

// for (auto&& cir : data.CIR) os << cir;
os << parquet::EndRow;
// os.EndRow();
} catch (...) {
std::cerr << "Error writing to parquet file." << std::endl;
return false;
}
} else if (!pkt_receive.isRunning()) {
std::cout << "Finished parsing and receiving has stopped." << std::endl;
return true;
break;
}
}
os.EndRowGroup();
// outfile->CloseAsync();
if (outfile->Close().ok()) {
std::cout << "n_cols: " << os.num_columns() << std::endl;
std::cout << "cu_row: " << os.current_row() << std::endl;
return true;
} else {
std::cerr << "File Close Not OK!" << std::endl;
return false;
}
}

Pkt_Parse::Data::Data(const Pkt_Payload& payload) { // payload is an std::array of std::byte.
// timestamp is the bytes 2-5
timestamp = strToUInt32(payload.substr(2, 4));
// noise is the bytes 6-7
noise = strToUInt32(std::string(2, '\0').append(payload.substr(6, 2)));
for (size_t i = 0; i != CIR_LENGTH; ++i) {
CIR[i] = strToUInt32(payload.substr(CIR_BEGIN_BYTE_N + i, 4));
}
// std::cout << timestamp << std::endl;
}

uint32_t Pkt_Parse::Data::bytesToUInt32(const std::array<std::byte, 4>& bytes) {
uint32_t value;
if constexpr (std::endian::native == std::endian::big) {
auto reverse = [](const std::array<std::byte, 4>& src, std::array<std::byte, 4>& dst) {
std::copy(src.rbegin(), src.rend(), dst.begin());
};
std::array<std::byte, 4> reversed;
reverse(bytes, reversed);
std::memcpy(&value, reversed.data(), 4);
} else {
std::memcpy(&value, bytes.data(), 4);
}

return value;
}

uint32_t Pkt_Parse::Data::strToUInt32(const std::string& str) {
assert(str.size() == 4); // str must be length of 4
return str[0] << 24 | str[1] << 16 | str[2] << 8 | str[3];
}
15 changes: 8 additions & 7 deletions src/pkt_receive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ void Pkt_Receive::handleIncomingPacket(const boost::system::error_code& error, s
std::string payload(buffer.data(), bytes_received);

if (bytes_received == PKT_PAYLOAD_LENGTH) {
Pkt_Payload bitset_data;
std::transform(payload.begin(), payload.end(), bitset_data.begin(),
[](char c) { return std::byte(c); });
{
std::unique_lock<std::mutex> lock(payload_queue_mutex);
payload_queue.push(bitset_data);
}
// Pkt_Payload bitset_data;
// std::transform(payload.begin(), payload.end(), bitset_data.begin(),
// [](char c) { return std::byte(c); });
// {
// std::unique_lock<std::mutex> lock(payload_queue_mutex);
// payload_queue.push(bitset_data);
// }
payload_queue.push(payload);
} else {
// omit this package
}
Expand Down

0 comments on commit 4083e64

Please sign in to comment.