Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make AbstractVCFCodec thread-safe, allowing for multi-threaded VariantContext genotype decoding #1636

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 48 additions & 42 deletions src/main/java/htsjdk/variant/vcf/AbstractVCFCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;

public abstract class AbstractVCFCodec extends AsciiFeatureCodec<VariantContext> implements NameAwareCodec {
Expand All @@ -60,24 +61,25 @@ public abstract class AbstractVCFCodec extends AsciiFeatureCodec<VariantContext>
private VCFTextTransformer vcfTextTransformer = passThruTextTransformer;

// a mapping of the allele
protected Map<String, List<Allele>> alleleMap = new HashMap<String, List<Allele>>(3);

protected ThreadLocal<Map<String, List<Allele>>> alleleMap =
ThreadLocal.withInitial(() -> new HashMap<String, List<Allele>>(3));

// for performance testing purposes
public static boolean validate = true;

// a key optimization -- we need a per thread string parts array, so we don't allocate a big array over and over
// todo: make this thread safe?
protected String[] parts = null;
protected String[] genotypeParts = null;
protected final String[] locParts = new String[6];
// a key optimization -- we need a per thread string parts array, so we don't allocate a big
// array
// over and over
protected ThreadLocal<String[]> parts = ThreadLocal.withInitial(() -> null);
protected ThreadLocal<String[]> genotypeParts = ThreadLocal.withInitial(() -> null);

// for performance we cache the hashmap of filter encodings for quick lookup
protected HashMap<String,List<String>> filterHash = new HashMap<String,List<String>>();
protected HashMap<String, List<String>> filterHash = new HashMap<String, List<String>>();

// we store a name to give to each of the variant contexts we emit
protected String name = "Unknown";

protected int lineNo = 0;
protected AtomicInteger lineNo = new AtomicInteger(0);

protected Map<String, String> stringCache = new HashMap<String, String>();

Expand Down Expand Up @@ -108,17 +110,19 @@ class LazyVCFGenotypesParser implements LazyGenotypesContext.LazyParser {
final List<Allele> alleles;
final String contig;
final int start;
final int lineNum;

LazyVCFGenotypesParser(final List<Allele> alleles, final String contig, final int start) {
LazyVCFGenotypesParser(final List<Allele> alleles, final String contig, final int start, final int lineNum) {
this.alleles = alleles;
this.contig = contig;
this.start = start;
this.lineNum = lineNum;
}

@Override
public LazyGenotypesContext.LazyData parse(final Object data) {
//System.out.printf("Loading genotypes... %s:%d%n", contig, start);
return createGenotypeMap((String) data, alleles, contig, start);
return createGenotypeMap((String) data, alleles, contig, start, lineNum);
}
}

Expand Down Expand Up @@ -315,7 +319,7 @@ public VCFSampleHeaderLine getSampleHeaderLine(final String headerLineString, fi
* @return a feature, (not guaranteed complete) that has the correct start and stop
*/
public Feature decodeLoc(String line) {
return decodeLine(line, false);
return decodeLine(line, false, lineNo.getAndIncrement());
}

/**
Expand All @@ -325,7 +329,7 @@ public Feature decodeLoc(String line) {
*/
@Override
public VariantContext decode(String line) {
return decodeLine(line, true);
return decodeLine(line, true, lineNo.getAndIncrement());
}

/**
Expand Down Expand Up @@ -362,26 +366,29 @@ private VCFTextTransformer getTextTransformerForVCFVersion(final VCFHeaderVersio
passThruTextTransformer;
}

private VariantContext decodeLine(final String line, final boolean includeGenotypes) {
private VariantContext decodeLine(final String line, final boolean includeGenotypes, final int lineNum) {
// the same line reader is not used for parsing the header and parsing lines, if we see a #, we've seen a header line
if (line.startsWith(VCFHeader.HEADER_INDICATOR)) return null;

// our header cannot be null, we need the genotype sample names and counts
if (header == null) throw new TribbleException("VCF Header cannot be null when decoding a record");

if (parts == null)
parts = new String[Math.min(header.getColumnCount(), NUM_STANDARD_FIELDS+1)];
String[] myParts = parts.get();
if (myParts == null) {
parts.set(new String[Math.min(header.getColumnCount(), NUM_STANDARD_FIELDS + 1)]);
myParts = parts.get();
}

final int nParts = ParsingUtils.split(line, parts, VCFConstants.FIELD_SEPARATOR_CHAR, true);
final int nParts = ParsingUtils.split(line, myParts, VCFConstants.FIELD_SEPARATOR_CHAR, true);

// if we have don't have a header, or we have a header with no genotyping data check that we
// have eight columns. Otherwise check that we have nine (normal columns + genotyping data)
if (( (header == null || !header.hasGenotypingData()) && nParts != NUM_STANDARD_FIELDS) ||
(header != null && header.hasGenotypingData() && nParts != (NUM_STANDARD_FIELDS + 1)) )
throw new TribbleException("Line " + lineNo + ": there aren't enough columns for line " + line + " (we expected " + (header == null ? NUM_STANDARD_FIELDS : NUM_STANDARD_FIELDS + 1) +
throw new TribbleException("Line " + lineNum + ": there aren't enough columns for line " + line + " (we expected " + (header == null ? NUM_STANDARD_FIELDS : NUM_STANDARD_FIELDS + 1) +
" tokens, and saw " + nParts + " )");

return parseVCFLine(parts, includeGenotypes);
return parseVCFLine(myParts, includeGenotypes, lineNum);
}

/**
Expand All @@ -390,15 +397,10 @@ private VariantContext decodeLine(final String line, final boolean includeGenoty
* @param parts the parts split up
* @return a variant context object
*/
private VariantContext parseVCFLine(final String[] parts, final boolean includeGenotypes) {
private VariantContext parseVCFLine(final String[] parts, final boolean includeGenotypes, final int lineNum) {
VariantContextBuilder builder = new VariantContextBuilder();
builder.source(getName());

// increment the line count
// TODO -- because of the way the engine utilizes Tribble, we can parse a line multiple times (especially when
// TODO -- the first record is far along the contig) and the line counter can get out of sync
lineNo++;

// parse out the required fields
final String chr = getCachedString(parts[0]);
builder.chr(chr);
Expand Down Expand Up @@ -440,12 +442,12 @@ else if ( parts[2].equals(VCFConstants.EMPTY_ID_FIELD) )
}

// get our alleles, filters, and setup an attribute map
final List<Allele> alleles = parseAlleles(ref, alts, lineNo);
final List<Allele> alleles = parseAlleles(ref, alts, lineNum);
builder.alleles(alleles);

// do we have genotyping data
if (parts.length > NUM_STANDARD_FIELDS && includeGenotypes) {
final LazyGenotypesContext.LazyParser lazyParser = new LazyVCFGenotypesParser(alleles, chr, pos);
final LazyGenotypesContext.LazyParser lazyParser = new LazyVCFGenotypesParser(alleles, chr, pos, lineNum);
final int nGenotypes = header.getNGenotypeSamples();
LazyGenotypesContext lazy = new LazyGenotypesContext(lazyParser, parts[8], nGenotypes);

Expand Down Expand Up @@ -750,37 +752,41 @@ private static boolean isVCFStream(final InputStream stream, final String MAGIC_
public LazyGenotypesContext.LazyData createGenotypeMap(final String str,
final List<Allele> alleles,
final String chr,
final int pos) {
if (genotypeParts == null)
genotypeParts = new String[header.getColumnCount() - NUM_STANDARD_FIELDS];
final int pos,
final int lineNum) {
String[] myGenotypeParts = genotypeParts.get();
if (myGenotypeParts == null) {
genotypeParts.set(new String[header.getColumnCount() - NUM_STANDARD_FIELDS]);
myGenotypeParts = genotypeParts.get();
}

int nParts = ParsingUtils.split(str, genotypeParts, VCFConstants.FIELD_SEPARATOR_CHAR);
if ( nParts != genotypeParts.length )
generateException("there are " + (nParts-1) + " genotypes while the header requires that " + (genotypeParts.length-1) + " genotypes be present for all records at " + chr + ":" + pos, lineNo);
int nParts = ParsingUtils.split(str, myGenotypeParts, VCFConstants.FIELD_SEPARATOR_CHAR);
if ( nParts != myGenotypeParts.length )
generateException("there are " + (nParts-1) + " genotypes while the header requires that " + (myGenotypeParts.length-1) + " genotypes be present for all records at " + chr + ":" + pos, lineNum);

ArrayList<Genotype> genotypes = new ArrayList<Genotype>(nParts);

// get the format keys
List<String> genotypeKeys = ParsingUtils.split(genotypeParts[0], VCFConstants.GENOTYPE_FIELD_SEPARATOR_CHAR);
List<String> genotypeKeys = ParsingUtils.split(myGenotypeParts[0], VCFConstants.GENOTYPE_FIELD_SEPARATOR_CHAR);

// cycle through the sample names
Iterator<String> sampleNameIterator = header.getGenotypeSamples().iterator();

// clear out our allele mapping
alleleMap.clear();
alleleMap.get().clear();

// cycle through the genotype strings
boolean PlIsSet = false;
for (int genotypeOffset = 1; genotypeOffset < nParts; genotypeOffset++) {
List<String> genotypeValues = ParsingUtils.split(genotypeParts[genotypeOffset], VCFConstants.GENOTYPE_FIELD_SEPARATOR_CHAR);
List<String> genotypeValues = ParsingUtils.split(myGenotypeParts[genotypeOffset], VCFConstants.GENOTYPE_FIELD_SEPARATOR_CHAR);
genotypeValues = vcfTextTransformer.decodeText(genotypeValues);

final String sampleName = sampleNameIterator.next();
final GenotypeBuilder gb = new GenotypeBuilder(sampleName);

// check to see if the value list is longer than the key list, which is a problem
if (genotypeKeys.size() < genotypeValues.size())
generateException("There are too many keys for the sample " + sampleName + ", keys = " + parts[8] + ", values = " + parts[genotypeOffset]);
generateException("There are too many keys for the sample " + sampleName + ", keys = " + parts.get()[8] + ", values = " + parts.get()[genotypeOffset]);

int genotypeAlleleLocation = -1;
if (!genotypeKeys.isEmpty()) {
Expand Down Expand Up @@ -831,7 +837,7 @@ public LazyGenotypesContext.LazyData createGenotypeMap(final String str,
if ( genotypeAlleleLocation > 0 )
generateException("Saw GT field at position " + genotypeAlleleLocation + ", but it must be at the first position for genotypes when present");

final List<Allele> GTalleles = (genotypeAlleleLocation == -1 ? new ArrayList<Allele>(0) : parseGenotypeAlleles(genotypeValues.get(genotypeAlleleLocation), alleles, alleleMap));
final List<Allele> GTalleles = (genotypeAlleleLocation == -1 ? new ArrayList<Allele>(0) : parseGenotypeAlleles(genotypeValues.get(genotypeAlleleLocation), alleles, alleleMap.get()));
gb.alleles(GTalleles);
gb.phased(genotypeAlleleLocation != -1 && genotypeValues.get(genotypeAlleleLocation).indexOf(VCFConstants.PHASED) != -1);

Expand Down Expand Up @@ -869,9 +875,9 @@ public final void disableOnTheFlyModifications() {
}

/**
* Replaces the sample name read from the VCF header with the remappedSampleName. Works
* only for single-sample VCFs -- attempting to perform sample name remapping for multi-sample
* VCFs will produce an Exception.
* Replaces the sample name read from the VCF header with the remappedSampleName. Works only for
* single-sample VCFs -- attempting to perform sample name remapping for multi-sample VCFs will
* produce an Exception.
*
* @param remappedSampleName replacement sample name for the sample specified in the VCF header
*/
Expand All @@ -880,7 +886,7 @@ public void setRemappedSampleName( final String remappedSampleName ) {
}

protected void generateException(String message) {
throw new TribbleException(String.format("The provided VCF file is malformed at approximately line number %d: %s", lineNo, message));
throw new TribbleException(String.format("The provided VCF file is malformed at approximately line number %d: %s", lineNo.get(), message));
}

protected static void generateException(String message, int lineNo) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/htsjdk/variant/vcf/VCF3Codec.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Object readActualHeader(final LineIterator reader) {
VCFHeaderVersion version = null;
boolean foundHeaderVersion = false;
while (reader.hasNext()) {
lineNo++;
lineNo.incrementAndGet();
final String line = reader.peek();
if (line.startsWith(VCFHeader.METADATA_INDICATOR)) {
final String[] lineFields = line.substring(2).split("=");
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/htsjdk/variant/vcf/VCFCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public Object readActualHeader(final LineIterator lineIterator) {
boolean foundHeaderVersion = false;
while (lineIterator.hasNext()) {
line = lineIterator.peek();
lineNo++;
lineNo.incrementAndGet();
if (line.startsWith(VCFHeader.METADATA_INDICATOR)) {
final String[] lineFields = line.substring(2).split("=");
if (lineFields.length == 2 && VCFHeaderVersion.isFormatString(lineFields[0]) ) {
Expand Down Expand Up @@ -134,9 +134,9 @@ protected List<String> parseFilters(final String filterString) {
if ( filterString.equals(VCFConstants.PASSES_FILTERS_v4) )
return Collections.emptyList();
if ( filterString.equals(VCFConstants.PASSES_FILTERS_v3) )
generateException(VCFConstants.PASSES_FILTERS_v3 + " is an invalid filter name in vcf4", lineNo);
generateException(VCFConstants.PASSES_FILTERS_v3 + " is an invalid filter name in vcf4", lineNo.get());
if (filterString.isEmpty())
generateException("The VCF specification requires a valid filter status: filter was " + filterString, lineNo);
generateException("The VCF specification requires a valid filter status: filter was " + filterString, lineNo.get());

// do we have the filter string cached?
if ( filterHash.containsKey(filterString) )
Expand Down