Revision 469689991e7dbce2be4c4f618584304d91841c49 authored by Chase W. Nelson on 01 October 2020, 17:39:04 UTC, committed by Chase W. Nelson on 01 October 2020, 17:39:04 UTC
1 parent 0b9f8cb
generate_seqs_from_VCF.py
#! /usr/bin/env python
###############################################################################
## LICENSE
##
## Copyright (C) 2020
##
## This program is free software: you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program. If not, see <http://www.gnu.org/licenses/>.
###############################################################################
# DATE: 2020
# AUTHOR: Chase W. Nelson
# AFFILIATION: Biodiversity Research Center, Academia Sinica, Taipei, Taiwan
# CONTACT: cnelson@gate.sinica.edu.tw
# FUNDING: Academia Sinica Postdoctoral Research Fellowship, P.I. Wen-Hsiung Li
# CITATION: https://github.com/chasewnelson/SARS-CoV-2-ORF3d
from Bio import SeqIO
import os
import random
import re
import sys
Usage = "Script to generate a FASTA with randomly interspersed variants (from VCF)"
# Example:
# generate_seqs_from_VCF.py reference.fasta variants.vcf 1000
# Check argument number
if not len(sys.argv) == 4:
raise SystemExit("\n### TERMINATED: need 3 unnamed arguments:\n" + \
" # (1) FASTA reference (1 sequence)\n" + \
" # (2) .VCF SNP report\n" + \
" # (3) number of sequences to generate\n\n")
infile_FASTA = sys.argv[1]
infile_VCF = sys.argv[2]
num_seqs = int(sys.argv[3])
print("FASTA reference: " + str(infile_FASTA))
print("VCF with variants: " + str(infile_VCF))
print("Num sequences to generate: " + str(num_seqs))
# Check infiles exist
if not os.path.isfile(infile_FASTA):
raise SystemExit("\n### TERMINATED: infile 1 (FASTA reference) does not exist\n")
if not os.path.isfile(infile_VCF):
raise SystemExit("\n### TERMINATED: infile 2 (.VCF SNP report) does not exist\n")
# Create outfile_prefix name
outfile_prefix = str(infile_VCF)
outfile_prefix = outfile_prefix.replace(".vcf", "") # TODO: learn regex
outfile_prefix = outfile_prefix.replace(".fasta", "") # TODO: learn regex
outfile_prefix = outfile_prefix.replace(".txt", "") # TODO: learn regex
outfile_prefix = outfile_prefix.replace(".tsv", "") # TODO: learn regex
outfile_name = outfile_prefix + "_nSeqs" + str(num_seqs) + ".fasta"
print("Will write output to: " + outfile_name + "\n")
###############################################################################
### Extract SNP data from VCF
# Build a dictionary of site(int)->nt(str)->freqs(list)
# This will later be used to find mean frequency across all samples
site_nt_freqs = {}
site_nt_counts = {}
# Prepare regex
VCF_AF_pattern = r"AF=[\d\.]+" # AF=0.015038
VCF_AF_regex = re.compile(VCF_AF_pattern)
#for this_VCF in VCF_filenames:
if os.path.isfile(infile_VCF): # this_VCF
with open(infile_VCF, "r") as f:
lines = (line.rstrip() for line in f)
this_VCF_ID = str(infile_VCF).rstrip(".vcf")
variant_count = 0
for line in lines:
if not line.startswith("#"): # skip headers
variant_count += 1
line_list = line.split("\t")
# Extract information for this SNP
this_POS = int(line_list[1])
this_ALT = str(line_list[4])
this_INFO = line_list[7]
this_AF = VCF_AF_regex.search(this_INFO)
this_AF = this_INFO[this_AF.start():this_AF.end()] # AF=0.367003
this_AF = float(this_AF.replace("AF=", ""))
# Add data for this record
if this_POS not in site_nt_freqs.keys():
site_nt_freqs[this_POS] = {}
site_nt_counts[this_POS] = {}
site_nt_freqs[this_POS][this_ALT] = this_AF
site_nt_counts[this_POS][this_ALT] = 1
elif this_ALT not in site_nt_freqs[this_POS]:
site_nt_freqs[this_POS][this_ALT] = {}
site_nt_counts[this_POS][this_ALT] = {}
site_nt_freqs[this_POS][this_ALT] = this_AF
site_nt_counts[this_POS][this_ALT] = 1
else:
site_nt_freqs[this_POS][this_ALT] += this_AF
site_nt_counts[this_POS][this_ALT] += 1
else:
raise SystemExit("\n### TERMINATED: file doesn't exist: " + str(infile_VCF) + "\n")
###############################################################################
### Record reference sequence
# Open FASTA for reading
rec = SeqIO.read(infile_FASTA, "fasta") # will throw error if more than one record!
ref_seq = str(rec.seq)
ref_seq_len = len(ref_seq)
###############################################################################
### Build lists of nucleotides to observe at each site, from which we will draw without replacement
site_nt_lists = {}
for this_site in site_nt_freqs.keys():
this_nt_REF = ref_seq[this_site - 1]
for this_nt_ALT in site_nt_freqs[this_site].keys():
this_nt_ALT_freq = float(site_nt_freqs[this_site][this_nt_ALT]) / float(site_nt_counts[this_site][this_nt_ALT])
num_ALT = int(round(this_nt_ALT_freq * num_seqs)) # rounded to nearest int
num_REF = num_seqs - num_ALT
if num_REF < 0:
raise SystemExit("\n### TERMINATED: negative number of nucleotides implied at site: " + str(this_site) + "\n")
else:
# Add reference nucleotides
for ref_i in range(num_REF):
if this_site in site_nt_lists.keys():
site_nt_lists[this_site].append(str(this_nt_REF))
else:
site_nt_lists[this_site] = [str(this_nt_REF)]
# Add alternate nucleotides
for alt_i in range(num_ALT):
if this_site in site_nt_lists:
site_nt_lists[this_site].append(str(this_nt_ALT))
else:
site_nt_lists[this_site] = [str(this_nt_ALT)]
###############################################################################
### Randomize the order of nucleotides at each site
this_seed = random.randrange(sys.maxsize)
random.seed(this_seed)
print("\nRandom number seed for nucleotide incorporation: " + str(this_seed) + "\n")
for this_site in site_nt_lists.keys():
random.shuffle(site_nt_lists[this_site])
###############################################################################
### Build FASTA alignment with variants!
# Open FASTA for writing
outfile_hdl = open(outfile_name, "w")
for this_seq_idx in range(num_seqs):
this_seq = str(ref_seq)
this_seq_ID = str(outfile_prefix) + "_pseudoseq_" + str(this_seq_idx + 1)
# Make sure we used all the nucleotides
for this_site_idx in range(len(this_seq)):
this_site_num = this_site_idx + 1
if this_site_num in site_nt_lists:
this_seq = this_seq[:this_site_idx] + site_nt_lists[this_site_num].pop() + this_seq[(this_site_idx + 1):]
# else it's the REF for sure; leave it
# Write to FASTA file
outfile_hdl.write(">" + this_seq_ID + "\n")
outfile_hdl.write(str(this_seq) + "\n")
# Close FASTA output file
outfile_hdl.close()
# Make sure we used all the nucleotides were used
for this_seq_idx in range(num_seqs):
this_site_num = this_seq_idx + 1
if this_site_num in site_nt_lists and len(site_nt_lists[this_site_num]) > 0:
raise SystemExit("\n### WARNING: sequences remain at site " + str(this_site_num) + ". BIG PROBLEM.\n")
###############################################################################
### FINISH
print("\n")
raise SystemExit("\n### All done; we stopped here, dear researcher.\n")
Computing file changes ...