Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT CLOSE OR MERGE] Rti 12596 compress to file #32

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 83 additions & 2 deletions c_src/zstd_nif.c
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
#include "erl_nif.h"

#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <zstd.h>

#define MAX_BYTES_TO_NIF 20000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How and Why this value was chosen?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


ErlNifTSDKey zstdDecompressContextKey;
ErlNifTSDKey zstdCompressContextKey;
ErlNifTSDKey zstdCompressToFileContextKey;

static ERL_NIF_TERM do_compress_to_file(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);


static ERL_NIF_TERM zstd_nif_compress(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
ErlNifBinary bin, ret_bin;
Expand Down Expand Up @@ -62,15 +69,89 @@ static ERL_NIF_TERM zstd_nif_decompress(ErlNifEnv* env, int argc, const ERL_NIF_
return out;
}



static int save_file(const char* fileName, const void* buff, size_t buffSize)
{
FILE* const oFile = fopen(fileName, "a");
if (!oFile) {
return 0;
}
size_t const wSize = fwrite(buff, 1, buffSize, oFile);
if (wSize != (size_t)buffSize) {
return 0;
}
if (fclose(oFile)) {
return 0;
}
return 1;
}
Comment on lines +74 to +88
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this part is IO bound and it's not marked as such, then opening and closing are expensive operations on top of that doing it constantly is going to be slow. There is no fsync to ensure that the data is written to the disk.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I highly recommend reading the I/O Queues part of the erl_nif

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also probably it should be scheduled with ERL_NIF_DIRTY_JOB_IO_BOUND

Copy link
Collaborator Author

@tak30 tak30 Aug 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding fsync: I tried to use fsync or any variation like fdatasync, but they are not compliant with C99 : error: implicit declaration of function 'fsync' is invalid in C99 [-Werror,-Wimplicit-function-declaration] fsync(); ^ 1 error generated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding I/O Queues, that is a good idea, I just found this very good example, we could replicate it: https://github.com/erlang/otp/blob/master/erts/emulator/nifs/common/zlib_nif.c

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also probably it should be scheduled with ERL_NIF_DIRTY_JOB_IO_BOUND

I'm not sure about that as compressing uses a lot of cpu

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because fsync is under unistd.h

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if we have a way with IO queues, then I would rather follow that approach rather than directly writing to disk.

static ERL_NIF_TERM zstd_nif_compress_to_file(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
ErlNifBinary bin;
if(!enif_inspect_iolist_as_binary(env, argv[0], &bin)) return enif_make_badarg(env);
if (bin.size > MAX_BYTES_TO_NIF) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data should be splitted and handle in shorter nif calls. I recommend reading Yielding NIF and Long-Running NIFs https://www.erlang.org/doc/man/erl_nif.html

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should do the same as zlib

return enif_schedule_nif(env, "do_compress_to_file", ERL_NIF_DIRTY_JOB_CPU_BOUND, do_compress_to_file, argc, argv);
}

return do_compress_to_file(env, argc, argv);
}

static ERL_NIF_TERM do_compress_to_file(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
ErlNifBinary bin, ret_bin;
size_t buff_size, compressed_size;
unsigned int compression_level, path_len;

ZSTD_CCtx* ctx = (ZSTD_CCtx*)enif_tsd_get(zstdCompressToFileContextKey);
if (!ctx) {
ctx = ZSTD_createCCtx();
enif_tsd_set(zstdCompressToFileContextKey, ctx);
}

enif_get_list_length(env, argv[1], &path_len);
char path[path_len + 1];

if(!enif_inspect_iolist_as_binary(env, argv[0], &bin)
|| !enif_get_string(env, argv[1], path, (path_len + 1), ERL_NIF_LATIN1)
|| !enif_get_uint(env, argv[2], &compression_level)
|| compression_level > ZSTD_maxCLevel())
return enif_make_badarg(env);

buff_size = ZSTD_compressBound(bin.size);

if(!enif_alloc_binary(buff_size, &ret_bin))
return enif_make_atom(env, "error");

compressed_size = ZSTD_compressCCtx(ctx, ret_bin.data, buff_size, bin.data, bin.size, compression_level);
if(ZSTD_isError(compressed_size)) {
enif_release_binary(&ret_bin);
return enif_make_atom(env, "error");
}

if(!enif_realloc_binary(&ret_bin, compressed_size)) {
enif_release_binary(&ret_bin);
return enif_make_atom(env, "error");
}

if (!save_file(path, ret_bin.data, compressed_size)) {
enif_release_binary(&ret_bin);
return enif_make_atom(env, "error");
}

enif_release_binary(&ret_bin);
return enif_make_atom(env, "ok");
}

static ErlNifFunc nif_funcs[] = {
{"compress", 2, zstd_nif_compress},
{"decompress", 1, zstd_nif_decompress}
{"decompress", 1, zstd_nif_decompress},
{"compress_to_file", 3, zstd_nif_compress_to_file},
};

static int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
{
enif_tsd_key_create("zstd_decompress_context_key", &zstdDecompressContextKey);
enif_tsd_key_create("zstd_compress_context_key", &zstdCompressContextKey);
enif_tsd_key_create("zstd_compress_to_file_context_key", &zstdCompressToFileContextKey);
return 0;
}

Expand Down
12 changes: 12 additions & 0 deletions src/zstd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

-export([compress/1, compress/2]).
-export([decompress/1]).
-export([compress_to_file/2, compress_to_file/3]).

-on_load init/0.

Expand All @@ -21,6 +22,17 @@ compress(_, _) ->
decompress(_) ->
erlang:nif_error(?LINE).

-spec compress_to_file(Uncompressed :: iolist(), Path :: string()) -> ok | error.
compress_to_file(IoList, Path) ->
compress_to_file(IoList, Path, 1).

-spec compress_to_file(Uncompressed :: iolist(),
Path :: string(),
CompressionLevel :: 0..22) ->
ok | error.
compress_to_file(_, _, _) ->
erlang:nif_error(?LINE).

init() ->
SoName =
case code:priv_dir(?APPNAME) of
Expand Down
26 changes: 25 additions & 1 deletion test/zstd_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,31 @@
-include_lib("eunit/include/eunit.hrl").

zstd_test() ->
Data = <<"Hello, World!">>,
Data = <<"Hello, World!\n">>,
?assertEqual(Data,
zstd:decompress(
zstd:compress(Data))).

compress_to_file_test() ->
Path = "/tmp/zstd_test.zst",
Data = [<<"Hello">>, <<" there!">>],
compress_to_file_and_check(Path, Data).

compress_to_file_using_dirty_scheduler_test() ->
Path = "/tmp/zstd_dirty_scheduler_test.zst",
Data =
[base64:encode(
crypto:strong_rand_bytes(64000))],
compress_to_file_and_check(Path, Data).

compress_to_file_and_check(Path, Data) ->
case filelib:is_regular(Path) of
true ->
file:delete(Path);
_ ->
ok
end,
?assertEqual(ok, zstd:compress_to_file(Data, Path)),
{ok, ToDecompress} = file:read_file(Path),
Decompressed = zstd:decompress(ToDecompress),
?assertEqual(Decompressed, erlang:iolist_to_binary(Data)).