From 7fdf5966b061f7399f9d69b89440d8d39695435e Mon Sep 17 00:00:00 2001 From: Ben Cole Date: Wed, 16 Nov 2022 11:27:56 -0600 Subject: [PATCH 1/2] Make AbstractVCFCodec thread-safe; allows for multi-threaded VCF reading --- .../htsjdk/variant/vcf/AbstractVCFCodec.java | 88 ++++++++++--------- .../java/htsjdk/variant/vcf/VCF3Codec.java | 2 +- .../java/htsjdk/variant/vcf/VCFCodec.java | 6 +- 3 files changed, 51 insertions(+), 45 deletions(-) diff --git a/src/main/java/htsjdk/variant/vcf/AbstractVCFCodec.java b/src/main/java/htsjdk/variant/vcf/AbstractVCFCodec.java index bfa718453e..26407c2798 100644 --- a/src/main/java/htsjdk/variant/vcf/AbstractVCFCodec.java +++ b/src/main/java/htsjdk/variant/vcf/AbstractVCFCodec.java @@ -43,6 +43,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.GZIPInputStream; public abstract class AbstractVCFCodec extends AsciiFeatureCodec implements NameAwareCodec { @@ -60,24 +61,25 @@ public abstract class AbstractVCFCodec extends AsciiFeatureCodec private VCFTextTransformer vcfTextTransformer = passThruTextTransformer; // a mapping of the allele - protected Map> alleleMap = new HashMap>(3); - + protected ThreadLocal>> alleleMap = + ThreadLocal.withInitial(() -> new HashMap>(3)); + // for performance testing purposes public static boolean validate = true; - // a key optimization -- we need a per thread string parts array, so we don't allocate a big array over and over - // todo: make this thread safe? - protected String[] parts = null; - protected String[] genotypeParts = null; - protected final String[] locParts = new String[6]; + // a key optimization -- we need a per thread string parts array, so we don't allocate a big + // array + // over and over + protected ThreadLocal parts = ThreadLocal.withInitial(() -> null); + protected ThreadLocal genotypeParts = ThreadLocal.withInitial(() -> null); // for performance we cache the hashmap of filter encodings for quick lookup - protected HashMap> filterHash = new HashMap>(); + protected HashMap> filterHash = new HashMap>(); // we store a name to give to each of the variant contexts we emit protected String name = "Unknown"; - protected int lineNo = 0; + protected AtomicInteger lineNo = new AtomicInteger(0); protected Map stringCache = new HashMap(); @@ -108,17 +110,19 @@ class LazyVCFGenotypesParser implements LazyGenotypesContext.LazyParser { final List alleles; final String contig; final int start; + final int lineNum; - LazyVCFGenotypesParser(final List alleles, final String contig, final int start) { + LazyVCFGenotypesParser(final List alleles, final String contig, final int start, final int lineNum) { this.alleles = alleles; this.contig = contig; this.start = start; + this.lineNum = lineNum; } @Override public LazyGenotypesContext.LazyData parse(final Object data) { //System.out.printf("Loading genotypes... %s:%d%n", contig, start); - return createGenotypeMap((String) data, alleles, contig, start); + return createGenotypeMap((String) data, alleles, contig, start, lineNum); } } @@ -315,7 +319,7 @@ public VCFSampleHeaderLine getSampleHeaderLine(final String headerLineString, fi * @return a feature, (not guaranteed complete) that has the correct start and stop */ public Feature decodeLoc(String line) { - return decodeLine(line, false); + return decodeLine(line, false, lineNo.getAndIncrement()); } /** @@ -325,7 +329,7 @@ public Feature decodeLoc(String line) { */ @Override public VariantContext decode(String line) { - return decodeLine(line, true); + return decodeLine(line, true, lineNo.getAndIncrement()); } /** @@ -362,26 +366,29 @@ private VCFTextTransformer getTextTransformerForVCFVersion(final VCFHeaderVersio passThruTextTransformer; } - private VariantContext decodeLine(final String line, final boolean includeGenotypes) { + private VariantContext decodeLine(final String line, final boolean includeGenotypes, final int lineNum) { // the same line reader is not used for parsing the header and parsing lines, if we see a #, we've seen a header line if (line.startsWith(VCFHeader.HEADER_INDICATOR)) return null; // our header cannot be null, we need the genotype sample names and counts if (header == null) throw new TribbleException("VCF Header cannot be null when decoding a record"); - if (parts == null) - parts = new String[Math.min(header.getColumnCount(), NUM_STANDARD_FIELDS+1)]; + String[] myParts = parts.get(); + if (myParts == null) { + parts.set(new String[Math.min(header.getColumnCount(), NUM_STANDARD_FIELDS + 1)]); + myParts = parts.get(); + } - final int nParts = ParsingUtils.split(line, parts, VCFConstants.FIELD_SEPARATOR_CHAR, true); + final int nParts = ParsingUtils.split(line, myParts, VCFConstants.FIELD_SEPARATOR_CHAR, true); // if we have don't have a header, or we have a header with no genotyping data check that we // have eight columns. Otherwise check that we have nine (normal columns + genotyping data) if (( (header == null || !header.hasGenotypingData()) && nParts != NUM_STANDARD_FIELDS) || (header != null && header.hasGenotypingData() && nParts != (NUM_STANDARD_FIELDS + 1)) ) - throw new TribbleException("Line " + lineNo + ": there aren't enough columns for line " + line + " (we expected " + (header == null ? NUM_STANDARD_FIELDS : NUM_STANDARD_FIELDS + 1) + + throw new TribbleException("Line " + lineNum + ": there aren't enough columns for line " + line + " (we expected " + (header == null ? NUM_STANDARD_FIELDS : NUM_STANDARD_FIELDS + 1) + " tokens, and saw " + nParts + " )"); - return parseVCFLine(parts, includeGenotypes); + return parseVCFLine(myParts, includeGenotypes, lineNum); } /** @@ -390,15 +397,10 @@ private VariantContext decodeLine(final String line, final boolean includeGenoty * @param parts the parts split up * @return a variant context object */ - private VariantContext parseVCFLine(final String[] parts, final boolean includeGenotypes) { + private VariantContext parseVCFLine(final String[] parts, final boolean includeGenotypes, final int lineNum) { VariantContextBuilder builder = new VariantContextBuilder(); builder.source(getName()); - // increment the line count - // TODO -- because of the way the engine utilizes Tribble, we can parse a line multiple times (especially when - // TODO -- the first record is far along the contig) and the line counter can get out of sync - lineNo++; - // parse out the required fields final String chr = getCachedString(parts[0]); builder.chr(chr); @@ -440,12 +442,12 @@ else if ( parts[2].equals(VCFConstants.EMPTY_ID_FIELD) ) } // get our alleles, filters, and setup an attribute map - final List alleles = parseAlleles(ref, alts, lineNo); + final List alleles = parseAlleles(ref, alts, lineNum); builder.alleles(alleles); // do we have genotyping data if (parts.length > NUM_STANDARD_FIELDS && includeGenotypes) { - final LazyGenotypesContext.LazyParser lazyParser = new LazyVCFGenotypesParser(alleles, chr, pos); + final LazyGenotypesContext.LazyParser lazyParser = new LazyVCFGenotypesParser(alleles, chr, pos, lineNum); final int nGenotypes = header.getNGenotypeSamples(); LazyGenotypesContext lazy = new LazyGenotypesContext(lazyParser, parts[8], nGenotypes); @@ -750,29 +752,33 @@ private static boolean isVCFStream(final InputStream stream, final String MAGIC_ public LazyGenotypesContext.LazyData createGenotypeMap(final String str, final List alleles, final String chr, - final int pos) { - if (genotypeParts == null) - genotypeParts = new String[header.getColumnCount() - NUM_STANDARD_FIELDS]; + final int pos, + final int lineNum) { + String[] myGenotypeParts = genotypeParts.get(); + if (myGenotypeParts == null) { + genotypeParts.set(new String[header.getColumnCount() - NUM_STANDARD_FIELDS]); + myGenotypeParts = genotypeParts.get(); + } - int nParts = ParsingUtils.split(str, genotypeParts, VCFConstants.FIELD_SEPARATOR_CHAR); - if ( nParts != genotypeParts.length ) - generateException("there are " + (nParts-1) + " genotypes while the header requires that " + (genotypeParts.length-1) + " genotypes be present for all records at " + chr + ":" + pos, lineNo); + int nParts = ParsingUtils.split(str, myGenotypeParts, VCFConstants.FIELD_SEPARATOR_CHAR); + if ( nParts != myGenotypeParts.length ) + generateException("there are " + (nParts-1) + " genotypes while the header requires that " + (myGenotypeParts.length-1) + " genotypes be present for all records at " + chr + ":" + pos, lineNum); ArrayList genotypes = new ArrayList(nParts); // get the format keys - List genotypeKeys = ParsingUtils.split(genotypeParts[0], VCFConstants.GENOTYPE_FIELD_SEPARATOR_CHAR); + List genotypeKeys = ParsingUtils.split(myGenotypeParts[0], VCFConstants.GENOTYPE_FIELD_SEPARATOR_CHAR); // cycle through the sample names Iterator sampleNameIterator = header.getGenotypeSamples().iterator(); // clear out our allele mapping - alleleMap.clear(); + alleleMap.get().clear(); // cycle through the genotype strings boolean PlIsSet = false; for (int genotypeOffset = 1; genotypeOffset < nParts; genotypeOffset++) { - List genotypeValues = ParsingUtils.split(genotypeParts[genotypeOffset], VCFConstants.GENOTYPE_FIELD_SEPARATOR_CHAR); + List genotypeValues = ParsingUtils.split(myGenotypeParts[genotypeOffset], VCFConstants.GENOTYPE_FIELD_SEPARATOR_CHAR); genotypeValues = vcfTextTransformer.decodeText(genotypeValues); final String sampleName = sampleNameIterator.next(); @@ -780,7 +786,7 @@ public LazyGenotypesContext.LazyData createGenotypeMap(final String str, // check to see if the value list is longer than the key list, which is a problem if (genotypeKeys.size() < genotypeValues.size()) - generateException("There are too many keys for the sample " + sampleName + ", keys = " + parts[8] + ", values = " + parts[genotypeOffset]); + generateException("There are too many keys for the sample " + sampleName + ", keys = " + parts.get()[8] + ", values = " + parts.get()[genotypeOffset]); int genotypeAlleleLocation = -1; if (!genotypeKeys.isEmpty()) { @@ -831,7 +837,7 @@ public LazyGenotypesContext.LazyData createGenotypeMap(final String str, if ( genotypeAlleleLocation > 0 ) generateException("Saw GT field at position " + genotypeAlleleLocation + ", but it must be at the first position for genotypes when present"); - final List GTalleles = (genotypeAlleleLocation == -1 ? new ArrayList(0) : parseGenotypeAlleles(genotypeValues.get(genotypeAlleleLocation), alleles, alleleMap)); + final List GTalleles = (genotypeAlleleLocation == -1 ? new ArrayList(0) : parseGenotypeAlleles(genotypeValues.get(genotypeAlleleLocation), alleles, alleleMap.get())); gb.alleles(GTalleles); gb.phased(genotypeAlleleLocation != -1 && genotypeValues.get(genotypeAlleleLocation).indexOf(VCFConstants.PHASED) != -1); @@ -869,9 +875,9 @@ public final void disableOnTheFlyModifications() { } /** - * Replaces the sample name read from the VCF header with the remappedSampleName. Works - * only for single-sample VCFs -- attempting to perform sample name remapping for multi-sample - * VCFs will produce an Exception. + * Replaces the sample name read from the VCF header with the remappedSampleName. Works only for + * single-sample VCFs -- attempting to perform sample name remapping for multi-sample VCFs will + * produce an Exception. * * @param remappedSampleName replacement sample name for the sample specified in the VCF header */ diff --git a/src/main/java/htsjdk/variant/vcf/VCF3Codec.java b/src/main/java/htsjdk/variant/vcf/VCF3Codec.java index e9ca3abdf7..0e6a70ed76 100644 --- a/src/main/java/htsjdk/variant/vcf/VCF3Codec.java +++ b/src/main/java/htsjdk/variant/vcf/VCF3Codec.java @@ -63,7 +63,7 @@ public Object readActualHeader(final LineIterator reader) { VCFHeaderVersion version = null; boolean foundHeaderVersion = false; while (reader.hasNext()) { - lineNo++; + lineNo.incrementAndGet(); final String line = reader.peek(); if (line.startsWith(VCFHeader.METADATA_INDICATOR)) { final String[] lineFields = line.substring(2).split("="); diff --git a/src/main/java/htsjdk/variant/vcf/VCFCodec.java b/src/main/java/htsjdk/variant/vcf/VCFCodec.java index 42f07150d1..27f804bb9b 100644 --- a/src/main/java/htsjdk/variant/vcf/VCFCodec.java +++ b/src/main/java/htsjdk/variant/vcf/VCFCodec.java @@ -88,7 +88,7 @@ public Object readActualHeader(final LineIterator lineIterator) { boolean foundHeaderVersion = false; while (lineIterator.hasNext()) { line = lineIterator.peek(); - lineNo++; + lineNo.incrementAndGet(); if (line.startsWith(VCFHeader.METADATA_INDICATOR)) { final String[] lineFields = line.substring(2).split("="); if (lineFields.length == 2 && VCFHeaderVersion.isFormatString(lineFields[0]) ) { @@ -134,9 +134,9 @@ protected List parseFilters(final String filterString) { if ( filterString.equals(VCFConstants.PASSES_FILTERS_v4) ) return Collections.emptyList(); if ( filterString.equals(VCFConstants.PASSES_FILTERS_v3) ) - generateException(VCFConstants.PASSES_FILTERS_v3 + " is an invalid filter name in vcf4", lineNo); + generateException(VCFConstants.PASSES_FILTERS_v3 + " is an invalid filter name in vcf4", lineNo.get()); if (filterString.isEmpty()) - generateException("The VCF specification requires a valid filter status: filter was " + filterString, lineNo); + generateException("The VCF specification requires a valid filter status: filter was " + filterString, lineNo.get()); // do we have the filter string cached? if ( filterHash.containsKey(filterString) ) From 65b94af59686fe9aa0eb5ab2749e865f921383f3 Mon Sep 17 00:00:00 2001 From: Ben Cole Date: Wed, 16 Nov 2022 17:13:10 -0600 Subject: [PATCH 2/2] Fix generateException usage of lineNo variable --- src/main/java/htsjdk/variant/vcf/AbstractVCFCodec.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/htsjdk/variant/vcf/AbstractVCFCodec.java b/src/main/java/htsjdk/variant/vcf/AbstractVCFCodec.java index 26407c2798..27c742e199 100644 --- a/src/main/java/htsjdk/variant/vcf/AbstractVCFCodec.java +++ b/src/main/java/htsjdk/variant/vcf/AbstractVCFCodec.java @@ -886,7 +886,7 @@ public void setRemappedSampleName( final String remappedSampleName ) { } protected void generateException(String message) { - throw new TribbleException(String.format("The provided VCF file is malformed at approximately line number %d: %s", lineNo, message)); + throw new TribbleException(String.format("The provided VCF file is malformed at approximately line number %d: %s", lineNo.get(), message)); } protected static void generateException(String message, int lineNo) {