Skip to content

Commit

Permalink
Remove duplicate functions in gatk_preprocessing (resolves #330)
Browse files Browse the repository at this point in the history
  • Loading branch information
jvivian committed Jul 9, 2016
1 parent ec30323 commit 82a22ec
Showing 1 changed file with 40 additions and 19 deletions.
59 changes: 40 additions & 19 deletions src/toil_scripts/tools/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def run_picard_create_sequence_dictionary(job, ref_id):
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'ref.dict'))


def run_gatk_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, dbsnp, mem='10G'):
def run_gatk_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, dbsnp, mem='10G', unsafe=False):
"""
Convenience method for grouping together GATK preprocessing
Expand All @@ -115,13 +115,14 @@ def run_gatk_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mill
:param str mills: Mills VCF FileStoreID
:param str dbsnp: DBSNP VCF FileStoreID
:param str mem: Memory value to be passed to children. Needed for CI tests
:param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY"
:return: BAM and BAI FileStoreIDs from Print Reads
:rtype: tuple(str, str)
"""
rtc = job.wrapJobFn(run_realigner_target_creator, cores, bam, bai, ref, ref_dict, fai, phase, mills, mem)
ir = job.wrapJobFn(run_indel_realignment, rtc.rv(), bam, bai, ref, ref_dict, fai, phase, mills, mem)
br = job.wrapJobFn(run_base_recalibration, cores, ir.rv(0), ir.rv(1), ref, ref_dict, fai, dbsnp, mem)
pr = job.wrapJobFn(run_print_reads, cores, br.rv(), ir.rv(0), ir.rv(1), ref, ref_dict, fai, mem)
rtc = job.wrapJobFn(run_realigner_target_creator, cores, bam, bai, ref, ref_dict, fai, phase, mills, mem, unsafe)
ir = job.wrapJobFn(run_indel_realignment, rtc.rv(), bam, bai, ref, ref_dict, fai, phase, mills, mem, unsafe)
br = job.wrapJobFn(run_base_recalibration, cores, ir.rv(0), ir.rv(1), ref, ref_dict, fai, dbsnp, mem, unsafe)
pr = job.wrapJobFn(run_print_reads, cores, br.rv(), ir.rv(0), ir.rv(1), ref, ref_dict, fai, mem, unsafe)
# Wiring
job.addChild(rtc)
rtc.addChild(ir)
Expand All @@ -130,7 +131,7 @@ def run_gatk_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mill
return pr.rv(0), pr.rv(1)


def run_realigner_target_creator(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, mem='10G'):
def run_realigner_target_creator(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, mem, unsafe=False):
"""
Creates intervals file needed for indel realignment
Expand All @@ -144,13 +145,14 @@ def run_realigner_target_creator(job, cores, bam, bai, ref, ref_dict, fai, phase
:param str phase: Phase VCF FileStoreID
:param str mills: Mills VCF FileStoreID
:param str mem: Memory value to be passed to children. Needed for CI tests
:param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY"
:return: FileStoreID for the processed bam
:rtype: str
"""
work_dir = job.fileStore.getLocalTempDir()
file_ids = [ref, fai, ref_dict, bam, bai, phase, mills]
file_names = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.bam', 'sample.bam.bai', 'phase.vcf', 'mills.vcf']
for file_store_id, name in zip(file_ids, file_names):
inputs = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.bam', 'sample.bam.bai', 'phase.vcf', 'mills.vcf']
for file_store_id, name in zip(file_ids, inputs):
job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name))
# Call: GATK -- RealignerTargetCreator
parameters = ['-T', 'RealignerTargetCreator',
Expand All @@ -161,13 +163,17 @@ def run_realigner_target_creator(job, cores, bam, bai, ref, ref_dict, fai, phase
'-known', '/data/mills.vcf',
'--downsampling_type', 'NONE',
'-o', '/data/sample.intervals']
if unsafe:
parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY'])
docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2',
inputs=inputs,
outputs={'sample.intervals': None},
work_dir=work_dir, parameters=parameters, env=dict(JAVA_OPTS='-Xmx{}'.format(mem)))
# Write to fileStore
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.intervals'))


