Skip to content

Commit

Permalink
Merge pull request #346 from pixlise/feature/archive-optimiser-tool
Browse files Browse the repository at this point in the history
Feature/archive optimiser tool
  • Loading branch information
pnemere authored Nov 7, 2024
2 parents 30633aa + 5a8c40d commit f9652e6
Show file tree
Hide file tree
Showing 13 changed files with 419 additions and 38 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -59,35 +59,35 @@ func NewDatasetArchiveDownloader(
// Unzipped files path (archive zips unzipped here),
// How many zips loaded from archive
// Error (if any)
func (dl *DatasetArchiveDownloader) DownloadFromDatasetArchive(datasetID string, workingDir string) (string, string, int, error) {
func (dl *DatasetArchiveDownloader) DownloadFromDatasetArchive(datasetID string, workingDir string) (string, string, int, string, error) {
// Create a directories to process data in
dl.log.Debugf("Preparing to download archived dataset %v...", datasetID)

downloadPath, err := fileaccess.MakeEmptyLocalDirectory(workingDir, "download")
if err != nil {
err = fmt.Errorf("Failed to generate directory for importer downloads: %v", err)
//dl.log.Errorf("%v", err)
return "", "", 0, err
return "", "", 0, "", err
}
unzippedPath, err := fileaccess.MakeEmptyLocalDirectory(workingDir, "unzipped")
if err != nil {
err = fmt.Errorf("Failed to generate directory for importer unzips: %v", err)
//dl.log.Errorf("%v", err)
return "", "", 0, err
return "", "", 0, "", err
}

// Download all zip files from archive for this dataset ID, and extract them as required
dl.log.Debugf("Downloading archived zip files...")

zipCount, err := dl.downloadArchivedZipsForDataset(datasetID, downloadPath, unzippedPath)
zipCount, lastZipName, err := dl.downloadArchivedZipsForDataset(datasetID, downloadPath, unzippedPath)
if err != nil {
err = fmt.Errorf("Failed to download archived zip files for dataset ID: %v. Error: %v", datasetID, err)
//dl.log.Errorf("%v", err)
return downloadPath, unzippedPath, zipCount, err
return downloadPath, unzippedPath, zipCount, lastZipName, err
}

dl.log.Debugf("Dataset %v downloaded %v zip files from archive", datasetID, zipCount)
return downloadPath, unzippedPath, zipCount, nil
return downloadPath, unzippedPath, zipCount, lastZipName, nil
}

func (dl *DatasetArchiveDownloader) DownloadPseudoIntensityRangesFile(configBucket string, downloadPath string, version string) (string, error) {
Expand Down Expand Up @@ -121,45 +121,45 @@ func (dl *DatasetArchiveDownloader) fetchFile(bucketFrom string, pathFrom string
// Returns 2 things:
// Number of zips loaded
// Error if there was one
func (dl *DatasetArchiveDownloader) downloadArchivedZipsForDataset(datasetID string, downloadPath string, unzippedPath string) (int, error) {
func (dl *DatasetArchiveDownloader) downloadArchivedZipsForDataset(datasetID string, downloadPath string, unzippedPath string) (int, string, error) {
// Download all zip files that have the dataset ID prefixed in their file name
// Unzip them in timestamp order into downloadPath
archiveSearchPath := path.Join(filepaths.RootArchive, datasetID)
dl.log.Infof("Searching for archived files in: s3://%v/%v", dl.datasetBucket, archiveSearchPath)

archivedFiles, err := dl.remoteFS.ListObjects(dl.datasetBucket, archiveSearchPath)
if err != nil {
return 0, err
return 0, "", err
}

orderedArchivedFiles, err := getOrderedArchiveFiles(archivedFiles)

if err != nil {
// Stop here if we find a bad file
return 0, err
return 0, "", err
}

fileCount := 0

for _, filePath := range orderedArchivedFiles {
fileName := path.Base(filePath)
if !strings.HasSuffix(fileName, ".zip") {
return 0, errors.New("Expected zip file, got: " + fileName)
return 0, "", errors.New("Expected zip file, got: " + fileName)
}

savePath := filepath.Join(downloadPath, fileName)
err = dl.fetchFile(dl.datasetBucket, filePath, savePath)

if err != nil {
return 0, err
return 0, "", err
}

dl.log.Debugf("Unzipping: \"%v\"", savePath)

// Unzip the file
unzippedFileNames, err := utils.UnzipDirectory(savePath, unzippedPath, false)
if err != nil {
return 0, err
return 0, "", err
}

fileCount += len(unzippedFileNames)
Expand All @@ -175,8 +175,13 @@ func (dl *DatasetArchiveDownloader) downloadArchivedZipsForDataset(datasetID str
}
}

dl.log.Infof("Downloaded %v zip files, unzipped %v files", len(orderedArchivedFiles), fileCount)
return len(orderedArchivedFiles), nil
lastFileName := ""
if len(orderedArchivedFiles) > 0 {
lastFileName = orderedArchivedFiles[len(orderedArchivedFiles)-1]
}

dl.log.Infof("Downloaded %v zip files, unzipped %v files. Last file name: %v", len(orderedArchivedFiles), fileCount, lastFileName)
return len(orderedArchivedFiles), filepath.Base(lastFileName), nil
}

func (dl *DatasetArchiveDownloader) DownloadUserCustomisationsForDataset(datasetID string, downloadPath string) error {
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion api/dataimport/for-trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"os"
"strings"

"github.com/pixlise/core/v4/api/dataimport/internal/datasetArchive"
"github.com/pixlise/core/v4/api/dataimport/datasetArchive"
"github.com/pixlise/core/v4/api/dbCollections"
"github.com/pixlise/core/v4/api/job"
"github.com/pixlise/core/v4/api/specialUserIds"
Expand Down
4 changes: 2 additions & 2 deletions api/dataimport/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"strconv"
"time"

"github.com/pixlise/core/v4/api/dataimport/datasetArchive"
"github.com/pixlise/core/v4/api/dataimport/internal/converterSelector"
"github.com/pixlise/core/v4/api/dataimport/internal/datasetArchive"
"github.com/pixlise/core/v4/api/dataimport/internal/output"
"github.com/pixlise/core/v4/api/filepaths"
"github.com/pixlise/core/v4/core/fileaccess"
Expand Down Expand Up @@ -81,7 +81,7 @@ func ImportDataset(

// Firstly, we download from the archive
archive := datasetArchive.NewDatasetArchiveDownloader(remoteFS, localFS, log, datasetBucket, manualUploadBucket)
localDownloadPath, localUnzippedPath, zipCount, err := archive.DownloadFromDatasetArchive(datasetID, workingDir)
localDownloadPath, localUnzippedPath, zipCount, lastZipName, err := archive.DownloadFromDatasetArchive(datasetID, workingDir)

Check failure on line 84 in api/dataimport/import.go

View workflow job for this annotation

GitHub Actions / prerelease / build

lastZipName declared and not used

Check failure on line 84 in api/dataimport/import.go

View workflow job for this annotation

GitHub Actions / prerelease / test

lastZipName declared and not used

Check failure on line 84 in api/dataimport/import.go

View workflow job for this annotation

GitHub Actions / release / build

lastZipName declared and not used

Check failure on line 84 in api/dataimport/import.go

View workflow job for this annotation

GitHub Actions / release / test

lastZipName declared and not used
if err != nil {
return workingDir, savedSummary, "", false, err
}
Expand Down
2 changes: 1 addition & 1 deletion api/dataimport/reprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/pixlise/core/v4/api/dataimport/internal/datasetArchive"
"github.com/pixlise/core/v4/api/dataimport/datasetArchive"
"github.com/pixlise/core/v4/core/awsutil"
"github.com/pixlise/core/v4/core/errorwithstatus"
"github.com/pixlise/core/v4/core/utils"
Expand Down
18 changes: 18 additions & 0 deletions api/piquant/configuration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package piquant

import "fmt"

func Example_ReadFieldFromPIQUANTConfigMSA() {
piquantMSA := `##INCSR : 0.0152 Solid angle from source in steradians (can include normalization for optic file - use this for tuning 0.00355)
##INCANGLE : 90.00 Incident angle of primary X-ray beam in degrees (90 is normal incidence)
#ELEVANGLE : 48.03 Elevation angle of detector, in degrees (90 is normal to surface)
#AZIMANGLE : 180.0 Azimuth angle between incident beam plane and detected beam plane
##GEOMETRY : 1.0 Geometric correction factor
#SOLIDANGLE : 0.224 Solid angle collected by the detector in steradians`

a, err := ReadFieldFromPIQUANTConfigMSA(piquantMSA, "ELEVANGLE")
fmt.Printf("%v|%v", a, err)

// Output:
// 48.03|<nil>
}
8 changes: 6 additions & 2 deletions api/ws/wsHelpers/sync-mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ import (
)

func ResetLocalMongoBackupDir() error {
os.RemoveAll("./backup")
err := os.Mkdir("./backup", 0750)
err := os.RemoveAll("./backup")
if err != nil {
return fmt.Errorf("PIXLISE Backup failed to RemoveAll backup directory: %v", err)
}

err = os.Mkdir("./backup", 0750)
if err != nil {
return fmt.Errorf("PIXLISE Backup failed to create backup directory: %v", err)
}
Expand Down
32 changes: 18 additions & 14 deletions core/utils/zip-file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,44 @@ import (
"time"
)

func AddFilesToZip(w *zip.Writer, basePath, baseInZip string) {
func AddFilesToZip(w *zip.Writer, basePath, baseInZip string) error {
// Open the Directory
files, err := os.ReadDir(basePath)
if err != nil {
fmt.Println(err)
return err
}

for _, file := range files {
fmt.Println(basePath + file.Name())
readPath := filepath.Join(basePath, file.Name())
fmt.Println(readPath)
if !file.IsDir() {
dat, err := os.ReadFile(basePath + file.Name())
dat, err := os.ReadFile(readPath)
if err != nil {
fmt.Println(err)
return fmt.Errorf("Failed to read file %v: %v", readPath, err)
}

// Add some files to the archive.
f, err := w.Create(baseInZip + file.Name())
writePath := path.Join(baseInZip, file.Name())
f, err := w.Create(writePath)
if err != nil {
fmt.Println(err)
return fmt.Errorf("Failed to create file %v in zip: %v", writePath, err)
}
_, err = f.Write(dat)
if err != nil {
fmt.Println(err)
return fmt.Errorf("Failed to write file %v to zip: %v", writePath, err)
}
} else if file.IsDir() {

// Recurse
newBase := basePath + file.Name() + "/"
fmt.Println("Recursing and Adding SubDir: " + file.Name())
fmt.Println("Recursing and Adding SubDir: " + newBase)
//fmt.Println("Recursing and Adding SubDir: " + file.Name())
fmt.Println("Recursing and Adding SubDir: " + readPath)

AddFilesToZip(w, newBase, baseInZip+file.Name()+"/")
err := AddFilesToZip(w, readPath, path.Join(baseInZip, file.Name()))
if err != nil {
return err
}
}
}

return nil
}

func UnzipDirectory(src string, dest string, flattenPaths bool) ([]string, error) {
Expand Down
32 changes: 28 additions & 4 deletions internal/cmd-line-tools/beam-geom-v3-csv-importer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ func main() {
log.Fatalf("Failed to decode experiment: %v", err)
}

fmt.Println("Images found:")
for imgPMC, imgName := range pmcImageLookup {
fmt.Printf(" %v -> %v\n", imgPMC, imgName)
}

// Now construct and save beam location entry. NOTE: it should already exist!
coll = destDB.Collection(dbCollections.ImageBeamLocationsName)
for imgPMC, imgName := range pmcImageLookup {
Expand All @@ -187,10 +192,29 @@ func main() {
log.Fatalf("Failed to read beam locations for image: %v, scan: %v. Error: %v", imgName, scanItem.Id, err)
}

if len(imageBeamLocations.LocationPerScan) != 1 || imageBeamLocations.LocationPerScan[0].ScanId != scanId || imageBeamLocations.LocationPerScan[0].BeamVersion != 2 {
log.Fatalf("Read beams for image: %v, got unexpected entries, expected one entry for scan %v, v2", imgName, scanId)
if imageBeamLocations.LocationPerScan[0].ScanId != scanId {
log.Fatalf("Read beams for image: %v, got unexpected scan %v", imgName, imageBeamLocations.LocationPerScan[0].ScanId)
}

if len(imageBeamLocations.LocationPerScan) < 1 || len(imageBeamLocations.LocationPerScan) > 2 {
log.Fatalf("Read beams for image: %v, got unexpected entries, expected 1-2 entries for scan %v, got %v", imgName, scanId, len(imageBeamLocations.LocationPerScan))
}
if len(imageBeamLocations.LocationPerScan) == 1 {
if imageBeamLocations.LocationPerScan[0].BeamVersion != 2 {
log.Fatalf("Read beams for image: %v, got unexpected entries, expected one entry for scan %v, v2, got v%v", imgName, scanId, imageBeamLocations.LocationPerScan[0].BeamVersion)
}
} else {
// 2 entries, make sure 1 is v1 and 1 is v2
if imageBeamLocations.LocationPerScan[0].BeamVersion == imageBeamLocations.LocationPerScan[1].BeamVersion ||
imageBeamLocations.LocationPerScan[0].BeamVersion != 1 && imageBeamLocations.LocationPerScan[0].BeamVersion != 2 ||
imageBeamLocations.LocationPerScan[1].BeamVersion != 1 && imageBeamLocations.LocationPerScan[1].BeamVersion != 2 {
log.Fatalf("Read beams for image: %v, got unexpected entries, expected 2 entries, v1/2 each for scan %v, got versions: %v,%v", imgName, scanId, imageBeamLocations.LocationPerScan[0].BeamVersion, imageBeamLocations.LocationPerScan[1].BeamVersion)

}
}

preStoreLocations := len(imageBeamLocations.LocationPerScan)

// Now insert this new location set
ijs := []*protos.Coordinate2D{}

Expand Down Expand Up @@ -243,8 +267,8 @@ func main() {
log.Fatalf("Failed to read beam locations to confirm writing beams for image: %v, scan: %v. Error: %v", imgName, scanId, err)
}

if len(imageBeamLocations.LocationPerScan) != 2 {
log.Fatalf("Expected 2 stored beam locations for image: %v, scan: %v", imgName, scanId)
if len(imageBeamLocations.LocationPerScan) != preStoreLocations+1 {
log.Fatalf("Expected %v stored beam locations for image: %v, scan: %v. Got %v", preStoreLocations, imgName, scanId, len(imageBeamLocations.LocationPerScan))
}

found3 := false
Expand Down
Loading

0 comments on commit f9652e6

Please sign in to comment.