From 3408227af6066cffc057ac5b8fa4b08f0e2cf955 Mon Sep 17 00:00:00 2001 From: Peter Nemere Date: Fri, 8 Nov 2024 10:47:53 +1000 Subject: [PATCH 1/2] Added archive optimiser update mode, so we can copy back to the bucket. This is temporary --- .../dataset-archive-optimiser/main.go | 137 ++++++++++++++++-- 1 file changed, 128 insertions(+), 9 deletions(-) diff --git a/internal/cmd-line-tools/dataset-archive-optimiser/main.go b/internal/cmd-line-tools/dataset-archive-optimiser/main.go index 7d81e75e..09a20d80 100644 --- a/internal/cmd-line-tools/dataset-archive-optimiser/main.go +++ b/internal/cmd-line-tools/dataset-archive-optimiser/main.go @@ -3,14 +3,17 @@ package main import ( "archive/zip" "context" + "errors" "flag" "fmt" "log" "os" "path" "path/filepath" + "strings" "time" + "github.com/aws/aws-sdk-go/aws/session" "github.com/pixlise/core/v4/api/dataimport/datasetArchive" "github.com/pixlise/core/v4/api/dbCollections" "github.com/pixlise/core/v4/core/awsutil" @@ -25,18 +28,22 @@ import ( var t0 = time.Now().UnixMilli() +var destMongoSecret string +var dbName string +var sourceDataBucket string +var destDataBucket string +var updateScript string + func main() { fmt.Printf("Started: %v\n", time.Now().String()) - var destMongoSecret string - var dbName string - var sourceDataBucket string - var destDataBucket string + var mode string flag.StringVar(&destMongoSecret, "destMongoSecret", "", "Destination mongo DB secret") flag.StringVar(&dbName, "dbName", "", "DB name we're importing to") flag.StringVar(&sourceDataBucket, "sourceDataBucket", "", "Data bucket so we can read archive zips") flag.StringVar(&destDataBucket, "destDataBucket", "", "Data bucket we're writing optimised archives to") + flag.StringVar(&mode, "mode", "", "combine (to download zips and combine them into one) or update (to put the combined zips back to the source)") flag.Parse() @@ -44,13 +51,16 @@ func main() { checkNotEmpty := []string{ sourceDataBucket, destDataBucket, - dbName, } checkNotEmptyName := []string{ "sourceDataBucket", "destDataBucket", - "dbName", } + if mode == "combine" { + checkNotEmptyName = append(checkNotEmptyName, "dbName") + checkNotEmpty = append(checkNotEmpty, dbName) + } + for c, s := range checkNotEmpty { if len(s) <= 0 { log.Fatalf("Parameter: %v was empty", checkNotEmptyName[c]) @@ -75,6 +85,114 @@ func main() { iLog := &logger.StdOutLogger{} iLog.SetLogLevel(logger.LogInfo) + if mode == "combine" { + combine(sess, iLog, remoteFS, &localFS) + } else if mode == "update" { + update(remoteFS, &localFS) + } else { + fatalError(errors.New("Unknown mode: " + mode)) + } + + printFinishStats() +} + +func update(remoteFS fileaccess.FileAccess, localFS fileaccess.FileAccess) { + archivePrefix := "Archive/" + archived, err := remoteFS.ListObjects(destDataBucket, archivePrefix) + if err != nil { + fatalError(err) + } + + rttToArchiveFiles := map[string][]string{} + for _, f := range archived { + f = f[len(archivePrefix):] + pos := strings.Index(f, "-") + if pos > 0 { + rtt := f[0:pos] + if files, ok := rttToArchiveFiles[rtt]; !ok { + rttToArchiveFiles[rtt] = []string{f} + } else { + files = append(files, f) + rttToArchiveFiles[rtt] = files + } + } + } + + // Loop through and delete any with just 1 zip file, we don't need to optimise those + for rtt, files := range rttToArchiveFiles { + if len(files) < 2 { + delete(rttToArchiveFiles, rtt) + } + } + + // Handle each file + for rtt, files := range rttToArchiveFiles { + updateForRTT(rtt, files, remoteFS) + } + + fmt.Println(updateScript) +} + +func updateForRTT(rtt string, allZips []string, remoteFS fileaccess.FileAccess) { + // See if we have any optimised files for this rtt + archOptPrefix := "Archive-Optimised/" + files, err := remoteFS.ListObjects(sourceDataBucket, archOptPrefix+rtt+"-") + if err != nil { + fmt.Printf("Failed to read archived files for rtt: %v, err: %v", rtt, err) + return + } + + if len(files) < 1 { + fmt.Printf("No optimised archive for rtt: %v\n", rtt) + return + } else if len(files) > 1 { + fmt.Printf("Too many optimised archive for rtt: %v\n", rtt) + return + } + + archivedZipName := files[0][len(archOptPrefix):] + + // Make sure the optimised file matches one in the list + if !utils.ItemInSlice(archivedZipName, allZips) { + fmt.Printf("Optimised archive name doesn't match any source files for: %v\n", rtt) + return + } + + // Delete all files older than or equal to this + optimisedTS, err := getTimestamp(archivedZipName) + if err != nil { + fmt.Printf("Failed to read timestamp for: %v", archivedZipName) + return + } + + for _, f := range allZips { + needUpdate := false + if f == archivedZipName { + needUpdate = true + } else { + ts, err := getTimestamp(f) + if err != nil { + fmt.Printf("Failed to read timestamp for: %v", f) + return + } + needUpdate = ts < optimisedTS + } + + if needUpdate { + updateScript += fmt.Sprintf("aws s3 rm s3://%v/%v\n", destDataBucket, "Archive/"+f) + } + } + + // Copy the optimised file back + updateScript += fmt.Sprintf("aws s3 cp s3://%v/%v s3://%v/%v\n", sourceDataBucket, "Archive-Optimised/"+archivedZipName, destDataBucket, "Archive/") +} + +func getTimestamp(fileName string) (int, error) { + _ /*expecting this to match already due to dir listing*/, timeStamp, err := datasetArchive.DecodeArchiveFileName(fileName) + return timeStamp, err +} + +func combine(sess *session.Session, iLog logger.ILogger, remoteFS fileaccess.FileAccess, localFS fileaccess.FileAccess) { // Connect to mongo destMongoClient, _, err := mongoDBConnection.Connect(sess, destMongoSecret, iLog) if err != nil { @@ -101,6 +219,9 @@ func main() { // Loop through all scans and read each archive one-by-one workingDir, err := os.MkdirTemp("", "archive-fix-") + if err != nil { + log.Fatalln(err) + } l := &logger.StdOutLogger{} @@ -116,7 +237,7 @@ func main() { l.Infof("============================================================") l.Infof(">>> Downloading archives for scan: %v (%v)", scan.Title, scan.Id) - archive := datasetArchive.NewDatasetArchiveDownloader(remoteFS, &localFS, l, sourceDataBucket, "" /* not needed */) + archive := datasetArchive.NewDatasetArchiveDownloader(remoteFS, localFS, l, sourceDataBucket, "" /* not needed */) _ /*localDownloadPath*/, localUnzippedPath, zipCount, lastZipName, err := archive.DownloadFromDatasetArchive(scan.Id, workingDir) if err != nil { log.Fatalf("Failed to download archive for scan %v: %v", scan.Id, err) @@ -182,8 +303,6 @@ func main() { // Delete all downloaded files and created zip } } - - printFinishStats() } func getWithoutVersion(fileName string) string { From 7a0c9225c502f7b0c91864772c7fd174f69236c7 Mon Sep 17 00:00:00 2001 From: Peter Nemere Date: Mon, 11 Nov 2024 09:52:01 +1000 Subject: [PATCH 2/2] Archive tool fixed to run in place, less options, it now just read all zips for a scan, if there is more than 1, zips it, uploads the zip (overwriting the latest file), then deletes the others --- api/dataimport/datasetArchive/download.go | 28 +- api/dataimport/import.go | 6 +- .../dataset-archive-optimiser/main.go | 326 ++++++++---------- 3 files changed, 161 insertions(+), 199 deletions(-) diff --git a/api/dataimport/datasetArchive/download.go b/api/dataimport/datasetArchive/download.go index 0e7ed325..122da83f 100644 --- a/api/dataimport/datasetArchive/download.go +++ b/api/dataimport/datasetArchive/download.go @@ -59,7 +59,7 @@ func NewDatasetArchiveDownloader( // Unzipped files path (archive zips unzipped here), // How many zips loaded from archive // Error (if any) -func (dl *DatasetArchiveDownloader) DownloadFromDatasetArchive(datasetID string, workingDir string) (string, string, int, string, error) { +func (dl *DatasetArchiveDownloader) DownloadFromDatasetArchive(datasetID string, workingDir string) (string, string, []string, error) { // Create a directories to process data in dl.log.Debugf("Preparing to download archived dataset %v...", datasetID) @@ -67,27 +67,27 @@ func (dl *DatasetArchiveDownloader) DownloadFromDatasetArchive(datasetID string, if err != nil { err = fmt.Errorf("Failed to generate directory for importer downloads: %v", err) //dl.log.Errorf("%v", err) - return "", "", 0, "", err + return "", "", []string{}, err } unzippedPath, err := fileaccess.MakeEmptyLocalDirectory(workingDir, "unzipped") if err != nil { err = fmt.Errorf("Failed to generate directory for importer unzips: %v", err) //dl.log.Errorf("%v", err) - return "", "", 0, "", err + return "", "", []string{}, err } // Download all zip files from archive for this dataset ID, and extract them as required dl.log.Debugf("Downloading archived zip files...") - zipCount, lastZipName, err := dl.downloadArchivedZipsForDataset(datasetID, downloadPath, unzippedPath) + zipFilesOrdered, err := dl.downloadArchivedZipsForDataset(datasetID, downloadPath, unzippedPath) if err != nil { err = fmt.Errorf("Failed to download archived zip files for dataset ID: %v. Error: %v", datasetID, err) //dl.log.Errorf("%v", err) - return downloadPath, unzippedPath, zipCount, lastZipName, err + return downloadPath, unzippedPath, zipFilesOrdered, err } - dl.log.Debugf("Dataset %v downloaded %v zip files from archive", datasetID, zipCount) - return downloadPath, unzippedPath, zipCount, lastZipName, nil + dl.log.Debugf("Dataset %v downloaded %v zip files from archive", datasetID, len(zipFilesOrdered)) + return downloadPath, unzippedPath, zipFilesOrdered, nil } func (dl *DatasetArchiveDownloader) DownloadPseudoIntensityRangesFile(configBucket string, downloadPath string, version string) (string, error) { @@ -121,7 +121,7 @@ func (dl *DatasetArchiveDownloader) fetchFile(bucketFrom string, pathFrom string // Returns 2 things: // Number of zips loaded // Error if there was one -func (dl *DatasetArchiveDownloader) downloadArchivedZipsForDataset(datasetID string, downloadPath string, unzippedPath string) (int, string, error) { +func (dl *DatasetArchiveDownloader) downloadArchivedZipsForDataset(datasetID string, downloadPath string, unzippedPath string) ([]string, error) { // Download all zip files that have the dataset ID prefixed in their file name // Unzip them in timestamp order into downloadPath archiveSearchPath := path.Join(filepaths.RootArchive, datasetID) @@ -129,14 +129,14 @@ func (dl *DatasetArchiveDownloader) downloadArchivedZipsForDataset(datasetID str archivedFiles, err := dl.remoteFS.ListObjects(dl.datasetBucket, archiveSearchPath) if err != nil { - return 0, "", err + return []string{}, err } orderedArchivedFiles, err := getOrderedArchiveFiles(archivedFiles) if err != nil { // Stop here if we find a bad file - return 0, "", err + return []string{}, err } fileCount := 0 @@ -144,14 +144,14 @@ func (dl *DatasetArchiveDownloader) downloadArchivedZipsForDataset(datasetID str for _, filePath := range orderedArchivedFiles { fileName := path.Base(filePath) if !strings.HasSuffix(fileName, ".zip") { - return 0, "", errors.New("Expected zip file, got: " + fileName) + return []string{}, errors.New("Expected zip file, got: " + fileName) } savePath := filepath.Join(downloadPath, fileName) err = dl.fetchFile(dl.datasetBucket, filePath, savePath) if err != nil { - return 0, "", err + return []string{}, err } dl.log.Debugf("Unzipping: \"%v\"", savePath) @@ -159,7 +159,7 @@ func (dl *DatasetArchiveDownloader) downloadArchivedZipsForDataset(datasetID str // Unzip the file unzippedFileNames, err := utils.UnzipDirectory(savePath, unzippedPath, false) if err != nil { - return 0, "", err + return []string{}, err } fileCount += len(unzippedFileNames) @@ -181,7 +181,7 @@ func (dl *DatasetArchiveDownloader) downloadArchivedZipsForDataset(datasetID str } dl.log.Infof("Downloaded %v zip files, unzipped %v files. Last file name: %v", len(orderedArchivedFiles), fileCount, lastFileName) - return len(orderedArchivedFiles), filepath.Base(lastFileName), nil + return orderedArchivedFiles, nil } func (dl *DatasetArchiveDownloader) DownloadUserCustomisationsForDataset(datasetID string, downloadPath string) error { diff --git a/api/dataimport/import.go b/api/dataimport/import.go index 53c63f33..73a07aae 100644 --- a/api/dataimport/import.go +++ b/api/dataimport/import.go @@ -81,13 +81,13 @@ func ImportDataset( // Firstly, we download from the archive archive := datasetArchive.NewDatasetArchiveDownloader(remoteFS, localFS, log, datasetBucket, manualUploadBucket) - localDownloadPath, localUnzippedPath, zipCount, _, err := archive.DownloadFromDatasetArchive(datasetID, workingDir) + localDownloadPath, localUnzippedPath, zipFiles, err := archive.DownloadFromDatasetArchive(datasetID, workingDir) if err != nil { return workingDir, savedSummary, "", false, err } // If no zip files were loaded, maybe this dataset is a manually uploaded one, try to import from there instead - if zipCount == 0 { + if len(zipFiles) == 0 { log.Infof("No zip files found in archive, dataset may have been manually uploaded. Trying to download...") localDownloadPath, localUnzippedPath, err = archive.DownloadFromDatasetUploads(datasetID, workingDir) if err != nil { @@ -154,7 +154,7 @@ func ImportDataset( } } - return workingDir, savedSummary, updatenotificationtype, !justArchived && zipCount > 1, err + return workingDir, savedSummary, updatenotificationtype, !justArchived && len(zipFiles) > 1, err } // ImportFromLocalFileSystem - As the name says, imports from directory on local file system diff --git a/internal/cmd-line-tools/dataset-archive-optimiser/main.go b/internal/cmd-line-tools/dataset-archive-optimiser/main.go index 09a20d80..071f4d77 100644 --- a/internal/cmd-line-tools/dataset-archive-optimiser/main.go +++ b/internal/cmd-line-tools/dataset-archive-optimiser/main.go @@ -3,7 +3,6 @@ package main import ( "archive/zip" "context" - "errors" "flag" "fmt" "log" @@ -30,35 +29,25 @@ var t0 = time.Now().UnixMilli() var destMongoSecret string var dbName string -var sourceDataBucket string -var destDataBucket string -var updateScript string +var dataBucket string func main() { fmt.Printf("Started: %v\n", time.Now().String()) - var mode string - flag.StringVar(&destMongoSecret, "destMongoSecret", "", "Destination mongo DB secret") flag.StringVar(&dbName, "dbName", "", "DB name we're importing to") - flag.StringVar(&sourceDataBucket, "sourceDataBucket", "", "Data bucket so we can read archive zips") - flag.StringVar(&destDataBucket, "destDataBucket", "", "Data bucket we're writing optimised archives to") - flag.StringVar(&mode, "mode", "", "combine (to download zips and combine them into one) or update (to put the combined zips back to the source)") + flag.StringVar(&dataBucket, "dataBucket", "", "Data bucket so we can read archive zips and write back optimised zips") flag.Parse() // Check they're not empty checkNotEmpty := []string{ - sourceDataBucket, - destDataBucket, + dataBucket, + dbName, } checkNotEmptyName := []string{ - "sourceDataBucket", - "destDataBucket", - } - if mode == "combine" { - checkNotEmptyName = append(checkNotEmptyName, "dbName") - checkNotEmpty = append(checkNotEmpty, dbName) + "dataBucket", + "dbName", } for c, s := range checkNotEmpty { @@ -79,26 +68,61 @@ func main() { } remoteFS := fileaccess.MakeS3Access(s3svc) - localFS := fileaccess.FSAccess{} // Init logger - this used to be local=stdout, cloud env=cloudwatch, but we now write all logs to stdout iLog := &logger.StdOutLogger{} iLog.SetLogLevel(logger.LogInfo) - if mode == "combine" { - combine(sess, iLog, remoteFS, &localFS) - } else if mode == "update" { - update(remoteFS, &localFS) - } else { - fatalError(errors.New("Unknown mode: " + mode)) - } + rtts := readRTTs(sess, iLog) + optimise(rtts, &remoteFS, iLog) printFinishStats() } -func update(remoteFS fileaccess.FileAccess, localFS fileaccess.FileAccess) { +func readRTTs(sess *session.Session, iLog logger.ILogger) map[string]string { + // Connect to mongo + destMongoClient, _, err := mongoDBConnection.Connect(sess, destMongoSecret, iLog) + if err != nil { + fatalError(err) + } + + // Destination DB is the new pixlise one + destDB := destMongoClient.Database(dbName) //mongoDBConnection.GetDatabaseName("pixlise", destEnvName)) + + // Verify the dataset is valid + ctx := context.TODO() + coll := destDB.Collection(dbCollections.ScansName) + + cursor, err := coll.Find(ctx, bson.M{}, options.Find()) + if err != nil { + fatalError(err) + } + + scans := []*protos.ScanItem{} + err = cursor.All(ctx, &scans) + if err != nil { + fatalError(err) + } + + rtts := map[string]string{} + skip := []string{} + for _, scan := range scans { + if utils.ItemInSlice(scan.Id, skip) { + iLog.Infof("Skipping scan id: %v", scan.Id) + continue + } + + if scan.Instrument == protos.ScanInstrument_PIXL_FM { + rtts[scan.Id] = scan.Title + } + } + + return rtts +} + +func optimise(rtts map[string]string, remoteFS fileaccess.FileAccess, iLog logger.ILogger) { archivePrefix := "Archive/" - archived, err := remoteFS.ListObjects(destDataBucket, archivePrefix) + archived, err := remoteFS.ListObjects(dataBucket, archivePrefix) if err != nil { fatalError(err) } @@ -109,11 +133,15 @@ func update(remoteFS fileaccess.FileAccess, localFS fileaccess.FileAccess) { pos := strings.Index(f, "-") if pos > 0 { rtt := f[0:pos] - if files, ok := rttToArchiveFiles[rtt]; !ok { - rttToArchiveFiles[rtt] = []string{f} - } else { - files = append(files, f) - rttToArchiveFiles[rtt] = files + + // Make sure it's one of the ones we're interested in optimising (it's for an actual PIXL scan) + if _, ok := rtts[rtt]; ok { + if files, ok := rttToArchiveFiles[rtt]; !ok { + rttToArchiveFiles[rtt] = []string{f} + } else { + files = append(files, f) + rttToArchiveFiles[rtt] = files + } } } } @@ -125,188 +153,122 @@ func update(remoteFS fileaccess.FileAccess, localFS fileaccess.FileAccess) { } } - // Handle each file - for rtt, files := range rttToArchiveFiles { - updateForRTT(rtt, files, remoteFS) - } - - fmt.Println(updateScript) -} - -func updateForRTT(rtt string, allZips []string, remoteFS fileaccess.FileAccess) { - // See if we have any optimised files for this rtt - archOptPrefix := "Archive-Optimised/" - files, err := remoteFS.ListObjects(sourceDataBucket, archOptPrefix+rtt+"-") - if err != nil { - fmt.Printf("Failed to read archived files for rtt: %v, err: %v", rtt, err) - return - } - - if len(files) < 1 { - fmt.Printf("No optimised archive for rtt: %v\n", rtt) - return - } else if len(files) > 1 { - fmt.Printf("Too many optimised archive for rtt: %v\n", rtt) - return - } - - archivedZipName := files[0][len(archOptPrefix):] - - // Make sure the optimised file matches one in the list - if !utils.ItemInSlice(archivedZipName, allZips) { - fmt.Printf("Optimised archive name doesn't match any source files for: %v\n", rtt) - return - } - - // Delete all files older than or equal to this - optimisedTS, err := getTimestamp(archivedZipName) + // Loop through all scans and read each archive one-by-one + workingDir, err := os.MkdirTemp("", "archive-fix-") if err != nil { - fmt.Printf("Failed to read timestamp for: %v", archivedZipName) - return + fatalError(err) } - for _, f := range allZips { - needUpdate := false - if f == archivedZipName { - needUpdate = true + // Handle each file + for rtt, _ := range rttToArchiveFiles { + localArchivePath, zipFilesOptimised, err := makeOptimisedArchive(rtt, rtts[rtt], remoteFS, workingDir, iLog) + if err != nil { + fmt.Printf("Error creating optimised archive for %v: %v\n", rtt, err) } else { - ts, err := getTimestamp(f) + // Upload the optimised file (should be overwriting the latest one) + err = upload(localArchivePath, "Archive", remoteFS, iLog) if err != nil { - fmt.Printf("Failed to read timestamp for: %v", f) - return + fmt.Printf("FAILED TO UPLOAD archive file %v: %v\n", localArchivePath, err) } - needUpdate = ts < optimisedTS - } - if needUpdate { - updateScript += fmt.Sprintf("aws s3 rm s3://%v/%v\n", destDataBucket, "Archive/"+f) + // Delete the zips that we are replacing + for _, zipFile := range zipFilesOptimised { + + // Don't delete what we just uploaded! + if !strings.HasSuffix(localArchivePath, zipFile) { + zipPath := path.Join("Archive", zipFile) + err = remoteFS.DeleteObject(dataBucket, zipPath) + if err != nil { + fmt.Printf("Error deleting archive file %v: %v\n", zipPath, err) + } + } + } } } - - // Copy the optimised file back - updateScript += fmt.Sprintf("aws s3 cp s3://%v/%v s3://%v/%v\n", sourceDataBucket, "Archive-Optimised/"+archivedZipName, destDataBucket, "Archive/") } -func getTimestamp(fileName string) (int, error) { - _ /*expecting this to match already due to dir listing*/, timeStamp, err := datasetArchive.DecodeArchiveFileName(fileName) - return timeStamp, err -} - -func combine(sess *session.Session, iLog logger.ILogger, remoteFS fileaccess.FileAccess, localFS fileaccess.FileAccess) { - // Connect to mongo - destMongoClient, _, err := mongoDBConnection.Connect(sess, destMongoSecret, iLog) - if err != nil { - fatalError(err) - } - - // Destination DB is the new pixlise one - destDB := destMongoClient.Database(dbName) //mongoDBConnection.GetDatabaseName("pixlise", destEnvName)) +func makeOptimisedArchive(rtt string, scanTitle string, remoteFS fileaccess.FileAccess, workingDir string, iLog logger.ILogger) (string, []string, error) { + l := &logger.StdOutLogger{} + localFS := fileaccess.FSAccess{} - // Verify the dataset is valid - ctx := context.TODO() - coll := destDB.Collection(dbCollections.ScansName) + l.Infof("") + l.Infof("============================================================") + l.Infof(">>> Downloading archives for scan: %v (%v)", scanTitle, rtt) - cursor, err := coll.Find(ctx, bson.M{}, options.Find()) + archive := datasetArchive.NewDatasetArchiveDownloader(remoteFS, &localFS, l, dataBucket, "" /* not needed */) + _ /*localDownloadPath*/, localUnzippedPath, zipFilesOrdered, err := archive.DownloadFromDatasetArchive(rtt, workingDir) if err != nil { - log.Fatalln(err) + return "", []string{}, fmt.Errorf("Failed to download archive for scan %v: %v", rtt, err) } - scans := []*protos.ScanItem{} - err = cursor.All(ctx, &scans) - if err != nil { - log.Fatalln(err) + zipCount := len(zipFilesOrdered) + if zipCount == 1 { + l.Infof("Only one zip was loaded, nothing to optimise...") } - - // Loop through all scans and read each archive one-by-one - workingDir, err := os.MkdirTemp("", "archive-fix-") - if err != nil { - log.Fatalln(err) + if zipCount <= 1 { + // Stuff already logged... l.Infof("No archive zip files found for scan %v\n", scan.Id) + return "", zipFilesOrdered, nil } - l := &logger.StdOutLogger{} - - skip := []string{} - for _, scan := range scans { - if utils.ItemInSlice(scan.Id, skip) { - l.Infof("Skipping scan id: %v", scan.Id) - continue - } - - if scan.Instrument == protos.ScanInstrument_PIXL_FM { - l.Infof("") - l.Infof("============================================================") - l.Infof(">>> Downloading archives for scan: %v (%v)", scan.Title, scan.Id) - - archive := datasetArchive.NewDatasetArchiveDownloader(remoteFS, localFS, l, sourceDataBucket, "" /* not needed */) - _ /*localDownloadPath*/, localUnzippedPath, zipCount, lastZipName, err := archive.DownloadFromDatasetArchive(scan.Id, workingDir) - if err != nil { - log.Fatalf("Failed to download archive for scan %v: %v", scan.Id, err) - } + lastZipName := zipFilesOrdered[len(zipFilesOrdered)-1] - if zipCount == 1 { - l.Infof("Only one zip was loaded, nothing to optimise...") - } - if zipCount <= 1 { - // Stuff already logged... l.Infof("No archive zip files found for scan %v\n", scan.Id) - continue - } + // Remove any paths... + lastZipName = filepath.Base(lastZipName) + for c, p := range zipFilesOrdered { + zipFilesOrdered[c] = filepath.Base(p) + } - l.Infof("Zipping optimised archive %v...", lastZipName) + l.Infof("Zipping optimised archive %v...", lastZipName) - // Now we zip up everything that's there - //tm := time.Now() - //zipName := fmt.Sprintf("%v-%02d-%02d-%v-%02d-%02d-%02d.zip", scan.Id, tm.Day(), int(tm.Month()), tm.Year(), tm.Hour(), tm.Minute(), tm.Second()) - // Zip with the latest zip name so if newer downlinks happened since we dont invent a time newer than them. This is to run on prod v3 but v4 has - // been collecting its own archives for months. This also makes sense if we run against prodv4 in future. - zipName := lastZipName - zipPath := filepath.Join(workingDir, zipName) - zipFile, err := os.Create(zipPath) - if err != nil { - log.Fatalf("Failed to create zip output file for scan %v: %v", scan.Id, err) - } + // Now we zip up everything that's there + //tm := time.Now() + //zipName := fmt.Sprintf("%v-%02d-%02d-%v-%02d-%02d-%02d.zip", scan.Id, tm.Day(), int(tm.Month()), tm.Year(), tm.Hour(), tm.Minute(), tm.Second()) + // Zip with the latest zip name so if newer downlinks happened since we dont invent a time newer than them. This is to run on prod v3 but v4 has + // been collecting its own archives for months. This also makes sense if we run against prodv4 in future. + zipName := lastZipName + zipPath := filepath.Join(workingDir, zipName) + zipFile, err := os.Create(zipPath) + if err != nil { + return "", zipFilesOrdered, fmt.Errorf("Failed to create zip output file for scan %v: %v", rtt, err) + } - zipWriter := zip.NewWriter(zipFile) - /*_, err = zipWriter.Create(zipPath) - if err != nil { - log.Fatalf("Failed to create zip output file for scan %v: %v", scan.Id, err) - }*/ + zipWriter := zip.NewWriter(zipFile) + /*_, err = zipWriter.Create(zipPath) + if err != nil { + return "", fmt.Errorf("Failed to create zip output file for scan %v: %v", scan.Id, err) + }*/ - err = utils.AddFilesToZip(zipWriter, localUnzippedPath, "") - if err != nil { - log.Fatalf("Failed to create optimised zip %v for scan %v: %v", zipPath, scan.Id, err) - } + err = utils.AddFilesToZip(zipWriter, localUnzippedPath, "") + if err != nil { + return "", zipFilesOrdered, fmt.Errorf("Failed to create optimised zip %v for scan %v: %v", zipPath, rtt, err) + } - err = zipWriter.Close() - if err != nil { - log.Fatalf("Failed to close written zip %v for scan %v: %v", zipPath, scan.Id, err) - } + err = zipWriter.Close() + if err != nil { + return "", zipFilesOrdered, fmt.Errorf("Failed to close written zip %v for scan %v: %v", zipPath, rtt, err) + } - // Upload the zip to S3 - uploadPath := path.Join("Archive-Optimised", zipName) - l.Infof("Uploading optimised archive to s3://%v/%v", destDataBucket, uploadPath) + return zipPath, zipFilesOrdered, nil +} - zipData, err := os.ReadFile(zipPath) - if err != nil { - log.Fatalf("Failed to read created zip output file %v for scan %v: %v", zipPath, scan.Id, err) - } +func upload(localArchivePath string, remotePath string, remoteFS fileaccess.FileAccess, iLog logger.ILogger) error { + zipName := filepath.Base(localArchivePath) - if len(zipData) <= 0 { - l.Infof("Created optimized zip archive %v for scan %v was 0 bytes, skipping upload\n", zipPath, scan.Id) - continue - } + // Upload the zip to S3 + uploadPath := path.Join("Archive", zipName) + iLog.Infof("Uploading optimised archive to s3://%v/%v", dataBucket, uploadPath) - err = remoteFS.WriteObject(destDataBucket, uploadPath, zipData) - if err != nil { - log.Fatalf("Failed to upload zip output file for scan %v: %v", scan.Id, err) - } + zipData, err := os.ReadFile(localArchivePath) + if err != nil { + return fmt.Errorf("Failed to read created zip output file %v: %v", localArchivePath, err) + } - // Delete all downloaded files and created zip - } + if len(zipData) <= 0 { + iLog.Infof("Created optimized zip archive %v was 0 bytes, skipping upload\n", localArchivePath) + return nil } -} -func getWithoutVersion(fileName string) string { - return fileName[0:len(fileName)-6] + "__" + fileName[len(fileName)-4:] + return remoteFS.WriteObject(dataBucket, uploadPath, zipData) } func fatalError(err error) {