diff --git a/common/branch-mgr.c b/common/branch-mgr.c index ffdb8b27..4539318e 100644 --- a/common/branch-mgr.c +++ b/common/branch-mgr.c @@ -368,19 +368,70 @@ on_branch_updated (SeafBranchManager *mgr, SeafBranch *branch) publish_repo_update_event (branch->repo_id, branch->commit_id); } +static gboolean +get_gc_id (SeafDBRow *row, void *data) +{ + char **out_gc_id = data; + + *out_gc_id = g_strdup(seaf_db_row_get_column_text (row, 0)); + + return FALSE; +} + int seaf_branch_manager_test_and_update_branch (SeafBranchManager *mgr, SeafBranch *branch, - const char *old_commit_id) + const char *old_commit_id, + gboolean check_gc, + const char *last_gc_id, + const char *origin_repo_id, + gboolean *gc_conflict) { SeafDBTrans *trans; char *sql; char commit_id[41] = { 0 }; + char *gc_id = NULL; + + if (check_gc) + *gc_conflict = FALSE; trans = seaf_db_begin_transaction (mgr->seaf->db); if (!trans) return -1; + if (check_gc) { + sql = "SELECT gc_id FROM GCID WHERE repo_id = ? FOR UPDATE"; + if (!origin_repo_id) { + if (seaf_db_trans_foreach_selected_row (trans, sql, + get_gc_id, &gc_id, + 1, "string", branch->repo_id) < 0) { + seaf_db_rollback (trans); + seaf_db_trans_close (trans); + return -1; + } + } + else { + if (seaf_db_trans_foreach_selected_row (trans, sql, + get_gc_id, &gc_id, + 1, "string", origin_repo_id) < 0) { + seaf_db_rollback (trans); + seaf_db_trans_close (trans); + return -1; + } + } + + if (g_strcmp0 (last_gc_id, gc_id) != 0) { + seaf_warning ("Head branch update for repo %s conflicts with GC.\n", + branch->repo_id); + seaf_db_rollback (trans); + seaf_db_trans_close (trans); + *gc_conflict = TRUE; + g_free (gc_id); + return -1; + } + g_free (gc_id); + } + switch (seaf_db_type (mgr->seaf->db)) { case SEAF_DB_TYPE_MYSQL: case SEAF_DB_TYPE_PGSQL: diff --git a/common/branch-mgr.h b/common/branch-mgr.h index 9d459fb5..43bf9b52 100644 --- a/common/branch-mgr.h +++ b/common/branch-mgr.h @@ -59,7 +59,11 @@ seaf_branch_manager_update_branch (SeafBranchManager *mgr, int seaf_branch_manager_test_and_update_branch (SeafBranchManager *mgr, SeafBranch *branch, - const char *old_commit_id); + const char *old_commit_id, + gboolean check_gc, + const char *last_gc_id, + const char *origin_repo_id, + gboolean *gc_conflict); #endif SeafBranch * diff --git a/common/rpc-service.c b/common/rpc-service.c index 3b216e60..b86dbbe3 100644 --- a/common/rpc-service.c +++ b/common/rpc-service.c @@ -1108,7 +1108,8 @@ seafile_change_repo_passwd (const char *repo_id, seaf_branch_set_commit (repo->head, commit->commit_id); if (seaf_branch_manager_test_and_update_branch (seaf->branch_mgr, repo->head, - parent->commit_id) < 0) { + parent->commit_id, + FALSE, NULL, NULL, NULL) < 0) { seaf_repo_unref (repo); seaf_commit_unref (commit); seaf_commit_unref (parent); @@ -1365,6 +1366,16 @@ seafile_get_repo_history_limit (const char *repo_id, return seaf_repo_manager_get_repo_history_limit (seaf->repo_mgr, repo_id); } +int +seafile_set_repo_valid_since (const char *repo_id, + gint64 timestamp, + GError **error) +{ + return seaf_repo_manager_set_repo_valid_since (seaf->repo_mgr, + repo_id, + timestamp); +} + int seafile_repo_set_access_property (const char *repo_id, const char *ap, GError **error) { diff --git a/fileserver/fileop.go b/fileserver/fileop.go index c06bbda3..5e932126 100644 --- a/fileserver/fileop.go +++ b/fileserver/fileop.go @@ -1455,7 +1455,7 @@ func mkdirWithParents(repoID, parentDir, newDirPath, user string) error { } buf := fmt.Sprintf("Added directory \"%s\"", relativeDirCan) - _, err = genNewCommit(repo, headCommit, rootID, user, buf, true) + _, err = genNewCommit(repo, headCommit, rootID, user, buf, true, "", false) if err != nil { err := fmt.Errorf("failed to generate new commit: %v", err) return err @@ -1714,7 +1714,13 @@ func postMultiFiles(rsp http.ResponseWriter, r *http.Request, repoID, parentDir, } } - retStr, err := postFilesAndGenCommit(fileNames, repo.ID, user, canonPath, replace, ids, sizes, lastModify) + gcID, err := repomgr.GetCurrentGCID(repo.StoreID) + if err != nil { + err := fmt.Errorf("failed to get current gc id for repo %s: %v", repoID, err) + return &appError{err, "", http.StatusInternalServerError} + } + + retStr, err := postFilesAndGenCommit(fileNames, repo.ID, user, canonPath, replace, ids, sizes, lastModify, gcID) if err != nil { err := fmt.Errorf("failed to post files and gen commit: %v", err) return &appError{err, "", http.StatusInternalServerError} @@ -1770,7 +1776,7 @@ func checkFilesWithSameName(repo *repomgr.Repo, canonPath string, fileNames []st return false } -func postFilesAndGenCommit(fileNames []string, repoID string, user, canonPath string, replace bool, ids []string, sizes []int64, lastModify int64) (string, error) { +func postFilesAndGenCommit(fileNames []string, repoID string, user, canonPath string, replace bool, ids []string, sizes []int64, lastModify int64, lastGCID string) (string, error) { handleConncurrentUpdate := true if !replace { handleConncurrentUpdate = false @@ -1816,7 +1822,7 @@ retry: buf = fmt.Sprintf("Added \"%s\".", fileNames[0]) } - _, err = genNewCommit(repo, headCommit, rootID, user, buf, handleConncurrentUpdate) + _, err = genNewCommit(repo, headCommit, rootID, user, buf, handleConncurrentUpdate, lastGCID, true) if err != nil { if err != ErrConflict { err := fmt.Errorf("failed to generate new commit: %v", err) @@ -1880,7 +1886,7 @@ func getCanonPath(p string) string { var ErrConflict = fmt.Errorf("Concurent upload conflict") -func genNewCommit(repo *repomgr.Repo, base *commitmgr.Commit, newRoot, user, desc string, handleConncurrentUpdate bool) (string, error) { +func genNewCommit(repo *repomgr.Repo, base *commitmgr.Commit, newRoot, user, desc string, handleConncurrentUpdate bool, lastGCID string, checkGC bool) (string, error) { var retryCnt int repoID := repo.ID commit := commitmgr.NewCommit(repoID, base.CommitID, newRoot, user, desc) @@ -1895,7 +1901,7 @@ func genNewCommit(repo *repomgr.Repo, base *commitmgr.Commit, newRoot, user, des maxRetryCnt := 10 for { - retry, err := genCommitNeedRetry(repo, base, commit, newRoot, user, handleConncurrentUpdate, &commitID) + retry, err := genCommitNeedRetry(repo, base, commit, newRoot, user, handleConncurrentUpdate, &commitID, lastGCID, checkGC) if err != nil { return "", err } @@ -1925,10 +1931,19 @@ func genNewCommit(repo *repomgr.Repo, base *commitmgr.Commit, newRoot, user, des return commitID, nil } -func fastForwardOrMerge(user string, repo *repomgr.Repo, base, newCommit *commitmgr.Commit) error { +func fastForwardOrMerge(user, token string, repo *repomgr.Repo, base, newCommit *commitmgr.Commit) error { var retryCnt int + checkGC, err := repomgr.HasLastGCID(repo.ID, token) + if err != nil { + return err + } + var lastGCID string + if checkGC { + lastGCID, _ = repomgr.GetLastGCID(repo.ID, token) + repomgr.RemoveLastGCID(repo.ID, token) + } for { - retry, err := genCommitNeedRetry(repo, base, newCommit, newCommit.RootID, user, true, nil) + retry, err := genCommitNeedRetry(repo, base, newCommit, newCommit.RootID, user, true, nil, lastGCID, checkGC) if err != nil { return err } @@ -1948,7 +1963,7 @@ func fastForwardOrMerge(user string, repo *repomgr.Repo, base, newCommit *commit return nil } -func genCommitNeedRetry(repo *repomgr.Repo, base *commitmgr.Commit, commit *commitmgr.Commit, newRoot, user string, handleConncurrentUpdate bool, commitID *string) (bool, error) { +func genCommitNeedRetry(repo *repomgr.Repo, base *commitmgr.Commit, commit *commitmgr.Commit, newRoot, user string, handleConncurrentUpdate bool, commitID *string, lastGCID string, checkGC bool) (bool, error) { var secondParentID string repoID := repo.ID var mergeDesc string @@ -2001,7 +2016,10 @@ func genCommitNeedRetry(repo *repomgr.Repo, base *commitmgr.Commit, commit *comm mergedCommit = commit } - err = updateBranch(repoID, mergedCommit.CommitID, currentHead.CommitID, secondParentID) + gcConflict, err := updateBranch(repoID, repo.StoreID, mergedCommit.CommitID, currentHead.CommitID, secondParentID, checkGC, lastGCID) + if gcConflict { + return false, err + } if err != nil { return true, nil } @@ -2024,56 +2042,80 @@ func genMergeDesc(repo *repomgr.Repo, mergedRoot, p1Root, p2Root string) string return desc } -func updateBranch(repoID, newCommitID, oldCommitID, secondParentID string) error { +func updateBranch(repoID, originRepoID, newCommitID, oldCommitID, secondParentID string, checkGC bool, lastGCID string) (gcConflict bool, err error) { + ctx, cancel := context.WithTimeout(context.Background(), option.DBOpTimeout) + defer cancel() + trans, err := seafileDB.BeginTx(ctx, nil) + if err != nil { + err := fmt.Errorf("failed to start transaction: %v", err) + return false, err + } + + var row *sql.Row + var sqlStr string + if checkGC { + sqlStr = "SELECT gc_id FROM GCID WHERE repo_id = ? FOR UPDATE" + if originRepoID == "" { + row = trans.QueryRowContext(ctx, sqlStr, repoID) + } else { + row = trans.QueryRowContext(ctx, sqlStr, originRepoID) + } + var gcID sql.NullString + if err := row.Scan(&gcID); err != nil { + if err != sql.ErrNoRows { + trans.Rollback() + return false, err + } + } + + if lastGCID != gcID.String { + err = fmt.Errorf("Head branch update for repo %s conflicts with GC.", repoID) + trans.Rollback() + return true, err + } + } + var commitID string name := "master" - var sqlStr string if strings.EqualFold(dbType, "mysql") { sqlStr = "SELECT commit_id FROM Branch WHERE name = ? AND repo_id = ? FOR UPDATE" } else { sqlStr = "SELECT commit_id FROM Branch WHERE name = ? AND repo_id = ?" } - ctx, cancel := context.WithTimeout(context.Background(), option.DBOpTimeout) - defer cancel() - trans, err := seafileDB.BeginTx(ctx, nil) - if err != nil { - err := fmt.Errorf("failed to start transaction: %v", err) - return err - } - row := trans.QueryRowContext(ctx, sqlStr, name, repoID) + row = trans.QueryRowContext(ctx, sqlStr, name, repoID) if err := row.Scan(&commitID); err != nil { if err != sql.ErrNoRows { trans.Rollback() - return err + return false, err } } if oldCommitID != commitID { trans.Rollback() err := fmt.Errorf("head commit id has changed") - return err + return false, err } sqlStr = "UPDATE Branch SET commit_id = ? WHERE name = ? AND repo_id = ?" _, err = trans.ExecContext(ctx, sqlStr, newCommitID, name, repoID) if err != nil { trans.Rollback() - return err + return false, err } trans.Commit() if secondParentID != "" { if err := onBranchUpdated(repoID, secondParentID, false); err != nil { - return err + return false, err } } if err := onBranchUpdated(repoID, newCommitID, true); err != nil { - return err + return false, err } - return nil + return false, nil } func onBranchUpdated(repoID string, commitID string, updateRepoInfo bool) error { @@ -2726,7 +2768,7 @@ func updateDir(repoID, dirPath, newDirID, user, headID string) (string, error) { if commitDesc == "" { commitDesc = "Auto merge by system" } - newCommitID, err := genNewCommit(repo, headCommit, newDirID, user, commitDesc, true) + newCommitID, err := genNewCommit(repo, headCommit, newDirID, user, commitDesc, true, "", false) if err != nil { err := fmt.Errorf("failed to generate new commit: %v", err) return "", err @@ -2767,7 +2809,7 @@ func updateDir(repoID, dirPath, newDirID, user, headID string) (string, error) { commitDesc = "Auto merge by system" } - newCommitID, err := genNewCommit(repo, headCommit, rootID, user, commitDesc, true) + newCommitID, err := genNewCommit(repo, headCommit, rootID, user, commitDesc, true, "", false) if err != nil { err := fmt.Errorf("failed to generate new commit: %v", err) return "", err @@ -3166,8 +3208,13 @@ func putFile(rsp http.ResponseWriter, r *http.Request, repoID, parentDir, user, return &appError{err, "", http.StatusInternalServerError} } + gcID, err := repomgr.GetCurrentGCID(repo.StoreID) + if err != nil { + err := fmt.Errorf("failed to get current gc id: %v", err) + return &appError{err, "", http.StatusInternalServerError} + } desc := fmt.Sprintf("Modified \"%s\"", fileName) - _, err = genNewCommit(repo, headCommit, rootID, user, desc, true) + _, err = genNewCommit(repo, headCommit, rootID, user, desc, true, gcID, true) if err != nil { err := fmt.Errorf("failed to generate new commit: %v", err) return &appError{err, "", http.StatusInternalServerError} @@ -3398,8 +3445,13 @@ func commitFileBlocks(repoID, parentDir, fileName, blockIDsJSON, user string, fi return "", &appError{err, "", http.StatusInternalServerError} } + gcID, err := repomgr.GetCurrentGCID(repo.StoreID) + if err != nil { + err := fmt.Errorf("failed to get current gc id: %v", err) + return "", &appError{err, "", http.StatusInternalServerError} + } desc := fmt.Sprintf("Added \"%s\"", fileName) - _, err = genNewCommit(repo, headCommit, rootID, user, desc, true) + _, err = genNewCommit(repo, headCommit, rootID, user, desc, true, gcID, true) if err != nil { err := fmt.Errorf("failed to generate new commit: %v", err) return "", &appError{err, "", http.StatusInternalServerError} diff --git a/fileserver/repomgr/repomgr.go b/fileserver/repomgr/repomgr.go index ad642fdb..6c9602cf 100644 --- a/fileserver/repomgr/repomgr.go +++ b/fileserver/repomgr/repomgr.go @@ -720,3 +720,84 @@ func UpdateRepoInfo(repoID, commitID string) error { return nil } + +func HasLastGCID(repoID, clientID string) (bool, error) { + sqlStr := "SELECT 1 FROM LastGCID WHERE repo_id = ? AND client_id = ?" + + var exist int + ctx, cancel := context.WithTimeout(context.Background(), option.DBOpTimeout) + defer cancel() + row := seafileDB.QueryRowContext(ctx, sqlStr, repoID, clientID) + if err := row.Scan(&exist); err != nil { + if err != sql.ErrNoRows { + return false, err + } + } + if exist == 0 { + return false, nil + } + return true, nil +} + +func GetLastGCID(repoID, clientID string) (string, error) { + sqlStr := "SELECT gc_id FROM LastGCID WHERE repo_id = ? AND client_id = ?" + + var gcID sql.NullString + ctx, cancel := context.WithTimeout(context.Background(), option.DBOpTimeout) + defer cancel() + row := seafileDB.QueryRowContext(ctx, sqlStr, repoID, clientID) + if err := row.Scan(&gcID); err != nil { + if err != sql.ErrNoRows { + return "", err + } + } + + return gcID.String, nil +} + +func GetCurrentGCID(repoID string) (string, error) { + sqlStr := "SELECT gc_id FROM GCID WHERE repo_id = ?" + + var gcID sql.NullString + ctx, cancel := context.WithTimeout(context.Background(), option.DBOpTimeout) + defer cancel() + row := seafileDB.QueryRowContext(ctx, sqlStr, repoID) + if err := row.Scan(&gcID); err != nil { + if err != sql.ErrNoRows { + return "", err + } + } + + return gcID.String, nil +} + +func RemoveLastGCID(repoID, clientID string) error { + sqlStr := "DELETE FROM LastGCID WHERE repo_id = ? AND client_id = ?" + ctx, cancel := context.WithTimeout(context.Background(), option.DBOpTimeout) + defer cancel() + if _, err := seafileDB.ExecContext(ctx, sqlStr, repoID, clientID); err != nil { + return err + } + return nil +} + +func SetLastGCID(repoID, clientID, gcID string) error { + exist, err := HasLastGCID(repoID, clientID) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), option.DBOpTimeout) + defer cancel() + if exist { + sqlStr := "UPDATE LastGCID SET gc_id = ? WHERE repo_id = ? AND client_id = ?" + if _, err = seafileDB.ExecContext(ctx, sqlStr, gcID, repoID, clientID); err != nil { + return err + } + } else { + sqlStr := "INSERT INTO LastGCID (repo_id, client_id, gc_id) VALUES (?, ?, ?)" + if _, err = seafileDB.ExecContext(ctx, sqlStr, repoID, clientID, gcID); err != nil { + return err + } + } + return nil +} diff --git a/fileserver/sync_api.go b/fileserver/sync_api.go index 363e040b..9351ff67 100644 --- a/fileserver/sync_api.go +++ b/fileserver/sync_api.go @@ -918,6 +918,18 @@ func publishStatusEvent(rData *statusEventData) { } } +func saveLastGCID(repoID, token string) error { + repo := repomgr.Get(repoID) + if repo == nil { + return fmt.Errorf("failed to get repo: %s", repoID) + } + gcID, err := repomgr.GetCurrentGCID(repo.StoreID) + if err != nil { + return err + } + return repomgr.SetLastGCID(repoID, token, gcID) +} + func putCommitCB(rsp http.ResponseWriter, r *http.Request) *appError { vars := mux.Vars(r) repoID := vars["repoid"] @@ -949,6 +961,15 @@ func putCommitCB(rsp http.ResponseWriter, r *http.Request) *appError { if err := commitmgr.Save(commit); err != nil { err := fmt.Errorf("Failed to add commit %s: %v", commitID, err) return &appError{err, "", http.StatusInternalServerError} + } else { + token := r.Header.Get("Seafile-Repo-Token") + if token == "" { + token = utils.GetAuthorizationToken(r.Header) + } + if err := saveLastGCID(repoID, token); err != nil { + err := fmt.Errorf("Failed to save gc id: %v", err) + return &appError{err, "", http.StatusInternalServerError} + } } return nil @@ -1034,7 +1055,11 @@ func putUpdateBranchCB(rsp http.ResponseWriter, r *http.Request) *appError { return &appError{nil, msg, seafHTTPResNoQuota} } - if err := fastForwardOrMerge(user, repo, base, newCommit); err != nil { + token := r.Header.Get("Seafile-Repo-Token") + if token == "" { + token = utils.GetAuthorizationToken(r.Header) + } + if err := fastForwardOrMerge(user, token, repo, base, newCommit); err != nil { err := fmt.Errorf("Fast forward merge for repo %s is failed: %v", repoID, err) return &appError{err, "", http.StatusInternalServerError} } diff --git a/fileserver/virtual_repo.go b/fileserver/virtual_repo.go index fb6be165..0d60134c 100644 --- a/fileserver/virtual_repo.go +++ b/fileserver/virtual_repo.go @@ -362,7 +362,7 @@ func editRepoNeedRetry(repoID, name, desc, user string) (bool, error) { return false, err } - err = updateBranch(repoID, commit.CommitID, parent.CommitID, "") + _, err = updateBranch(repoID, repo.StoreID, commit.CommitID, parent.CommitID, "", false, "") if err != nil { return true, nil } diff --git a/fuse/repo-mgr.c b/fuse/repo-mgr.c index 000c1fa3..eafb1c51 100644 --- a/fuse/repo-mgr.c +++ b/fuse/repo-mgr.c @@ -94,11 +94,6 @@ seaf_repo_from_commit (SeafRepo *repo, SeafCommit *commit) repo->name = g_strdup (commit->repo_name); repo->desc = g_strdup (commit->repo_desc); repo->encrypted = commit->encrypted; - if (repo->encrypted) { - repo->enc_version = commit->enc_version; - if (repo->enc_version >= 1) - memcpy (repo->magic, commit->magic, 33); - } repo->no_local_history = commit->no_local_history; repo->version = commit->version; } @@ -109,11 +104,6 @@ seaf_repo_to_commit (SeafRepo *repo, SeafCommit *commit) commit->repo_name = g_strdup (repo->name); commit->repo_desc = g_strdup (repo->desc); commit->encrypted = repo->encrypted; - if (commit->encrypted) { - commit->enc_version = repo->enc_version; - if (commit->enc_version >= 1) - commit->magic = g_strdup (repo->magic); - } commit->no_local_history = repo->no_local_history; commit->version = repo->version; } diff --git a/include/seafile-rpc.h b/include/seafile-rpc.h index 0b70da20..6d6bd4db 100644 --- a/include/seafile-rpc.h +++ b/include/seafile-rpc.h @@ -625,6 +625,11 @@ int seafile_get_repo_history_limit (const char *repo_id, GError **error); +int +seafile_set_repo_valid_since (const char *repo_id, + gint64 timestamp, + GError **error); + int seafile_check_passwd (const char *repo_id, const char *magic, diff --git a/python/seafile/rpcclient.py b/python/seafile/rpcclient.py index 1c0aae2c..f73e40d8 100644 --- a/python/seafile/rpcclient.py +++ b/python/seafile/rpcclient.py @@ -612,6 +612,10 @@ def set_repo_history_limit(repo_id, days): def get_repo_history_limit(repo_id): pass + @searpc_func("int", ["string", "int64"]) + def set_repo_valid_since(repo_id, timestamp): + pass + # virtual repo @searpc_func("string", ["string", "string", "string", "string", "string", "string"]) def create_virtual_repo(origin_repo_id, path, repo_name, repo_desc, owner, passwd=''): diff --git a/python/seaserv/api.py b/python/seaserv/api.py index 291f9a16..ca350bda 100644 --- a/python/seaserv/api.py +++ b/python/seaserv/api.py @@ -399,6 +399,9 @@ def set_repo_history_limit(self, repo_id, days): """ return seafserv_threaded_rpc.set_repo_history_limit(repo_id, days) + def set_repo_valid_since(self, repo_id, timestamp): + return seafserv_threaded_rpc.set_repo_valid_since(repo_id, timestamp) + def check_repo_blocks_missing(self, repo_id, blklist): return seafserv_threaded_rpc.check_repo_blocks_missing(repo_id, blklist) diff --git a/server/gc/gc-core.c b/server/gc/gc-core.c index 595ace77..0a4fbef2 100644 --- a/server/gc/gc-core.c +++ b/server/gc/gc-core.c @@ -10,8 +10,12 @@ #define DEBUG_FLAG SEAFILE_DEBUG_OTHER #include "log.h" +#include #define MAX_BF_SIZE (((size_t)1) << 29) /* 64 MB */ +#define KEEP_ALIVE_PER_OBJS 100 +#define KEEP_ALIVE_PER_SECOND 1 + /* * The number of bits in the bloom filter is 4 times the number of all blocks. * Let m be the bits in the bf, n be the number of blocks to be added to the bf @@ -30,15 +34,40 @@ * If total_blocks is a small number (e.g. < 100), we should try to clean all dead blocks. * So we set the minimal size of the bf to 1KB. */ + +/* + * Online GC algorithm + * + * There is a table `GCID` in the seafile database. Every time GC is run for a repo, + * a new GC ID (UUID) will be generated and inserted into this table. + * + * Other threads that want to update the branch head of a repo must do so as follows: + * 1. Read the GC ID from the table before wrting blocks; + * 2. begin a transaction; + * 3. Read the GC ID again with `SELECT ... FOR UPDATE`; + * 4. Compare the new GC ID with the previous one. If they are the same, proceed to + * update the branch head; otherwise, a GC operation has been run between + * steps 1 and 3, the branch update operation must be failed. + * 5. Commit or rollback the transaction. + * + * For syncing clients, the algorithm is a bit more complicated. + * Because writing blocks and updating the branch head is not executed in the same + * context (or more precisely, not in the same thread), the GC ID read in step 1 + * has to be stored into a database table `LastGCID (client_token, gc_id)`. + * After step 4, no matter the branch update succeeds or not, the entry in `LastGCID` + * table has to be deleted. + */ + static Bloom * -alloc_gc_index (guint64 total_objs) +alloc_gc_index (const char *repo_id, guint64 total_blocks) { size_t size; - size = (size_t) MAX(total_objs << 2, 1 << 13); + size = (size_t) MAX(total_blocks << 2, 1 << 13); size = MIN (size, MAX_BF_SIZE); - seaf_message ("GC index size is %u Byte.\n", (int)size >> 3); + seaf_message ("GC index size is %u Byte for repo %.8s.\n", + (int)size >> 3, repo_id); return bloom_create (size, 3, 0); } @@ -48,6 +77,7 @@ typedef struct { Bloom *blocks_index; Bloom *fs_index; GHashTable *visited; + GHashTable *visited_commits; /* > 0: keep a period of history; * == 0: only keep data in head commit; @@ -61,6 +91,10 @@ typedef struct { int verbose; gint64 traversed_fs_objs; + + SeafDBTrans *trans; + gint64 keep_alive_last_time; + gint64 keep_alive_obj_counter; } GCData; static int @@ -118,6 +152,18 @@ fs_callback (SeafFSManager *mgr, g_hash_table_replace (data->visited, key, key); } + if (data->trans) { + ++(data->keep_alive_obj_counter); + + if (data->keep_alive_obj_counter >= KEEP_ALIVE_PER_OBJS && + ((gint64)time(NULL) - data->keep_alive_last_time) >= KEEP_ALIVE_PER_SECOND) + { + data->keep_alive_last_time = (gint64)time(NULL); + data->keep_alive_obj_counter = 0; + seaf_db_trans_query(data->trans, "SELECT 1;", 0); + } + } + add_fs_to_index(data, obj_id); if (type == SEAF_METADATA_TYPE_FILE && @@ -133,6 +179,12 @@ traverse_commit (SeafCommit *commit, void *vdata, gboolean *stop) GCData *data = vdata; int ret; + if (g_hash_table_lookup (data->visited_commits, commit->commit_id)) { + // Has traversed on prev head commit, stop traverse from this branch + *stop = TRUE; + return TRUE; + } + if (data->truncate_time == 0) { *stop = TRUE; @@ -154,7 +206,8 @@ traverse_commit (SeafCommit *commit, void *vdata, gboolean *stop) data->traversed_head = TRUE; if (data->verbose) - seaf_message ("Traversing commit %.8s.\n", commit->commit_id); + seaf_message ("Traversing commit %.8s for repo %.8s.\n", + commit->commit_id, data->repo->id); ++data->traversed_commits; @@ -168,75 +221,193 @@ traverse_commit (SeafCommit *commit, void *vdata, gboolean *stop) if (ret < 0) return FALSE; + int dummy; + g_hash_table_replace (data->visited_commits, + g_strdup (commit->commit_id), &dummy); + if (data->verbose) - seaf_message ("Traversed %"G_GINT64_FORMAT" fs objects.\n", - data->traversed_fs_objs); + seaf_message ("Traversed %"G_GINT64_FORMAT" fs objects for repo %.8s.\n", + data->traversed_fs_objs, data->repo->id); return TRUE; } -static gint64 -populate_gc_index_for_repo (SeafRepo *repo, Bloom *blocks_index, Bloom *fs_index, int verbose) +static int +update_gc_id (SeafRepo *repo, SeafDBTrans *trans) { - GList *branches, *ptr; - SeafBranch *branch; - GCData *data; - int ret = 0; + char *sql; + char *gc_id; + gboolean id_exists, db_err = FALSE; + int ret; - if (!repo->is_virtual) - seaf_message ("Populating index for repo %.8s.\n", repo->id); - else - seaf_message ("Populating index for sub-repo %.8s.\n", repo->id); + sql = "SELECT 1 FROM GCID WHERE repo_id = ?"; + id_exists = seaf_db_trans_check_for_existence (trans, sql, &db_err, + 1, "string", repo->id); - branches = seaf_branch_manager_get_branch_list (seaf->branch_mgr, repo->id); - if (branches == NULL) { - seaf_warning ("[GC] Failed to get branch list of repo %s.\n", repo->id); - return -1; + gc_id = gen_uuid (); + if (id_exists) { + sql = "UPDATE GCID SET gc_id = ? WHERE repo_id = ?"; + ret = seaf_db_trans_query (trans, sql, 2, + "string", gc_id, "string", repo->id); + } else { + sql = "INSERT INTO GCID (repo_id, gc_id) VALUES (?, ?)"; + ret = seaf_db_trans_query (trans, sql, 2, + "string", repo->id, "string", gc_id); } + g_free (gc_id); - data = g_new0(GCData, 1); - data->repo = repo; - data->blocks_index = blocks_index; - data->fs_index = fs_index; - data->visited = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); - data->verbose = verbose; + return ret; +} - gint64 truncate_time = seaf_repo_manager_get_repo_truncate_time (repo->manager, - repo->id); - if (truncate_time > 0) { - seaf_repo_manager_set_repo_valid_since (repo->manager, - repo->id, - truncate_time); - } else if (truncate_time == 0) { +static void +update_valid_since_time (SeafRepo *repo, gint64 new_time) +{ + gint64 old_time = seaf_repo_manager_get_repo_valid_since (repo->manager, + repo->id); + + if (new_time > 0) { + if (new_time > old_time) + seaf_repo_manager_set_repo_valid_since (repo->manager, + repo->id, + new_time); + } else if (new_time == 0) { /* Only the head commit is valid after GC if no history is kept. */ SeafCommit *head = seaf_commit_manager_get_commit (seaf->commit_mgr, repo->id, repo->version, repo->head->commit_id); - if (head) + if (head && (old_time < 0 || head->ctime > (guint64)old_time)) seaf_repo_manager_set_repo_valid_since (repo->manager, repo->id, head->ctime); seaf_commit_unref (head); } +} + +static GCData * +gc_data_new (SeafRepo *repo, Bloom *blocks_index, Bloom *fs_index, int verbose) +{ + GCData *data; + data = g_new0(GCData, 1); + seaf_repo_ref(repo); + data->repo = repo; + data->blocks_index = blocks_index; + data->fs_index = fs_index; + data->visited = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); + data->visited_commits = g_hash_table_new_full (g_str_hash, g_str_equal, + g_free, NULL); + data->verbose = verbose; + gint64 truncate_time; + truncate_time = seaf_repo_manager_get_repo_truncate_time (repo->manager, + repo->id); + update_valid_since_time (repo, truncate_time); data->truncate_time = truncate_time; - for (ptr = branches; ptr != NULL; ptr = ptr->next) { - branch = ptr->data; - gboolean res = seaf_commit_manager_traverse_commit_tree (seaf->commit_mgr, - repo->id, - repo->version, - branch->commit_id, - traverse_commit, - data, - FALSE); - seaf_branch_unref (branch); + data->keep_alive_last_time = (gint64)time(NULL); + data->keep_alive_obj_counter = 0; + + return data; +} + +static void +gc_data_free (GCData *data) +{ + if (!data) + return; + + seaf_repo_unref(data->repo); + g_hash_table_destroy (data->visited); + g_hash_table_destroy (data->visited_commits); + g_free (data); + + return; +} + +static gint64 +populate_gc_index_for_repo_for_new_commits (GCData *data, SeafDBTrans *trans) +{ + SeafBranch *new_branch = NULL; + gint64 n_blocks_last = 0; + int n_commits_last = 0; + gboolean res; + gint64 ret = 0; + SeafRepo *repo = data->repo; + + if (!repo->is_virtual) { + if (trans != NULL && update_gc_id (repo, trans) < 0) { + seaf_warning ("Failed to update GCID for repo %s.\n", repo->id); + ret = -1; + goto out; + } + } + + n_blocks_last = data->traversed_blocks; + n_commits_last = data->traversed_commits; + data->traversed_blocks = 0; + data->traversed_commits = 0; + data->trans = trans; + + new_branch = seaf_branch_manager_get_branch (seaf->branch_mgr, repo->id, "master"); + if (!new_branch) { + seaf_warning ("Failed to get master branch of repo %.8s.\n", repo->id); + ret = -1; + goto out; + } + + if (g_strcmp0 (repo->head->commit_id, new_branch->commit_id) != 0) { + res = seaf_commit_manager_traverse_commit_tree (seaf->commit_mgr, + repo->id, repo->version, + new_branch->commit_id, + traverse_commit, + data, + FALSE); if (!res) { ret = -1; - break; + seaf_warning ("Failed to populate index for repo %.8s.\n", repo->id); + goto out; } } + seaf_message ("Traversed %d commits, %"G_GINT64_FORMAT" blocks for repo %.8s.\n", + data->traversed_commits + n_commits_last, + data->traversed_blocks + n_blocks_last, + repo->id); + + ret = data->traversed_blocks; + +out: + seaf_branch_unref (new_branch); + + return ret; + +} + +static gint64 +populate_gc_index_for_repo (GCData *data, SeafDBTrans *trans) +{ + gboolean res; + gint64 ret = 0; + SeafRepo *repo = data->repo; + + data->trans = trans; + + if (!repo->is_virtual) + seaf_message ("Populating index for repo %.8s.\n", repo->id); + else + seaf_message ("Populating index for sub-repo %.8s.\n", repo->id); + + res = seaf_commit_manager_traverse_commit_tree (seaf->commit_mgr, + repo->id, repo->version, + repo->head->commit_id, + traverse_commit, + data, + FALSE); + if (!res) { + ret = -1; + seaf_warning ("Failed to populate index for repo %.8s.\n", repo->id); + return -1; + } + // Traverse the base commit of the virtual repo. Otherwise, if the virtual repo has not been updated for a long time, // the fs object corresponding to the base commit will be removed by mistake. if (!repo->is_virtual) { @@ -251,80 +422,200 @@ populate_gc_index_for_repo (SeafRepo *repo, Bloom *blocks_index, Bloom *fs_index if (!vinfo) { continue; } - gboolean res = seaf_commit_manager_traverse_commit_tree (seaf->commit_mgr, - repo->store_id, repo->version, - vinfo->base_commit, - traverse_commit, - data, - FALSE); + res = seaf_commit_manager_traverse_commit_tree (seaf->commit_mgr, + repo->store_id, repo->version, + vinfo->base_commit, + traverse_commit, + data, + FALSE); seaf_virtual_repo_info_free (vinfo); if (!res) { seaf_warning ("Failed to traverse base commit %s for virtual repo %s.\n", vinfo->base_commit, repo_id); - ret = -1; - break; + string_list_free (vrepo_ids); + return -1; } } string_list_free (vrepo_ids); } - seaf_message ("Traversed %d commits, %"G_GINT64_FORMAT" blocks.\n", - data->traversed_commits, data->traversed_blocks); ret = data->traversed_blocks; - g_list_free (branches); - g_hash_table_destroy (data->visited); - g_free (data); - return ret; } -typedef struct { +#define MAX_THREADS 10 + +typedef struct CheckBlockParam { + char *store_id; + int repo_version; + Bloom *index; + int dry_run; + GAsyncQueue *async_queue; + pthread_mutex_t counter_lock; + gint64 removed_blocks; +} CheckBlockParam; + +typedef struct CheckFSParam { + char *store_id; + int repo_version; Bloom *index; int dry_run; - guint64 removed_blocks; -} CheckBlocksData; + GAsyncQueue *async_queue; + pthread_mutex_t counter_lock; + gint64 removed_fs; +} CheckFSParam; -static gboolean -check_block_liveness (const char *store_id, int version, - const char *block_id, void *vdata) +static void +check_block_liveness (gpointer data, gpointer user_data) { - CheckBlocksData *data = vdata; - Bloom *index = data->index; - - if (!bloom_test (index, block_id)) { - data->removed_blocks++; - if (!data->dry_run) + char *block_id = data; + CheckBlockParam *param = user_data; + + if (!bloom_test (param->index, block_id)) { + pthread_mutex_lock (¶m->counter_lock); + param->removed_blocks ++; + pthread_mutex_unlock (¶m->counter_lock); + if (!param->dry_run) seaf_block_manager_remove_block (seaf->block_mgr, - store_id, version, + param->store_id, param->repo_version, block_id); } + g_async_queue_push (param->async_queue, block_id); +} + +static gint64 +check_existing_blocks (char *store_id, int repo_version, GHashTable *exist_blocks, + Bloom *blocks_index, int dry_run) +{ + char *block_id; + GThreadPool *tpool = NULL; + GAsyncQueue *async_queue = NULL; + CheckBlockParam *param = NULL; + GHashTableIter iter; + gpointer key, value; + gint64 ret = 0; + + async_queue = g_async_queue_new (); + param = g_new0 (CheckBlockParam, 1); + param->store_id = store_id; + param->repo_version = repo_version; + param->index = blocks_index; + param->dry_run = dry_run; + param->async_queue = async_queue; + pthread_mutex_init (¶m->counter_lock, NULL); + + tpool = g_thread_pool_new (check_block_liveness, param, MAX_THREADS, FALSE, NULL); + if (!tpool) { + seaf_warning ("Failed to create thread pool for repo %s, stop gc.\n", + store_id); + ret = -1; + goto out; + } + + g_hash_table_iter_init (&iter, exist_blocks); + + while (g_hash_table_iter_next (&iter, &key, &value)) { + g_thread_pool_push (tpool, (char *)key, NULL); + } + + while ((block_id = g_async_queue_pop (async_queue))) { + g_hash_table_remove (exist_blocks, block_id); + if (g_hash_table_size (exist_blocks) == 0) { + break; + } + } + + ret = param->removed_blocks; + +out: + g_thread_pool_free (tpool, TRUE, TRUE); + g_async_queue_unref (async_queue); + g_free (param); + + return ret; +} + +static gboolean +collect_exist_blocks (const char *store_id, int version, + const char *block_id, void *vdata) +{ + GHashTable *exist_blocks = vdata; + int dummy; + + g_hash_table_replace (exist_blocks, g_strdup (block_id), &dummy); + return TRUE; } -#define MAX_THREADS 10 +static void +check_fs_liveness (gpointer data, gpointer user_data) +{ + char *fs_id = data; + CheckFSParam *param = user_data; + + if (!bloom_test (param->index, fs_id)) { + pthread_mutex_lock (¶m->counter_lock); + param->removed_fs ++; + pthread_mutex_unlock (¶m->counter_lock); + if (!param->dry_run) + seaf_fs_manager_delete_object(seaf->fs_mgr, + param->store_id, param->repo_version, + fs_id); + } + + g_async_queue_push (param->async_queue, fs_id); +} static gint64 check_existing_fs (char *store_id, int repo_version, GHashTable *exist_fs, Bloom *fs_index, int dry_run) { + char *fs_id; + GThreadPool *tpool = NULL; + GAsyncQueue *async_queue = NULL; + CheckFSParam *param = NULL; GHashTableIter iter; gpointer key, value; gint64 ret = 0; + async_queue = g_async_queue_new (); + param = g_new0 (CheckFSParam, 1); + param->store_id = store_id; + param->repo_version = repo_version; + param->index = fs_index; + param->dry_run = dry_run; + param->async_queue = async_queue; + pthread_mutex_init (¶m->counter_lock, NULL); + + tpool = g_thread_pool_new (check_fs_liveness, param, MAX_THREADS, FALSE, NULL); + if (!tpool) { + seaf_warning ("Failed to create thread pool for repo %s, stop gc.\n", + store_id); + ret = -1; + goto out; + } + g_hash_table_iter_init (&iter, exist_fs); while (g_hash_table_iter_next (&iter, &key, &value)) { - if (!bloom_test (fs_index, (char *)key)) { - ret++; - if (dry_run) - continue; - seaf_fs_manager_delete_object(seaf->fs_mgr, - store_id, repo_version, - (char *)key); + g_thread_pool_push (tpool, (char *)key, NULL); + } + + while ((fs_id = g_async_queue_pop (async_queue))) { + g_hash_table_remove (exist_fs, fs_id); + if (g_hash_table_size (exist_fs) == 0) { + break; } } + ret = param->removed_fs; + +out: + g_thread_pool_free (tpool, TRUE, TRUE); + g_async_queue_unref (async_queue); + g_free (param); + return ret; } @@ -341,13 +632,51 @@ collect_exist_fs (const char *store_id, int version, } static gint64 -populate_gc_index_for_virtual_repos (SeafRepo *repo, Bloom *blocks_index, Bloom *fs_index, int verbose) +populate_gc_index_for_virtual_repos_for_new_commits (GList *virtual_repos, + SeafDBTrans *trans) +{ + GList *ptr; + SeafRepo *vrepo; + gint64 scan_ret = 0; + gint64 ret = 0; + GCData *data = NULL; + + for (ptr = virtual_repos; ptr; ptr = ptr->next) { + data = ptr->data; + if (!data) + continue; + + vrepo = data->repo; + if (!vrepo) { + continue; + } + + scan_ret = populate_gc_index_for_repo_for_new_commits (data, trans); + if (scan_ret < 0) { + ret = -1; + goto out; + } + ret += scan_ret; + } + +out: + return ret; +} + +static gint64 +populate_gc_index_for_virtual_repos (SeafRepo *repo, + GList **virtual_repos, + Bloom *blocks_index, + Bloom *fs_index, + SeafDBTrans *trans, + int verbose) { GList *vrepo_ids = NULL, *ptr; char *repo_id; SeafRepo *vrepo; - gint64 ret = 0; gint64 scan_ret = 0; + gint64 ret = 0; + GCData *data; vrepo_ids = seaf_repo_manager_get_virtual_repo_ids_by_origin (seaf->repo_mgr, repo->id); @@ -360,8 +689,11 @@ populate_gc_index_for_virtual_repos (SeafRepo *repo, Bloom *blocks_index, Bloom goto out; } - scan_ret = populate_gc_index_for_repo (vrepo, blocks_index, fs_index, verbose); - seaf_repo_unref (vrepo); + data = gc_data_new (vrepo, blocks_index, fs_index, verbose); + *virtual_repos = g_list_prepend (*virtual_repos, data); + + scan_ret = populate_gc_index_for_repo (data, trans); + seaf_repo_unref(vrepo); if (scan_ret < 0) { ret = -1; goto out; @@ -374,29 +706,46 @@ populate_gc_index_for_virtual_repos (SeafRepo *repo, Bloom *blocks_index, Bloom return ret; } +/* + * @keep_days: explicitly sepecify how many days of history to keep after GC. + * This has higher priority than the history limit set in database. + * @online: is running online GC. Online GC is not supported for SQLite DB. + */ gint64 -gc_v1_repo (SeafRepo *repo, int dry_run, int verbose, int rm_fs) +gc_v1_repo (SeafRepo *repo, int dry_run, int online, int verbose, int rm_fs) { Bloom *blocks_index = NULL; Bloom *fs_index = NULL; + GHashTable *exist_blocks = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); GHashTable *exist_fs = NULL; - guint64 total_blocks; - guint64 removed_blocks; - guint64 reachable_blocks; + GList *virtual_repos = NULL; + guint64 total_blocks = 0; guint64 total_fs = 0; + guint64 reachable_blocks = 0; gint64 removed_fs = 0; gint64 ret; + GCData *data; + SeafDBTrans *trans = NULL; - total_blocks = seaf_block_manager_get_block_number (seaf->block_mgr, - repo->store_id, repo->version); - reachable_blocks = 0; + ret = seaf_block_manager_foreach_block (seaf->block_mgr, + repo->store_id, repo->version, + collect_exist_blocks, + exist_blocks); + if (ret < 0) { + seaf_warning ("Failed to collect existing blocks for repo %.8s, stop GC.\n\n", + repo->id); + g_hash_table_destroy (exist_blocks); + return ret; + } + total_blocks = g_hash_table_size (exist_blocks); if (total_blocks == 0) { - seaf_message ("No blocks. Skip GC.\n\n"); + seaf_message ("No blocks for repo %.8s, skip GC.\n\n", repo->id); + g_hash_table_destroy (exist_blocks); return 0; } - if (rm_fs) { + if (rm_fs) { exist_fs = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); ret = seaf_obj_store_foreach_obj (seaf->fs_mgr->obj_store, repo->store_id, repo->version, @@ -412,9 +761,11 @@ gc_v1_repo (SeafRepo *repo, int dry_run, int verbose, int rm_fs) } if (rm_fs) - seaf_message ("GC started. Total block number is %"G_GUINT64_FORMAT", total fs number is %"G_GUINT64_FORMAT".\n", total_blocks, total_fs); + seaf_message ("GC started for repo %.8s. Total block number is %"G_GUINT64_FORMAT", total fs number is %"G_GUINT64_FORMAT".\n", + repo->id, total_blocks, total_fs); else - seaf_message ("GC started. Total block number is %"G_GUINT64_FORMAT".\n", total_blocks); + seaf_message ("GC started for repo %.8s. Total block number is %"G_GUINT64_FORMAT".\n", + repo->id, total_blocks); /* * Store the index of live blocks in bloom filter to save memory. @@ -422,15 +773,16 @@ gc_v1_repo (SeafRepo *repo, int dry_run, int verbose, int rm_fs) * may skip some garbage blocks, but we won't delete * blocks that are still alive. */ - blocks_index = alloc_gc_index (total_blocks); + blocks_index = alloc_gc_index (repo->id, total_blocks); if (!blocks_index) { - seaf_warning ("GC: Failed to allocate blocks_index.\n"); + seaf_warning ("GC: Failed to allocate blocks index for repo %.8s, stop gc.\n", + repo->id); ret = -1; goto out; } if (rm_fs && total_fs > 0) { - fs_index = alloc_gc_index (total_fs); + fs_index = alloc_gc_index (repo->id, total_fs); if (!fs_index) { seaf_warning ("GC: Failed to allocate fs index for repo %.8s, stop gc.\n", repo->id); @@ -439,49 +791,79 @@ gc_v1_repo (SeafRepo *repo, int dry_run, int verbose, int rm_fs) } } - seaf_message ("Populating index.\n"); - - ret = populate_gc_index_for_repo (repo, blocks_index, fs_index, verbose); - if (ret < 0) + data = gc_data_new (repo, blocks_index, fs_index, verbose); + ret = populate_gc_index_for_repo (data, trans); + if (ret < 0) { goto out; - + } + reachable_blocks += ret; /* Since virtual repos share fs and block store with the origin repo, * it's necessary to do GC for them together. */ - ret = populate_gc_index_for_virtual_repos (repo, blocks_index, fs_index, verbose); - if (ret < 0) + ret = populate_gc_index_for_virtual_repos (repo, &virtual_repos, + blocks_index, fs_index, trans, verbose); + if (ret < 0) { goto out; + } reachable_blocks += ret; - if (!dry_run) - seaf_message ("Scanning and deleting unused blocks.\n"); - else - seaf_message ("Scanning unused blocks.\n"); + if (online) { + trans = seaf_db_begin_transaction (seaf->db); + if (!trans) + goto out; + } - CheckBlocksData data; - data.index = blocks_index; - data.dry_run = dry_run; - data.removed_blocks = 0; + ret = populate_gc_index_for_repo_for_new_commits (data, trans); + if (ret < 0) { + if (online) { + seaf_db_rollback (trans); + seaf_db_trans_close (trans); + } + goto out; + } + + reachable_blocks += ret; + + + ret = populate_gc_index_for_virtual_repos_for_new_commits (virtual_repos, trans); - ret = seaf_block_manager_foreach_block (seaf->block_mgr, - repo->store_id, repo->version, - check_block_liveness, - &data); if (ret < 0) { - seaf_warning ("GC: Failed to clean dead blocks.\n"); + if (online) { + seaf_db_rollback (trans); + seaf_db_trans_close (trans); + } goto out; } - removed_blocks = data.removed_blocks; - ret = removed_blocks; + reachable_blocks += ret; + + if (!dry_run) + seaf_message ("Scanning and deleting unused blocks for repo %.8s.\n", + repo->id); + else + seaf_message ("Scanning unused blocks for repo %.8s.\n", repo->id); + + ret = check_existing_blocks (repo->store_id, repo->version, exist_blocks, + blocks_index, dry_run); + if (ret < 0) { + if (online) { + seaf_db_rollback (trans); + seaf_db_trans_close (trans); + } + goto out; + } if (rm_fs && total_fs > 0) { removed_fs = check_existing_fs(repo->store_id, repo->version, exist_fs, fs_index, dry_run); if (removed_fs < 0) { + if (online) { + seaf_db_rollback (trans); + seaf_db_trans_close (trans); + } goto out; } } @@ -492,81 +874,288 @@ gc_v1_repo (SeafRepo *repo, int dry_run, int verbose, int rm_fs) "about %"G_GUINT64_FORMAT" reachable blocks, " "%"G_GUINT64_FORMAT" blocks are removed. " "%"G_GUINT64_FORMAT" fs are removed.\n", - repo->id, total_blocks, reachable_blocks, removed_blocks, removed_fs); + repo->id, total_blocks, reachable_blocks, ret, removed_fs); else - seaf_message ("GC finished. %"G_GUINT64_FORMAT" blocks total, " + seaf_message ("GC finished for repo %.8s. %"G_GUINT64_FORMAT" blocks total, " "about %"G_GUINT64_FORMAT" reachable blocks, " "%"G_GUINT64_FORMAT" blocks are removed.\n", - total_blocks, reachable_blocks, removed_blocks); + repo->id, total_blocks, reachable_blocks, ret); } else { if (rm_fs) seaf_message ("GC finished for repo %.8s. %"G_GUINT64_FORMAT" blocks total, " "about %"G_GUINT64_FORMAT" reachable blocks, " "%"G_GUINT64_FORMAT" blocks can be removed. " "%"G_GUINT64_FORMAT" fs can be removed.\n", - repo->id, total_blocks, reachable_blocks, removed_blocks, removed_fs); + repo->id, total_blocks, reachable_blocks, ret, removed_fs); else - seaf_message ("GC finished. %"G_GUINT64_FORMAT" blocks total, " + seaf_message ("GC finished for repo %.8s. %"G_GUINT64_FORMAT" blocks total, " "about %"G_GUINT64_FORMAT" reachable blocks, " "%"G_GUINT64_FORMAT" blocks can be removed.\n", - total_blocks, reachable_blocks, removed_blocks); + repo->id, total_blocks, reachable_blocks, ret); + } + + if (online) { + if (seaf_db_commit (trans) < 0) { + seaf_db_rollback (trans); + } + seaf_db_trans_close (trans); } out: printf ("\n"); - if (exist_fs) - g_hash_table_destroy (exist_fs); - if (blocks_index) bloom_destroy (blocks_index); if (fs_index) - bloom_destroy (fs_index); + bloom_destroy(fs_index); + g_hash_table_destroy (exist_blocks); + if (exist_fs) + g_hash_table_destroy (exist_fs); + gc_data_free (data); + g_list_free_full(virtual_repos, (GDestroyNotify)gc_data_free); return ret; } +typedef enum RemoveType { + COMMIT, + FS, + BLOCK +} RemoveType; + +typedef struct RemoveTask { + const char *repo_id; + RemoveType remove_type; + gboolean success; +} RemoveTask; + +static void +remove_store (gpointer data, gpointer user_data) +{ + RemoveTask *task = data; + GAsyncQueue *async_queue = user_data; + int ret = 0; + + switch (task->remove_type) { + case COMMIT: + seaf_message ("Deleting commits for repo %s.\n", task->repo_id); + ret = seaf_commit_manager_remove_store (seaf->commit_mgr, task->repo_id); + if (ret == 0) { + task->success = TRUE; + } + break; + case FS: + seaf_message ("Deleting fs objects for repo %s.\n", task->repo_id); + ret = seaf_fs_manager_remove_store (seaf->fs_mgr, task->repo_id); + if (ret == 0) { + task->success = TRUE; + } + break; + case BLOCK: + seaf_message ("Deleting blocks for repo %s.\n", task->repo_id); + ret = seaf_block_manager_remove_store (seaf->block_mgr, task->repo_id); + if (ret == 0) { + task->success = TRUE; + } + break; + default: + break; + } + + g_async_queue_push (async_queue, task); +} + void -delete_garbaged_repos (int dry_run) +delete_garbaged_repos (int dry_run, int thread_num) { GList *del_repos = NULL; GList *ptr; + GAsyncQueue *async_queue = NULL; + int tnum; + GThreadPool *tpool = NULL; + RemoveTask *task = NULL; + int n_tasks = 0; + char *repo_id; + char *dup_id; + GHashTableIter iter; + gpointer key, value; + GHashTable *deleted; + + deleted = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); seaf_message ("=== Repos deleted by users ===\n"); del_repos = seaf_repo_manager_list_garbage_repos (seaf->repo_mgr); + + if (!dry_run && del_repos) { + async_queue = g_async_queue_new (); + if (!async_queue) { + seaf_warning ("Failed to create async queue.\n"); + goto out; + } + + tnum = thread_num <= 0 ? MAX_THREADS : thread_num; + tpool = g_thread_pool_new (remove_store, async_queue, tnum, FALSE, NULL); + if (!tpool) { + seaf_warning ("Failed to create thread pool.\n"); + goto out; + } + } + for (ptr = del_repos; ptr; ptr = ptr->next) { - char *repo_id = ptr->data; + repo_id = ptr->data; /* Confirm repo doesn't exist before removing blocks. */ if (!seaf_repo_manager_repo_exists (seaf->repo_mgr, repo_id)) { if (!dry_run) { - seaf_message ("GC deleted repo %.8s.\n", repo_id); - seaf_commit_manager_remove_store (seaf->commit_mgr, repo_id); - seaf_fs_manager_remove_store (seaf->fs_mgr, repo_id); - seaf_block_manager_remove_store (seaf->block_mgr, repo_id); + seaf_message ("Start to GC deleted repo %s.\n", repo_id); + // Remove commit + task = g_new0 (RemoveTask, 1); + task->repo_id = repo_id; + task->remove_type = COMMIT; + g_thread_pool_push (tpool, task, NULL); + + // Remove fs + task = g_new0 (RemoveTask, 1); + task->repo_id = repo_id; + task->remove_type = FS; + g_thread_pool_push (tpool, task, NULL); + + // Remove block + task = g_new0 (RemoveTask, 1); + task->repo_id = repo_id; + task->remove_type = BLOCK; + g_thread_pool_push (tpool, task, NULL); + + n_tasks += 3; + + dup_id = g_strdup (repo_id); + g_hash_table_insert (deleted, dup_id, dup_id); } else { - seaf_message ("Repo %.8s can be GC'ed.\n", repo_id); + seaf_message ("Repo %s can be GC'ed.\n", repo_id); + } + } + } + + while (n_tasks > 0 && (task = g_async_queue_pop (async_queue))) { + n_tasks--; + if (!task->success) { + if (g_hash_table_lookup (deleted, task->repo_id)) { + g_hash_table_remove(deleted, task->repo_id); } } + g_free (task); + } - if (!dry_run) - seaf_repo_manager_remove_garbage_repo (seaf->repo_mgr, repo_id); - g_free (repo_id); + if (!dry_run) { + g_hash_table_iter_init (&iter, deleted); + while (g_hash_table_iter_next (&iter, &key, &value)) { + seaf_repo_manager_remove_garbage_repo (seaf->repo_mgr, (char *)key); + } } - g_list_free (del_repos); + +out: + g_hash_table_destroy (deleted); + if (tpool) + g_thread_pool_free (tpool, TRUE, TRUE); + if (async_queue) + g_async_queue_unref (async_queue); + string_list_free (del_repos); +} + +typedef struct GCRepoParam { + int dry_run; + int verbose; + int rm_fs; + gboolean online; + GAsyncQueue *async_queue; +} GCRepoParam; + +typedef struct GCRepo { + SeafRepo *repo; + gint64 gc_ret; +} GCRepo; + +static void +free_gc_repo (GCRepo *gc_repo) +{ + if (!gc_repo) + return; + + seaf_repo_unref (gc_repo->repo); + g_free (gc_repo); +} + +static void +gc_repo_cb (gpointer data, gpointer user_data) +{ + GCRepo *gc_repo = data; + GCRepoParam *param = user_data; + SeafRepo *repo = gc_repo->repo; + + seaf_message ("GC version %d repo %s(%s)\n", + repo->version, repo->name, repo->id); + + gc_repo->gc_ret = gc_v1_repo (repo, param->dry_run, + param->online, param->verbose, param->rm_fs); + + g_async_queue_push (param->async_queue, gc_repo); } int -gc_core_run (GList *repo_id_list, int dry_run, int verbose, int rm_fs) +gc_core_run (GList *repo_id_list, const char *id_prefix, + int dry_run, int verbose, int thread_num, int rm_fs) { GList *ptr; SeafRepo *repo; GList *corrupt_repos = NULL; GList *del_block_repos = NULL; gboolean del_garbage = FALSE; - gint64 gc_ret; + GAsyncQueue *async_queue = NULL; + GCRepoParam *param = NULL; + int tnum; + GThreadPool *tpool = NULL; + int gc_repo_num = 0; + GCRepo *gc_repo = NULL; char *repo_id; + gboolean online; + + if (seaf_db_type (seaf->db) == SEAF_DB_TYPE_SQLITE) { + online = FALSE; + seaf_message ("Database is SQLite, use offline GC.\n"); + } else { + online = TRUE; + seaf_message ("Database is MySQL/Postgre/Oracle, use online GC.\n"); + } + + async_queue = g_async_queue_new (); + if (!async_queue) { + seaf_warning ("Failed to create async queue, stop gc.\n"); + return -1; + } - if (repo_id_list == NULL) { + param = g_new0 (GCRepoParam, 1); + param->dry_run = dry_run; + param->verbose = verbose; + param->rm_fs = rm_fs; + param->online = online; + param->async_queue = async_queue; + + tnum = thread_num <= 0 ? MAX_THREADS : thread_num; + tpool = g_thread_pool_new (gc_repo_cb, param, tnum, FALSE, NULL); + if (!tpool) { + seaf_warning ("Failed to create thread pool, stop gc.\n"); + g_async_queue_unref (async_queue); + g_free (param); + return -1; + } + + seaf_message ("Using up to %d threads to run GC.\n", tnum); + + if (id_prefix) { + if (repo_id_list) + g_list_free (repo_id_list); + repo_id_list = seaf_repo_manager_get_repo_id_list_by_prefix (seaf->repo_mgr, id_prefix); + del_garbage = TRUE; + } else if (repo_id_list == NULL) { repo_id_list = seaf_repo_manager_get_repo_id_list (seaf->repo_mgr); del_garbage = TRUE; } @@ -582,25 +1171,33 @@ gc_core_run (GList *repo_id_list, int dry_run, int verbose, int rm_fs) if (repo->is_corrupted) { corrupt_repos = g_list_prepend (corrupt_repos, g_strdup(repo->id)); seaf_message ("Repo %s is damaged, skip GC.\n\n", repo->id); + seaf_repo_unref (repo); continue; } if (!repo->is_virtual) { - seaf_message ("GC version %d repo %s(%s)\n", - repo->version, repo->name, repo->id); - gc_ret = gc_v1_repo (repo, dry_run, verbose, rm_fs); - if (gc_ret < 0) { - corrupt_repos = g_list_prepend (corrupt_repos, g_strdup(repo->id)); - } else if (dry_run && gc_ret) { - del_block_repos = g_list_prepend (del_block_repos, g_strdup(repo->id)); - } + gc_repo = g_new0 (GCRepo, 1); + gc_repo->repo = repo; + g_thread_pool_push (tpool, gc_repo, NULL); + gc_repo_num++; + } else { + seaf_repo_unref (repo); } - seaf_repo_unref (repo); } g_list_free (repo_id_list); + while (gc_repo_num > 0 && (gc_repo = g_async_queue_pop (async_queue))) { + if (gc_repo->gc_ret < 0) { + corrupt_repos = g_list_prepend (corrupt_repos, g_strdup(gc_repo->repo->id)); + } else if (dry_run && gc_repo->gc_ret) { + del_block_repos = g_list_prepend (del_block_repos, g_strdup(gc_repo->repo->id)); + } + free_gc_repo (gc_repo); + gc_repo_num--; + } + if (del_garbage) { - delete_garbaged_repos (dry_run); + delete_garbaged_repos (dry_run, tnum); } seaf_message ("=== GC is finished ===\n"); @@ -627,5 +1224,9 @@ gc_core_run (GList *repo_id_list, int dry_run, int verbose, int rm_fs) g_list_free (del_block_repos); } + g_thread_pool_free (tpool, TRUE, TRUE); + g_async_queue_unref (async_queue); + g_free (param); + return 0; } diff --git a/server/gc/gc-core.h b/server/gc/gc-core.h index b4e9b210..9647879e 100644 --- a/server/gc/gc-core.h +++ b/server/gc/gc-core.h @@ -1,9 +1,10 @@ #ifndef GC_CORE_H #define GC_CORE_H -int gc_core_run (GList *repo_id_list, int dry_run, int verbose, int rm_fs); +int gc_core_run (GList *repo_id_list, const char *id_prefix, + int dry_run, int verbose, int thread_num, int rm_fs); void -delete_garbaged_repos (int dry_run); +delete_garbaged_repos (int dry_run, int thread_num); #endif diff --git a/server/gc/repo-mgr.c b/server/gc/repo-mgr.c index 58f21cce..8ecff66f 100644 --- a/server/gc/repo-mgr.c +++ b/server/gc/repo-mgr.c @@ -60,6 +60,8 @@ seaf_repo_free (SeafRepo *repo) if (repo->desc) g_free (repo->desc); if (repo->category) g_free (repo->category); if (repo->head) seaf_branch_unref (repo->head); + g_free (repo->pwd_hash_algo); + g_free (repo->pwd_hash_params); g_free (repo); } @@ -97,11 +99,24 @@ seaf_repo_from_commit (SeafRepo *repo, SeafCommit *commit) repo->repaired = commit->repaired; if (repo->encrypted) { repo->enc_version = commit->enc_version; - if (repo->enc_version == 1) + if (repo->enc_version == 1 && !commit->pwd_hash_algo) memcpy (repo->magic, commit->magic, 32); else if (repo->enc_version == 2) { - memcpy (repo->magic, commit->magic, 64); memcpy (repo->random_key, commit->random_key, 96); + } else if (repo->enc_version == 3) { + memcpy (repo->random_key, commit->random_key, 96); + memcpy (repo->salt, commit->salt, 64); + } else if (repo->enc_version == 4) { + memcpy (repo->random_key, commit->random_key, 96); + memcpy (repo->salt, commit->salt, 64); + } + if (repo->enc_version >= 2 && !commit->pwd_hash_algo) { + memcpy (repo->magic, commit->magic, 64); + } + if (commit->pwd_hash_algo) { + memcpy (repo->pwd_hash, commit->pwd_hash, 64); + repo->pwd_hash_algo = g_strdup (commit->pwd_hash_algo); + repo->pwd_hash_params = g_strdup (commit->pwd_hash_params); } } repo->no_local_history = commit->no_local_history; @@ -117,11 +132,24 @@ seaf_repo_to_commit (SeafRepo *repo, SeafCommit *commit) commit->repaired = repo->repaired; if (commit->encrypted) { commit->enc_version = repo->enc_version; - if (commit->enc_version == 1) + if (commit->enc_version == 1 && !repo->pwd_hash_algo) commit->magic = g_strdup (repo->magic); else if (commit->enc_version == 2) { - commit->magic = g_strdup (repo->magic); commit->random_key = g_strdup (repo->random_key); + } else if (commit->enc_version == 3) { + commit->random_key = g_strdup (repo->random_key); + commit->salt = g_strdup (repo->salt); + } else if (commit->enc_version == 4) { + commit->random_key = g_strdup (repo->random_key); + commit->salt = g_strdup (repo->salt); + } + if (commit->enc_version >= 2 && !repo->pwd_hash_algo) { + commit->magic = g_strdup (repo->magic); + } + if (repo->pwd_hash_algo) { + commit->pwd_hash = g_strdup (repo->pwd_hash); + commit->pwd_hash_algo = g_strdup (repo->pwd_hash_algo); + commit->pwd_hash_params = g_strdup (repo->pwd_hash_params); } } commit->no_local_history = repo->no_local_history; @@ -375,6 +403,23 @@ seaf_repo_manager_get_repo_id_list (SeafRepoManager *mgr) return ret; } +GList * +seaf_repo_manager_get_repo_id_list_by_prefix (SeafRepoManager *mgr, + const char *prefix) +{ + GList *ret = NULL; + char sql[256]; + + snprintf (sql, 256, "SELECT repo_id FROM Repo WHERE repo_id LIKE '%s%%'", prefix); + + if (seaf_db_foreach_selected_row (mgr->seaf->db, sql, + collect_repo_id, &ret) < 0) { + return NULL; + } + + return ret; +} + GList * seaf_repo_manager_get_repo_list (SeafRepoManager *mgr, int start, int limit, diff --git a/server/gc/repo-mgr.h b/server/gc/repo-mgr.h index 0368117e..f2caed63 100644 --- a/server/gc/repo-mgr.h +++ b/server/gc/repo-mgr.h @@ -28,7 +28,11 @@ struct _SeafRepo { gboolean encrypted; int enc_version; gchar magic[65]; /* hash(repo_id + passwd), key stretched. */ + gchar pwd_hash[65]; /* hash(repo_id + passwd), key stretched. */ + gchar *pwd_hash_algo; + gchar *pwd_hash_params; gchar random_key[97]; + gchar salt[65]; gboolean no_local_history; SeafBranch *head; @@ -113,6 +117,10 @@ seaf_repo_manager_get_repo_list (SeafRepoManager *mgr, GList * seaf_repo_manager_get_repo_id_list (SeafRepoManager *mgr); +GList * +seaf_repo_manager_get_repo_id_list_by_prefix (SeafRepoManager *mgr, + const char *prefix); + int seaf_repo_manager_set_repo_history_limit (SeafRepoManager *mgr, const char *repo_id, diff --git a/server/gc/seafserv-gc.c b/server/gc/seafserv-gc.c index 313b5c27..65acafa4 100644 --- a/server/gc/seafserv-gc.c +++ b/server/gc/seafserv-gc.c @@ -4,6 +4,7 @@ #include #include "seafile-session.h" +#include "seaf-utils.h" #include "gc-core.h" #include "verify.h" @@ -15,7 +16,7 @@ static char *central_config_dir = NULL; SeafileSession *seaf; -static const char *short_opts = "hvc:d:VDrRF:"; +static const char *short_opts = "hvc:d:VDrRF:Ct:i:"; static const struct option long_opts[] = { { "help", no_argument, NULL, 'h', }, { "version", no_argument, NULL, 'v', }, @@ -26,18 +27,24 @@ static const struct option long_opts[] = { { "dry-run", no_argument, NULL, 'D' }, { "rm-deleted", no_argument, NULL, 'r' }, { "rm-fs", no_argument, NULL, 'R' }, + { "check", no_argument, NULL, 'C' }, + { "thread-num", required_argument, NULL, 't', }, + { "id-prefix", required_argument, NULL, 'i', }, { 0, 0, 0, 0 }, }; static void usage () { - fprintf (stderr, "usage: seafserv-gc [-c config_dir] [-d seafile_dir] " - "[repo_id_1 [repo_id_2 ...]]\n" - "Additional options:\n" - "-r, --rm-deleted: remove garbaged repos\n" - "-R, --rm-fs: remove fs object\n" - "-D, --dry-run: report blocks that can be remove, but not remove them\n" - "-V, --verbose: verbose output messages\n"); + fprintf (stderr, + "usage: seafserv-gc [-c config_dir] [-d seafile_dir] " + "[repo_id_1 [repo_id_2 ...]]\n" + "Additional options:\n" + "-r, --rm-deleted: remove garbaged repos\n" + "-R, --rm-fs: remove fs object\n" + "-D, --dry-run: report blocks that can be remove, but not remove them\n" + "-V, --verbose: verbose output messages\n" + "-C, --check: check data integrity\n" + "-t, --thread-num: thread number for gc repos\n"); } #ifdef WIN32 @@ -66,6 +73,8 @@ get_argv_utf8 (int *argc) } #endif +#define DEFAULT_THREAD_NUM 10 + int main(int argc, char *argv[]) { @@ -74,6 +83,10 @@ main(int argc, char *argv[]) int dry_run = 0; int rm_garbage = 0; int rm_fs = 0; + int check_integrity = 0; + int thread_num = 1; + const char *debug_str = NULL; + char *id_prefix = NULL; #ifdef WIN32 argv = get_argv_utf8 (&argc); @@ -111,6 +124,15 @@ main(int argc, char *argv[]) case 'R': rm_fs = 1; break; + case 'C': + check_integrity = 1; + break; + case 't': + thread_num = atoi(optarg); + break; + case 'i': + id_prefix = g_strdup(optarg); + break; default: usage(); exit(-1); @@ -121,6 +143,10 @@ main(int argc, char *argv[]) g_type_init(); #endif + if (!debug_str) + debug_str = g_getenv("SEAFILE_DEBUG"); + seafile_debug_set_flags_string (debug_str); + if (seafile_log_init ("-", "info", "debug", "seafserv-gc") < 0) { fprintf (stderr, "Failed to init log.\n"); exit (1); @@ -136,7 +162,7 @@ main(int argc, char *argv[]) } if (rm_garbage) { - delete_garbaged_repos (dry_run); + delete_garbaged_repos (dry_run, thread_num); return 0; } @@ -145,7 +171,13 @@ main(int argc, char *argv[]) for (i = optind; i < argc; i++) repo_id_list = g_list_append (repo_id_list, g_strdup(argv[i])); - gc_core_run (repo_id_list, dry_run, verbose, rm_fs); + if (check_integrity) { + return verify_repos (repo_id_list); + } + + gc_core_run (repo_id_list, id_prefix, dry_run, verbose, thread_num, rm_fs); + + g_free (id_prefix); return 0; } diff --git a/server/gc/verify.c b/server/gc/verify.c index d517004c..9347557b 100644 --- a/server/gc/verify.c +++ b/server/gc/verify.c @@ -1,10 +1,12 @@ #include "seafile-session.h" +#include "utils.h" #include "log.h" typedef struct VerifyData { SeafRepo *repo; gint64 truncate_time; gboolean traversed_head; + GHashTable *exist_blocks; } VerifyData; static int @@ -24,11 +26,9 @@ check_blocks (VerifyData *data, const char *file_id) } for (i = 0; i < seafile->n_blocks; ++i) { - if (!seaf_block_manager_block_exists (seaf->block_mgr, - repo->store_id, - repo->version, - seafile->blk_sha1s[i])) - g_message ("Block %s is missing.\n", seafile->blk_sha1s[i]); + if (!g_hash_table_lookup(data->exist_blocks, seafile->blk_sha1s[i])) { + seaf_message ("Block %s is missing.\n", seafile->blk_sha1s[i]); + } } seafile_unref (seafile); @@ -69,8 +69,12 @@ traverse_commit (SeafCommit *commit, void *vdata, gboolean *stop) (gint64)(commit->ctime) < data->truncate_time && data->traversed_head) { + /* Still traverse the first commit older than truncate_time. + * If a file in the child commit of this commit is deleted, + * we need to access this commit in order to restore it + * from trash. + */ *stop = TRUE; - return TRUE; } if (!data->traversed_head) @@ -88,6 +92,61 @@ traverse_commit (SeafCommit *commit, void *vdata, gboolean *stop) return TRUE; } +static int +verify_virtual_repos (VerifyData *data) +{ + SeafRepo *repo = data->repo; + if (repo->is_virtual) { + return 0; + } + + GList *vrepo_ids = NULL, *ptr; + char *repo_id; + SeafVirtRepo *vinfo; + int ret = 0; + + vrepo_ids = seaf_repo_manager_get_virtual_repo_ids_by_origin (seaf->repo_mgr, + repo->id); + + for (ptr = vrepo_ids; ptr; ptr = ptr->next) { + repo_id = ptr->data; + vinfo = seaf_repo_manager_get_virtual_repo_info (seaf->repo_mgr, repo_id); + if (!vinfo) { + continue; + } + + gboolean res = seaf_commit_manager_traverse_commit_tree (seaf->commit_mgr, + repo->store_id, repo->version, + vinfo->base_commit, + traverse_commit, + data, + FALSE); + seaf_virtual_repo_info_free (vinfo); + if (!res) { + seaf_warning ("Failed to traverse base commit %s for virtual repo %s.\n", vinfo->base_commit, repo_id); + ret = -1; + goto out; + } + } + +out: + string_list_free (vrepo_ids); + return ret; + +} + +static gboolean +collect_exist_blocks (const char *store_id, int version, + const char *block_id, void *vdata) +{ + GHashTable *exist_blocks = vdata; + char *copy = g_strdup (block_id); + + g_hash_table_replace (exist_blocks, copy, copy); + + return TRUE; +} + static int verify_repo (SeafRepo *repo) { @@ -99,10 +158,22 @@ verify_repo (SeafRepo *repo) data.repo = repo; data.truncate_time = seaf_repo_manager_get_repo_truncate_time (repo->manager, repo->id); + data.exist_blocks = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); + ret = seaf_block_manager_foreach_block (seaf->block_mgr, + repo->store_id, repo->version, + collect_exist_blocks, + data.exist_blocks); + if (ret < 0) { + seaf_warning ("Failed to collect existing blocks for repo %.8s, stop GC.\n\n", + repo->id); + g_hash_table_destroy (data.exist_blocks); + return ret; + } branches = seaf_branch_manager_get_branch_list (seaf->branch_mgr, repo->id); if (branches == NULL) { seaf_warning ("[GC] Failed to get branch list of repo %s.\n", repo->id); + g_hash_table_destroy (data.exist_blocks); return -1; } @@ -123,6 +194,14 @@ verify_repo (SeafRepo *repo) g_list_free (branches); + if (ret < 0) { + g_hash_table_destroy (data.exist_blocks); + return ret; + } + + ret = verify_virtual_repos (&data); + + g_hash_table_destroy (data.exist_blocks); return ret; } @@ -144,13 +223,18 @@ verify_repos (GList *repo_id_list) if (!repo) continue; + seaf_message ("Start to verify repo %s\n", repo->id); if (repo->is_corrupted) { seaf_warning ("Repo %s is corrupted.\n", repo->id); } else { ret = verify_repo (repo); + if (ret < 0) { + seaf_warning ("Failed to verify repo %s\n", repo->id); + seaf_repo_unref (repo); + continue; + } + seaf_message ("Verify repo %s success\n", repo->id); seaf_repo_unref (repo); - if (ret < 0) - break; } } diff --git a/server/http-server.c b/server/http-server.c index 8839c211..c7999908 100644 --- a/server/http-server.c +++ b/server/http-server.c @@ -313,6 +313,21 @@ lookup_perm_cache (HttpServer *htp_server, const char *repo_id, const char *user return perm; } +static char * +get_auth_token (evhtp_request_t *req) +{ + const char *token = evhtp_kv_find (req->headers_in, "Seafile-Repo-Token"); + if (token) { + return g_strdup (token); + } + + char *tmp_token = NULL; + const char *auth_token = evhtp_kv_find (req->headers_in, "Authorization"); + tmp_token = seaf_parse_auth_token (auth_token); + + return tmp_token; +} + static void insert_perm_cache (HttpServer *htp_server, const char *repo_id, const char *username, @@ -918,7 +933,8 @@ gen_merge_description (SeafRepo *repo, static int fast_forward_or_merge (const char *repo_id, SeafCommit *base, - SeafCommit *new_commit) + SeafCommit *new_commit, + const char *token) { #define MAX_RETRY_COUNT 3 @@ -926,6 +942,9 @@ fast_forward_or_merge (const char *repo_id, SeafCommit *current_head = NULL, *merged_commit = NULL; int retry_cnt = 0; int ret = 0; + char *last_gc_id = NULL; + gboolean check_gc; + gboolean gc_conflict = FALSE; repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id); if (!repo) { @@ -934,6 +953,26 @@ fast_forward_or_merge (const char *repo_id, goto out; } + /* In some uploads, no blocks need to be uploaded. For example, deleting + * a file or folder. In such cases, checkbl won't be called. + * So the last gc id is not inserted to the database. We don't need to + * check gc for these cases since no new blocks are uploaded. + * + * Note that having a 'NULL' gc id in database is not the same as not having + * a last gc id record. The former one indicates that, before block upload, + * no GC has been performed; the latter one indicates no _new_ blocks are + * being referenced by this new commit. + */ + if (seaf_db_type(seaf->db) == SEAF_DB_TYPE_SQLITE) + check_gc = FALSE; + else + check_gc = seaf_repo_has_last_gc_id (repo, token); + + if (check_gc) { + last_gc_id = seaf_repo_get_last_gc_id (repo, token); + seaf_repo_remove_last_gc_id (repo, token); + } + retry: current_head = seaf_commit_manager_get_commit (seaf->commit_mgr, repo->id, repo->version, @@ -1002,10 +1041,22 @@ fast_forward_or_merge (const char *repo_id, seaf_branch_set_commit(repo->head, merged_commit->commit_id); + gc_conflict = FALSE; + if (seaf_branch_manager_test_and_update_branch(seaf->branch_mgr, repo->head, - current_head->commit_id) < 0) + current_head->commit_id, + check_gc, last_gc_id, + repo->store_id, + &gc_conflict) < 0) { + if (gc_conflict) { + seaf_warning ("Head branch update for repo %s conflicts with GC.\n", + repo_id); + ret = -1; + goto out; + } + seaf_repo_unref (repo); repo = NULL; seaf_commit_unref (current_head); @@ -1032,6 +1083,7 @@ fast_forward_or_merge (const char *repo_id, } out: + g_free (last_gc_id); seaf_commit_unref (current_head); seaf_commit_unref (merged_commit); seaf_repo_unref (repo); @@ -1047,6 +1099,7 @@ put_update_branch_cb (evhtp_request_t *req, void *arg) char *username = NULL; SeafRepo *repo = NULL; SeafCommit *new_commit = NULL, *base = NULL; + char *token = NULL; const char *new_commit_id = evhtp_kv_find (req->uri->query, "head"); if (new_commit_id == NULL || !is_object_id_valid (new_commit_id)) { @@ -1099,7 +1152,9 @@ put_update_branch_cb (evhtp_request_t *req, void *arg) goto out; } - if (fast_forward_or_merge (repo_id, base, new_commit) < 0) { + token = get_auth_token (req); + + if (fast_forward_or_merge (repo_id, base, new_commit, token) < 0) { seaf_warning ("Fast forward merge for repo %s is failed.\n", repo_id); evhtp_send_reply (req, EVHTP_RES_SERVERR); goto out; @@ -1112,6 +1167,7 @@ put_update_branch_cb (evhtp_request_t *req, void *arg) evhtp_send_reply (req, EVHTP_RES_OK); out: + g_free (token); seaf_repo_unref (repo); seaf_commit_unref (new_commit); seaf_commit_unref (base); @@ -1280,6 +1336,28 @@ get_commit_info_cb (evhtp_request_t *req, void *arg) g_strfreev (parts); } +static int +save_last_gc_id (const char *repo_id, const char *token) +{ + SeafRepo *repo; + char *gc_id; + + repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id); + if (!repo) { + seaf_warning ("Failed to find repo %s.\n", repo_id); + return -1; + } + + gc_id = seaf_repo_get_current_gc_id (repo); + + seaf_repo_set_last_gc_id (repo, token, gc_id); + + g_free (gc_id); + seaf_repo_unref (repo); + + return 0; +} + static void put_commit_cb (evhtp_request_t *req, void *arg) { @@ -1331,7 +1409,22 @@ put_commit_cb (evhtp_request_t *req, void *arg) if (seaf_commit_manager_add_commit (seaf->commit_mgr, commit) < 0) { evhtp_send_reply (req, EVHTP_RES_SERVERR); } else { - evhtp_send_reply (req, EVHTP_RES_OK); + /* Last GCID must be set before checking blocks. However, in http sync, + * block list may be sent in multiple http requests. There is no way to + * tell which one is the first check block request. + * + * So we set the last GCID just before replying to upload commit + * request. One consequence is that even if the following upload + * doesn't upload new blocks, we still need to check gc conflict in + * update-branch request. Since gc conflict is a rare case, this solution + * won't introduce many more gc conflicts. + */ + char *token = get_auth_token (req); + if (save_last_gc_id (repo_id, token) < 0) { + evhtp_send_reply (req, EVHTP_RES_SERVERR); + } else + evhtp_send_reply (req, EVHTP_RES_OK); + g_free (token); } seaf_commit_unref (commit); diff --git a/server/index-blocks-mgr.c b/server/index-blocks-mgr.c index be9839d3..f88ed375 100644 --- a/server/index-blocks-mgr.c +++ b/server/index-blocks-mgr.c @@ -145,12 +145,14 @@ start_index_task (gpointer data, gpointer user_data) GList *ptr = NULL, *id_list = NULL, *size_list = NULL; char *path = NULL; char *ret_json = NULL; + char *gc_id = NULL; char hex[41]; unsigned char sha1[20]; int ret = 0; IdxProgress *progress = idx_para->progress; SeafileCrypt *crypt = idx_para->crypt; + gc_id = seaf_repo_get_current_gc_id(repo); gint64 *size; for (ptr = idx_para->paths; ptr; ptr = ptr->next) { path = ptr->data; @@ -179,6 +181,7 @@ start_index_task (gpointer data, gpointer user_data) id_list, size_list, 0, + gc_id, NULL); progress->status = ret; if (idx_para->ret_json) { @@ -194,6 +197,7 @@ start_index_task (gpointer data, gpointer user_data) g_list_free_full (id_list, g_free); g_list_free_full (size_list, g_free); free_index_para (idx_para); + g_free (gc_id); return; } diff --git a/server/repo-mgr.c b/server/repo-mgr.c index 53bd95e3..b8d998ed 100644 --- a/server/repo-mgr.c +++ b/server/repo-mgr.c @@ -1072,6 +1072,18 @@ create_tables_mysql (SeafRepoManager *mgr) if (seaf_db_query (db, sql) < 0) return -1; + /* Tables for online GC */ + + sql = "CREATE TABLE IF NOT EXISTS GCID (id BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT, " + "repo_id CHAR(36), gc_id CHAR(36), UNIQUE INDEX(repo_id)) ENGINE=INNODB"; + if (seaf_db_query (db, sql) < 0) + return -1; + + sql = "CREATE TABLE IF NOT EXISTS LastGCID (id BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT, " + "repo_id CHAR(36), client_id VARCHAR(128), gc_id CHAR(36), UNIQUE INDEX(repo_id, client_id)) ENGINE=INNODB"; + if (seaf_db_query (db, sql) < 0) + return -1; + sql = "CREATE TABLE IF NOT EXISTS RepoTrash (id BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT, " "repo_id CHAR(36)," "repo_name VARCHAR(255), head_id CHAR(40), owner_id VARCHAR(255)," @@ -4272,7 +4284,8 @@ seaf_repo_manager_edit_repo (const char *repo_id, seaf_branch_set_commit (repo->head, commit->commit_id); if (seaf_branch_manager_test_and_update_branch (seaf->branch_mgr, repo->head, - parent->commit_id) < 0) { + parent->commit_id, + FALSE, NULL, NULL, NULL) < 0) { seaf_repo_unref (repo); seaf_commit_unref (commit); seaf_commit_unref (parent); @@ -4367,6 +4380,97 @@ seaf_get_total_storage (GError **error) return size; } +/* Online GC related */ + +char * +seaf_repo_get_current_gc_id (SeafRepo *repo) +{ + if (seaf_db_type(seaf->db) == SEAF_DB_TYPE_SQLITE) + return NULL; + + char *sql = "SELECT gc_id FROM GCID WHERE repo_id = ?"; + char *gc_id; + + if (!repo->virtual_info) + gc_id = seaf_db_statement_get_string (seaf->db, sql, 1, "string", repo->id); + else { + gc_id = seaf_db_statement_get_string (seaf->db, sql, 1, "string", repo->store_id); + } + + return gc_id; +} + +char * +seaf_repo_get_last_gc_id (SeafRepo *repo, const char *client_id) +{ + if (seaf_db_type(seaf->db) == SEAF_DB_TYPE_SQLITE) + return NULL; + + char *sql = "SELECT gc_id FROM LastGCID WHERE repo_id = ? AND client_id = ?"; + char *gc_id; + + gc_id = seaf_db_statement_get_string (seaf->db, sql, + 2, "string", repo->id, + "string", client_id); + + return gc_id; +} + +gboolean +seaf_repo_has_last_gc_id (SeafRepo *repo, const char *client_id) +{ + if (seaf_db_type(seaf->db) == SEAF_DB_TYPE_SQLITE) + return FALSE; + + char *sql = "SELECT 1 FROM LastGCID WHERE repo_id = ? AND client_id = ?"; + gboolean db_err; + + return seaf_db_statement_exists (seaf->db, sql, &db_err, + 2, "string", repo->id, "string", client_id); +} + +int +seaf_repo_set_last_gc_id (SeafRepo *repo, + const char *client_id, + const char *gc_id) +{ + if (seaf_db_type(seaf->db) == SEAF_DB_TYPE_SQLITE) + return 0; + + gboolean id_exists, db_err = FALSE; + char *sql; + int ret = 0; + + sql = "SELECT 1 FROM LastGCID WHERE repo_id = ? AND client_id = ?"; + id_exists = seaf_db_statement_exists (seaf->db, sql, &db_err, + 2, "string", repo->id, "string", client_id); + if (id_exists) { + sql = "UPDATE LastGCID SET gc_id = ? WHERE repo_id = ? AND client_id = ?"; + ret = seaf_db_statement_query (seaf->db, sql, + 3, "string", gc_id, + "string", repo->id, "string", client_id); + } else { + sql = "INSERT INTO LastGCID (repo_id, client_id, gc_id) VALUES (?, ?, ?)"; + ret = seaf_db_statement_query (seaf->db, sql, + 3, "string", repo->id, + "string", client_id, "string", gc_id); + } + + return ret; +} + +int +seaf_repo_remove_last_gc_id (SeafRepo *repo, + const char *client_id) +{ + if (seaf_db_type(seaf->db) == SEAF_DB_TYPE_SQLITE) + return 0; + + char *sql = "DELETE FROM LastGCID WHERE repo_id = ? AND client_id = ?"; + seaf_db_statement_query (seaf->db, sql, 2, "string", repo->id, "string", client_id); + return 0; +} + int seaf_repo_manager_add_upload_tmp_file (SeafRepoManager *mgr, const char *repo_id, diff --git a/server/repo-mgr.h b/server/repo-mgr.h index 3c2ae0ad..66269dcf 100644 --- a/server/repo-mgr.h +++ b/server/repo-mgr.h @@ -859,6 +859,26 @@ seaf_get_total_file_number (GError **error); gint64 seaf_get_total_storage (GError **error); +/* Online GC related */ + +char * +seaf_repo_get_current_gc_id (SeafRepo *repo); + +char * +seaf_repo_get_last_gc_id (SeafRepo *repo, const char *client_id); + +gboolean +seaf_repo_has_last_gc_id (SeafRepo *repo, const char *client_id); + +int +seaf_repo_set_last_gc_id (SeafRepo *repo, + const char *client_id, + const char *gc_id); + +int +seaf_repo_remove_last_gc_id (SeafRepo *repo, + const char *client_id); + int seaf_repo_manager_add_upload_tmp_file (SeafRepoManager *mgr, const char *repo_id, @@ -927,6 +947,7 @@ post_files_and_gen_commit (GList *filenames, GList *id_list, GList *size_list, gint64 mtime, + char *last_gc_id, GError **error); char * diff --git a/server/repo-op.c b/server/repo-op.c index 21fd2e89..6c9993ad 100644 --- a/server/repo-op.c +++ b/server/repo-op.c @@ -54,6 +54,7 @@ post_files_and_gen_commit (GList *filenames, GList *id_list, GList *size_list, gint64 mtime, + char *last_gc_id, GError **error); /* @@ -444,6 +445,8 @@ gen_new_commit (const char *repo_id, const char *desc, char *new_commit_id, gboolean handle_concurrent_update, + gboolean check_gc, + const char *last_gc_id, GError **error) { #define MAX_RETRY_COUNT 10 @@ -451,6 +454,7 @@ gen_new_commit (const char *repo_id, SeafRepo *repo = NULL; SeafCommit *new_commit = NULL, *current_head = NULL, *merged_commit = NULL; int retry_cnt = 0; + gboolean gc_conflict = FALSE; int ret = 0; repo = seaf_repo_manager_get_repo (seaf->repo_mgr, repo_id); @@ -558,10 +562,27 @@ gen_new_commit (const char *repo_id, seaf_branch_set_commit(repo->head, merged_commit->commit_id); + if (seaf_db_type(seaf->db) == SEAF_DB_TYPE_SQLITE) + check_gc = FALSE; + + if (check_gc) + gc_conflict = FALSE; + if (seaf_branch_manager_test_and_update_branch(seaf->branch_mgr, repo->head, - current_head->commit_id) < 0) + current_head->commit_id, + check_gc, + last_gc_id, + repo->store_id, + &gc_conflict) < 0) { + if (check_gc && gc_conflict) { + seaf_warning ("Head branch update for repo %s conflicts with GC.\n", + repo->id); + g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL, "GC Conflict"); + ret = -1; + goto out; + } if (!handle_concurrent_update) { g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_CONCURRENT_UPLOAD, "Concurrent upload"); ret = -1; @@ -630,6 +651,7 @@ seaf_repo_manager_post_file (SeafRepoManager *mgr, SeafileCrypt *crypt = NULL; SeafDirent *new_dent = NULL; char hex[41]; + char *gc_id = NULL; int ret = 0; int retry_cnt = 0; @@ -678,6 +700,8 @@ seaf_repo_manager_post_file (SeafRepoManager *mgr, crypt = seafile_crypt_new (repo->enc_version, key, iv); } + gc_id = seaf_repo_get_current_gc_id (repo); + gint64 size; if (seaf_fs_manager_index_blocks (seaf->fs_mgr, repo->store_id, repo->version, @@ -709,7 +733,7 @@ seaf_repo_manager_post_file (SeafRepoManager *mgr, snprintf(buf, SEAF_PATH_MAX, "Added \"%s\"", file_name); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, FALSE, error) < 0) { + user, buf, NULL, FALSE, TRUE, gc_id, error) < 0) { if (*error == NULL || (*error)->code != SEAF_ERR_CONCURRENT_UPLOAD) { ret = -1; goto out; @@ -743,6 +767,7 @@ seaf_repo_manager_post_file (SeafRepoManager *mgr, g_free (root_id); g_free (canon_path); g_free (crypt); + g_free (gc_id); if (ret == 0) update_repo_size(repo_id); @@ -1076,6 +1101,7 @@ seaf_repo_manager_post_multi_files (SeafRepoManager *mgr, char *canon_path = NULL; GList *filenames = NULL, *paths = NULL, *id_list = NULL, *size_list = NULL, *ptr; char *filename, *path; + char *gc_id = NULL; unsigned char sha1[20]; SeafileCrypt *crypt = NULL; char hex[41]; @@ -1139,6 +1165,7 @@ seaf_repo_manager_post_multi_files (SeafRepoManager *mgr, if (!task_id) { gint64 *size; + gc_id = seaf_repo_get_current_gc_id(repo); for (ptr = paths; ptr; ptr = ptr->next) { path = ptr->data; @@ -1169,6 +1196,7 @@ seaf_repo_manager_post_multi_files (SeafRepoManager *mgr, id_list, size_list, mtime, + gc_id, error); } else { ret = index_blocks_mgr_start_index (seaf->index_blocks_mgr, @@ -1194,6 +1222,7 @@ seaf_repo_manager_post_multi_files (SeafRepoManager *mgr, g_list_free (size_list); g_free (canon_path); g_free (crypt); + g_free (gc_id); return ret; } @@ -1208,6 +1237,7 @@ post_files_and_gen_commit (GList *filenames, GList *id_list, GList *size_list, gint64 mtime, + char *last_gc_id, GError **error) { SeafRepo *repo = NULL; @@ -1247,7 +1277,7 @@ post_files_and_gen_commit (GList *filenames, g_string_printf (buf, "Added \"%s\".", (char *)(filenames->data)); if (gen_new_commit (repo->id, head_commit, root_id, - user, buf->str, NULL, handle_concurrent_update, error) < 0) { + user, buf->str, NULL, handle_concurrent_update, TRUE, last_gc_id, error) < 0) { if (*error == NULL || (*error)->code != SEAF_ERR_CONCURRENT_UPLOAD) { ret = -1; goto out; @@ -1515,6 +1545,7 @@ seaf_repo_manager_commit_file_blocks (SeafRepoManager *mgr, SeafDirent *new_dent = NULL; GList *blockids = NULL; char hex[41]; + char *gc_id = NULL; int ret = 0; blockids = json_to_file_list (blockids_json); @@ -1549,6 +1580,8 @@ seaf_repo_manager_commit_file_blocks (SeafRepoManager *mgr, goto out; } + gc_id = seaf_repo_get_current_gc_id (repo); + /* Write blocks. */ if (seaf_fs_manager_index_existed_file_blocks ( seaf->fs_mgr, repo->store_id, repo->version, @@ -1582,7 +1615,7 @@ seaf_repo_manager_commit_file_blocks (SeafRepoManager *mgr, *new_id = g_strdup(hex); snprintf(buf, SEAF_PATH_MAX, "Added \"%s\"", file_name); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, TRUE, error) < 0) + user, buf, NULL, TRUE, TRUE, gc_id, error) < 0) ret = -1; out: @@ -1594,6 +1627,7 @@ seaf_repo_manager_commit_file_blocks (SeafRepoManager *mgr, seaf_dirent_free (new_dent); g_free (root_id); g_free (canon_path); + g_free (gc_id); if (ret == 0) update_repo_size(repo_id); @@ -1806,7 +1840,7 @@ seaf_repo_manager_del_file (SeafRepoManager *mgr, } if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, TRUE, error) < 0) { + user, buf, NULL, TRUE, FALSE, NULL, error) < 0) { ret = -1; goto out; } @@ -1931,7 +1965,7 @@ seaf_repo_manager_batch_del_files (SeafRepoManager *mgr, } if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, TRUE, error) < 0) { + user, buf, NULL, TRUE, FALSE, NULL, error) < 0) { ret = -1; goto out; } @@ -2021,6 +2055,8 @@ put_dirent_and_commit (SeafRepo *repo, int n_dents, int replace, const char *user, + gboolean check_gc, + const char *last_gc_id, GError **error) { SeafCommit *head_commit = NULL; @@ -2068,7 +2104,7 @@ put_dirent_and_commit (SeafRepo *repo, } if (gen_new_commit (repo->id, head_commit, root_id, - user, buf, NULL, TRUE, error) < 0) + user, buf, NULL, TRUE, check_gc, last_gc_id, error) < 0) ret = -1; out: @@ -2498,6 +2534,7 @@ cross_repo_copy (const char *src_repo_id, int check_quota_ret; SeafileCrypt *src_crypt = NULL; SeafileCrypt *dst_crypt = NULL; + char *gc_id = NULL; src_repo = seaf_repo_manager_get_repo (seaf->repo_mgr, src_repo_id); if (!src_repo) { @@ -2539,6 +2576,8 @@ cross_repo_copy (const char *src_repo_id, dst_names = json_to_file_list (dst_filename); file_num = g_list_length (src_names); + gc_id = seaf_repo_get_current_gc_id (dst_repo); + src_dents = g_new0 (SeafDirent *, file_num); dst_dents = g_new0 (SeafDirent *, file_num); @@ -2630,6 +2669,8 @@ cross_repo_copy (const char *src_repo_id, file_num, replace, modifier, + TRUE, + gc_id, NULL) < 0) { err_str = COPY_ERR_INTERNAL; ret = -1; @@ -2658,6 +2699,7 @@ cross_repo_copy (const char *src_repo_id, string_list_free (src_names); if (dst_names) string_list_free (dst_names); + g_free (gc_id); if (ret == 0) { update_repo_size (dst_repo_id); @@ -2818,6 +2860,8 @@ seaf_repo_manager_copy_file (SeafRepoManager *mgr, 1, 0, user, + FALSE, + NULL, error) < 0) { if (!error) g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL, @@ -3017,6 +3061,8 @@ seaf_repo_manager_copy_multiple_files (SeafRepoManager *mgr, file_num, 0, user, + FALSE, + NULL, error) < 0) { if (!error) g_set_error (error, SEAFILE_DOMAIN, SEAF_ERR_GENERAL, @@ -3162,7 +3208,7 @@ move_file_same_repo (const char *repo_id, } if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, TRUE, error) < 0) + user, buf, NULL, TRUE, FALSE, NULL, error) < 0) ret = -1; out: @@ -3202,6 +3248,7 @@ cross_repo_move (const char *src_repo_id, int check_quota_ret; SeafileCrypt *src_crypt = NULL; SeafileCrypt *dst_crypt = NULL; + char *gc_id = NULL; src_repo = seaf_repo_manager_get_repo (seaf->repo_mgr, src_repo_id); if (!src_repo) { @@ -3241,6 +3288,7 @@ cross_repo_move (const char *src_repo_id, src_names = json_to_file_list (src_filename); dst_names = json_to_file_list (dst_filename); + gc_id = seaf_repo_get_current_gc_id (dst_repo); file_num = g_list_length (src_names); @@ -3335,6 +3383,8 @@ cross_repo_move (const char *src_repo_id, file_num, replace, modifier, + TRUE, + gc_id, NULL) < 0) { err_str = COPY_ERR_INTERNAL; ret = -1; @@ -3372,6 +3422,7 @@ cross_repo_move (const char *src_repo_id, string_list_free (src_names); if (dst_names) string_list_free (dst_names); + g_free (gc_id); if (ret == 0) { update_repo_size (dst_repo_id); @@ -3556,6 +3607,8 @@ seaf_repo_manager_move_multiple_files (SeafRepoManager *mgr, file_num, replace, user, + FALSE, + NULL, NULL) < 0) { ret = -1; goto out; @@ -3780,7 +3833,7 @@ seaf_repo_manager_mkdir_with_parents (SeafRepoManager *mgr, /* Commit. */ snprintf(buf, SEAF_PATH_MAX, "Added directory \"%s\"", relative_dir_can); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, TRUE, error) < 0) { + user, buf, NULL, TRUE, FALSE, NULL, error) < 0) { ret = -1; g_free (root_id); goto out; @@ -3861,7 +3914,7 @@ seaf_repo_manager_post_dir (SeafRepoManager *mgr, /* Commit. */ snprintf(buf, SEAF_PATH_MAX, "Added directory \"%s\"", new_dir_name); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, TRUE, error) < 0) { + user, buf, NULL, TRUE, FALSE, NULL, error) < 0) { ret = -1; goto out; } @@ -3933,7 +3986,7 @@ seaf_repo_manager_post_empty_file (SeafRepoManager *mgr, /* Commit. */ snprintf(buf, SEAF_PATH_MAX, "Added \"%s\"", new_file_name); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, TRUE, error) < 0) { + user, buf, NULL, TRUE, FALSE, NULL, error) < 0) { ret = -1; goto out; } @@ -4126,7 +4179,7 @@ seaf_repo_manager_rename_file (SeafRepoManager *mgr, } if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, TRUE, error) < 0) { + user, buf, NULL, TRUE, FALSE, NULL, error) < 0) { ret = -1; goto out; } @@ -4269,6 +4322,7 @@ seaf_repo_manager_put_file (SeafRepoManager *mgr, SeafDirent *new_dent = NULL; char hex[41]; char *old_file_id = NULL, *fullpath = NULL; + char *gc_id = NULL; int ret = 0; if (g_access (temp_file_path, R_OK) != 0) { @@ -4320,6 +4374,8 @@ seaf_repo_manager_put_file (SeafRepoManager *mgr, crypt = seafile_crypt_new (repo->enc_version, key, iv); } + gc_id = seaf_repo_get_current_gc_id (repo); + gint64 size; if (seaf_fs_manager_index_blocks (seaf->fs_mgr, repo->store_id, repo->version, @@ -4366,7 +4422,7 @@ seaf_repo_manager_put_file (SeafRepoManager *mgr, /* Commit. */ snprintf(buf, SEAF_PATH_MAX, "Modified \"%s\"", file_name); - if (gen_new_commit (repo_id, head_commit, root_id, user, buf, NULL, TRUE, error) < 0) { + if (gen_new_commit (repo_id, head_commit, root_id, user, buf, NULL, TRUE, TRUE, gc_id, error) < 0) { ret = -1; goto out; } @@ -4387,6 +4443,7 @@ seaf_repo_manager_put_file (SeafRepoManager *mgr, g_free (crypt); g_free (old_file_id); g_free (fullpath); + g_free (gc_id); if (ret == 0) { update_repo_size (repo_id); @@ -4448,7 +4505,7 @@ seaf_repo_manager_update_dir (SeafRepoManager *mgr, commit_desc = g_strdup("Auto merge by system"); if (gen_new_commit (repo_id, head_commit, new_dir_id, - user, commit_desc, new_commit_id, TRUE, error) < 0) + user, commit_desc, new_commit_id, TRUE, FALSE, NULL, error) < 0) ret = -1; g_free (commit_desc); goto out; @@ -4481,7 +4538,7 @@ seaf_repo_manager_update_dir (SeafRepoManager *mgr, commit_desc = g_strdup("Auto merge by system"); if (gen_new_commit (repo_id, head_commit, root_id, - user, commit_desc, new_commit_id, TRUE, error) < 0) { + user, commit_desc, new_commit_id, TRUE, FALSE, NULL, error) < 0) { ret = -1; g_free (commit_desc); goto out; @@ -4986,7 +5043,7 @@ seaf_repo_manager_revert_file (SeafRepoManager *mgr, #endif snprintf(buf, SEAF_PATH_MAX, "Reverted file \"%s\" to status at %s", filename, time_str); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, TRUE, error) < 0) { + user, buf, NULL, TRUE, FALSE, NULL, error) < 0) { ret = -1; goto out; } @@ -5208,7 +5265,7 @@ seaf_repo_manager_revert_dir (SeafRepoManager *mgr, /* Commit. */ snprintf(buf, SEAF_PATH_MAX, "Recovered deleted directory \"%s\"", dirname); if (gen_new_commit (repo_id, head_commit, root_id, - user, buf, NULL, TRUE, error) < 0) { + user, buf, NULL, TRUE, FALSE, NULL, error) < 0) { ret = -1; goto out; } @@ -6174,7 +6231,8 @@ seaf_repo_manager_revert_on_server (SeafRepoManager *mgr, seaf_branch_set_commit (repo->head, new_commit->commit_id); if (seaf_branch_manager_test_and_update_branch (seaf->branch_mgr, repo->head, - new_commit->parent_id) < 0) + new_commit->parent_id, + FALSE, NULL, NULL, NULL) < 0) { seaf_repo_unref (repo); seaf_commit_unref (commit); diff --git a/server/seaf-server.c b/server/seaf-server.c index a7ddc247..5f261e03 100644 --- a/server/seaf-server.c +++ b/server/seaf-server.c @@ -716,6 +716,10 @@ static void start_rpc_service (const char *seafile_dir, seafile_get_repo_history_limit, "get_repo_history_limit", searpc_signature_int__string()); + searpc_server_register_function ("seafserv-threaded-rpcserver", + seafile_set_repo_valid_since, + "set_repo_valid_since", + searpc_signature_int__string_int64()); /* System default library */ searpc_server_register_function ("seafserv-threaded-rpcserver", diff --git a/tests/test_gc/test_gc.py b/tests/test_gc/test_gc.py new file mode 100644 index 00000000..db0cb210 --- /dev/null +++ b/tests/test_gc/test_gc.py @@ -0,0 +1,133 @@ +import pytest +import requests +import os +import time +from subprocess import run +from tests.config import USER, USER2 +from seaserv import seafile_api as api + +file_name = 'file.txt' +first_name = 'first.txt' +first_path = os.getcwd() + '/' + first_name +first_content = 'Fist file content.\r\n' + +second_name = 'second.txt' +second_content = 'Second file content.\r\n' +second_path = os.getcwd() + '/' + second_name + +third_name = 'third.txt' +third_path = os.getcwd() + '/' + third_name +third_content = 'Third file content.\r\n' + +def create_test_file(): + fp = open(first_path, 'w') + fp.write(first_content) + fp.close() + fp = open(second_path, 'w') + fp.write(second_content) + fp.close() + fp = open(third_path, 'w') + fp.write(third_content) + fp.close() + +def del_local_files(): + os.remove(first_path) + os.remove(second_path) + os.remove(third_path) + +def create_test_dir(repo, dir_name): + parent_dir = '/' + api.post_dir(repo.id,parent_dir,dir_name,USER) + +def run_gc(repo_id, rm_fs, check): + cmdStr = 'seafserv-gc --verbose -F /tmp/seafile-tests/conf -d /tmp/seafile-tests/seafile-data %s %s %s'%(rm_fs, check, repo_id) + cmd=cmdStr.split(' ') + ret = run (cmd) + assert ret.returncode == 0 + +@pytest.mark.parametrize('rm_fs', ['', '--rm-fs']) +def test_gc_full_history(repo, rm_fs): + create_test_file() + + api.set_repo_valid_since (repo.id, -1) + + create_test_dir(repo,'subdir') + v_repo_id = api.share_subdir_to_user(repo.id, '/subdir', USER, USER2, 'rw') + assert v_repo_id is not None + assert api.post_file(repo.id, first_path, '/subdir', file_name, USER) == 0 + + assert api.post_empty_file(repo.id, '/', file_name, USER) == 0 + t_repo = api.get_repo(repo.id) + assert api.put_file(repo.id, first_path, '/', file_name, USER, t_repo.head_cmmt_id) + t_repo = api.get_repo(repo.id) + assert api.put_file(repo.id, second_path, '/', file_name, USER, t_repo.head_cmmt_id) + t_repo = api.get_repo(repo.id) + assert api.put_file(repo.id, third_path, '/', file_name, USER, t_repo.head_cmmit_id) + time.sleep(1) + + api.del_file(repo.id, '/', '[\"'+file_name+'\"]', USER) + + run_gc(repo.id, rm_fs, '') + run_gc(repo.id, '', '--check') + + del_local_files() + +@pytest.mark.parametrize('rm_fs', ['', '--rm-fs']) +def test_gc_no_history(repo, rm_fs): + create_test_file() + + api.set_repo_valid_since (repo.id, 0) + + create_test_dir(repo,'subdir') + v_repo_id = api.share_subdir_to_user(repo.id, '/subdir', USER, USER2, 'rw') + assert v_repo_id is not None + assert api.post_file(repo.id, first_path, '/subdir', file_name, USER) == 0 + + assert api.post_empty_file(repo.id, '/', file_name, USER) == 0 + t_repo = api.get_repo(repo.id) + assert api.put_file(repo.id, first_path, '/', file_name, USER, t_repo.head_cmmt_id) + t_repo = api.get_repo(repo.id) + assert api.put_file(repo.id, second_path, '/', file_name, USER, t_repo.head_cmmt_id) + t_repo = api.get_repo(repo.id) + time.sleep(1) + assert api.put_file(repo.id, third_path, '/', file_name, USER, t_repo.head_cmmt_id) + + time.sleep(1) + api.del_file(repo.id, '/', '[\"'+file_name+'\"]', USER) + + run_gc(repo.id, rm_fs, '') + api.set_repo_valid_since (repo.id, 0) + run_gc(repo.id, '', '--check') + + del_local_files() + +@pytest.mark.parametrize('rm_fs', ['', '--rm-fs']) +def test_gc_partial_history(repo, rm_fs): + create_test_file() + + create_test_dir(repo,'subdir') + v_repo_id = api.share_subdir_to_user(repo.id, '/subdir', USER, USER2, 'rw') + assert v_repo_id is not None + assert api.post_file(repo.id, first_path, '/subdir', file_name, USER) == 0 + + assert api.post_empty_file(repo.id, '/', file_name, USER) == 0 + t_repo = api.get_repo(repo.id) + time.sleep(1) + assert api.put_file(repo.id, first_path, '/', file_name, USER, t_repo.head_cmmt_id) + t_repo = api.get_repo(repo.id) + time.sleep(1) + assert api.put_file(repo.id, second_path, '/', file_name, USER, t_repo.head_cmmt_id) + + t_repo = api.get_repo(repo.id) + t_commit = api.get_commit(t_repo.id, t_repo.version, t_repo.head_cmmt_id) + api.set_repo_valid_since (repo.id, t_commit.ctime) + + time.sleep(1) + assert api.put_file(repo.id, third_path, '/', file_name, USER, t_repo.head_cmmt_id) + + api.del_file(repo.id, '/', '[\"'+file_name+'\"]', USER) + + run_gc(repo.id, rm_fs, '') + run_gc(repo.id, '', '--check') + + del_local_files()