Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed library & taxonomy download functions and added multiprocessing to speed up downloads #698

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 74 additions & 55 deletions scripts/k2
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3


import argparse
import bz2
import ftplib
Expand All @@ -23,6 +24,7 @@ import urllib
import urllib.error
import urllib.parse
import urllib.request
import multiprocessing

LOG = None
SCRIPT_PATHNAME = None
Expand Down Expand Up @@ -297,12 +299,15 @@ def find_kraken2_binary(name):
if os.path.exists(os.path.join(script_parent_directory, name)):
binary = os.path.join(script_parent_directory, name)
if binary is None:
project_root = get_parent_directory(script_parent_directory)
if "src" in os.listdir(project_root) and name in os.listdir(
os.path.join(project_root, "src")
):
binary = os.path.join(project_root, os.path.join("src", name))
if not binary:
try:
binary = str(shutil.which('kraken2').removesuffix('/bin/kraken2'))+ "/libexec/"+name
except FileNotFoundError :
project_root = get_parent_directory(script_parent_directory)
if "src" in os.listdir(project_root) and name in os.listdir(
os.path.join(project_root, "src")
):
binary = os.path.join(project_root, os.path.join("src", name))
else:
LOG.error("Unable to find {:s}, exiting\n".format(name))
sys.exit(1)
return binary
Expand Down Expand Up @@ -518,11 +523,11 @@ def add_to_library(args):
with open(filename, "r") as in_file:
scan_fasta_file(in_file, out_file, lenient=True)
shutil.copyfile(filename, destination)
if not args.no_masking:
mask_files(
[destination], destination + ".masked", protein=args.protein
)
shutil.move(destination + ".masked", destination)
#if not args.no_masking:
# mask_files(
# [destination], destination + ".masked", protein=args.protein
# )
# shutil.move(destination + ".masked", destination)
with open("added.md5", "a") as out_file:
out_file.write(filehash + "\t" + destination + "\n")
LOG.info("Added " + filename + " to library " + args.db + "\n")
Expand Down Expand Up @@ -564,7 +569,8 @@ def assign_taxid_to_sequences(manifest_to_taxid, is_protein):
max_out_chars = 0
for filepath in sorted(manifest_to_taxid):
taxid = manifest_to_taxid[filepath]
with gzip.open(filepath) as in_file:
filepath = os.path.basename(filepath.removesuffix('.gz'))
with open(filepath, 'rb+') as in_file:
while True:
line = in_file.readline()
if line == b"":
Expand Down Expand Up @@ -674,6 +680,11 @@ def download_file(url, local_name=None):
clear_console_line()
LOG.info("Saved {:s} to {:s}\n".format(local_name, os.getcwd()))

def download_multiple_files(url):
## download ~31654 files of the bacteria library in parallel, depends on the amount of allocated cpus
filename = os.path.basename(url).removesuffix('.gz')
if not os.path.isfile(filename):
subprocess.call(['wget', '-q', url])

def make_manifest_filter(file, regex):
def inner(listing):
Expand Down Expand Up @@ -701,6 +712,27 @@ def get_manifest(server, remote_path, regex):
ftp.retrlines("LIST", callback=make_manifest_filter(f, regex))
ftp.close()

def purge(pattern):
dir = os.getcwd()
for f in os.listdir(dir):
if re.search(pattern, f):
os.remove(os.path.join(dir, f))


def multiprocess_urls(remote_dir, filepaths):
server = 'https://ftp.ncbi.nih.gov'
pool = multiprocessing.Pool(processes=50)
url_list = []
for f in filepaths:
ftp_url = server+remote_dir+f
url_list.append(ftp_url)
## Download the files in the fna format
print("Downloading "+ str(len(url_list))+ " files.")
pool.map(download_multiple_files, url_list)
## unzip the files
pool.close()
pool.join()


