Skip to content

Commit

Permalink
Add online gc (#706)
Browse files Browse the repository at this point in the history
* Add online gc core

* Add check gc when update repo

* Go add check gc when update repo

* Add gc unit test

* Support set pwd_hash for gc and fsck

* Optimize to check if block exists

---------

Co-authored-by: 杨赫然 <[email protected]>
  • Loading branch information
feiniks and 杨赫然 authored Oct 24, 2024
1 parent 6af96fe commit 5899f11
Show file tree
Hide file tree
Showing 24 changed files with 1,660 additions and 246 deletions.
53 changes: 52 additions & 1 deletion common/branch-mgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,19 +368,70 @@ on_branch_updated (SeafBranchManager *mgr, SeafBranch *branch)
publish_repo_update_event (branch->repo_id, branch->commit_id);
}

static gboolean
get_gc_id (SeafDBRow *row, void *data)
{
char **out_gc_id = data;

*out_gc_id = g_strdup(seaf_db_row_get_column_text (row, 0));

return FALSE;
}

int
seaf_branch_manager_test_and_update_branch (SeafBranchManager *mgr,
SeafBranch *branch,
const char *old_commit_id)
const char *old_commit_id,
gboolean check_gc,
const char *last_gc_id,
const char *origin_repo_id,
gboolean *gc_conflict)
{
SeafDBTrans *trans;
char *sql;
char commit_id[41] = { 0 };
char *gc_id = NULL;

if (check_gc)
*gc_conflict = FALSE;

trans = seaf_db_begin_transaction (mgr->seaf->db);
if (!trans)
return -1;

if (check_gc) {
sql = "SELECT gc_id FROM GCID WHERE repo_id = ? FOR UPDATE";
if (!origin_repo_id) {
if (seaf_db_trans_foreach_selected_row (trans, sql,
get_gc_id, &gc_id,
1, "string", branch->repo_id) < 0) {
seaf_db_rollback (trans);
seaf_db_trans_close (trans);
return -1;
}
}
else {
if (seaf_db_trans_foreach_selected_row (trans, sql,
get_gc_id, &gc_id,
1, "string", origin_repo_id) < 0) {
seaf_db_rollback (trans);
seaf_db_trans_close (trans);
return -1;
}
}

if (g_strcmp0 (last_gc_id, gc_id) != 0) {
seaf_warning ("Head branch update for repo %s conflicts with GC.\n",
branch->repo_id);
seaf_db_rollback (trans);
seaf_db_trans_close (trans);
*gc_conflict = TRUE;
g_free (gc_id);
return -1;
}
g_free (gc_id);
}

switch (seaf_db_type (mgr->seaf->db)) {
case SEAF_DB_TYPE_MYSQL:
case SEAF_DB_TYPE_PGSQL:
Expand Down
6 changes: 5 additions & 1 deletion common/branch-mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ seaf_branch_manager_update_branch (SeafBranchManager *mgr,
int
seaf_branch_manager_test_and_update_branch (SeafBranchManager *mgr,
SeafBranch *branch,
const char *old_commit_id);
const char *old_commit_id,
gboolean check_gc,
const char *last_gc_id,
const char *origin_repo_id,
gboolean *gc_conflict);
#endif

SeafBranch *
Expand Down
13 changes: 12 additions & 1 deletion common/rpc-service.c
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,8 @@ seafile_change_repo_passwd (const char *repo_id,
seaf_branch_set_commit (repo->head, commit->commit_id);
if (seaf_branch_manager_test_and_update_branch (seaf->branch_mgr,
repo->head,
parent->commit_id) < 0) {
parent->commit_id,
FALSE, NULL, NULL, NULL) < 0) {
seaf_repo_unref (repo);
seaf_commit_unref (commit);
seaf_commit_unref (parent);
Expand Down Expand Up @@ -1365,6 +1366,16 @@ seafile_get_repo_history_limit (const char *repo_id,
return seaf_repo_manager_get_repo_history_limit (seaf->repo_mgr, repo_id);
}

int
seafile_set_repo_valid_since (const char *repo_id,
gint64 timestamp,
GError **error)
{
return seaf_repo_manager_set_repo_valid_since (seaf->repo_mgr,
repo_id,
timestamp);
}

int
seafile_repo_set_access_property (const char *repo_id, const char *ap, GError **error)
{
Expand Down
112 changes: 82 additions & 30 deletions fileserver/fileop.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ func mkdirWithParents(repoID, parentDir, newDirPath, user string) error {
}

buf := fmt.Sprintf("Added directory \"%s\"", relativeDirCan)
_, err = genNewCommit(repo, headCommit, rootID, user, buf, true)
_, err = genNewCommit(repo, headCommit, rootID, user, buf, true, "", false)
if err != nil {
err := fmt.Errorf("failed to generate new commit: %v", err)
return err
Expand Down Expand Up @@ -1714,7 +1714,13 @@ func postMultiFiles(rsp http.ResponseWriter, r *http.Request, repoID, parentDir,
}
}

retStr, err := postFilesAndGenCommit(fileNames, repo.ID, user, canonPath, replace, ids, sizes, lastModify)
gcID, err := repomgr.GetCurrentGCID(repo.StoreID)
if err != nil {
err := fmt.Errorf("failed to get current gc id for repo %s: %v", repoID, err)
return &appError{err, "", http.StatusInternalServerError}
}

retStr, err := postFilesAndGenCommit(fileNames, repo.ID, user, canonPath, replace, ids, sizes, lastModify, gcID)
if err != nil {
err := fmt.Errorf("failed to post files and gen commit: %v", err)
return &appError{err, "", http.StatusInternalServerError}
Expand Down Expand Up @@ -1770,7 +1776,7 @@ func checkFilesWithSameName(repo *repomgr.Repo, canonPath string, fileNames []st
return false
}

func postFilesAndGenCommit(fileNames []string, repoID string, user, canonPath string, replace bool, ids []string, sizes []int64, lastModify int64) (string, error) {
func postFilesAndGenCommit(fileNames []string, repoID string, user, canonPath string, replace bool, ids []string, sizes []int64, lastModify int64, lastGCID string) (string, error) {
handleConncurrentUpdate := true
if !replace {
handleConncurrentUpdate = false
Expand Down Expand Up @@ -1816,7 +1822,7 @@ retry:
buf = fmt.Sprintf("Added \"%s\".", fileNames[0])
}

_, err = genNewCommit(repo, headCommit, rootID, user, buf, handleConncurrentUpdate)
_, err = genNewCommit(repo, headCommit, rootID, user, buf, handleConncurrentUpdate, lastGCID, true)
if err != nil {
if err != ErrConflict {
err := fmt.Errorf("failed to generate new commit: %v", err)
Expand Down Expand Up @@ -1880,7 +1886,7 @@ func getCanonPath(p string) string {

var ErrConflict = fmt.Errorf("Concurent upload conflict")

func genNewCommit(repo *repomgr.Repo, base *commitmgr.Commit, newRoot, user, desc string, handleConncurrentUpdate bool) (string, error) {
func genNewCommit(repo *repomgr.Repo, base *commitmgr.Commit, newRoot, user, desc string, handleConncurrentUpdate bool, lastGCID string, checkGC bool) (string, error) {
var retryCnt int
repoID := repo.ID
commit := commitmgr.NewCommit(repoID, base.CommitID, newRoot, user, desc)
Expand All @@ -1895,7 +1901,7 @@ func genNewCommit(repo *repomgr.Repo, base *commitmgr.Commit, newRoot, user, des
maxRetryCnt := 10

for {
retry, err := genCommitNeedRetry(repo, base, commit, newRoot, user, handleConncurrentUpdate, &commitID)
retry, err := genCommitNeedRetry(repo, base, commit, newRoot, user, handleConncurrentUpdate, &commitID, lastGCID, checkGC)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -1925,10 +1931,19 @@ func genNewCommit(repo *repomgr.Repo, base *commitmgr.Commit, newRoot, user, des
return commitID, nil
}

func fastForwardOrMerge(user string, repo *repomgr.Repo, base, newCommit *commitmgr.Commit) error {
func fastForwardOrMerge(user, token string, repo *repomgr.Repo, base, newCommit *commitmgr.Commit) error {
var retryCnt int
checkGC, err := repomgr.HasLastGCID(repo.ID, token)
if err != nil {
return err
}
var lastGCID string
if checkGC {
lastGCID, _ = repomgr.GetLastGCID(repo.ID, token)
repomgr.RemoveLastGCID(repo.ID, token)
}
for {
retry, err := genCommitNeedRetry(repo, base, newCommit, newCommit.RootID, user, true, nil)
retry, err := genCommitNeedRetry(repo, base, newCommit, newCommit.RootID, user, true, nil, lastGCID, checkGC)
if err != nil {
return err
}
Expand All @@ -1948,7 +1963,7 @@ func fastForwardOrMerge(user string, repo *repomgr.Repo, base, newCommit *commit
return nil
}

func genCommitNeedRetry(repo *repomgr.Repo, base *commitmgr.Commit, commit *commitmgr.Commit, newRoot, user string, handleConncurrentUpdate bool, commitID *string) (bool, error) {
func genCommitNeedRetry(repo *repomgr.Repo, base *commitmgr.Commit, commit *commitmgr.Commit, newRoot, user string, handleConncurrentUpdate bool, commitID *string, lastGCID string, checkGC bool) (bool, error) {
var secondParentID string
repoID := repo.ID
var mergeDesc string
Expand Down Expand Up @@ -2001,7 +2016,10 @@ func genCommitNeedRetry(repo *repomgr.Repo, base *commitmgr.Commit, commit *comm
mergedCommit = commit
}

err = updateBranch(repoID, mergedCommit.CommitID, currentHead.CommitID, secondParentID)
gcConflict, err := updateBranch(repoID, repo.StoreID, mergedCommit.CommitID, currentHead.CommitID, secondParentID, checkGC, lastGCID)
if gcConflict {
return false, err
}
if err != nil {
return true, nil
}
Expand All @@ -2024,56 +2042,80 @@ func genMergeDesc(repo *repomgr.Repo, mergedRoot, p1Root, p2Root string) string
return desc
}

func updateBranch(repoID, newCommitID, oldCommitID, secondParentID string) error {
func updateBranch(repoID, originRepoID, newCommitID, oldCommitID, secondParentID string, checkGC bool, lastGCID string) (gcConflict bool, err error) {
ctx, cancel := context.WithTimeout(context.Background(), option.DBOpTimeout)
defer cancel()
trans, err := seafileDB.BeginTx(ctx, nil)
if err != nil {
err := fmt.Errorf("failed to start transaction: %v", err)
return false, err
}

var row *sql.Row
var sqlStr string
if checkGC {
sqlStr = "SELECT gc_id FROM GCID WHERE repo_id = ? FOR UPDATE"
if originRepoID == "" {
row = trans.QueryRowContext(ctx, sqlStr, repoID)
} else {
row = trans.QueryRowContext(ctx, sqlStr, originRepoID)
}
var gcID sql.NullString
if err := row.Scan(&gcID); err != nil {
if err != sql.ErrNoRows {
trans.Rollback()
return false, err
}
}

if lastGCID != gcID.String {
err = fmt.Errorf("Head branch update for repo %s conflicts with GC.", repoID)
trans.Rollback()
return true, err
}
}

var commitID string
name := "master"
var sqlStr string
if strings.EqualFold(dbType, "mysql") {
sqlStr = "SELECT commit_id FROM Branch WHERE name = ? AND repo_id = ? FOR UPDATE"
} else {
sqlStr = "SELECT commit_id FROM Branch WHERE name = ? AND repo_id = ?"
}

ctx, cancel := context.WithTimeout(context.Background(), option.DBOpTimeout)
defer cancel()
trans, err := seafileDB.BeginTx(ctx, nil)
if err != nil {
err := fmt.Errorf("failed to start transaction: %v", err)
return err
}
row := trans.QueryRowContext(ctx, sqlStr, name, repoID)
row = trans.QueryRowContext(ctx, sqlStr, name, repoID)
if err := row.Scan(&commitID); err != nil {
if err != sql.ErrNoRows {
trans.Rollback()
return err
return false, err
}
}
if oldCommitID != commitID {
trans.Rollback()
err := fmt.Errorf("head commit id has changed")
return err
return false, err
}

sqlStr = "UPDATE Branch SET commit_id = ? WHERE name = ? AND repo_id = ?"
_, err = trans.ExecContext(ctx, sqlStr, newCommitID, name, repoID)
if err != nil {
trans.Rollback()
return err
return false, err
}

trans.Commit()

if secondParentID != "" {
if err := onBranchUpdated(repoID, secondParentID, false); err != nil {
return err
return false, err
}
}

if err := onBranchUpdated(repoID, newCommitID, true); err != nil {
return err
return false, err
}

return nil
return false, nil
}

func onBranchUpdated(repoID string, commitID string, updateRepoInfo bool) error {
Expand Down Expand Up @@ -2726,7 +2768,7 @@ func updateDir(repoID, dirPath, newDirID, user, headID string) (string, error) {
if commitDesc == "" {
commitDesc = "Auto merge by system"
}
newCommitID, err := genNewCommit(repo, headCommit, newDirID, user, commitDesc, true)
newCommitID, err := genNewCommit(repo, headCommit, newDirID, user, commitDesc, true, "", false)
if err != nil {
err := fmt.Errorf("failed to generate new commit: %v", err)
return "", err
Expand Down Expand Up @@ -2767,7 +2809,7 @@ func updateDir(repoID, dirPath, newDirID, user, headID string) (string, error) {
commitDesc = "Auto merge by system"
}

newCommitID, err := genNewCommit(repo, headCommit, rootID, user, commitDesc, true)
newCommitID, err := genNewCommit(repo, headCommit, rootID, user, commitDesc, true, "", false)
if err != nil {
err := fmt.Errorf("failed to generate new commit: %v", err)
return "", err
Expand Down Expand Up @@ -3166,8 +3208,13 @@ func putFile(rsp http.ResponseWriter, r *http.Request, repoID, parentDir, user,
return &appError{err, "", http.StatusInternalServerError}
}

gcID, err := repomgr.GetCurrentGCID(repo.StoreID)
if err != nil {
err := fmt.Errorf("failed to get current gc id: %v", err)
return &appError{err, "", http.StatusInternalServerError}
}
desc := fmt.Sprintf("Modified \"%s\"", fileName)
_, err = genNewCommit(repo, headCommit, rootID, user, desc, true)
_, err = genNewCommit(repo, headCommit, rootID, user, desc, true, gcID, true)
if err != nil {
err := fmt.Errorf("failed to generate new commit: %v", err)
return &appError{err, "", http.StatusInternalServerError}
Expand Down Expand Up @@ -3398,8 +3445,13 @@ func commitFileBlocks(repoID, parentDir, fileName, blockIDsJSON, user string, fi
return "", &appError{err, "", http.StatusInternalServerError}
}

gcID, err := repomgr.GetCurrentGCID(repo.StoreID)
if err != nil {
err := fmt.Errorf("failed to get current gc id: %v", err)
return "", &appError{err, "", http.StatusInternalServerError}
}
desc := fmt.Sprintf("Added \"%s\"", fileName)
_, err = genNewCommit(repo, headCommit, rootID, user, desc, true)
_, err = genNewCommit(repo, headCommit, rootID, user, desc, true, gcID, true)
if err != nil {
err := fmt.Errorf("failed to generate new commit: %v", err)
return "", &appError{err, "", http.StatusInternalServerError}
Expand Down
Loading

0 comments on commit 5899f11

Please sign in to comment.