From 73240756694ec49de08ea597ebeee59d92a04738 Mon Sep 17 00:00:00 2001 From: Yifan Yuan Date: Wed, 13 Sep 2023 18:25:17 +0800 Subject: [PATCH] [Feat] GzMeta StreamConvertor StreamConvertor is a UDS server that can accept gzip data from Unix domain socket and generate gzip index & tar headers Signed-off-by: Yifan Yuan --- src/overlaybd/CMakeLists.txt | 3 + src/overlaybd/gzindex/CMakeLists.txt | 2 +- src/overlaybd/gzindex/gzfile_index.h | 3 +- src/overlaybd/gzindex/gzip_index_create.cpp | 13 +- src/overlaybd/gzindex/test/test.cpp | 91 +++++++----- src/overlaybd/gzip/CMakeLists.txt | 2 +- src/overlaybd/gzip/gz.cpp | 118 ++++++++++----- src/overlaybd/gzip/gz.h | 13 +- src/overlaybd/stream_convertor/CMakeLists.txt | 16 ++ .../stream_convertor/stream_conv.cpp | 140 ++++++++++++++++++ src/overlaybd/tar/libtar.h | 5 +- src/overlaybd/tar/test/test.cpp | 114 ++++++++++---- 12 files changed, 406 insertions(+), 114 deletions(-) create mode 100644 src/overlaybd/stream_convertor/CMakeLists.txt create mode 100644 src/overlaybd/stream_convertor/stream_conv.cpp diff --git a/src/overlaybd/CMakeLists.txt b/src/overlaybd/CMakeLists.txt index f39ddfdd..3f000a6b 100644 --- a/src/overlaybd/CMakeLists.txt +++ b/src/overlaybd/CMakeLists.txt @@ -7,6 +7,9 @@ add_subdirectory(extfs) add_subdirectory(gzip) add_subdirectory(gzindex) +add_subdirectory(stream_convertor) + + add_library(overlaybd_lib INTERFACE) target_include_directories(overlaybd_lib INTERFACE ${PHOTON_INCLUDE_DIR} diff --git a/src/overlaybd/gzindex/CMakeLists.txt b/src/overlaybd/gzindex/CMakeLists.txt index 0ccd496f..cbbaeb86 100644 --- a/src/overlaybd/gzindex/CMakeLists.txt +++ b/src/overlaybd/gzindex/CMakeLists.txt @@ -1,5 +1,5 @@ file(GLOB SOURCE_TAR "*.cpp") -add_library(gzindex_lib STATIC ${SOURCE_TAR}) +add_library(gzindex_lib STATIC ${SOURCE_TAR} ) target_include_directories(gzindex_lib PUBLIC ${PHOTON_INCLUDE_DIR}) target_link_libraries(gzindex_lib photon_static) diff --git a/src/overlaybd/gzindex/gzfile_index.h b/src/overlaybd/gzindex/gzfile_index.h index 8840d9c9..34f6d19e 100644 --- a/src/overlaybd/gzindex/gzfile_index.h +++ b/src/overlaybd/gzindex/gzfile_index.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include "photon/common/checksum/crc32c.h" @@ -91,4 +92,4 @@ int init_index_header(photon::fs::IFile* src, IndexFileHeader &h, off_t span, i int create_index_entry(z_stream strm, IndexFilterRecorder *filter, off_t en_pos, off_t de_pos, unsigned char *window); -int save_index_to_file(IndexFileHeader &h, INDEX& index, photon::fs::IFile *index_file); +int save_index_to_file(IndexFileHeader &h, INDEX& index, photon::fs::IFile *index_file, ssize_t gzip_file_size = -1); diff --git a/src/overlaybd/gzindex/gzip_index_create.cpp b/src/overlaybd/gzindex/gzip_index_create.cpp index 8330b1aa..91532451 100644 --- a/src/overlaybd/gzindex/gzip_index_create.cpp +++ b/src/overlaybd/gzindex/gzip_index_create.cpp @@ -13,13 +13,16 @@ See the License for the specific language governing permissions and limitations under the License. */ +// #include "../zfile/crc32/crc32c.h" #include #include #include #include #include + #include "gzfile_index.h" + #include "photon/common/alog.h" #include "photon/common/alog-stdstring.h" #include "photon/fs/localfs.h" @@ -271,15 +274,20 @@ static int get_compressed_index(const IndexFileHeader& h, const INDEX& index, un out_len = index_len; return 0; } - + LOG_INFO("index crc: `", crc32(0, buf, index_len)); return zlib_compress(h.dict_compress_level, buf, index_len, out, out_len); } -int save_index_to_file(IndexFileHeader &h, INDEX& index, photon::fs::IFile *index_file) { +int save_index_to_file(IndexFileHeader &h, INDEX& index, photon::fs::IFile *index_file, ssize_t gzip_file_size) { int indx_cmpr_buf_len = index.size() * sizeof(IndexEntry) * 2 + 4096; unsigned char *buf = new unsigned char[indx_cmpr_buf_len]; DEFER(delete []buf); + if (gzip_file_size != -1) { + LOG_INFO("save gzip file size: `", gzip_file_size); + h.gzip_file_size = gzip_file_size; + } + if (get_compressed_index(h, index, buf, indx_cmpr_buf_len) != 0) { LOG_ERROR_RETURN(0, -1, "Failed to get_compress_index"); } @@ -355,7 +363,6 @@ int create_gz_index(photon::fs::IFile* gzip_file, const char *index_file_path, o if (init_index_header(gzip_file, h, span, dict_compress_algo, dict_compress_level) != 0) { LOG_ERRNO_RETURN(0, -1, "init index header failed."); } - INDEX index; int ret = build_index(h, gzip_file, index, index_file); if (ret != 0) { diff --git a/src/overlaybd/gzindex/test/test.cpp b/src/overlaybd/gzindex/test/test.cpp index 0130f5dd..6d598c15 100644 --- a/src/overlaybd/gzindex/test/test.cpp +++ b/src/overlaybd/gzindex/test/test.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -198,11 +199,11 @@ const char *GzIndexTest::fn_defile = "fdata"; const char *GzIndexTest::fn_gzdata = "fdata.gz"; const char *GzIndexTest::fn_gzindex = "findex"; -char uds_path[] = "/tmp/udstest.sock"; +char uds_path[] = "/tmp/gzstream_test/stream_conv.sock"; int download(const std::string &url, const std::string &out) { - if (::access(out.c_str(), 0) == 0) - return 0; + // if (::access(out.c_str(), 0) == 0) + // return 0; auto base = std::string(basename(url.c_str())); // download file std::string cmd = "curl -s -o " + out + " " + url; @@ -221,27 +222,27 @@ void handler(photon::net::ISocketStream *sock) { char recv[65536]; size_t count = 0; auto dst = photon::fs::open_localfile_adaptor("/tmp/dest", O_TRUNC | O_CREAT | O_RDWR); - auto idx_file = - photon::fs::open_localfile_adaptor("/tmp/dest.gz_idx", O_TRUNC | O_CREAT | O_RDWR); DEFER(delete dst); - DEFER(delete idx_file); - - sock->read(recv, sizeof(size_t)); - // inf(sock, dst); - auto st_size = *(ssize_t *)recv; - auto gzfile = open_gzstream_file(sock, st_size, idx_file); - ASSERT_NE(gzfile, nullptr); - DEFER(delete gzfile); + // sock->read(recv, sizeof(ssize_t)); + // auto st_size = *(size_t*)recv; + auto gzstream = open_gzstream_file(sock, 0); + ASSERT_NE(gzstream, nullptr); + DEFER(delete gzstream); while (true) { - auto readn = gzfile->read(recv, 65536); + auto readn = gzstream->read(recv, 65536); if (readn <= 0) break; + count += readn; dst->write(recv, readn); } - LOG_INFO("RECV `", count); - save_gzip_index(gzfile); - ASSERT_STREQ(sha256sum("/tmp/dest").c_str(), "sha256:562688d70dcd1596556e7c671c1266f6e9c22b4f4fb8344efa8bed88fc2bac7b"); + auto fn_idx = gzstream->save_index(); + LOG_INFO("RECV `, fn_idx: `", count, fn_idx.c_str()); + + + // ASSERT_STREQ(sha256sum("/tmp/dest").c_str(), "sha256:562688d70dcd1596556e7c671c1266f6e9c22b4f4fb8344efa8bed88fc2bac7b"); + // ASSERT_STREQ(sha256sum(fn_idx.c_str()).c_str(), "sha256:af3ffd4965d83f3d235c48ce75e16a1f2edf12d0e5d82816d7066a8485aade82"); + } void uds_server() { @@ -258,6 +259,7 @@ void uds_server() { } void uds_client(photon::fs::IFile *file) { + photon::thread_yield_to(nullptr); auto cli = photon::net::new_uds_client(); DEFER({ delete cli; }); @@ -271,45 +273,56 @@ void uds_client(photon::fs::IFile *file) { struct stat st; file->fstat(&st); LOG_INFO("Connected `, start send file data(size: `)", path, st.st_size); + // sock->write(&st.st_size, sizeof(st.st_size)); char buff[65536]; - sock->write((void *)&st.st_size, sizeof(st.st_size)); auto count = 0; while (true) { auto readn = file->read(buff, 65536); ASSERT_NE(readn, -1); auto ret = sock->write(buff, readn); count += readn; - LOG_INFO("write ` bytes", ret); ASSERT_EQ(ret, readn); if (readn != 65536) break; } LOG_INFO("SEND: `", count); + + return; } TEST_F(GzIndexTest, stream) { - set_log_output_level(0); + std::string workdir = "/tmp/gzstream_test/"; + mkdir(workdir.c_str(), 0755); + auto lfs = photon::fs::new_localfs_adaptor(workdir.c_str()); LOG_INFO("start streamFile test"); - std::string fn_test_tgz = "/tmp/go1.17.6.linux-amd64.tar.gz"; - ASSERT_EQ( - 0, download("https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz", - fn_test_tgz.c_str())); - - auto jh1 = photon::thread_enable_join(photon::thread_create11(uds_server)); - - auto file = photon::fs::open_localfile_adaptor(fn_test_tgz.c_str(), O_RDONLY); - // auto dst = photon::fs::open_localfile_adaptor("/tmp/dest", O_TRUNC | O_CREAT |O_RDWR, 0644); - ASSERT_NE(file, nullptr); - // inf(file, dst); - uds_client(file); - - photon::thread_join(jh1); - remove(uds_path); - file->lseek(0, SEEK_SET); - auto fn_test_tgz_idx = fn_test_tgz + ".index"; - if (::access(fn_test_tgz_idx.c_str(), 0) != 0){ - ASSERT_EQ(create_gz_index(file, fn_test_tgz_idx.c_str()), 0); + std::vector filelist = { + "https://dadi-shared.oss-cn-beijing.aliyuncs.com/cri-containerd-cni-1.5.2-linux-amd64.tar.gz", + "https://dadi-shared.oss-cn-beijing.aliyuncs.com/containerd-1.4.4-linux-amd64.tar.gz", + "https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.13.linux-amd64.tar.gz", + "https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz" + }; + std::vector tar_sha256sum = { + "sha256:05e8b01c1ddb6ba4f8c84e7dbc76529bdc09861f9ce17c213a49e8c334f184ed", + "sha256:0ccf983abf0b0fb64cc969079982bc34761ce22d7a3236a40d49d840d150e09a", + "sha256:1041ec4e2f40156e0731be175388be4c67aeceb44829f988df213e9fd5f26dc9", + "sha256:562688d70dcd1596556e7c671c1266f6e9c22b4f4fb8344efa8bed88fc2bac7b" + }; + int i = 0; + for (auto test_tgz : filelist) { + auto jh1 = photon::thread_enable_join(photon::thread_create11(uds_server)); + std::string fn_test_tgz = basename(test_tgz.c_str()); + ASSERT_EQ( + 0, download(test_tgz, (workdir + fn_test_tgz).c_str())); + auto file = lfs->open(fn_test_tgz.c_str(), O_RDONLY); + std::string tar_sha256="", idx_sha256=""; + uds_client(file); + photon::thread_join(jh1); + auto dst_sha256 = sha256sum("/tmp/dest"); + ASSERT_STREQ(dst_sha256.c_str(), tar_sha256sum[i++].c_str()); + lfs->unlink(fn_test_tgz.c_str()); } + remove(uds_path); + } TEST_F(GzIndexTest, pread) { diff --git a/src/overlaybd/gzip/CMakeLists.txt b/src/overlaybd/gzip/CMakeLists.txt index 62f8fa1a..b5555c87 100644 --- a/src/overlaybd/gzip/CMakeLists.txt +++ b/src/overlaybd/gzip/CMakeLists.txt @@ -4,7 +4,7 @@ add_library(gzip_lib STATIC ${SOURCE_GZIP}) target_include_directories(gzip_lib PUBLIC ${PHOTON_INCLUDE_DIR} ) -target_link_libraries(gzip_lib photon_static) +target_link_libraries(gzip_lib photon_static checksum_lib) # if(BUILD_TESTING) # add_subdirectory(test) diff --git a/src/overlaybd/gzip/gz.cpp b/src/overlaybd/gzip/gz.cpp index 79e2cb28..52bff7c1 100644 --- a/src/overlaybd/gzip/gz.cpp +++ b/src/overlaybd/gzip/gz.cpp @@ -16,16 +16,23 @@ #include "gz.h" #include +#include #include #include #include +#include +#include #include #include +#include + #include #include #include #include #include +#include +#include "../../tools/sha256file.h" #include "../gzindex/gzfile_index.h" class GzAdaptorFile : public photon::fs::VirtualReadOnlyFile { public: @@ -65,15 +72,23 @@ class GzAdaptorFile : public photon::fs::VirtualReadOnlyFile { int m_cur = 0, m_left = 0; }; -// class StreamFile : public photon::fs::IFile { -// public: - -// } - -class GzStreamFile : public GzAdaptorFile { +class GzStreamFile : public IGzFile { public: - GzStreamFile(IStream *sock, ssize_t st_size, IFile *index_save) - : sock(sock), st_size(st_size), m_idx_file(index_save) { + GzStreamFile(IStream *sock, ssize_t st_size, bool index_save, + const char* uid, const char *_workdir) + : fstream(sock), st_size(st_size), workdir(_workdir){ + + if (uid == nullptr) { + timeval now; + gettimeofday(&now, NULL); + char suffix[32]{}; + sprintf(suffix, ".%lu", now.tv_sec*1000000 + now.tv_usec); + fn_idx = fn_idx + suffix; + fn_buff = fn_buff + suffix; + } else { + fn_idx = fn_idx + uid; + fn_buff = fn_buff + uid; + } strm.zalloc = Z_NULL; strm.zfree = Z_NULL; @@ -85,22 +100,35 @@ class GzStreamFile : public GzAdaptorFile { ttin = ttout = strm.avail_out = 0; init_index_header(this, m_idx_header, GZ_CHUNK_SIZE, GZ_DICT_COMPERSS_ALGO, GZ_COMPRESS_LEVEL); - if (m_idx_file) { + lfs = photon::fs::new_localfs_adaptor(workdir.c_str()); + m_indexes.clear(); + + LOG_INFO("create buffer file(`) and indexfile(`)", fn_buff, fn_idx); + if (index_save) { + m_idx_file = lfs->open(fn_idx.c_str(), O_TRUNC | O_CREAT | O_RDWR, 0644); m_idx_filter = new_index_filter(&m_idx_header, &m_indexes, m_idx_file); + fstream = new_sha256_file((IFile*)fstream, false); } - buffer_fs = photon::fs::new_localfs_adaptor(); - buffer_file = buffer_fs->open(FN_BUFF_FILE, O_TRUNC | O_CREAT | O_RDWR); - - LOG_INFO("create a GzStreamFile. expected size: `", st_size); + buffer_file = lfs->open(fn_buff.c_str(), O_TRUNC | O_CREAT | O_RDWR, 0644); + LOG_INFO("create a GzStreamFile. workdir: `", workdir); }; ~GzStreamFile() { (void)inflateEnd(&strm); - buffer_fs->unlink(FN_BUFF_FILE); + delete m_idx_file; + delete buffer_file; + delete m_idx_filter; + delete fstream; + for (auto it:m_indexes) { + delete it; + } + lfs->unlink(fn_buff.c_str()); } - off_t lseek(off_t offset, int whence) override { + UNIMPLEMENTED_POINTER(photon::fs::IFileSystem* filesystem() override); + + virtual off_t lseek(off_t offset, int whence) override { if (whence == SEEK_END) { return st_size - offset; } @@ -119,12 +147,12 @@ class GzStreamFile : public GzAdaptorFile { } LOG_ERRNO_RETURN(ESPIPE, -1, "unimplemented in GzStreamFile"); } - int fstat(struct stat *buf) override { + virtual int fstat(struct stat *buf) override { buf->st_size = st_size; return 0; } - ssize_t read(void *buf, size_t count) override { + virtual ssize_t read(void *buf, size_t count) override { size_t n = 0; LOG_DEBUG("count: `", count); while (count > 0) { @@ -142,14 +170,15 @@ class GzStreamFile : public GzAdaptorFile { LOG_DEBUG("trucate buffer file."); buffer_file->ftruncate(0); buffer_file->lseek(0, SEEK_SET); - strm.avail_in = sock->read(in, CHUNK); + strm.avail_in = fstream->read(in, CHUNK); if (strm.avail_in < 0) { LOG_ERRNO_RETURN(0, -1, "read buffer from uds failed"); } size_t readn = strm.avail_in; if (strm.avail_in == 0) break; - LOG_INFO("recv: `", strm.avail_in); + LOG_DEBUG("recv: `", strm.avail_in); + st_size += strm.avail_in; strm.next_in = in; int ret = 0; bf_start = 0; @@ -203,25 +232,45 @@ class GzStreamFile : public GzAdaptorFile { return n; } - int save_index() { - return save_index_to_file(m_idx_header, m_indexes, m_idx_file); + virtual std::string sha256_checksum() override { + if (sha256sum.empty()) { + sha256sum = ((SHA256File*)fstream)->sha256_checksum(); + } + return sha256sum; } - ssize_t st_size = -1; + + virtual std::string save_index() override { + if (save_index_to_file(m_idx_header, m_indexes, m_idx_file, st_size) != 0){ + LOG_ERRNO_RETURN(0, "", "save index failed"); + } + auto dst_file = this->sha256_checksum() + ".gz_idx"; + LOG_INFO("save index as: `", dst_file); + lfs->rename(fn_idx.c_str(), dst_file.c_str()); + return std::string(workdir) + "/" + dst_file; + } + ssize_t st_size = 0; IFile *m_file = nullptr; photon::fs::IFileSystem *m_fs = nullptr; const static int CHUNK = 32768; - IStream *sock; + IStream *fstream; z_stream strm; - Byte in[CHUNK], out[CHUNK]; // buffer[10485760]; + Byte in[CHUNK]{}, out[CHUNK]{}; // buffer[10485760]; /* allocate inflate state */ size_t have = 0; - off_t ttin, ttout, bf_start, bf_len; + off_t ttin, ttout; + off_t bf_start = 0, bf_len = 0; + off_t cur_offset = 0; - photon::fs::IFileSystem *buffer_fs = nullptr; + photon::fs::IFileSystem *lfs = nullptr; IFile *buffer_file = nullptr; IFile *m_idx_file = nullptr; - const char *FN_BUFF_FILE = "/tmp/decompbuffer"; + // const char *FN_BUFF_PREFIX = "/tmp/decompbuffer"; + // const char *FN_IDX_PREFIX = "/tmp/gzidx"; + std::string workdir; + std::string fn_buff = "decomp_buffer"; + std::string fn_idx = "gz_idx"; + std::string sha256sum = ""; IndexFilterRecorder *m_idx_filter = nullptr; INDEX m_indexes; IndexFileHeader m_idx_header; @@ -234,11 +283,12 @@ photon::fs::IFile *open_gzfile_adaptor(const char *path) { return new GzAdaptorFile(gzf); } -photon::fs::IFile *open_gzstream_file(IStream *sock, ssize_t st_size, - photon::fs::IFile *save_idx_as) { - return new GzStreamFile(sock, st_size, save_idx_as); +IGzFile *open_gzstream_file(IStream *sock, ssize_t st_size, + bool save_idx, const char *uid, const char *workdir) { + char buffer[1024]{}; + if (workdir == nullptr) { + getcwd(buffer, sizeof(buffer)); + workdir = (char*)buffer; + } + return new GzStreamFile(sock, st_size, true, uid, workdir); } - -int save_gzip_index(photon::fs::IFile *file) { - return ((GzStreamFile *)file)->save_index(); -} \ No newline at end of file diff --git a/src/overlaybd/gzip/gz.h b/src/overlaybd/gzip/gz.h index f918846f..9d8d70e4 100644 --- a/src/overlaybd/gzip/gz.h +++ b/src/overlaybd/gzip/gz.h @@ -18,9 +18,16 @@ #include #include +#include #include -photon::fs::IFile* open_gzfile_adaptor(const char *path); -photon::fs::IFile* open_gzstream_file(IStream *sock, ssize_t st_size, photon::fs::IFile *save_idx_as = nullptr); +class IGzFile : public photon::fs::VirtualReadOnlyFile { +public : + // return filename + virtual std::string save_index() = 0; + virtual std::string sha256_checksum() = 0; +}; -int save_gzip_index(photon::fs::IFile *gz_stream_file); +photon::fs::IFile* open_gzfile_adaptor(const char *path); +IGzFile* open_gzstream_file(IStream *sock, ssize_t st_size, + bool save_index = true, const char *uid = nullptr, const char *workdir = nullptr); diff --git a/src/overlaybd/stream_convertor/CMakeLists.txt b/src/overlaybd/stream_convertor/CMakeLists.txt new file mode 100644 index 00000000..17a7707a --- /dev/null +++ b/src/overlaybd/stream_convertor/CMakeLists.txt @@ -0,0 +1,16 @@ +file(GLOB SOURCE_SERV "*.cpp") + +add_executable(stream_conv ${SOURCE_SERV}) +target_include_directories(stream_conv PUBLIC + ${PHOTON_INCLUDE_DIR} +) +target_link_libraries(stream_conv +photon_static +gzip_lib +gzindex_lib +tar_lib +) + +# if(BUILD_TESTING) +# add_subdirectory(test) +# endif() diff --git a/src/overlaybd/stream_convertor/stream_conv.cpp b/src/overlaybd/stream_convertor/stream_conv.cpp new file mode 100644 index 00000000..b17bfaef --- /dev/null +++ b/src/overlaybd/stream_convertor/stream_conv.cpp @@ -0,0 +1,140 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "../gzip/gz.h" +#include "../tar/libtar.h" +#include "../../tools/sha256file.h" + + +class StreamConvertor { +public: + std::string get_task_id() { + auto now = std::chrono::system_clock::now(); + uint64_t us = std::chrono::duration_cast(now.time_since_epoch()).count(); + return std::to_string(us)+"." + std::to_string(rand() % 1000000); + } + + void serve(photon::net::ISocketStream *sock) { + auto start = std::chrono::steady_clock::now(); + LOG_DEBUG("Accepted"); + auto task_id = get_task_id(); + char recv[65536]; + + auto streamfile = open_gzstream_file(sock, 0, true, task_id.c_str()); + DEFER(delete streamfile); + auto turboOCI_stream = new UnTar(streamfile, nullptr, 0, 4096, nullptr, true); + DEFER(delete turboOCI_stream); + auto fn_tar_idx = task_id + ".tar.meta"; + auto tar_idx = lfs->open(fn_tar_idx.c_str(), O_TRUNC | O_CREAT | O_RDWR, 0644); + DEFER(delete tar_idx); + auto nitems = turboOCI_stream->dump_tar_headers(tar_idx); + LOG_INFO("` items get in `",nitems, fn_tar_idx); + streamfile->save_index(); + auto dst_tar_idx = streamfile->sha256_checksum() + ".tar.meta"; + if (lfs->rename(fn_tar_idx.c_str(), dst_tar_idx.c_str()) != 0) { + LOG_ERROR("rename metafile (` --> `) failed.", fn_tar_idx, dst_tar_idx); + } + LOG_INFO("save tar meta success. `", dst_tar_idx); + auto end = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast( end - start); + LOG_INFO("task ` finish. time_elapsed: `ms", task_id, elapsed.count()); + } + + int start_uds_server() { + + srand(time(NULL)); + lfs = photon::fs::new_localfs_adaptor(workdir); + auto uds_handler = [&](photon::net::ISocketStream *s) -> int { + LOG_INFO("Accept UDS"); + this->serve(s); + return 0; + }; + serv = photon::net::new_uds_server(true); + assert(0 == serv->bind(uds_path)); + assert(0 == serv->listen(100)); + char path[256]; + serv->getsockname(path, 256); + if (strcmp(path, uds_path) != 0) { + LOG_ERRNO_RETURN(0, -1, "get socket name error. ['`' != '`'(expected)]", + path, uds_path); + } + LOG_INFO("uds server listening `", path); + serv->set_handler(uds_handler); + serv->start_loop(true); + return 0; + } + + int stop() { + serv->terminate(); + delete serv; + return 0; + } + + photon::fs::IFileSystem *lfs = nullptr; + photon::net::ISocketServer *serv = nullptr; + + const char *uds_path= "/var/run/stream_conv.sock"; + const char *workdir = "/tmp"; +} *server; + + +static void stop_by_signal(int signal) { + LOG_INFO("Got signal ", signal); + server->stop(); + LOG_INFO("server stopped"); +} + +int main(int argc, char *argv[]){ + mallopt(M_TRIM_THRESHOLD, 128 * 1024); + // prctl(PR_SET_THP_DISABLE, 1); + + set_log_output_level(1); + photon::init(photon::INIT_EVENT_DEFAULT | photon::INIT_IO_DEFAULT | + photon::INIT_EVENT_SIGNAL); + //... + photon::block_all_signal(); + photon::sync_signal(SIGTERM, &stop_by_signal); + photon::sync_signal(SIGINT, &stop_by_signal); + photon::sync_signal(SIGTSTP, &stop_by_signal); + // photon::sync_signal(SIGPIPE, &ignore_signal); + // photon::sync_signal(SIGUSR2, &restart_by_signal); + DEFER(photon::fini()); + server = new StreamConvertor; + DEFER(delete server); + server->start_uds_server(); + +} + +// TEST_F(GzIndexTest, stream) { +// set_log_output_level(0); +// LOG_INFO("start streamFile test"); +// std::string fn_test_tgz = "/tmp/go1.17.6.linux-amd64.tar.gz"; +// ASSERT_EQ( +// 0, download("https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz", +// fn_test_tgz.c_str())); + +// auto jh1 = photon::thread_enable_join(photon::thread_create11(uds_server)); + +// auto file = photon::fs::open_localfile_adaptor(fn_test_tgz.c_str(), O_RDONLY); +// // auto dst = photon::fs::open_localfile_adaptor("/tmp/dest", O_TRUNC | O_CREAT |O_RDWR, 0644); +// ASSERT_NE(file, nullptr); +// // inf(file, dst); +// uds_client(file); + +// photon::thread_join(jh1); +// remove(uds_path); +// file->lseek(0, SEEK_SET); +// auto fn_test_tgz_idx = fn_test_tgz + ".index"; +// if (::access(fn_test_tgz_idx.c_str(), 0) != 0){ +// ASSERT_EQ(create_gz_index(file, fn_test_tgz_idx.c_str()), 0); +// } +// } \ No newline at end of file diff --git a/src/overlaybd/tar/libtar.h b/src/overlaybd/tar/libtar.h index d373e706..4222d6ab 100644 --- a/src/overlaybd/tar/libtar.h +++ b/src/overlaybd/tar/libtar.h @@ -189,13 +189,14 @@ class UnTar : public TarCore { fs_base_file(bf), meta_only(meta_only), from_tar_idx(from_tar_idx){} int extract_all(); + // return number of objects in this tarfile ssize_t dump_tar_headers(photon::fs::IFile* file); private: photon::fs::IFileSystem *fs = nullptr; // target photon::fs::IFile *fs_base_file = nullptr; - bool meta_only; - bool from_tar_idx; + bool meta_only = false; + bool from_tar_idx = false; std::set unpackedPaths; std::list> dirs; // diff --git a/src/overlaybd/tar/test/test.cpp b/src/overlaybd/tar/test/test.cpp index d9c1d132..a2308dcc 100644 --- a/src/overlaybd/tar/test/test.cpp +++ b/src/overlaybd/tar/test/test.cpp @@ -22,6 +22,7 @@ #include #include #include +#include "../../gzindex/gzfile.h" #include "../../extfs/extfs.h" #include "../../lsmt/file.h" #include "../libtar.h" @@ -54,12 +55,14 @@ class TarTest : public ::testing::Test { delete fs; } - int download(const std::string &url, const std::string &out) { + int download(const std::string &url, std::string out) { + if (out == "") { + out = workdir + "/" + std::string(basename(url.c_str())); + } if (fs->access(out.c_str(), 0) == 0) return 0; - auto base = std::string(basename(url.c_str())); // download file - std::string cmd = "curl -s -o " + workdir + "/" + out + " " + url; + std::string cmd = "curl -s -o " + out + " " + url; LOG_INFO(VALUE(cmd.c_str())); auto ret = system(cmd.c_str()); if (ret != 0) { @@ -216,40 +219,91 @@ TEST_F(TarTest, tar_meta) { TEST_F(TarTest, stream) { set_log_output_level(1); - std::string fn_test_idx = "dest.gz_idx"; std::string fn_test_tgz = "go1.17.6.linux-amd64.tar.gz"; ASSERT_EQ( 0, download("https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz", - fn_test_tgz.c_str())); - - auto src_file = fs->open(fn_test_tgz.c_str(), O_RDONLY, 0666); - auto idx_file = fs->open(fn_test_idx.c_str(), O_TRUNC|O_CREAT|O_RDWR, 0644); - struct stat st; - src_file->fstat(&st); - auto streamfile = open_gzstream_file(src_file, st.st_size, idx_file); - ASSERT_NE(nullptr, src_file); - DEFER(delete src_file); - + "")); set_log_output_level(0); - auto turboOCI_stream = new UnTar(streamfile, nullptr, 0, 4096, nullptr, true); - DEFER(delete turboOCI_stream); - - auto tar_idx = fs->open("stream.tar.meta", O_TRUNC | O_CREAT | O_RDWR, 0644); - DEFER(delete tar_idx); - auto obj_count = turboOCI_stream->dump_tar_headers(tar_idx); - EXPECT_NE(-1, obj_count); - tar_idx->lseek(0, SEEK_SET); - auto tar_meta_sha256 = new_sha256_file(tar_idx, false); - DEFER(delete tar_meta_sha256); - ASSERT_STREQ(tar_meta_sha256->sha256_checksum().c_str(), "sha256:c5aaa64a1b70964758e190b88b3e65528607b0002bffe42513bc65ac6e65f337"); - save_gzip_index(streamfile); - idx_file->lseek(0, SEEK_SET); - auto gz_idx_sha256 = new_sha256_file(idx_file, false); - DEFER(delete gz_idx_sha256); - ASSERT_STREQ(tar_meta_sha256->sha256_checksum().c_str(), "sha256:af3ffd4965d83f3d235c48ce75e16a1f2edf12d0e5d82816d7066a8485aade82"); + for (int i = 0; i < 3; i++) { + auto src_file = fs->open(fn_test_tgz.c_str(), O_RDONLY, 0644); + struct stat st; + src_file->fstat(&st); + auto streamfile = open_gzstream_file(src_file, 0); + auto fn = ("/tmp/tar_test/" + fn_test_tgz); + ASSERT_NE(nullptr, src_file); + DEFER(delete src_file); + + auto turboOCI_stream = new UnTar(streamfile, nullptr, 0, 4096, nullptr, true); + DEFER(delete turboOCI_stream); + + auto tar_idx = fs->open("stream.tar.meta", O_TRUNC | O_CREAT | O_RDWR, 0644); + DEFER(delete tar_idx); + auto obj_count = turboOCI_stream->dump_tar_headers(tar_idx); + EXPECT_NE(-1, obj_count); + tar_idx->lseek(0, SEEK_SET); + auto tar_meta_sha256 = new_sha256_file(tar_idx, false); + DEFER(delete tar_meta_sha256); + ASSERT_STREQ(tar_meta_sha256->sha256_checksum().c_str(), "sha256:c5aaa64a1b70964758e190b88b3e65528607b0002bffe42513bc65ac6e65f337"); + auto idx_fn = streamfile->save_index(); + // auto idx_fn = "/tmp/test.idx"; + + // create_gz_index(src_file, idx_fn); + auto idx_sha256 = sha256sum(idx_fn.c_str()); + delete streamfile; + ASSERT_STREQ(idx_sha256.c_str(), "sha256:af3ffd4965d83f3d235c48ce75e16a1f2edf12d0e5d82816d7066a8485aade82"); + } } +// TEST_F(TarTest, stream_tar_meta) { +// // set_log_output_level(0); +// std::vector filelist { +// "https://dadi-shared.oss-cn-beijing.aliyuncs.com/go1.17.6.linux-amd64.tar.gz" +// }; +// for (auto file : filelist){ +// ASSERT_EQ(0, download(, "")); + + +// auto src_file = fs->open("latest.tar", O_RDONLY, 0666); +// ASSERT_NE(nullptr, src_file); +// DEFER(delete src_file); +// auto verify_dev = createDevice("verify", src_file); +// make_extfs(verify_dev); +// auto verify_ext4fs = new_extfs(verify_dev, false); +// auto verifyfs = new_subfs(verify_ext4fs, "/", true); +// auto turboOCI_verify = new UnTar(src_file, verifyfs, 0, 4096, verify_dev, true); +// ASSERT_EQ(0, turboOCI_verify->extract_all()); +// verify_ext4fs->sync(); +// delete turboOCI_verify; +// delete verifyfs; + +// src_file->lseek(0, 0); + +// auto tar_idx = fs->open("latest.tar.meta", O_TRUNC | O_CREAT | O_RDWR, 0644); +// auto imgfile = createDevice("mock", src_file); +// DEFER(delete imgfile;); +// auto tar = new UnTar(src_file, nullptr, 0, 4096, nullptr, true); +// auto obj_count = tar->dump_tar_headers(tar_idx); +// EXPECT_NE(-1, obj_count); +// LOG_INFO("objects count: `", obj_count); +// tar_idx->lseek(0,0); + +// make_extfs(imgfile); +// auto extfs = new_extfs(imgfile, false); +// auto target = new_subfs(extfs, "/", true); +// auto turboOCI_mock = new UnTar(tar_idx, target, TAR_IGNORE_CRC, 4096, imgfile, true, true); +// auto ret = turboOCI_mock->extract_all(); +// delete turboOCI_mock; +// delete target; + +// ASSERT_EQ(0, ret); +// EXPECT_EQ(0, do_verify(verify_dev, imgfile)); +// delete tar_idx; +// delete tar; +// } + +// } + TEST_F(TarTest, tar_header_check) { auto fn = "data"; auto tarfs = new_tar_fs_adaptor(fs);