Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel repair and batch key for pause cancel upload #1713

Draft
wants to merge 4 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 77 additions & 29 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import (
const FileOperationInsert = "insert"

var (
downloadDirContextMap = make(map[string]context.CancelCauseFunc)
downloadDirLock = sync.Mutex{}
opCancelContextMap = make(map[string]context.CancelCauseFunc)
opCancelLock = sync.Mutex{}
ErrUnderRepair = errors.New("allocation is under repair")
)

// listObjects list allocation objects from its blobbers
Expand Down Expand Up @@ -73,27 +74,45 @@ func listObjectsFromAuthTicket(allocationID, authTicket, lookupHash string, offs
}

// cancelUpload cancel the upload operation of the file
// - allocationID is the allocation id
// - remotePath is the remote path of the file
func cancelUpload(allocationID, remotePath string) error {
allocationObj, err := getAllocation(allocationID)
// - batchKey is the batch key of the operation
func cancelUpload(batchKey string) error {
opCancelLock.Lock()
defer opCancelLock.Unlock()
if cancel, ok := opCancelContextMap[batchKey]; ok {
cancel(sdk.ErrCancelUpload)
} else {
return errors.New("invalid batch key")
}
return nil
}

// pauseUpload pause the upload operation of the file
// - batchKey is the batch key of the operation
func pauseUpload(batchKey string) error {
opCancelLock.Lock()
defer opCancelLock.Unlock()
if cancel, ok := opCancelContextMap[batchKey]; ok {
cancel(sdk.ErrPauseUpload)
} else {
return errors.New("invalid batch key")
}
return nil
}

func cancelRepair(allocationID string) error {
alloc, err := getAllocation(allocationID)
if err != nil {
PrintError("Error fetching the allocation", err)
return err
}
return allocationObj.CancelUpload(remotePath)
return alloc.CancelRepair()
}

// pauseUpload pause the upload operation of the file
// - allocationID is the allocation id
// - remotePath is the remote path of the file
func pauseUpload(allocationID, remotePath string) error {
allocationObj, err := getAllocation(allocationID)
func cancelDownload(allocationID, remotePath string) error {
alloc, err := getAllocation(allocationID)
if err != nil {
PrintError("Error fetching the allocation", err)
return err
}
return allocationObj.PauseUpload(remotePath)
return alloc.CancelDownload(remotePath)
}

// createDir create a directory on blobbers
Expand Down Expand Up @@ -600,10 +619,11 @@ type MultiDownloadOption struct {
// ## Inputs
// - allocationID
// - jsonMultiUploadOptions: Json Array of MultiOperationOption. eg: "[{"operationType":"move","remotePath":"/README.md","destPath":"/folder1/"},{"operationType":"delete","remotePath":"/t3.txt"}]"
// - batchKey: batch key for the operation
//
// ## Outputs
// - error
func MultiOperation(allocationID string, jsonMultiUploadOptions string) error {
func MultiOperation(allocationID, jsonMultiUploadOptions string) error {
if allocationID == "" {
return errors.New("AllocationID is required")
}
Expand Down Expand Up @@ -632,6 +652,9 @@ func MultiOperation(allocationID string, jsonMultiUploadOptions string) error {
if err != nil {
return err
}
if allocationObj.IsUnderRepair() {
return ErrUnderRepair
}
return allocationObj.DoMultiOperation(operations)
}

Expand Down Expand Up @@ -697,7 +720,7 @@ func setUploadMode(mode int) {

// multiUpload upload multiple files in parallel
// - jsonBulkUploadOptions is the json array of BulkUploadOption. Follows the BulkUploadOption struct
func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
func multiUpload(jsonBulkUploadOptions, batchKey string) (MultiUploadResult, error) {
defer func() {
if r := recover(); r != nil {
PrintError("Recovered in multiupload Error", r)
Expand Down Expand Up @@ -730,6 +753,11 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
result.Success = false
return result, err
}
if allocationObj.IsUnderRepair() {
result.Error = ErrUnderRepair.Error()
result.Success = false
return result, ErrUnderRepair
}

operationRequests := make([]sdk.OperationRequest, n)
for idx, option := range options {
Expand Down Expand Up @@ -812,14 +840,34 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
}

}
err = allocationObj.DoMultiOperation(operationRequests)
if err != nil {
result.Error = err.Error()
ctx, cancel := context.WithCancelCause(context.Background())
defer cancel(nil)
opCancelLock.Lock()
opCancelContextMap[batchKey] = cancel
opCancelLock.Unlock()
defer func() {
opCancelLock.Lock()
delete(opCancelContextMap, batchKey)
opCancelLock.Unlock()
}()
errChan := make(chan error, 1)
go func() {
errChan <- allocationObj.DoMultiOperation(operationRequests, sdk.WithContext(ctx))
}()
select {
case <-ctx.Done():
result.Error = ctx.Err().Error()
result.Success = false
return result, err
return result, ctx.Err()
case err := <-errChan:
if err != nil {
result.Error = err.Error()
result.Success = false
return result, err
}
result.Success = true
return result, nil
}
result.Success = true
return result, nil
}

func uploadWithJsFuncs(allocationID, remotePath string, readChunkFuncName string, fileSize int64, thumbnailBytes []byte, webStreaming, encrypt, isUpdate, isRepair bool, numBlocks int, callbackFuncName string) (bool, error) {
Expand Down Expand Up @@ -1227,9 +1275,9 @@ func downloadDirectory(allocationID, remotePath, authticket, callbackFuncName st
go func() {
errChan <- alloc.DownloadDirectory(ctx, remotePath, "", authticket, statusBar)
}()
downloadDirLock.Lock()
downloadDirContextMap[remotePath] = cancel
downloadDirLock.Unlock()
opCancelLock.Lock()
opCancelContextMap[remotePath] = cancel
opCancelLock.Unlock()
select {
case err = <-errChan:
if err != nil {
Expand All @@ -1244,12 +1292,12 @@ func downloadDirectory(allocationID, remotePath, authticket, callbackFuncName st
// cancelDownloadDirectory cancel the download directory operation
// - remotePath : remote path of the directory
func cancelDownloadDirectory(remotePath string) {
downloadDirLock.Lock()
cancel, ok := downloadDirContextMap[remotePath]
opCancelLock.Lock()
cancel, ok := opCancelContextMap[remotePath]
if ok {
cancel(errors.New("download directory canceled by user"))
}
downloadDirLock.Unlock()
opCancelLock.Unlock()
}

func cancelDownloadBlocks(allocationID, remotePath string, start, end int64) error {
Expand Down
2 changes: 2 additions & 0 deletions wasmsdk/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ func main() {
"downloadDirectory": downloadDirectory,
"cancelDownloadDirectory": cancelDownloadDirectory,
"cancelDownloadBlocks": cancelDownloadBlocks,
"cancelRepair": cancelRepair,
"cancelDownload": cancelDownload,

// player
"play": play,
Expand Down
30 changes: 22 additions & 8 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2924,18 +2924,21 @@ func (a *Allocation) StartRepair(localRootPath, pathToRepair string, statusCB St
return err
}
}

repairCtx, repairCtxCancel := context.WithCancel(a.ctx)
repairReq := &RepairRequest{
listDir: listDir,
localRootPath: localRootPath,
statusCB: statusCB,
repairPath: pathToRepair,
listDir: listDir,
localRootPath: localRootPath,
statusCB: statusCB,
repairPath: pathToRepair,
repairCtx: repairCtx,
repairCtxCancel: repairCtxCancel,
}

repairReq.completedCallback = func() {
a.mutex.Lock()
defer a.mutex.Unlock()
a.repairRequestInProgress = nil
repairCtxCancel()
}

go func() {
Expand Down Expand Up @@ -2975,9 +2978,11 @@ func (a *Allocation) RepairSize(remotePath string) (RepairSize, error) {
if err != nil {
return RepairSize{}, err
}

repairCtx, repairCtxCancel := context.WithCancel(a.ctx)
repairReq := RepairRequest{
allocation: a,
allocation: a,
repairCtx: repairCtx,
repairCtxCancel: repairCtxCancel,
}
return repairReq.Size(context.Background(), dir)
}
Expand All @@ -2992,7 +2997,7 @@ func (a *Allocation) CancelUpload(remotePath string) error {
if !ok {
return errors.New("remote_path_not_found", "Invalid path. No upload in progress for the path "+remotePath)
} else {
cancelFunc(fmt.Errorf("upload canceled by user"))
cancelFunc(ErrCancelUpload)
}
return nil
}
Expand All @@ -3017,13 +3022,22 @@ func (a *Allocation) PauseUpload(remotePath string) error {
// CancelRepair cancels the repair operation for the allocation.
// It cancels the repair operation and returns an error if no repair is in progress for the allocation.
func (a *Allocation) CancelRepair() error {
a.mutex.Lock()
defer a.mutex.Unlock()
if a.repairRequestInProgress != nil {
a.repairRequestInProgress.isRepairCanceled = true
a.repairRequestInProgress.repairCtxCancel()
return nil
}
return errors.New("invalid_cancel_repair_request", "No repair in progress for the allocation")
}

func (a *Allocation) IsUnderRepair() bool {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.repairRequestInProgress != nil
}

func (a *Allocation) GetMaxWriteReadFromBlobbers(blobbers []*BlobberAllocation) (maxW float64, maxR float64, err error) {
if !a.isInitialized() {
return 0, 0, notInitialized
Expand Down
9 changes: 7 additions & 2 deletions zboxcore/sdk/allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ func TestAllocation_dispatchWork(t *testing.T) {
})
t.Run("Test_Cover_Repair_Request", func(t *testing.T) {
go a.dispatchWork(context.Background())
a.repairChan <- &RepairRequest{listDir: &ListResult{}}
repairCtx, repairCtxCancel := context.WithCancel(context.Background())
a.repairChan <- &RepairRequest{listDir: &ListResult{}, repairCtx: repairCtx, repairCtxCancel: repairCtxCancel}
})
}

Expand Down Expand Up @@ -2300,7 +2301,11 @@ func TestAllocation_CancelRepair(t *testing.T) {
{
name: "Test_Success",
setup: func(t *testing.T, a *Allocation) (teardown func(t *testing.T)) {
a.repairRequestInProgress = &RepairRequest{}
ctx, cancel := context.WithCancel(context.Background())
a.repairRequestInProgress = &RepairRequest{
repairCtx: ctx,
repairCtxCancel: cancel,
}
return nil
},
},
Expand Down
5 changes: 5 additions & 0 deletions zboxcore/sdk/listworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ func (req *ListRequest) getlistFromBlobbers() ([]*listResponse, error) {
consensusMap := make(map[string][]*blockchain.StorageNode)
var consensusHash string
errCnt := 0
if numList == 0 {
return nil, errors.New("no blobbers", "getlistFromBlobbers")
}
l.Logger.Debug("getListFromBlobbers: ", numList)
for i := 0; i < numList; i++ {
listInfos[i] = <-rspCh
if !req.forRepair {
Expand Down Expand Up @@ -212,6 +216,7 @@ func (req *ListRequest) getlistFromBlobbers() ([]*listResponse, error) {
return listInfos, listInfos[0].err
}
req.listOnly = true
l.Logger.Debug("listInfos: ", len(listInfos))
listInfos = listInfos[:1]
listOnlyRespCh := make(chan *listResponse, 1)
for i := 0; i < listLen; i++ {
Expand Down
13 changes: 12 additions & 1 deletion zboxcore/sdk/multi_operation_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,17 @@ type MultiOperationOption func(mo *MultiOperation)

func WithRepair() MultiOperationOption {
return func(mo *MultiOperation) {
mo.Consensus.consensusThresh = 0
mo.Consensus.consensusThresh = 1
mo.isRepair = true
}
}

func WithContext(ctx context.Context) MultiOperationOption {
return func(mo *MultiOperation) {
mo.ctx, mo.ctxCncl = context.WithCancelCause(ctx)
}
}

type Operationer interface {
Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error)
buildChange(refs []fileref.RefEntity, uid uuid.UUID) []allocationchange.AllocationChange
Expand Down Expand Up @@ -321,6 +327,11 @@ func (mo *MultiOperation) Process() error {
l.Logger.Error("consensus not met", activeBlobbers, mo.consensusThresh)
return errors.New("consensus_not_met", fmt.Sprintf("Active blobbers %d is less than consensus threshold %d", activeBlobbers, mo.consensusThresh))
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if mo.allocationObj.StorageVersion == StorageV2 {
return mo.commitV2()
}
Expand Down
Loading
Loading