def download_files_from_manifest(
server,
Expand All @@ -717,26 +749,16 @@ def download_files_from_manifest(
i = 0
for filepath in f:
LOG.info(
"Checking if manifest files exist on server {:s}\r".format(
"Generating file list from manifest {:s}\r".format(
spinner[i % 4]
)
)
i += 1
filepath = filepath.strip()
if not ftp.exists(urllib.parse.urljoin(remote_dir, filepath)):
if filepath_to_taxid_table:
del filepath_to_taxid_table[filepath]
LOG.warning(
"{:s} does not exist on server, skipping\n".format(
remote_dir + filepath
)
)
continue
filepaths.append(filepath)
ftp.download(remote_dir, filepaths)
ftp.close()
multiprocess_urls(remote_dir, filepaths)
if decompress:
decompress_files(filepaths, out_filename)
subprocess.call("gunzip -f *.gz",shell=True)


def download_taxonomy(args):
Expand All @@ -747,29 +769,23 @@ def download_taxonomy(args):
if not args.skip_maps:
if not args.protein:
for subsection in ["gb", "wgs"]:
LOG.info(
"Downloading nucleotide {:s} accession to taxon map\n".format(
subsection
)
)
filename = "nucl_" + subsection + ".accession2taxid.gz"
ftp.download("/pub/taxonomy/accession2taxid/", filename)
uncompressed_file = "nucl_" + subsection + ".accession2taxid"
if not os.path.exists(uncompressed_file) and not os.path.exists(filename):
LOG.info("Downloading nucleotide {:s} accession to taxon map\n".format(subsection))

subprocess.call(f"wget -q https://{NCBI_SERVER}/pub/taxonomy/accession2taxid/{filename}", shell=True)
else:
LOG.info("Downloading protein accession to taxon map\n")
ftp.download(
"/pub/taxonomy/accession2taxid", "prot.accession2taxid.gz"
)
subprocess.call(f"wget -q https://{NCBI_SERVER}/pub/taxonomy/accession2taxid/prot.accession2taxid.gz", shell=True)
LOG.info("Downloaded accession to taxon map(s)\n")
LOG.info("Downloading taxonomy tree data\n")
ftp.download("/pub/taxonomy", "taxdump.tar.gz")
ftp.close()
subprocess.call(f"wget -q https://{NCBI_SERVER}/pub/taxonomy/taxdump.tar.gz", shell=True)
LOG.info("Decompressing taxonomy data\n")
decompress_files(glob.glob("*accession2taxid.gz"))
LOG.info("Untarring taxonomy tree data\n")
with tarfile.open("taxdump.tar.gz", "r:gz") as tar:
tar.extractall()
remove_files(glob.glob("*.gz"))

subprocess.call("tar xzf taxdump.tar.gz", shell=True)
LOG.info("Decompressing nucleotides\n")
subprocess.call("gunzip -f *.gz",shell=True)
purge('taxdump.tar')

def download_genomic_library(args):
library_filename = "library.faa" if args.protein else "library.fna"
Expand All @@ -796,10 +812,10 @@ def download_genomic_library(args):
if args.library == "human":
remote_dir_name = "vertebrate_mammalian/Homo_sapiens"
try:
url = "ftp://{:s}/genomes/refseq/{:s}/assembly_summary.txt".format(
url = "https://{:s}/genomes/refseq/{:s}/assembly_summary.txt".format(
NCBI_SERVER, remote_dir_name
)
download_file(url)
subprocess.call(f"wget -q {url}", shell=True)
except urllib.error.URLError:
LOG.error(
"Error downloading assembly summary file for {:s}, exiting\n".format(
Expand All @@ -821,6 +837,7 @@ def download_genomic_library(args):
download_files_from_manifest(
NCBI_SERVER,
"/genomes/",
decompress=True,
filepath_to_taxid_table=filepath_to_taxid_table,
)
assign_taxid_to_sequences(filepath_to_taxid_table, args.protein)
Expand Down Expand Up @@ -872,8 +889,9 @@ def download_genomic_library(args):
library_pathname = os.path.join(library_pathname, args.library)
os.makedirs(library_pathname, exist_ok=True)
os.chdir(library_pathname)
ftp = FTP(NCBI_SERVER)
ftp.download("pub/UniVec", args.library)
## UniVec_Core
print('Downloading UniVec_Core'+'\n')
subprocess.call('wget https://ftp.ncbi.nlm.nih.gov/pub/UniVec/UniVec_Core', shell=True)
special_taxid = 28384
LOG.info(
"Adding taxonomy ID of {:d} to all sequences\n".format(
Expand All @@ -894,11 +912,11 @@ def download_genomic_library(args):
with open("prelim_map.txt", "w") as out_file:
scan_fasta_file(in_file, out_file)
files_to_remove = [args.library]
if not args.no_masking:
mask_files(
[library_filename], library_filename + ".masked", args.protein
)
shutil.move(library_filename + ".masked", library_filename)
#if not args.no_masking:
# mask_files(
# [library_filename], library_filename + ".masked", args.protein
# )
# shutil.move(library_filename + ".masked", library_filename)
LOG.info("Added {:s} to {:s}\n".format(args.library, args.db))
if files_to_remove:
clean_up(files_to_remove)
Expand Down Expand Up @@ -987,7 +1005,8 @@ def build_kraken2_db(args):
os.chdir(args.db)
if not os.path.isdir("taxonomy"):
LOG.error("Cannot find taxonomy subdirectory in database\n")
sys.exit(1)
LOG.info("Downloading taxonomy...\n")
download_taxonomy(args)
if not os.path.isdir("library"):
LOG.error("Cannot find library subdirectory in database\n")
sys.exit(1)
Expand Down Expand Up @@ -1042,7 +1061,7 @@ def build_kraken2_db(args):
)
else:
LOG.error(
"Accession to taxid map files are required to build this database.\n"
"Accession to taxid map files are required to build th database.\n"
)
LOG.error(
"Run k2 download-taxonomy --db {:s} again".format(args.db)
Expand All @@ -1055,7 +1074,7 @@ def build_kraken2_db(args):
argv = [estimate_capacity_binary, "-S", construct_seed_template(args)]
if args.protein:
argv.append("-X")
wrapper_args_to_binary_args(
wrapper_args_to_binary_args(
args, argv, get_binary_options(estimate_capacity_binary)
)
fasta_filenames = glob.glob(
Expand Down Expand Up @@ -1371,7 +1390,7 @@ def build_gg_taxonomy(in_file):
display_name = genus + " " + species
else:
match = re.search("([a-z])__([^;]+)$", node)
if match:
if match:
rank = rank_codes[match.group(1)]
display_name = match.group(2)
rank = rank or "no rank"
Expand Down