Skip to content

Commit

Permalink
storage: Define start-stop row scan for deleting files from archive t…
Browse files Browse the repository at this point in the history
…able. #TASK-6756
  • Loading branch information
j-coll committed Aug 22, 2024
1 parent c636e41 commit be6da9c
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,15 @@ public void setupJob(Job job, String table) throws IOException {

List<Scan> scans;
if (!regions.isEmpty()) {
scans = new ArrayList<>(regions.size() / 2);
LOGGER.info("Delete rows from {} table ranges (start - end)", regions.size());
scans = new ArrayList<>(regions.size());
for (Pair<byte[], byte[]> region : regions) {
Scan scan = new Scan(templateScan);
scans.add(scan);
LOGGER.info(" - [ '"
+ Bytes.toStringBinary(region.getFirst()) + "' , '"
+ Bytes.toStringBinary(region.getSecond())
+ "' )");
if (region.getFirst() != null && region.getFirst().length != 0) {
scan.setStartRow(region.getFirst());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchemaManager;
import org.opencb.opencga.storage.hadoop.variant.adaptors.sample.HBaseVariantSampleDataManager;
import org.opencb.opencga.storage.hadoop.variant.annotation.HadoopDefaultVariantAnnotationManager;
import org.opencb.opencga.storage.hadoop.variant.archive.ArchiveDeleteHBaseColumnTask;
import org.opencb.opencga.storage.hadoop.variant.archive.ArchiveTableHelper;
import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutor;
import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutorFactory;
Expand Down Expand Up @@ -826,7 +827,10 @@ private void remove(String study, List<String> files, List<String> samples, URI
archiveColumns.add(family + ':' + ArchiveTableHelper.getRefColumnName(fileId));
archiveColumns.add(family + ':' + ArchiveTableHelper.getNonRefColumnName(fileId));
}
String[] deleteFromArchiveArgs = DeleteHBaseColumnDriver.buildArgs(archiveTable, archiveColumns, options);
ObjectMap thisOptions = new ObjectMap(options);
ArchiveDeleteHBaseColumnTask.configureTask(thisOptions, fileIds);

String[] deleteFromArchiveArgs = DeleteHBaseColumnDriver.buildArgs(archiveTable, archiveColumns, thisOptions);
getMRExecutor().run(DeleteHBaseColumnDriver.class, deleteFromArchiveArgs, "Delete from archive table");
return stopWatch.now(TimeUnit.MILLISECONDS);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.opencb.opencga.storage.hadoop.variant.archive;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.opencga.storage.hadoop.utils.DeleteHBaseColumnDriver;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class ArchiveDeleteHBaseColumnTask extends DeleteHBaseColumnDriver.DeleteHBaseColumnTask {

public static final String FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX = "fileBatchesWithFilesToDeleteFromArchiveIndex";

public static void configureTask(ObjectMap options, List<Integer> fileIds) {
ArchiveRowKeyFactory keyFactory = new ArchiveRowKeyFactory(options);
Set<Integer> fileBatchesSet = new HashSet<>();
for (Integer fileId : fileIds) {
fileBatchesSet.add(keyFactory.getFileBatch(fileId));
}
List<Integer> fileBatches = new ArrayList<>(fileBatchesSet);
fileBatches.sort(Integer::compareTo);
options.put(DeleteHBaseColumnDriver.DELETE_HBASE_COLUMN_TASK_CLASS, ArchiveDeleteHBaseColumnTask.class.getName());
options.put(ArchiveDeleteHBaseColumnTask.FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX, fileBatches);
}

@Override
public List<Pair<byte[], byte[]>> getRegionsToDelete(Configuration configuration) {
int[] fileBatches = configuration.getInts(FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX);
List<Pair<byte[], byte[]>> regions = new ArrayList<>();
ArchiveRowKeyFactory archiveRowKeyFactory = new ArchiveRowKeyFactory(configuration);
for (int fileBatch : fileBatches) {
regions.add(new Pair<>(
Bytes.toBytes(archiveRowKeyFactory.generateBlockIdFromBatch(fileBatch)),
Bytes.toBytes(archiveRowKeyFactory.generateBlockIdFromBatch(fileBatch + 1))));
}
return regions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ public String generateBlockIdFromSlice(int fileId, String chrom, long slice) {
return generateBlockIdFromSliceAndBatch(getFileBatch(fileId), chrom, slice);
}

public String generateBlockIdFromBatch(int fileBatch) {
StringBuilder sb = new StringBuilder(FILE_BATCH_PAD + 1);
sb.append(StringUtils.leftPad(String.valueOf(fileBatch), FILE_BATCH_PAD, '0'));
sb.append(getSeparator());
return sb.toString();
}

public String generateBlockIdFromSliceAndBatch(int fileBatch, String chrom, long slice) {
String chromosome = Region.normalizeChromosome(chrom);
StringBuilder sb = new StringBuilder(FILE_BATCH_PAD + 1 + chromosome.length() + 1 + POSITION_PAD);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.opencb.opencga.storage.hadoop.variant.archive;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Assert;
import org.junit.Test;
import org.opencb.commons.datastore.core.ObjectMap;

import java.util.Arrays;
import java.util.List;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.opencb.opencga.storage.hadoop.variant.archive.ArchiveDeleteHBaseColumnTask.*;

public class ArchiveDeleteHBaseColumnTaskTest {

private ArchiveDeleteHBaseColumnTask task;

@Test
public void testConfigureTask() {
ObjectMap options = new ObjectMap();
task = new ArchiveDeleteHBaseColumnTask();
configureTask(options, Arrays.asList(1, 2, 3));
assertEquals(Arrays.asList(0), options.getAsIntegerList(FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX));
List<Pair<byte[], byte[]>> regionsToDelete = task.getRegionsToDelete(toConf(options));
assertEquals(1, regionsToDelete.size());

assertArrayEquals(Bytes.toBytes("00000_"), regionsToDelete.get(0).getFirst());
assertArrayEquals(Bytes.toBytes("00001_"), regionsToDelete.get(0).getSecond());
}

@Test
public void testConfigureTaskMultiRegions() {
ObjectMap options = new ObjectMap();
task = new ArchiveDeleteHBaseColumnTask();
configureTask(options, Arrays.asList(5300, 6053, 9032));
assertEquals(Arrays.asList(5, 6, 9), options.getAsIntegerList(FILE_BATCHES_WITH_FILES_TO_DELETE_FROM_ARCHIVE_INDEX));
List<Pair<byte[], byte[]>> regionsToDelete = task.getRegionsToDelete(toConf(options));
assertEquals(3, regionsToDelete.size());

assertArrayEquals(Bytes.toBytes("00005_"), regionsToDelete.get(0).getFirst());
assertArrayEquals(Bytes.toBytes("00006_"), regionsToDelete.get(0).getSecond());
assertArrayEquals(Bytes.toBytes("00006_"), regionsToDelete.get(1).getFirst());
assertArrayEquals(Bytes.toBytes("00007_"), regionsToDelete.get(1).getSecond());
assertArrayEquals(Bytes.toBytes("00009_"), regionsToDelete.get(2).getFirst());
assertArrayEquals(Bytes.toBytes("00010_"), regionsToDelete.get(2).getSecond());
}

private static Configuration toConf(ObjectMap options) {
Configuration conf = new Configuration();
for (String key : options.keySet()) {
conf.set(key, options.getString(key));
}
return conf;
}

}

0 comments on commit be6da9c

Please sign in to comment.