Skip to content

Commit

Permalink
[Feat] GzMeta StreamConvertor
Browse files Browse the repository at this point in the history
    StreamConvertor is a UDS server that can accept gzip data from
Unix domain socket and generate gzip index & tar headers

Signed-off-by: Yifan Yuan <[email protected]>
  • Loading branch information
BigVan committed Sep 15, 2023
1 parent 2ba3d0c commit 7324075
Show file tree
Hide file tree
Showing 12 changed files with 406 additions and 114 deletions.
3 changes: 3 additions & 0 deletions src/overlaybd/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ add_subdirectory(extfs)
add_subdirectory(gzip)
add_subdirectory(gzindex)

add_subdirectory(stream_convertor)


add_library(overlaybd_lib INTERFACE)
target_include_directories(overlaybd_lib INTERFACE
${PHOTON_INCLUDE_DIR}
Expand Down
2 changes: 1 addition & 1 deletion src/overlaybd/gzindex/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
file(GLOB SOURCE_TAR "*.cpp")
add_library(gzindex_lib STATIC ${SOURCE_TAR})
add_library(gzindex_lib STATIC ${SOURCE_TAR} )

target_include_directories(gzindex_lib PUBLIC ${PHOTON_INCLUDE_DIR})
target_link_libraries(gzindex_lib photon_static)
Expand Down
3 changes: 2 additions & 1 deletion src/overlaybd/gzindex/gzfile_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <string>
#include <iostream>
#include <sstream>
#include <sys/types.h>
#include <vector>
#include <zlib.h>
#include "photon/common/checksum/crc32c.h"
Expand Down Expand Up @@ -91,4 +92,4 @@ int init_index_header(photon::fs::IFile* src, IndexFileHeader &h, off_t span, i

int create_index_entry(z_stream strm, IndexFilterRecorder *filter, off_t en_pos, off_t de_pos, unsigned char *window);

int save_index_to_file(IndexFileHeader &h, INDEX& index, photon::fs::IFile *index_file);
int save_index_to_file(IndexFileHeader &h, INDEX& index, photon::fs::IFile *index_file, ssize_t gzip_file_size = -1);
13 changes: 10 additions & 3 deletions src/overlaybd/gzindex/gzip_index_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
// #include "../zfile/crc32/crc32c.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <zlib.h>
#include <sys/fcntl.h>

#include "gzfile_index.h"

#include "photon/common/alog.h"
#include "photon/common/alog-stdstring.h"
#include "photon/fs/localfs.h"
Expand Down Expand Up @@ -271,15 +274,20 @@ static int get_compressed_index(const IndexFileHeader& h, const INDEX& index, un
out_len = index_len;
return 0;
}

LOG_INFO("index crc: `", crc32(0, buf, index_len));
return zlib_compress(h.dict_compress_level, buf, index_len, out, out_len);
}

int save_index_to_file(IndexFileHeader &h, INDEX& index, photon::fs::IFile *index_file) {
int save_index_to_file(IndexFileHeader &h, INDEX& index, photon::fs::IFile *index_file, ssize_t gzip_file_size) {
int indx_cmpr_buf_len = index.size() * sizeof(IndexEntry) * 2 + 4096;
unsigned char *buf = new unsigned char[indx_cmpr_buf_len];
DEFER(delete []buf);

if (gzip_file_size != -1) {
LOG_INFO("save gzip file size: `", gzip_file_size);
h.gzip_file_size = gzip_file_size;
}

if (get_compressed_index(h, index, buf, indx_cmpr_buf_len) != 0) {
LOG_ERROR_RETURN(0, -1, "Failed to get_compress_index");
}
Expand Down Expand Up @@ -355,7 +363,6 @@ int create_gz_index(photon::fs::IFile* gzip_file, const char *index_file_path, o
if (init_index_header(gzip_file, h, span, dict_compress_algo, dict_compress_level) != 0) {
LOG_ERRNO_RETURN(0, -1, "init index header failed.");
}

INDEX index;
int ret = build_index(h, gzip_file, index, index_file);
if (ret != 0) {
Expand Down
91 changes: 52 additions & 39 deletions src/overlaybd/gzindex/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <photon/photon.h>
#include <photon/common/io-alloc.h>
#include <photon/common/alog.h>
#include <photon/common/alog-stdstring.h>
#include <photon/fs/localfs.h>
#include <photon/io/fd-events.h>
#include <photon/net/socket.h>
Expand Down Expand Up @@ -198,11 +199,11 @@ const char *GzIndexTest::fn_defile = "fdata";
const char *GzIndexTest::fn_gzdata = "fdata.gz";
const char *GzIndexTest::fn_gzindex = "findex";

char uds_path[] = "/tmp/udstest.sock";
char uds_path[] = "/tmp/gzstream_test/stream_conv.sock";

int download(const std::string &url, const std::string &out) {
if (::access(out.c_str(), 0) == 0)
return 0;
// if (::access(out.c_str(), 0) == 0)
// return 0;
auto base = std::string(basename(url.c_str()));
// download file
std::string cmd = "curl -s -o " + out + " " + url;
Expand All @@ -221,27 +222,27 @@ void handler(photon::net::ISocketStream *sock) {
char recv[65536];
size_t count = 0;
auto dst = photon::fs::open_localfile_adaptor("/tmp/dest", O_TRUNC | O_CREAT | O_RDWR);
auto idx_file =
photon::fs::open_localfile_adaptor("/tmp/dest.gz_idx", O_TRUNC | O_CREAT | O_RDWR);
DEFER(delete dst);
DEFER(delete idx_file);

sock->read(recv, sizeof(size_t));
// inf(sock, dst);
auto st_size = *(ssize_t *)recv;
auto gzfile = open_gzstream_file(sock, st_size, idx_file);
ASSERT_NE(gzfile, nullptr);
DEFER(delete gzfile);
// sock->read(recv, sizeof(ssize_t));
// auto st_size = *(size_t*)recv;
auto gzstream = open_gzstream_file(sock, 0);
ASSERT_NE(gzstream, nullptr);
DEFER(delete gzstream);
while (true) {
auto readn = gzfile->read(recv, 65536);
auto readn = gzstream->read(recv, 65536);
if (readn <= 0)
break;

count += readn;
dst->write(recv, readn);
}
LOG_INFO("RECV `", count);
save_gzip_index(gzfile);
ASSERT_STREQ(sha256sum("/tmp/dest").c_str(), "sha256:562688d70dcd1596556e7c671c1266f6e9c22b4f4fb8344efa8bed88fc2bac7b");
auto fn_idx = gzstream->save_index();
LOG_INFO("RECV `, fn_idx: `", count, fn_idx.c_str());


// ASSERT_STREQ(sha256sum("/tmp/dest").c_str(), "sha256:562688d70dcd1596556e7c671c1266f6e9c22b4f4fb8344efa8bed88fc2bac7b");
// ASSERT_STREQ(sha256sum(fn_idx.c_str()).c_str(), "sha256:af3ffd4965d83f3d235c48ce75e16a1f2edf12d0e5d82816d7066a8485aade82");

}

void uds_server() {
Expand All @@ -258,6 +259,7 @@ void uds_server() {
}

void uds_client(photon::fs::IFile *file) {

photon::thread_yield_to(nullptr);
auto cli = photon::net::new_uds_client();
DEFER({ delete cli; });
Expand All @@ -271,45 +273,56 @@ void uds_client(photon::fs::IFile *file) {
struct stat st;
file->fstat(&st);
LOG_INFO("Connected `, start send file data(size: `)", path, st.st_size);
// sock->write(&st.st_size, sizeof(st.st_size));
char buff[65536];
sock->write((void *)&st.st_size, sizeof(st.st_size));
auto count = 0;
while (true) {
auto readn = file->read(buff, 65536);
ASSERT_NE(readn, -1);
auto ret = sock->write(buff, readn);
count += readn;
LOG_INFO("write ` bytes", ret);
ASSERT_EQ(ret, readn);
if (readn != 65536)
break;
}
LOG_INFO("SEND: `", count);

return;
}

TEST_F(GzIndexTest, stream) {
set_log_output_level(0);
std::string workdir = "/tmp/gzstream_test/";
mkdir(workdir.c_str(), 0755);
auto lfs = photon::fs::new_localfs_adaptor(workdir.c_str());
LOG_INFO("start streamFile test");
std::string fn_test_tgz = "/tmp/go1.17.6.linux-amd64.tar.gz";
ASSERT_EQ(
0, download("https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz",
fn_test_tgz.c_str()));

auto jh1 = photon::thread_enable_join(photon::thread_create11(uds_server));

auto file = photon::fs::open_localfile_adaptor(fn_test_tgz.c_str(), O_RDONLY);
// auto dst = photon::fs::open_localfile_adaptor("/tmp/dest", O_TRUNC | O_CREAT |O_RDWR, 0644);
ASSERT_NE(file, nullptr);
// inf(file, dst);
uds_client(file);

photon::thread_join(jh1);
remove(uds_path);
file->lseek(0, SEEK_SET);
auto fn_test_tgz_idx = fn_test_tgz + ".index";
if (::access(fn_test_tgz_idx.c_str(), 0) != 0){
ASSERT_EQ(create_gz_index(file, fn_test_tgz_idx.c_str()), 0);
std::vector<std::string> filelist = {
"https://dadi-shared.oss-cn-beijing.aliyuncs.com/cri-containerd-cni-1.5.2-linux-amd64.tar.gz",
"https://dadi-shared.oss-cn-beijing.aliyuncs.com/containerd-1.4.4-linux-amd64.tar.gz",
"https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.13.linux-amd64.tar.gz",
"https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz"
};
std::vector<std::string> tar_sha256sum = {
"sha256:05e8b01c1ddb6ba4f8c84e7dbc76529bdc09861f9ce17c213a49e8c334f184ed",
"sha256:0ccf983abf0b0fb64cc969079982bc34761ce22d7a3236a40d49d840d150e09a",
"sha256:1041ec4e2f40156e0731be175388be4c67aeceb44829f988df213e9fd5f26dc9",
"sha256:562688d70dcd1596556e7c671c1266f6e9c22b4f4fb8344efa8bed88fc2bac7b"
};
int i = 0;
for (auto test_tgz : filelist) {
auto jh1 = photon::thread_enable_join(photon::thread_create11(uds_server));
std::string fn_test_tgz = basename(test_tgz.c_str());
ASSERT_EQ(
0, download(test_tgz, (workdir + fn_test_tgz).c_str()));
auto file = lfs->open(fn_test_tgz.c_str(), O_RDONLY);
std::string tar_sha256="", idx_sha256="";
uds_client(file);
photon::thread_join(jh1);
auto dst_sha256 = sha256sum("/tmp/dest");
ASSERT_STREQ(dst_sha256.c_str(), tar_sha256sum[i++].c_str());
lfs->unlink(fn_test_tgz.c_str());
}
remove(uds_path);

}

TEST_F(GzIndexTest, pread) {
Expand Down
2 changes: 1 addition & 1 deletion src/overlaybd/gzip/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ add_library(gzip_lib STATIC ${SOURCE_GZIP})
target_include_directories(gzip_lib PUBLIC
${PHOTON_INCLUDE_DIR}
)
target_link_libraries(gzip_lib photon_static)
target_link_libraries(gzip_lib photon_static checksum_lib)

# if(BUILD_TESTING)
# add_subdirectory(test)
Expand Down
Loading

0 comments on commit 7324075

Please sign in to comment.