Skip to content

Commit

Permalink
updating script to select for genomes with bacterial hosts added
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu committed Oct 18, 2023
1 parent 290cf40 commit 81bcd29
Showing 1 changed file with 65 additions and 14 deletions.
79 changes: 65 additions & 14 deletions benchmarking/scripts/viral_genome_composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
Usage:
python benchmarking/scripts/viral_genome_composition.py examples/genomes/viral_genome_dir/
To transfer viruses consisting of the top 15 genomes:
python benchmarking/scripts/viral_genome_composition.py examples/genomes/viral_genome_dir/ subselected/ 15
"""
from pathlib import Path
import sys
from typing import List
from typing import List, Dict, Set
import os
from collections import Counter

import shutil



Expand Down Expand Up @@ -43,25 +46,31 @@ def get_virus_name(fasta_string_list : List[str]):
fasta_string_list[index] = " ".join(fasta_string.split(",")[0].split(" ")[1:])
return fasta_string_list

def get_viral_names_from_directory(genome_directory_path : Path):
def get_viral_names_from_directory(genome_directory_path : Path) -> Dict[Path, List[str]]:
"""
Obtain the genomic names from all genomic files in a directory.
Returns:
viral_names (Dict[Path, List[str]]) : mapping from file path to parsed viral names.
one name if one entry (in the case of complete genomes.)
"""
viral_names = []
path2viral_names = {}
for genome_file in os.listdir(genome_directory_path):
preparsed_viral_names = get_fasta_names(os.path.join(genome_directory_path, genome_file))
viral_names += get_virus_name(preparsed_viral_names)
return viral_names
genome_file_path = Path(os.path.join(genome_directory_path, genome_file))
preparsed_viral_names = get_fasta_names(genome_file_path)
path2viral_names[genome_file_path] = get_virus_name(preparsed_viral_names)
return path2viral_names

def genus_count(viral_names : List[str]) -> Counter:
def genus_count(path2viral_names : Dict[Path, List[str]]) -> Counter:
"""
Get counts of each genus from a list of viral names. This
method assumes the first string is the genus.
"""
genus_count = Counter()
for viral_name in viral_names:
genus_name = viral_name.split(" ")[0]
genus_count[genus_name] += 1
for genome_path, viral_names in path2viral_names.items():
for viral_name in viral_names:
genus_name = viral_name.split(" ")[0]
genus_count[genus_name] += 1
return genus_count

def print_names_as_csv(genus_counts : Counter):
Expand All @@ -71,10 +80,52 @@ def print_names_as_csv(genus_counts : Counter):
for name, count in genus_counts.most_common():
print(f"{name},{count}")

def get_viralgenus2paths(path2viral_names : Dict[Path, List[str]],
genus_name_counter : Counter,
top_n : int) -> Dict[Path, str]:
"""
Get the paths for the top N genomes.
"""
# get top genus names.
top_genus_names = set()
counter = 0
for name, count in genus_name_counter.most_common():
if counter >= top_n: break
top_genus_names.add(name)
counter += 1

# get paths to top n genomes
viralgenus2paths = {}
for genome_path, viral_names in path2viral_names.items():
for viral_name in viral_names:
genus_name = viral_name.split(" ")[0]
if genus_name in top_genus_names:
viralgenus2paths[genome_path] = genus_name

return viralgenus2paths

if __name__ == "__main__":
genome_directory = Path(sys.argv[1])
new_directory = None
if len(sys.argv) > 2:
new_directory = Path(sys.argv[2])
number_of_hosts = int(sys.argv[3])

# parse genomes and count genus occurances
viral_names = get_viral_names_from_directory(genome_directory)
genus_name_count = genus_count(viral_names)
print_names_as_csv(genus_name_count)
path2viral_names = get_viral_names_from_directory(genome_directory)
genus_name_count = genus_count(path2viral_names)
print_names_as_csv(genus_name_count)

if new_directory != None:
# get directory of genomes with top N hosts.
paths2genus = get_viralgenus2paths(path2viral_names, genus_name_count, number_of_hosts)

# create new directory if not exists.
if not os.path.exists(new_directory):
os.mkdir(new_directory)

# copy files over
with open(new_directory.name + ".csv", "w") as outfile_csv:
for viral_path, genus in paths2genus.items():
shutil.copyfile(viral_path, os.path.join(new_directory, viral_path.name))
outfile_csv.write(f"{viral_path.name},{genus}\n")

0 comments on commit 81bcd29

Please sign in to comment.