Skip to content

Commit

Permalink
Add verify blocks after sync (#715)
Browse files Browse the repository at this point in the history
Co-authored-by: 杨赫然 <[email protected]>
  • Loading branch information
feiniks and 杨赫然 authored Nov 18, 2024
1 parent 2cf6b99 commit 665d008
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 0 deletions.
5 changes: 5 additions & 0 deletions fileserver/option/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
WindowsEncoding string
SkipBlockHash bool
FsCacheLimit int64
VerifyClientBlocks bool

// general options
CloudMode bool
Expand Down Expand Up @@ -79,6 +80,7 @@ func initDefaultOptions() {
ClusterSharedTempFileMode = 0600
DefaultQuota = InfiniteQuota
FsCacheLimit = 2 << 30
VerifyClientBlocks = true
FsIdListRequestTimeout = -1
DBOpTimeout = 60 * time.Second
}
Expand Down Expand Up @@ -211,6 +213,9 @@ func parseFileServerSection(section *ini.Section) {
FsIdListRequestTimeout = fsIdListRequestTimeout
}
}
if key, err := section.GetKey("verify_client_blocks_after_sync"); err == nil {
VerifyClientBlocks, _ = key.Bool()
}
}

func parseQuota(quotaStr string) int64 {
Expand Down
81 changes: 81 additions & 0 deletions fileserver/sync_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,12 @@ func putUpdateBranchCB(rsp http.ResponseWriter, r *http.Request) *appError {
return &appError{nil, msg, seafHTTPResNoQuota}
}

if option.VerifyClientBlocks {
if body, err := checkBlocks(r.Context(), repo, base, newCommit); err != nil {
return &appError{nil, body, seafHTTPResBlockMissing}
}
}

token := r.Header.Get("Seafile-Repo-Token")
if token == "" {
token = utils.GetAuthorizationToken(r.Header)
Expand All @@ -1072,6 +1078,81 @@ func putUpdateBranchCB(rsp http.ResponseWriter, r *http.Request) *appError {
return nil
}

type checkBlockAux struct {
storeID string
version int
fileList []string
}

func checkBlocks(ctx context.Context, repo *repomgr.Repo, base, remote *commitmgr.Commit) (string, error) {
aux := new(checkBlockAux)
aux.storeID = repo.StoreID
aux.version = repo.Version
opt := &diff.DiffOptions{
FileCB: checkFileBlocks,
DirCB: checkDirCB,
Ctx: ctx,
RepoID: repo.StoreID}
opt.Data = aux

trees := []string{base.RootID, remote.RootID}
if err := diff.DiffTrees(trees, opt); err != nil {
return "", err
}

if len(aux.fileList) == 0 {
return "", nil
}

body, _ := json.Marshal(aux.fileList)

return string(body), fmt.Errorf("block is missing")
}

func checkFileBlocks(ctx context.Context, baseDir string, files []*fsmgr.SeafDirent, data interface{}) error {
select {
case <-ctx.Done():
return context.Canceled
default:
}

file1 := files[0]
file2 := files[1]

aux, ok := data.(*checkBlockAux)
if !ok {
err := fmt.Errorf("failed to assert results")
return err
}

if file2 == nil || file2.ID == emptySHA1 || (file1 != nil && file1.ID == file2.ID) {
return nil
}

file, err := fsmgr.GetSeafile(aux.storeID, file2.ID)
if err != nil {
return err
}
for _, blkID := range file.BlkIDs {
if !blockmgr.Exists(aux.storeID, blkID) {
aux.fileList = append(aux.fileList, file2.Name)
return nil
}
}

return nil
}

func checkDirCB(ctx context.Context, baseDir string, dirs []*fsmgr.SeafDirent, data interface{}, recurse *bool) error {
select {
case <-ctx.Done():
return context.Canceled
default:
}

return nil
}

func getHeadCommit(rsp http.ResponseWriter, r *http.Request) *appError {
vars := mux.Vars(r)
repoID := vars["repoid"]
Expand Down
115 changes: 115 additions & 0 deletions server/http-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ load_http_config (HttpServerStruct *htp_server, SeafileSession *session)
int worker_threads;
char *encoding;
char *cluster_shared_temp_file_mode = NULL;
gboolean verify_client_blocks;

host = fileserver_config_get_string (session->config, HOST, &error);
if (!error) {
Expand Down Expand Up @@ -194,6 +195,18 @@ load_http_config (HttpServerStruct *htp_server, SeafileSession *session)
}
seaf_message ("fileserver: worker_threads = %d\n", htp_server->worker_threads);

verify_client_blocks = fileserver_config_get_boolean (session->config,
"verify_client_blocks_after_sync",
&error);
if (error) {
htp_server->verify_client_blocks = TRUE;
g_clear_error(&error);
} else {
htp_server->verify_client_blocks = verify_client_blocks;
}
seaf_message ("fileserver: verify_client_blocks = %d\n",
htp_server->verify_client_blocks);

cluster_shared_temp_file_mode = fileserver_config_get_string (session->config,
"cluster_shared_temp_file_mode",
&error);
Expand Down Expand Up @@ -1090,6 +1103,95 @@ fast_forward_or_merge (const char *repo_id,
return ret;
}

typedef struct CheckBlockAux {
GList *file_list;
const char *store_id;
int version;
} CheckBlockAux;

static int
check_file_blocks (int n, const char *basedir, SeafDirent *files[], void *data)
{
Seafile *file = NULL;
char *block_id;
int i = 0;
SeafDirent *file1 = files[0];
SeafDirent *file2 = files[1];
CheckBlockAux *aux = (CheckBlockAux*)data;

if (!file2 || strcmp (file2->id, EMPTY_SHA1) == 0 || (file1 && strcmp (file1->id, file2->id) == 0)) {
return 0;
}

file = seaf_fs_manager_get_seafile (seaf->fs_mgr, aux->store_id, aux->version, file2->id);
if (!file) {
return -1;
}

for (i = 0; i < file->n_blocks; ++i) {
block_id = file->blk_sha1s[i];
if (!seaf_block_manager_block_exists (seaf->block_mgr, aux->store_id, aux->version, block_id)) {
aux->file_list = g_list_prepend (aux->file_list, g_strdup (file2->name));
goto out;
}
}

out:
seafile_unref (file);
return 0;
}

static int
check_dir_cb (int n, const char *basedir, SeafDirent *dirs[], void *data,
gboolean *recurse)
{
return 0;
}

static int
check_blocks (SeafRepo *repo, SeafCommit *base, SeafCommit *remote, char **ret_body) {
DiffOptions opts;
memset (&opts, 0, sizeof(opts));
memcpy (opts.store_id, repo->store_id, 36);
opts.version = repo->version;

opts.file_cb = check_file_blocks;
opts.dir_cb = check_dir_cb;

CheckBlockAux aux;
memset (&aux, 0, sizeof(aux));
aux.store_id = repo->store_id;
aux.version = repo->version;
opts.data = &aux;

const char *trees[2];
trees[0] = base->root_id;
trees[1] = remote->root_id;

if (diff_trees (2, trees, &opts) < 0) {
seaf_warning ("Failed to diff base and remote head for repo %.8s.\n",
repo->id);
return -1;
}

if (!aux.file_list) {
return 0;
}

json_t *obj_array = json_array ();
GList *ptr;
for (ptr = aux.file_list; ptr; ptr = ptr->next) {
json_array_append_new (obj_array, json_string (ptr->data));
g_free (ptr->data);
}
g_list_free (aux.file_list);

*ret_body = json_dumps (obj_array, JSON_COMPACT);
json_decref (obj_array);

return -1;
}

static void
put_update_branch_cb (evhtp_request_t *req, void *arg)
{
Expand Down Expand Up @@ -1154,6 +1256,19 @@ put_update_branch_cb (evhtp_request_t *req, void *arg)

token = get_auth_token (req);

if (seaf->http_server->verify_client_blocks) {
char *ret_body = NULL;
int rc = check_blocks(repo, base, new_commit, &ret_body);
if (rc < 0) {
if (ret_body) {
evbuffer_add (req->buffer_out, ret_body, strlen (ret_body));
}
evhtp_send_reply (req, SEAF_HTTP_RES_BLOCK_MISSING);
g_free (ret_body);
goto out;
}
}

if (fast_forward_or_merge (repo_id, base, new_commit, token) < 0) {
seaf_warning ("Fast forward merge for repo %s is failed.\n", repo_id);
evhtp_send_reply (req, EVHTP_RES_SERVERR);
Expand Down
2 changes: 2 additions & 0 deletions server/http-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ struct _HttpServerStruct {
char *windows_encoding;
int worker_threads;
int cluster_shared_temp_file_mode;

gboolean verify_client_blocks;
};

typedef struct _HttpServerStruct HttpServerStruct;
Expand Down

0 comments on commit 665d008

Please sign in to comment.