Skip to content

Commit

Permalink
Merge branch 'release-3.2.x' into TASK-5448
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Sep 2, 2024
2 parents ce3d6c4 + e3565be commit 2c63795
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
import org.opencb.opencga.storage.core.metadata.models.TaskMetadata;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;
import org.opencb.opencga.storage.core.variant.VariantStorageOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;

public class VariantDeleteOperationManager extends OperationManager {

private final Logger logger = LoggerFactory.getLogger(VariantDeleteOperationManager.class);

public VariantDeleteOperationManager(VariantStorageManager variantStorageManager, VariantStorageEngine engine) {
super(variantStorageManager, engine);
}
Expand Down Expand Up @@ -63,7 +67,14 @@ public void removeFile(String study, List<String> inputFiles, URI outdir, String
String catalogIndexStatus = file.getInternal().getVariant().getIndex().getStatus().getId();
if (!catalogIndexStatus.equals(VariantIndexStatus.READY)) {
// Might be partially loaded in VariantStorage. Check FileMetadata
FileMetadata fileMetadata = variantStorageEngine.getMetadataManager().getFileMetadata(studyMetadata.getId(), fileStr);
FileMetadata fileMetadata = variantStorageEngine.getMetadataManager()
.getFileMetadata(studyMetadata.getId(), file.getName());
if (fileMetadata != null && !fileMetadata.getPath().equals(file.getUri().getPath())) {
// FileMetadata path does not match the catalog path. This file is not registered in the storage.
throw new CatalogException("Unable to remove variants from file '" + file.getPath() + "'. "
+ "File is not registered in the storage. "
+ "Instead, found file with same name but different path '" + fileMetadata.getPath() + "'");
}
boolean canBeRemoved;
if (force) {
// When forcing remove, just require the file to be registered in the storage
Expand All @@ -73,17 +84,18 @@ public void removeFile(String study, List<String> inputFiles, URI outdir, String
canBeRemoved = fileMetadata != null && fileMetadata.getIndexStatus() != TaskMetadata.Status.NONE;
}
if (!canBeRemoved) {
throw new CatalogException("Unable to remove variants from file " + file.getName() + ". "
+ "IndexStatus = " + catalogIndexStatus);
throw new CatalogException("Unable to remove variants from file '" + file.getPath() + "'. "
+ "IndexStatus = " + catalogIndexStatus + "."
+ (fileMetadata == null ? " File not found in storage." : ""));
}
}
fileNames.add(file.getName());
// filePaths.add(file.getPath());
}

if (fileNames.isEmpty()) {
throw new CatalogException("Nothing to do!");
}
}
if (fileNames.isEmpty()) {
throw new CatalogException("Nothing to do!");
}

variantStorageEngine.removeFiles(study, fileNames, outdir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,11 @@ protected File getSmallFile() throws IOException, CatalogException {
}

protected File create(String resourceName) throws IOException, CatalogException {
return create(studyId, getResourceUri(resourceName));
return create(resourceName, "data/vcfs/");
}

protected File create(String resourceName, String path) throws IOException, CatalogException {
return create(studyId, getResourceUri(resourceName), path);
}

protected File create(String studyId, URI uri) throws IOException, CatalogException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.opencb.opencga.core.models.sample.Sample;
import org.opencb.opencga.core.models.study.Study;
import org.opencb.opencga.core.testclassification.duration.MediumTests;
import org.opencb.opencga.storage.core.variant.VariantStorageOptions;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand All @@ -44,6 +46,7 @@
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import static org.opencb.opencga.storage.core.variant.VariantStorageBaseTest.getResourceUri;

/**
* Created on 10/07/17.
Expand Down Expand Up @@ -83,6 +86,42 @@ public void testLoadAndRemoveOneWithOtherLoaded() throws Exception {
testLoadAndRemoveOne();
}

@Test
public void testLoadAndRemoveForce() throws Exception {
File file77 = create("platinum/1K.end.platinum-genomes-vcf-NA12877_S1.genome.vcf.gz");
File file78 = create("platinum/1K.end.platinum-genomes-vcf-NA12878_S1.genome.vcf.gz");
File file79 = create("platinum/1K.end.platinum-genomes-vcf-NA12879_S1.genome.vcf.gz");
indexFile(file77, new QueryOptions(), outputId);
indexFile(file78, new QueryOptions(), outputId);

removeFile(file77, new QueryOptions());

try {
removeFile(file77, new QueryOptions());
} catch (Exception e) {
assertTrue(e.getMessage(), e.getMessage().contains("Unable to remove variants from file"));
}
removeFile(file77, new QueryOptions(VariantStorageOptions.FORCE.key(), true));

try {
removeFile(file79, new QueryOptions(VariantStorageOptions.FORCE.key(), true));
} catch (Exception e) {
assertTrue(e.getMessage(), e.getMessage().contains("File not found in storage."));
}
Path file77Path = Paths.get(file77.getUri());
Path otherDir = file77Path.getParent().resolve("other_dir");
Files.createDirectory(otherDir);
Path otherFile = Files.copy(file77Path, otherDir.resolve(file77Path.getFileName()));
File file77_2 = create(studyFqn, otherFile.toUri(), "other_dir");

try {
removeFile(studyFqn, Collections.singletonList(file77_2.getPath()), new QueryOptions(VariantStorageOptions.FORCE.key(), true));
} catch (Exception e) {
assertTrue(e.getMessage(), e.getMessage().contains("Unable to remove variants from file"));
assertTrue(e.getMessage(), e.getMessage().contains("Instead, found file with same name but different path"));
}
}


@Test
public void testLoadAndRemoveMany() throws Exception {
Expand Down Expand Up @@ -127,8 +166,13 @@ private void removeFile(List<File> files, QueryOptions options) throws Exception
Study study = catalogManager.getFileManager().getStudy(ORGANIZATION, files.get(0), sessionId);
String studyId = study.getFqn();

removeFile(studyId, fileIds, options);
}

private void removeFile(String studyId, List<String> fileIds, QueryOptions options) throws Exception {

Path outdir = Paths.get(opencga.createTmpOutdir(studyId, "_REMOVE_", sessionId));
variantManager.removeFile(studyId, fileIds, new QueryOptions(), outdir.toUri(), sessionId);
variantManager.removeFile(studyId, fileIds, new QueryOptions(options), outdir.toUri(), sessionId);
// assertEquals(files.size(), removedFiles.size());

Cohort all = catalogManager.getCohortManager().search(studyId, new Query(CohortDBAdaptor.QueryParams.ID.key(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,13 @@ public class DeleteVariantCommandOptions {
@Parameter(names = {"--study", "-s"}, description = "Study [[organization@]project:]study where study and project can be either the ID or UUID", required = false, arity = 1)
public String study;

@Parameter(names = {"--file"}, description = "The body web service file parameter", required = false, arity = 1)
@Parameter(names = {"--file"}, description = "List of file ids to delete. Use 'all' to remove the whole study", required = false, arity = 1)
public String file;

@Parameter(names = {"--resume"}, description = "The body web service resume parameter", required = false, help = true, arity = 0)
@Parameter(names = {"--resume"}, description = "Resume failed delete operation.", required = false, help = true, arity = 0)
public boolean resume = false;

@Parameter(names = {"--force"}, description = "The body web service force parameter", required = false, help = true, arity = 0)
@Parameter(names = {"--force"}, description = "Force delete operation. This would allow deleting partially loaded files.", required = false, help = true, arity = 0)
public boolean force = false;

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;


Expand Down Expand Up @@ -149,7 +150,7 @@ private CellBaseConfiguration validate(boolean autoComplete) throws IOException
String inputVersion = getVersion();
CellBaseDataResponse<SpeciesProperties> species;
try {
species = cellBaseClient.getMetaClient().species();
species = retryMetaSpecies();
} catch (RuntimeException e) {
throw new IllegalArgumentException("Unable to access cellbase url '" + getURL() + "', version '" + inputVersion + "'", e);
}
Expand All @@ -158,7 +159,7 @@ private CellBaseConfiguration validate(boolean autoComplete) throws IOException
// Version might be missing the starting "v"
cellBaseConfiguration.setVersion("v" + cellBaseConfiguration.getVersion());
cellBaseClient = newCellBaseClient(cellBaseConfiguration, getSpecies(), getAssembly());
species = cellBaseClient.getMetaClient().species();
species = retryMetaSpecies();
}
}
if (species == null || species.firstResult() == null) {
Expand Down Expand Up @@ -308,7 +309,7 @@ private static String majorMinor(String version) {
public String getVersionFromServer() throws IOException {
if (serverVersion == null) {
synchronized (this) {
ObjectMap result = retryMetaAbout(3);
ObjectMap result = retryMetaAbout();
if (result == null) {
throw new IOException("Unable to get version from server for cellbase " + toString());
}
Expand All @@ -322,12 +323,43 @@ public String getVersionFromServer() throws IOException {
return serverVersion;
}

private ObjectMap retryMetaAbout(int retries) throws IOException {
ObjectMap result = cellBaseClient.getMetaClient().about().firstResult();
if (result == null && retries > 0) {
// Retry
logger.warn("Unable to get version from server for cellbase " + toString() + ". Retrying...");
result = retryMetaAbout(retries - 1);
private ObjectMap retryMetaAbout() throws IOException {
return retry(3, () -> cellBaseClient.getMetaClient().about().firstResult());
}

private CellBaseDataResponse<SpeciesProperties> retryMetaSpecies() throws IOException {
return retry(3, () -> cellBaseClient.getMetaClient().species());
}

private <T> T retry(int retries, Callable<T> function) throws IOException {
if (retries <= 0) {
return null;
}
T result = null;
Exception e = null;
try {
result = function.call();
} catch (Exception e1) {
e = e1;
}
if (result == null) {
try {
// Retry
logger.warn("Unable to get reach cellbase " + toString() + ". Retrying...");
result = retry(retries - 1, function);
} catch (Exception e1) {
if (e == null) {
e = e1;
} else {
e.addSuppressed(e1);
}
if (e instanceof IOException) {
throw (IOException) e;
} else {
throw new IOException("Error reading from cellbase " + toString(), e);
}
}

}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.opencb.opencga.core.models.operations.variant;

import org.opencb.commons.annotations.DataField;
import org.opencb.opencga.core.tools.ToolParams;

import java.util.List;
Expand All @@ -32,8 +33,13 @@ public VariantFileDeleteParams(List<String> file, boolean resume) {
this.resume = resume;
}

@DataField(description = "List of file ids to delete. Use 'all' to remove the whole study", required = true)
private List<String> file;

@DataField(description = "Resume failed delete operation.", defaultValue = "false")
private boolean resume;

@DataField(description = "Force delete operation. This would allow deleting partially loaded files.", defaultValue = "false")
private boolean force;

public List<String> getFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ public class ExecutionDaemon extends MonitorParentDaemon implements Closeable {
put(RvtestsWrapperAnalysis.ID, "variant " + RvtestsWrapperAnalysis.ID + "-run");
put(GatkWrapperAnalysis.ID, "variant " + GatkWrapperAnalysis.ID + "-run");
put(ExomiserWrapperAnalysis.ID, "variant " + ExomiserWrapperAnalysis.ID + "-run");
put(VariantFileDeleteOperationTool.ID, "variant file-delete");
put(VariantSecondaryAnnotationIndexOperationTool.ID, "variant secondary-index");
put(VariantSecondaryIndexSamplesDeleteOperationTool.ID, "variant secondary-index-delete");
put(VariantScoreDeleteOperationTool.ID, "variant score-delete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ public static class GenericVariantDeleteOptions {
splitter = CommaParameterSplitter.class, required = true)
public List<String> file = null;

@Parameter(names = {"--resume"}, description = "Resume a previously failed indexation")
@Parameter(names = {"--force"}, description = "Force delete operation. This would allow deleting partially loaded files.")
public boolean force;

@Parameter(names = {"--resume"}, description = "Resume failed delete operation.")
public boolean resume;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,15 @@ public void setupJob(Job job, String table) throws IOException {

List<Scan> scans;
if (!regions.isEmpty()) {
scans = new ArrayList<>(regions.size() / 2);
LOGGER.info("Delete rows from {} table ranges (start - end)", regions.size());
scans = new ArrayList<>(regions.size());
for (Pair<byte[], byte[]> region : regions) {
Scan scan = new Scan(templateScan);
scans.add(scan);
LOGGER.info(" - [ '"
+ Bytes.toStringBinary(region.getFirst()) + "' , '"
+ Bytes.toStringBinary(region.getSecond())
+ "' )");
if (region.getFirst() != null && region.getFirst().length != 0) {
scan.setStartRow(region.getFirst());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchemaManager;
import org.opencb.opencga.storage.hadoop.variant.adaptors.sample.HBaseVariantSampleDataManager;
import org.opencb.opencga.storage.hadoop.variant.annotation.HadoopDefaultVariantAnnotationManager;
import org.opencb.opencga.storage.hadoop.variant.archive.ArchiveDeleteHBaseColumnTask;
import org.opencb.opencga.storage.hadoop.variant.archive.ArchiveTableHelper;
import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutor;
import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutorFactory;
Expand Down Expand Up @@ -828,7 +829,10 @@ private void remove(String study, List<String> files, List<String> samples, URI
archiveColumns.add(family + ':' + ArchiveTableHelper.getRefColumnName(fileId));
archiveColumns.add(family + ':' + ArchiveTableHelper.getNonRefColumnName(fileId));
}
String[] deleteFromArchiveArgs = DeleteHBaseColumnDriver.buildArgs(archiveTable, archiveColumns, options);
ObjectMap thisOptions = new ObjectMap(options);
ArchiveDeleteHBaseColumnTask.configureTask(thisOptions, fileIds);

String[] deleteFromArchiveArgs = DeleteHBaseColumnDriver.buildArgs(archiveTable, archiveColumns, thisOptions);
getMRExecutor().run(DeleteHBaseColumnDriver.class, deleteFromArchiveArgs, "Delete from archive table");
return stopWatch.now(TimeUnit.MILLISECONDS);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.opencb.opencga.storage.hadoop.variant.archive;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.opencga.storage.hadoop.utils.DeleteHBaseColumnDriver;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class ArchiveDeleteHBaseColumnTask extends DeleteHBaseColumnDriver.DeleteHBaseColumnTask {

public static final String FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX = "fileBatchesWithFilesToDeleteFromArchiveIndex";

public static void configureTask(ObjectMap options, List<Integer> fileIds) {
ArchiveRowKeyFactory keyFactory = new ArchiveRowKeyFactory(options);
Set<Integer> fileBatchesSet = new HashSet<>();
for (Integer fileId : fileIds) {
fileBatchesSet.add(keyFactory.getFileBatch(fileId));
}
List<Integer> fileBatches = new ArrayList<>(fileBatchesSet);
fileBatches.sort(Integer::compareTo);
options.put(DeleteHBaseColumnDriver.DELETE_HBASE_COLUMN_TASK_CLASS, ArchiveDeleteHBaseColumnTask.class.getName());
options.put(ArchiveDeleteHBaseColumnTask.FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX, fileBatches);
}

@Override
public List<Pair<byte[], byte[]>> getRegionsToDelete(Configuration configuration) {
int[] fileBatches = configuration.getInts(FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX);
List<Pair<byte[], byte[]>> regions = new ArrayList<>();
ArchiveRowKeyFactory archiveRowKeyFactory = new ArchiveRowKeyFactory(configuration);
for (int fileBatch : fileBatches) {
regions.add(new Pair<>(
Bytes.toBytes(archiveRowKeyFactory.generateBlockIdFromBatch(fileBatch)),
Bytes.toBytes(archiveRowKeyFactory.generateBlockIdFromBatch(fileBatch + 1))));
}
return regions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ public String generateBlockIdFromSlice(int fileId, String chrom, long slice) {
return generateBlockIdFromSliceAndBatch(getFileBatch(fileId), chrom, slice);
}

public String generateBlockIdFromBatch(int fileBatch) {
StringBuilder sb = new StringBuilder(FILE_BATCH_PAD + 1);
sb.append(StringUtils.leftPad(String.valueOf(fileBatch), FILE_BATCH_PAD, '0'));
sb.append(getSeparator());
return sb.toString();
}

public String generateBlockIdFromSliceAndBatch(int fileBatch, String chrom, long slice) {
String chromosome = Region.normalizeChromosome(chrom);
StringBuilder sb = new StringBuilder(FILE_BATCH_PAD + 1 + chromosome.length() + 1 + POSITION_PAD);
Expand Down
Loading

0 comments on commit 2c63795

Please sign in to comment.