Skip to content

Commit

Permalink
Remove duplicate functions in gatk_preprocessing (resolves #330)
Browse files Browse the repository at this point in the history
  • Loading branch information
jvivian committed Jun 29, 2016
1 parent 0bbdcd7 commit 8cd5eed
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 197 deletions.
208 changes: 30 additions & 178 deletions src/toil_scripts/gatk_processing/gatk_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import logging
import errno
from toil.job import Job
from toil_scripts.lib.files import move_files

from toil_scripts import download_from_s3_url
from toil_scripts.lib.urls import s3am_upload
from toil_scripts.lib.programs import docker_call
from toil_scripts.tools.preprocessing import run_gatk_preprocessing

_log = logging.getLogger(__name__)

Expand All @@ -54,9 +56,7 @@ def build_parser():
parser.add_argument('-3', '--s3_dir', default=None, help='S3 Directory, starting with bucket name. e.g.: '
'cgl-driver-projects/ckcc/rna-seq-samples/')
parser.add_argument('-x', '--suffix', default=".bqsr", help='additional suffix, if any')
parser.add_argument('-u', '--sudo', dest='sudo', action='store_true', help='Docker usually needs sudo to execute '
'locally, but not''when running Mesos '
'or when a member of a Docker group.')
parser.add_argument('-u', '--unsafe', action='store_true', help='Runs GATK in unsafe mode.')
return parser

# Convenience functions used in the pipeline
Expand Down Expand Up @@ -166,23 +166,6 @@ def copy_to_output_dir(work_dir, output_dir, uuid=None, files=None):
else:
shutil.copy(os.path.join(work_dir, fname), os.path.join(output_dir, '{}.{}'.format(uuid, fname)))

def upload_or_move(job, work_dir, input_args, output):

# are we moving this into a local dir, or up to s3?
if input_args['output_dir']:
# get output path and
output_dir = input_args['output_dir']
# FIXME: undefined function
make_directory(output_dir)
move_to_output_dir(work_dir, output_dir, output)

elif input_args['s3_dir']:
s3am_upload(fpath=os.path.join(work_dir, output),
s3_dir=input_args['s3_dir'],
s3_key_path=input_args['ssec'])

else:
raise ValueError('No output_directory or s3_dir defined. Cannot determine where to store %s' % output)

def download_from_url_gatk(job, url, name):
"""
Expand Down Expand Up @@ -471,166 +454,34 @@ def mark_dups_sample(job, shared_ids, input_args):
# picard writes the index for file.bam at file.bai, not file.bam.bai
_move_bai(outpath)
shared_ids['sample.mkdups.bam.bai'] = job.fileStore.writeGlobalFile(outpath + ".bai")
job.addChildJobFn(realigner_target_creator, shared_ids, input_args, cores = input_args['cpu_count'])


def realigner_target_creator(job, shared_ids, input_args):
"""
Creates <type>.intervals file needed for indel realignment
job_vars: tuple Contains the input_args and ids dictionaries
sample: str Either "normal" or "tumor" to track which one is which
"""
work_dir = job.fileStore.getLocalTempDir()
sudo = input_args['sudo']
# Retrieve input file paths
read_from_filestore(job, work_dir, shared_ids, 'ref.fa',
'sample.mkdups.bam', 'ref.fa.fai', 'ref.dict',
'sample.mkdups.bam.bai', 'phase.vcf', 'mills.vcf')

# Output file path
output = os.path.join(work_dir, 'sample.intervals')
# Call: GATK -- RealignerTargetCreator
parameters = ['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY', # RISKY! (?) See #189
'-T', 'RealignerTargetCreator',
'-nt', str(input_args['cpu_count']),
'-R', 'ref.fa',
'-I', 'sample.mkdups.bam',
'-known', 'phase.vcf',
'-known', 'mills.vcf',
'--downsampling_type', 'NONE',
'-o', 'sample.intervals']

docker_call(work_dir=work_dir, parameters=parameters,
tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2',
inputs=['ref.fa','sample.mkdups.bam', 'ref.fa.fai', 'ref.dict',
'sample.mkdups.bam.bai', 'phase.vcf', 'mills.vcf'],
outputs={'sample.intervals': None},
env={'JAVA_OPTS':'-Xmx%sg' % input_args['memory']},
sudo=sudo)
shared_ids['sample.intervals'] = job.fileStore.writeGlobalFile(output)
job.addChildJobFn(indel_realignment, shared_ids, input_args)


def indel_realignment(job, shared_ids, input_args):
"""
Creates realigned bams using <sample>.intervals file from previous step
job_vars: tuple Contains the input_args and ids dictionaries
sample: str Either "normal" or "tumor" to track which one is which
"""
# Unpack convenience variables for job
work_dir = job.fileStore.getLocalTempDir()
sudo = input_args['sudo']
# Retrieve input file paths
return_input_paths(job, work_dir, shared_ids, 'ref.fa',
'sample.mkdups.bam', 'phase.vcf', 'mills.vcf',
'sample.intervals', 'ref.fa.fai', 'ref.dict',
'sample.mkdups.bam.bai')
# Output file path
outpath = os.path.join(work_dir, 'sample.indel.bam')
# Call: GATK -- IndelRealigner
parameters = ['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY', # RISKY! (?) See #189
'-T', 'IndelRealigner',
'-R', 'ref.fa',
'-I', 'sample.mkdups.bam',
'-known', 'phase.vcf',
'-known', 'mills.vcf',
'-targetIntervals', 'sample.intervals',
'--downsampling_type', 'NONE',
'-maxReads', str(720000),
'-maxInMemory', str(5400000),
'-o', 'sample.indel.bam']

docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2',
work_dir=work_dir, parameters=parameters,
inputs=['ref.fa', 'sample.mkdups.bam', 'phase.vcf', 'mills.vcf',
'sample.intervals', 'ref.fa.fai', 'ref.dict',
'sample.mkdups.bam.bai'],
outputs={'sample.indel.bam': None, 'sample.indel.bai': None},
env={'JAVA_OPTS':'-Xmx10g'})

# Write to fileStore
shared_ids['sample.indel.bam'] = job.fileStore.writeGlobalFile(outpath)
_move_bai(outpath)
shared_ids['sample.indel.bam.bai'] = job.fileStore.writeGlobalFile(outpath + ".bai")
job.addChildJobFn(base_recalibration, shared_ids, input_args, cores = input_args['cpu_count'])


def base_recalibration(job, shared_ids, input_args):
"""
Creates recal table to perform Base Quality Score Recalibration
job_vars: tuple Contains the input_args and ids dictionaries
sample: str Either "normal" or "tumor" to track which one is which
"""
# Unpack convenience variables for job
# job.addChildJobFn(realigner_target_creator, shared_ids, input_args, cores = input_args['cpu_count'])
gatk_preprocessing = job.wrapJobFn(run_gatk_preprocessing, input_args['cpu_count'],
bam=shared_ids['sample.mkdups.bam'],
bai=shared_ids['sample.mkdups.bam.bai'],
ref=shared_ids['ref.fa'],
ref_dict=shared_ids['ref.dict'],
fai=shared_ids['ref.fa.fai'],
phase=shared_ids['phase.vcf'],
mills=shared_ids['mills.vcf'],
dbsnp=shared_ids['dbsnp.vcf'],
unsafe=input_args['unsafe'])
job.addChild(gatk_preprocessing)
job.addFollowOn(upload_or_move, input_args, gatk_preprocessing.rv(0), gatk_preprocessing.rv(1))


# FIXME: Can be replaced with s3am_upload_job and move_file_job once #324 is merged
def upload_or_move(job, input_args, bam_id, bai_id):
work_dir = job.fileStore.getLocalTempDir()
sudo = input_args['sudo']
# Retrieve input file paths
return_input_paths(job, work_dir, shared_ids, 'ref.fa', 'sample.indel.bam',
'dbsnp.vcf', 'ref.fa.fai',
'ref.dict', 'sample.indel.bam.bai')
# Output file path
output = os.path.join(work_dir, 'sample.recal.table')
# Call: GATK -- IndelRealigner
parameters = ['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY', # RISKY! (?) See #189
'-T', 'BaseRecalibrator',
'-nct', str(input_args['cpu_count']),
'-R', 'ref.fa',
'-I', 'sample.indel.bam',
'-knownSites', 'dbsnp.vcf',
'-o', 'sample.recal.table']
docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2',
work_dir=work_dir, parameters=parameters,
inputs=['ref.fa', 'sample.indel.bam', 'dbsnp.vcf', 'ref.fa.fai',
'ref.dict', 'sample.indel.bam.bai'],
outputs={'sample.recal.table': None},
env={'JAVA_OPTS':'-Xmx%sg' % input_args['memory']}, sudo=sudo)
# Write to fileStore
shared_ids['sample.recal.table'] = job.fileStore.writeGlobalFile(output)
job.addChildJobFn(print_reads, shared_ids, input_args, cores = input_args['cpu_count'])


def print_reads(job, shared_ids, input_args):
"""
Create bam that has undergone Base Quality Score Recalibration (BQSR)
output_files = [os.path.join(work_dir, x) for x in [input_args['uuid'] + '.bam', input_args['uuid'] + '.bai']]
job.fileStore.readGlobalFile(bam_id, os.path.join(work_dir, input_args['uuid'] + '.bam'))
job.fileStore.readGlobalFile(bam_id, os.path.join(work_dir, input_args['uuid'] + '.bai'))
if input_args['output_dir']:
move_files(file_paths=output_files, output_dir=input_args['output_dir'])
if input_args['s3_dir']:
for output in output_files:
s3am_upload(fpath=output, s3_dir=input_args['s3_dir'],
num_cores=input_args['cpu_count'], s3_key_path=input_args['ssec'])

job_vars: tuple Contains the input_args and ids dictionaries
sample: str Either "normal" or "tumor" to track which one is which
"""
# Unpack convenience variables for job
uuid = input_args['uuid']
suffix = input_args['suffix']
work_dir = job.fileStore.getLocalTempDir()
sudo = input_args['sudo']
# Retrieve input file paths
return_input_paths(job, work_dir, shared_ids, 'ref.fa', 'sample.indel.bam',
'ref.fa.fai', 'ref.dict', 'sample.indel.bam.bai', 'sample.recal.table')
# Output file
outfile = '{}{}.bam'.format(uuid, suffix)
gatk_outfile_idx = '{}{}.bai'.format(uuid, suffix)
outfile_idx = '{}{}.bam.bai'.format(uuid, suffix)
outpath = os.path.join(work_dir, outfile)
# Call: GATK -- PrintReads
parameters = ['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY', # RISKY! (?) See #189
'-T', 'PrintReads',
'-nct', str(input_args['cpu_count']),
'-R', 'ref.fa',
'--emit_original_quals',
'-I', 'sample.indel.bam',
'-BQSR', 'sample.recal.table',
'-o', outfile]
docker_call(tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2',
work_dir=work_dir, parameters=parameters,
inputs=['ref.fa', 'sample.indel.bam', 'ref.fa.fai', 'ref.dict',
'sample.indel.bam.bai', 'sample.recal.table'],
outputs={outfile: None, gatk_outfile_idx: None},
env={'JAVA_OPTS':'-Xmx%sg' % input_args['memory']})

upload_or_move(job, work_dir, input_args, outfile)
_move_bai(outpath)
upload_or_move(job, work_dir, input_args, outfile_idx)

def main():
"""
Expand All @@ -652,6 +503,7 @@ def main():
'ssec': pargs.ssec,
'suffix': pargs.suffix,
'cpu_count': multiprocessing.cpu_count(), # FIXME: should not be called from toil-leader, see #186
'unsafe': pargs.unsafe,
'memory': '15'}

# Launch Pipeline
Expand Down
Loading

0 comments on commit 8cd5eed

Please sign in to comment.