diff --git a/ds3_integration/utils/testUtils.go b/ds3_integration/utils/testUtils.go index 5b7c388..a30b521 100644 --- a/ds3_integration/utils/testUtils.go +++ b/ds3_integration/utils/testUtils.go @@ -60,7 +60,7 @@ func VerifyBookContent(t *testing.T, bookName string, actual io.ReadCloser) { verifyContent(t, expected, actual) } -func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64, actual io.ReadCloser) { +func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64, actual io.Reader) { f, err := os.Open(filePath) ds3Testing.AssertNilError(t, err) @@ -73,7 +73,7 @@ func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64 verifyPartialContent(t, *expected, actual, length) } -func verifyPartialContent(t *testing.T, expected []byte, actual io.ReadCloser, length int64) { +func verifyPartialContent(t *testing.T, expected []byte, actual io.Reader, length int64) { content, err := getNBytesFromReader(actual, length) ds3Testing.AssertNilError(t, err) diff --git a/helpers/getProducer.go b/helpers/getProducer.go index 0591b2f..bb4d31b 100644 --- a/helpers/getProducer.go +++ b/helpers/getProducer.go @@ -12,6 +12,8 @@ import ( "time" ) +const timesToRetryGettingPartialBlob = 5 + type getProducer struct { JobMasterObjectList *ds3Models.MasterObjectList //MOL from put bulk job creation GetObjects *[]helperModels.GetObject @@ -146,10 +148,13 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf return } if bytesWritten != info.blob.Length() { - err = fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length()) - producer.strategy.Listeners.Errored(info.blob.Name(), err) - info.channelBuilder.SetFatalError(err) - producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error()) + producer.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length()) + err := GetRemainingBlob(producer.client, info.bucketName, info.blob, bytesWritten, writer, producer.Logger) + if err != nil { + producer.strategy.Listeners.Errored(info.blob.Name(), err) + info.channelBuilder.SetFatalError(err) + producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error()) + } } return } @@ -166,6 +171,52 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf } } +func GetRemainingBlob(client *ds3.Client, bucketName string, blob *helperModels.BlobDescription, amountAlreadyRetrieved int64, writer io.Writer, logger sdk_log.Logger) error { + logger.Debugf("starting retry for fetching partial blob '%s' at offset '%d': amount to retrieve %d", blob.Name(), blob.Offset(), blob.Length() - amountAlreadyRetrieved) + bytesRetrievedSoFar := amountAlreadyRetrieved + timesRetried := 0 + rangeEnd := blob.Offset() + blob.Length() -1 + for bytesRetrievedSoFar < blob.Length() && timesRetried < timesToRetryGettingPartialBlob { + rangeStart := blob.Offset() + bytesRetrievedSoFar + bytesRetrievedThisRound, err := RetryGettingBlobRange(client, bucketName, blob.Name(), blob.Offset(), rangeStart, rangeEnd, writer, logger) + if err != nil { + logger.Errorf("failed to get object '%s' at offset '%d', range %d=%d attempt %d: %s", blob.Name(), blob.Offset(), rangeStart, rangeEnd, timesRetried, err.Error()) + } + bytesRetrievedSoFar+= bytesRetrievedThisRound + timesRetried++ + } + + if bytesRetrievedSoFar < blob.Length() { + return fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", blob.Name(), blob.Offset(), bytesRetrievedSoFar, blob.Length()) + } + return nil +} + +func RetryGettingBlobRange(client *ds3.Client, bucketName string, objectName string, blobOffset int64, rangeStart int64, rangeEnd int64, writer io.Writer, logger sdk_log.Logger) (int64, error) { + // perform a naked get call for the rest of the blob that we originally failed to get + partOfBlobToFetch := ds3Models.Range{ + Start: rangeStart, + End: rangeEnd, + } + getObjRequest := ds3Models.NewGetObjectRequest(bucketName, objectName). + WithOffset(blobOffset). + WithRanges(partOfBlobToFetch) + + getObjResponse, err := client.GetObject(getObjRequest) + if err != nil { + return 0, err + } + defer func() { + err := getObjResponse.Content.Close() + if err != nil { + logger.Warningf("failed to close response body for get object '%s' with range %d-%d: %v", objectName, rangeStart, rangeEnd, err) + } + }() + + bytesWritten, err := io.Copy(writer, getObjResponse.Content) //copy all content from response reader to destination writer + return bytesWritten, err +} + // Writes a range of a blob to its destination channel func writeRangeToDestination(channelBuilder helperModels.WriteChannelBuilder, blobRange ds3Models.Range, content io.Reader) error { writer, err := channelBuilder.GetChannel(blobRange.Start) diff --git a/helpers_integration/helpersImpl_test.go b/helpers_integration/helpersImpl_test.go index d93155c..65abf88 100644 --- a/helpers_integration/helpersImpl_test.go +++ b/helpers_integration/helpersImpl_test.go @@ -504,3 +504,120 @@ func TestBulkPutAndGetLotsOfFiles(t *testing.T) { t.Errorf("expected to get a BP job ID, but instead got nothing") } } + +func TestRetryGettingBlobRange(t *testing.T) { + defer testutils.DeleteBucketContents(client, testBucket) + + helper := helpers.NewHelpers(client) + strategy := newTestTransferStrategy(t) + + // Put a blobbed object to BP + const bigFilePath = LargeBookPath + LargeBookTitle + writeObj, err := getTestWriteObjectRandomAccess(LargeBookTitle, bigFilePath) + ds3Testing.AssertNilError(t, err) + + var writeObjects []helperModels.PutObject + writeObjects = append(writeObjects, *writeObj) + + putJobId, err := helper.PutObjects(testBucket, writeObjects, strategy) + ds3Testing.AssertNilError(t, err) + if putJobId == "" { + t.Error("expected to get a BP job ID, but instead got nothing") + } + + // Try to get some data from each blob + getJob, err := client.GetJobSpectraS3(ds3Models.NewGetJobSpectraS3Request(putJobId)) + ds3Testing.AssertNilError(t, err) + + blobsChecked := 0 + for _, curObj := range getJob.MasterObjectList.Objects { + for _, blob := range curObj.Objects { + func() { + // create a temp file for writing the blob to + tempFile, err := ioutil.TempFile("", "go-sdk-test-") + ds3Testing.AssertNilError(t, err) + defer func() { + tempFile.Close() + os.Remove(tempFile.Name()) + }() + + // get a range of the blob + startRange := blob.Offset+10 // retrieve subset of blob + endRange := blob.Length+blob.Offset-1 + bytesWritten, err := helpers.RetryGettingBlobRange(client, testBucket, writeObj.PutObject.Name, blob.Offset, startRange, endRange, tempFile, client.Logger) + ds3Testing.AssertNilError(t, err) + ds3Testing.AssertInt64(t, "bytes written", endRange-startRange+1, bytesWritten) + + // verify that retrieved partial blob is correct + err = tempFile.Sync() + ds3Testing.AssertNilError(t, err) + + tempFile.Seek(0, 0) + length := endRange-startRange + testutils.VerifyPartialFile(t, bigFilePath, length, startRange, tempFile) + }() + blobsChecked++ + } + } + if blobsChecked == 0 { + t.Fatalf("didn't verify any blobs") + } +} + +func TestGetRemainingBlob(t *testing.T) { + defer testutils.DeleteBucketContents(client, testBucket) + + helper := helpers.NewHelpers(client) + strategy := newTestTransferStrategy(t) + + // Put a blobbed object to BP + const bigFilePath = LargeBookPath + LargeBookTitle + writeObj, err := getTestWriteObjectRandomAccess(LargeBookTitle, bigFilePath) + ds3Testing.AssertNilError(t, err) + + var writeObjects []helperModels.PutObject + writeObjects = append(writeObjects, *writeObj) + + putJobId, err := helper.PutObjects(testBucket, writeObjects, strategy) + ds3Testing.AssertNilError(t, err) + if putJobId == "" { + t.Error("expected to get a BP job ID, but instead got nothing") + } + + // Try to get some data from each blob + getJob, err := client.GetJobSpectraS3(ds3Models.NewGetJobSpectraS3Request(putJobId)) + ds3Testing.AssertNilError(t, err) + + blobsChecked := 0 + for _, curObj := range getJob.MasterObjectList.Objects { + for _, blob := range curObj.Objects { + func() { + // create a temp file for writing the blob to + tempFile, err := ioutil.TempFile("", "go-sdk-test-") + ds3Testing.AssertNilError(t, err) + defer func() { + tempFile.Close() + os.Remove(tempFile.Name()) + }() + + // get the remainder of the blob after skipping some bytes + blob := helperModels.NewBlobDescription(*blob.Name, blob.Offset, blob.Length) + var amountToSkip int64 = 10 + err = helpers.GetRemainingBlob(client, testBucket, &blob, amountToSkip, tempFile, client.Logger) + ds3Testing.AssertNilError(t, err) + + // verify that retrieved partial blob is correct + err = tempFile.Sync() + ds3Testing.AssertNilError(t, err) + + tempFile.Seek(0, 0) + length := blob.Length() - amountToSkip + testutils.VerifyPartialFile(t, bigFilePath, length, blob.Offset()+amountToSkip, tempFile) + }() + blobsChecked++ + } + } + if blobsChecked == 0 { + t.Fatalf("didn't verify any blobs") + } +}