def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, phase, mills, mem):
def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, phase, mills, mem, unsafe=False):
"""
Creates realigned bams using the intervals file from previous step
Expand All @@ -181,14 +187,15 @@ def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, phase, m
:param str phase: Phase VCF FileStoreID
:param str mills: Mills VCF FileStoreID
:param str mem: Memory value to be passed to children. Needed for CI tests
:param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY"
:return: FileStoreID for the processed bam
:rtype: tuple(str, str)
"""
work_dir = job.fileStore.getLocalTempDir()
file_ids = [ref, fai, ref_dict, intervals, bam, bai, phase, mills]
file_names = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.intervals',
'sample.bam', 'sample.bam.bai', 'phase.vcf', 'mills.vcf']
for file_store_id, name in zip(file_ids, file_names):
inputs = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.intervals',
'sample.bam', 'sample.bam.bai', 'phase.vcf', 'mills.vcf']
for file_store_id, name in zip(file_ids, inputs):
job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name))
# Call: GATK -- IndelRealigner
parameters = ['-T', 'IndelRealigner',
Expand All @@ -201,15 +208,19 @@ def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, phase, m
'-maxReads', str(720000),
'-maxInMemory', str(5400000),
'-o', '/data/sample.indel.bam']
if unsafe:
parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY'])
docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2',
inputs=inputs,
outputs={'sample.indel.bam': None, 'sample.indel.bai': None},
work_dir=work_dir, parameters=parameters, env=dict(JAVA_OPTS='-Xmx{}'.format(mem)))
# Write to fileStore
indel_bam = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.indel.bam'))
indel_bai = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.indel.bai'))
return indel_bam, indel_bai


def run_base_recalibration(job, cores, indel_bam, indel_bai, ref, ref_dict, fai, dbsnp, mem):
def run_base_recalibration(job, cores, indel_bam, indel_bai, ref, ref_dict, fai, dbsnp, mem, unsafe=False):
"""
Creates recal table used in Base Quality Score Recalibration
Expand All @@ -222,13 +233,14 @@ def run_base_recalibration(job, cores, indel_bam, indel_bai, ref, ref_dict, fai,
:param str fai: Reference index FileStoreID
:param str dbsnp: DBSNP VCF FileStoreID
:param str mem: Memory value to be passed to children. Needed for CI tests
:param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY"
:return: FileStoreID for the processed bam
:rtype: str
"""
work_dir = job.fileStore.getLocalTempDir()
file_ids = [ref, fai, ref_dict, indel_bam, indel_bai, dbsnp]
file_names = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.indel.bam', 'sample.indel.bai', 'dbsnp.vcf']
for file_store_id, name in zip(file_ids, file_names):
inputs = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.indel.bam', 'sample.indel.bai', 'dbsnp.vcf']
for file_store_id, name in zip(file_ids, inputs):
job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name))
# Call: GATK -- IndelRealigner
parameters = ['-T', 'BaseRecalibrator',
Expand All @@ -237,13 +249,17 @@ def run_base_recalibration(job, cores, indel_bam, indel_bai, ref, ref_dict, fai,
'-I', '/data/sample.indel.bam',
'-knownSites', '/data/dbsnp.vcf',
'-o', '/data/sample.recal.table']
if unsafe:
parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY'])
docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2',
inputs=inputs,
outputs={'sample.recal.table': None},
work_dir=work_dir, parameters=parameters, env=dict(JAVA_OPTS='-Xmx{}'.format(mem)))
# Write output to file store
return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.recal.table'))


def run_print_reads(job, cores, table, indel_bam, indel_bai, ref, ref_dict, fai, mem):
def run_print_reads(job, cores, table, indel_bam, indel_bai, ref, ref_dict, fai, mem, unsafe=False):
"""
Creates BAM that has had the base quality scores recalibrated
Expand All @@ -256,14 +272,15 @@ def run_print_reads(job, cores, table, indel_bam, indel_bai, ref, ref_dict, fai,
:param str ref_dict: Reference dictionary FileStoreID
:param str fai: Reference index FileStoreID
:param str mem: Memory value to be passed to children. Needed for CI tests
:param bool unsafe: If True, runs gatk UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY"
:return: FileStoreID for the processed bam
:rtype: tuple(str, str)
"""
work_dir = job.fileStore.getLocalTempDir()
file_ids = [ref, fai, ref_dict, table, indel_bam, indel_bai]
file_names = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.recal.table',
'sample.indel.bam', 'sample.indel.bai']
for file_store_id, name in zip(file_ids, file_names):
inputs = ['ref.fasta', 'ref.fasta.fai', 'ref.dict', 'sample.recal.table',
'sample.indel.bam', 'sample.indel.bai']
for file_store_id, name in zip(file_ids, inputs):
job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name))
# Call: GATK -- PrintReads
parameters = ['-T', 'PrintReads',
Expand All @@ -273,7 +290,11 @@ def run_print_reads(job, cores, table, indel_bam, indel_bai, ref, ref_dict, fai,
'-I', '/data/sample.indel.bam',
'-BQSR', '/data/sample.recal.table',
'-o', '/data/sample.bqsr.bam']
if unsafe:
parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY'])
docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2',
inputs=inputs,
outputs={'sample.bqsr.bam': None, 'sample.bqsr.bai': None},
work_dir=work_dir, parameters=parameters, env=dict(JAVA_OPTS='-Xmx{}'.format(mem)))
# Write ouptut to file store
bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bqsr.bam'))
Expand Down

0 comments on commit 82a22ec

Please sign in to comment.