diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/DeleteHBaseColumnDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/DeleteHBaseColumnDriver.java index 52879f01eed..53473a977b2 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/DeleteHBaseColumnDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/DeleteHBaseColumnDriver.java @@ -81,10 +81,15 @@ public void setupJob(Job job, String table) throws IOException { List scans; if (!regions.isEmpty()) { - scans = new ArrayList<>(regions.size() / 2); + LOGGER.info("Delete rows from {} table ranges (start - end)", regions.size()); + scans = new ArrayList<>(regions.size()); for (Pair region : regions) { Scan scan = new Scan(templateScan); scans.add(scan); + LOGGER.info(" - [ '" + + Bytes.toStringBinary(region.getFirst()) + "' , '" + + Bytes.toStringBinary(region.getSecond()) + + "' )"); if (region.getFirst() != null && region.getFirst().length != 0) { scan.setStartRow(region.getFirst()); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java index e0cbbfb3963..aa183adb007 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java @@ -77,6 +77,7 @@ import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchemaManager; import org.opencb.opencga.storage.hadoop.variant.adaptors.sample.HBaseVariantSampleDataManager; import org.opencb.opencga.storage.hadoop.variant.annotation.HadoopDefaultVariantAnnotationManager; +import org.opencb.opencga.storage.hadoop.variant.archive.ArchiveDeleteHBaseColumnTask; import org.opencb.opencga.storage.hadoop.variant.archive.ArchiveTableHelper; import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutor; import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutorFactory; @@ -826,7 +827,10 @@ private void remove(String study, List files, List samples, URI archiveColumns.add(family + ':' + ArchiveTableHelper.getRefColumnName(fileId)); archiveColumns.add(family + ':' + ArchiveTableHelper.getNonRefColumnName(fileId)); } - String[] deleteFromArchiveArgs = DeleteHBaseColumnDriver.buildArgs(archiveTable, archiveColumns, options); + ObjectMap thisOptions = new ObjectMap(options); + ArchiveDeleteHBaseColumnTask.configureTask(thisOptions, fileIds); + + String[] deleteFromArchiveArgs = DeleteHBaseColumnDriver.buildArgs(archiveTable, archiveColumns, thisOptions); getMRExecutor().run(DeleteHBaseColumnDriver.class, deleteFromArchiveArgs, "Delete from archive table"); return stopWatch.now(TimeUnit.MILLISECONDS); }); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/archive/ArchiveDeleteHBaseColumnTask.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/archive/ArchiveDeleteHBaseColumnTask.java new file mode 100644 index 00000000000..0fb9fb0adbf --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/archive/ArchiveDeleteHBaseColumnTask.java @@ -0,0 +1,42 @@ +package org.opencb.opencga.storage.hadoop.variant.archive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.opencb.commons.datastore.core.ObjectMap; +import org.opencb.opencga.storage.hadoop.utils.DeleteHBaseColumnDriver; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ArchiveDeleteHBaseColumnTask extends DeleteHBaseColumnDriver.DeleteHBaseColumnTask { + + public static final String FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX = "fileBatchesWithFilesToDeleteFromArchiveIndex"; + + public static void configureTask(ObjectMap options, List fileIds) { + ArchiveRowKeyFactory keyFactory = new ArchiveRowKeyFactory(options); + Set fileBatchesSet = new HashSet<>(); + for (Integer fileId : fileIds) { + fileBatchesSet.add(keyFactory.getFileBatch(fileId)); + } + List fileBatches = new ArrayList<>(fileBatchesSet); + fileBatches.sort(Integer::compareTo); + options.put(DeleteHBaseColumnDriver.DELETE_HBASE_COLUMN_TASK_CLASS, ArchiveDeleteHBaseColumnTask.class.getName()); + options.put(ArchiveDeleteHBaseColumnTask.FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX, fileBatches); + } + + @Override + public List> getRegionsToDelete(Configuration configuration) { + int[] fileBatches = configuration.getInts(FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX); + List> regions = new ArrayList<>(); + ArchiveRowKeyFactory archiveRowKeyFactory = new ArchiveRowKeyFactory(configuration); + for (int fileBatch : fileBatches) { + regions.add(new Pair<>( + Bytes.toBytes(archiveRowKeyFactory.generateBlockIdFromBatch(fileBatch)), + Bytes.toBytes(archiveRowKeyFactory.generateBlockIdFromBatch(fileBatch + 1)))); + } + return regions; + } +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/archive/ArchiveRowKeyFactory.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/archive/ArchiveRowKeyFactory.java index 43e9cf97cf7..0f660bf4032 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/archive/ArchiveRowKeyFactory.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/archive/ArchiveRowKeyFactory.java @@ -147,6 +147,13 @@ public String generateBlockIdFromSlice(int fileId, String chrom, long slice) { return generateBlockIdFromSliceAndBatch(getFileBatch(fileId), chrom, slice); } + public String generateBlockIdFromBatch(int fileBatch) { + StringBuilder sb = new StringBuilder(FILE_BATCH_PAD + 1); + sb.append(StringUtils.leftPad(String.valueOf(fileBatch), FILE_BATCH_PAD, '0')); + sb.append(getSeparator()); + return sb.toString(); + } + public String generateBlockIdFromSliceAndBatch(int fileBatch, String chrom, long slice) { String chromosome = Region.normalizeChromosome(chrom); StringBuilder sb = new StringBuilder(FILE_BATCH_PAD + 1 + chromosome.length() + 1 + POSITION_PAD); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/archive/ArchiveDeleteHBaseColumnTaskTest.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/archive/ArchiveDeleteHBaseColumnTaskTest.java new file mode 100644 index 00000000000..5513bcdcb7a --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/archive/ArchiveDeleteHBaseColumnTaskTest.java @@ -0,0 +1,59 @@ +package org.opencb.opencga.storage.hadoop.variant.archive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Assert; +import org.junit.Test; +import org.opencb.commons.datastore.core.ObjectMap; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.opencb.opencga.storage.hadoop.variant.archive.ArchiveDeleteHBaseColumnTask.*; + +public class ArchiveDeleteHBaseColumnTaskTest { + + private ArchiveDeleteHBaseColumnTask task; + + @Test + public void testConfigureTask() { + ObjectMap options = new ObjectMap(); + task = new ArchiveDeleteHBaseColumnTask(); + configureTask(options, Arrays.asList(1, 2, 3)); + assertEquals(Arrays.asList(0), options.getAsIntegerList(FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX)); + List> regionsToDelete = task.getRegionsToDelete(toConf(options)); + assertEquals(1, regionsToDelete.size()); + + assertArrayEquals(Bytes.toBytes("00000_"), regionsToDelete.get(0).getFirst()); + assertArrayEquals(Bytes.toBytes("00001_"), regionsToDelete.get(0).getSecond()); + } + + @Test + public void testConfigureTaskMultiRegions() { + ObjectMap options = new ObjectMap(); + task = new ArchiveDeleteHBaseColumnTask(); + configureTask(options, Arrays.asList(5300, 6053, 9032)); + assertEquals(Arrays.asList(5, 6, 9), options.getAsIntegerList(FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX)); + List> regionsToDelete = task.getRegionsToDelete(toConf(options)); + assertEquals(3, regionsToDelete.size()); + + assertArrayEquals(Bytes.toBytes("00005_"), regionsToDelete.get(0).getFirst()); + assertArrayEquals(Bytes.toBytes("00006_"), regionsToDelete.get(0).getSecond()); + assertArrayEquals(Bytes.toBytes("00006_"), regionsToDelete.get(1).getFirst()); + assertArrayEquals(Bytes.toBytes("00007_"), regionsToDelete.get(1).getSecond()); + assertArrayEquals(Bytes.toBytes("00009_"), regionsToDelete.get(2).getFirst()); + assertArrayEquals(Bytes.toBytes("00010_"), regionsToDelete.get(2).getSecond()); + } + + private static Configuration toConf(ObjectMap options) { + Configuration conf = new Configuration(); + for (String key : options.keySet()) { + conf.set(key, options.getString(key)); + } + return conf; + } + +} \ No newline at end of file