import numpy as np
import pandas as pd
import re
import regex
import subprocess
from io import StringIO
from Bio import SeqIO
from Bio.Seq import MutableSeq
from Bio.Seq import Seq
import multiprocessing as mp
from pyfaidx import Fasta
import os
"""
Defining the Cas class. And defininh all functions to create Cas objects, and creating some set Cas objects
"""
[docs]
class Cas:
def __init__(self, name, pam_three_prime, pam_five_prime, pam_length,
exclusion_positions, is_five_prime, is_three_prime, is_multi_pam):
self.name = name
self.pam_three_prime = pam_three_prime
self.pam_five_prime = pam_five_prime
self.pam_length = pam_length
self.exclusion_positions = exclusion_positions
self.is_five_prime = is_five_prime
self.is_three_prime = is_three_prime
self.is_multi_pam = is_multi_pam
SpCas9 = Cas(
name='SpCas9',
pam_three_prime=r'(?P<p0>[ACTG]GG)',
pam_five_prime=r'(?P<p0>CC[ACTG])',
pam_length=3,
exclusion_positions=[[-1]],
is_five_prime=False,
is_three_prime=True,
is_multi_pam=False
)
enAsCas12a = Cas(
name='enAsCas12a',
pam_three_prime=r'(?P<p0>[ACGT][GA]AA)|(?P<p1>[TGC]AA[TGC])|(?P<p2>[TGC]A[TC]A)|(?P<p3>GG[ACGT][TG])',
pam_five_prime=r'(?P<p0>TT[CT][ACGT])|(?P<p1>[ACG]TT[ACG])|(?P<p2>T[AG]T[ACG])|(?P<p3>[AC][ACGT]CC)',
pam_length=4,
exclusion_positions=[[-1, 2], [-1, -4], [-1], [-3, 2]],
is_five_prime=True,
is_three_prime=False,
is_multi_pam=True
)
SpCas9_NG = Cas(
name='SpCas9-NG',
pam_three_prime= r'(?P<p0>[ACTG]G)',
pam_five_prime=r'(?P<p0>C[ACTG])',
pam_length=2,
exclusion_positions=[[-1]],
is_five_prime=False,
is_three_prime=True,
is_multi_pam=False
)
SaCas9 = Cas(
name='SaCas9',
pam_three_prime=r'(?P<p0>[ACTG][ACTG]G[AG][AG]T)',
pam_five_prime=r'(?P<p0>A[TC][TC]C[ACTG][ACTG])',
pam_length=6,
exclusion_positions=[[-1, -2]],
is_five_prime=False,
is_three_prime=True,
is_multi_pam=False
)
"""
function to output a regex pattern from IUPAC PAM code.
Arguments:
custom_pam_list: list of PAM sequences in IUPAC code
Returns:
custom_pam_list_regex: list of PAM sequences in regex format
"""
[docs]
def create_regex_pam(custom_pam_list):
replace_dict = {
'R': '[AG]',
'Y': '[CT]',
'S': '[GC]',
'W': '[AT]',
'K': '[GT]',
'M': '[AC]',
'B':'[CGT]',
'D': '[AGT]',
'H':'[ACT]',
'V': '[ACG]',
'N': '[ACGT]'
}
custom_pam_list_regex = []
for pam in custom_pam_list:
for char in pam:
if char in replace_dict.keys():
pam = pam.replace(char, replace_dict[char])
else:
pass
custom_pam_list_regex.append(pam)
return custom_pam_list_regex
"""
function to create the exclusion position set for a given custom PAM list, and orientation
Arguments:
custom_pam_list: list of PAM sequences in IUPAC code
orient: PAM orientation, '5prime' or '3prime'
Returns:
exclusion_pos: list of exclusion positions for each PAM
"""
[docs]
def create_exclusion_pos_set(custom_pam_list, orient):
exclusion_codes = 'BDHVN'
all_indices = []
pam_len = len(custom_pam_list[0])
for pam in custom_pam_list:
pamwise_indices = []
for index, code in enumerate(pam):
if code in exclusion_codes:
pamwise_indices.append(index)
all_indices.append(pamwise_indices)
exclusion_pos = []
if orient == '5prime':
for pamwise_indices in all_indices:
pamwise_exclusion_pos = []
for index in pamwise_indices:
pos = index - pam_len
pamwise_exclusion_pos.append(pos)
exclusion_pos.append(pamwise_exclusion_pos)
else:
for pamwise_indices in all_indices:
pamwise_exclusion_pos = []
for index in pamwise_indices:
pos = - 1 - index
pamwise_exclusion_pos.append(pos)
exclusion_pos.append(pamwise_exclusion_pos)
return exclusion_pos
"""
function to create a custom cas_obj
Arguments:
custom_pam: list of PAM sequences in IUPAC code
orient: PAM orientation, '5prime' or '3prime'
Returns:
custom_cas: custom Cas object
"""
[docs]
def create_custom_cas_obj(custom_pam, orient):
#custom_pam can have multiple values (multi-PAM)
pam_len = len(custom_pam[0])
custom_pam_rc = []
for pam in custom_pam:
pam_seq = Seq(pam)
pam_seq_rc = pam_seq.reverse_complement()
custom_pam_rc.append(str(pam_seq_rc))
custom_pam_regex = '|'.join(f"(?P<p{i}>{pam})" for i, pam in enumerate(create_regex_pam(custom_pam)))
custom_pam_rc_regex = '|'.join(f"(?P<p{i}>{pam})" for i, pam in enumerate(create_regex_pam(custom_pam_rc)))
if orient == '5prime':
pam_five_prime = custom_pam_regex
pam_three_prime = custom_pam_rc_regex
is_five_prime=True
is_three_prime=False
else:
pam_five_prime = custom_pam_rc_regex
pam_three_prime = custom_pam_regex
is_five_prime=False
is_three_prime=True
if len(custom_pam) > 1:
is_multi_pam = True
else:
is_multi_pam = False
exclusion_positions = create_exclusion_pos_set(custom_pam, orient)
custom_cas = Cas(
name='custom_cas',
pam_length=pam_len,
pam_three_prime=pam_three_prime,
pam_five_prime=pam_five_prime,
exclusion_positions=exclusion_positions,
is_five_prime=is_five_prime,
is_three_prime=is_three_prime,
is_multi_pam=is_multi_pam
)
return custom_cas
# ESSNTIAL FUNCTIONS: PREPROCESSING VCF FILES AND FASTA FILES INPUTTED.
"""
Import fasta file for required chromosome
Arguments:
fa_filename: string filename
Returns:
chseq: sequence of the chromosome
"""
[docs]
def makeseq(fa_filename):
file_path = fa_filename
with open(file_path, 'r') as fasta_file:
# Parse the FASTA file
for record in SeqIO.parse(fasta_file, 'fasta'):
sequence_id = record.id
sequence = record.seq
chseq = record.seq
return chseq
"""
Returns a table of genotypes at each SNP location, with SNPs filtered by genomic region, heterozygosity in a cell line, or allele frequency in a population database
Arguments:
vcf_file: string filename
locus: genomic region in format chr#:startpos-endpos
variants_db: 'population' or 'cell-line' to handle filtering of SNPs differently
af_threshold: default 0.1, select SNPs with minor allele frequency above this threshold
Returns:
gens_df: dataframe of genotypes at each SNP location
"""
[docs]
def create_gens(vcf_file, locus, variants_db, af_threshold = 0.1): #for snps, not indels rn
#get chrom formatting in vcf file
cmd = f"bcftools view {vcf_file} | grep -v '^#' | head -n 1 | cut -f1"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
chrom_formatting = result.stdout.strip()
if chrom_formatting.startswith('chr'):
pass
else:
locus = locus[3:]
buffer = 0.01
t_adj = max(0.0, af_threshold - buffer)
# handle population and cell-line variant databases differently because population will use allele frequency
if variants_db.startswith('population'):
cmd = (
f'bcftools view -m2 -M2 -v snps -g ^miss -q {t_adj}:minor -r {locus} {vcf_file} -Ou | bcftools query -f"%CHROM\t%POS\t%ID\t%REF\t%ALT\t%INFO/AF\n"'
)
col_names_sim = ["chrom", "pos", "snp_id", "ref", "alt", "alt AF"]
elif variants_db.startswith('cell-line'):
cmd = (
f'bcftools view -v snps -g ^miss -g het -r {locus} {vcf_file} -Ou | bcftools query -f"%CHROM\t%POS\t%ID\t%REF\t%ALT[\t%GT]\n"'
)
col_names_sim = ["chrom", "pos", "snp_id", "ref", "alt", "genotype"]
else:
raise ValueError(
"var_types must be either 'cell-line' or 'population' optionally followed by numbers if inputting multiple of each type"
)
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
)
# Check for bcftools errors
if result.returncode != 0:
raise RuntimeError(
f"bcftools command failed with error:\n{result.stderr}"
)
# Parse into a DataFrame
gens = pd.read_csv(StringIO(result.stdout), sep="\t", header=None)
gens.columns = col_names_sim
#add another column to gens to store the present alleles acccording to the genotype. Hence, for cell-line vcfs, if the file is phased, this is information about which alleles are phased together. If file is not phased, this helps store the alleles present in the cell-line in case any of the snps are multi-allelic, and the ref version isn't present. This prevents gRNA being made with a version of the snp that isn't present in the cell-line.
present_alleles_list = []
for i in range(len(gens)):
alleles = gens['ref'][i] + '(ref)' + ',' + gens['alt'][i]
alleles = alleles.split(',')
for a in range(len(alleles)):
if a > 0:
alleles[a] = alleles[a] + '(alt)'
else:
continue
if variants_db.startswith('cell-line'):
gt = gens['genotype'][i]
gt = regex.split(r'[\/|]', gt)
allele_index1 = int(gt[0])
allele_index2 = int(gt[1])
else:
#if its a population gensdf, I am choosing to not deal with multi-allelic snps at the moment. simpy use ref and the first alt allele as the present alleles.
#want to have a uniform present alleles column in cell-line and population gensdfs so we can extract allelic information downstream in the same way for everything
allele_index1 = 0
allele_index2 = 1
#using set allele_indices, extract the corresponding alleles to generate the present_alleles data
present_alleles = (alleles[allele_index1], alleles[allele_index2])
present_alleles_list.append(present_alleles)
gens['present alleles'] = present_alleles_list
return gens
"""
Returns an immutable sequence with either ref or alt forms of SNP targets of interest
Arguments:
gens_df: dataframe of genotypes at each SNP location
refseq: reference sequence to be mutated
snpform: 'allele1' or 'allele2', arbitrary identity of allele, assigned in create_gens function
Returns:
immutable_seq: Seq() object of an immutable sequence with either ref or alt (allele1 or allele2) forms of SNP targets of interest
"""
[docs]
def getaltseq(gens_df, refseq, snpform = 'allele1'):
mutable_seq = MutableSeq(refseq)
gens_df['pos'] = gens_df['pos'].astype(int)
for i in range(len(gens_df)):
if snpform == 'allele1':
#picks out ith eentry in present_alleles col (eg: (T(ref), C(alt))), 0th entry of the tuple (T(ref)), and 0th element of that string (T) to get the present allele in allele1
present_allele = gens_df['present alleles'][i][0][0]
mutable_seq[gens_df['pos'][i]-1] = present_allele
elif snpform == 'allele2':
#same thing for allele2, but 1th entry in the tuple to get the allele in allele2
present_allele = gens_df['present alleles'][i][1][0]
mutable_seq[gens_df['pos'][i]-1] = present_allele
immutable_seq = Seq(mutable_seq)
return immutable_seq
# MEAT OF THE PIPELINE: FIND GUIDES FUNCTION
"""
Find guides in given sequence around given loci for particular Cas species
Arguments:
snplist: dataframe of SNP locations
sequence: sequence to find guides in
cas_obj: Cas object
max_snp_pos_in_protospacer: maximum SNP position in protospacer
guide_len: length of guide
Returns:
guides_df: dataframe of guides found
"""
[docs]
def find_guides(snplist, sequence, cas_obj, max_snp_pos_in_protospacer, guide_len):
pam_len = cas_obj.pam_length
guidelist = []
PAMlist = []
offtarget_counts_list = []
chrom_list = []
start_list = []
end_list = []
SNPcoordinates = []
rsIDs = []
variation = []
strand = []
SNPallele = []
guidename = []
pos_in_protospacer_or_pam = [] #1,2,3... for pos in protospacer, or -1,-2,-3 for pos in PAM starting from N
for i in range(len(snplist)):
snppos_index = int(snplist['pos'][i]-1)
snpregion = str(sequence[snppos_index-(guide_len+(pam_len-1)):snppos_index+(guide_len+pam_len)]) #snp is at sequence[pos-1]
gc_index_conversion_num = snppos_index-(guide_len+(pam_len-1)) + 1 # index + gc_index_conversion_num = genomic coordinate index in string, +1 = genomic coordinate
#find downstream pams on plus strand
plus_starts = []
plus_matches = []
plus_matched_group_list = []
for pam in regex.finditer(cas_obj.pam_three_prime, snpregion, regex.IGNORECASE, overlapped=True):
plus_starts.append(pam.start())
plus_matches.append(pam.group(0))
matched_group = None
for name in pam.groupdict():
if pam.group(name):
matched_group = name
break
plus_matched_group_list.append(matched_group)
#find upstream pams on minus strand
minus_starts = []
minus_matches = []
minus_matched_group_list = []
for pam in regex.finditer(cas_obj.pam_five_prime, snpregion, regex.IGNORECASE, overlapped=True):
minus_starts.append(pam.start())
minus_matches.append(pam.group(0))
matched_group = None
for name in pam.groupdict():
if pam.group(name):
matched_group = name
break
minus_matched_group_list.append(matched_group)
snppos_in_snpregion = guide_len+pam_len-1
chrom_num = str(re.findall(r'\d+', str(snplist['chrom'][0]))[0])
#find guides on plus strand
for j in range(len(plus_starts)):
start = plus_starts[j]
PAM = plus_matches[j]
matched_group = plus_matched_group_list[j]
if ((start >= guide_len) and (start <= snppos_in_snpregion + max_snp_pos_in_protospacer)): #all the way up to the SNP being in the last position on the PAM - because that is always concrete, to N of PAM being max pos away from snp.
#compute position of SNP in protospacer or PAM, and discard guide if SNP is in a position that should be excluded
pos_in_protospacer_or_pam_num = start-snppos_in_snpregion
if pos_in_protospacer_or_pam_num <=0: #if position is in PAM, subtract 1 to follow convention of -1,-2,-3 being the positions of the PAM
pos_in_protospacer_or_pam_num = pos_in_protospacer_or_pam_num-1
else:
pass
exclusion_set = cas_obj.exclusion_positions[int(matched_group[-1])]
if pos_in_protospacer_or_pam_num in exclusion_set:
flag = True
else:
flag = False
if flag:
continue
else:
pos_in_protospacer_or_pam.append(pos_in_protospacer_or_pam_num)
#saving PAM and guide, differently handled based on 5 prime or 3 prime PAM
if cas_obj.is_five_prime:
PAMlist.append(str(Seq(PAM).reverse_complement()))
guide = Seq(snpregion[start-guide_len: start])
guidelist.append(str(guide.reverse_complement()))
else:
PAMlist.append(str(PAM))
guide = snpregion[start-guide_len: start]
guidelist.append(str(guide))
chrom = ('chr' + chrom_num)
chrom_list.append(chrom)
guide_start = start - guide_len + gc_index_conversion_num #pam_start - guide_len + conversion_num
start_list.append(guide_start)
guide_end = guide_start + guide_len - 1 # end of the guide, inclusive
end_list.append(guide_end)
SNPcoordinates.append(snplist['pos'][i])
if 'snp_id' in snplist.columns:
rsIDs.append(snplist['snp_id'][i])
else:
rsIDs.append('None')
variation.append((snplist['ref'][i] + '>' + snplist['alt'][i]))
if cas_obj.is_five_prime: #save guides as minus guides if cas PAM is five-prime, because that's how the output makes sense
guide_strand = "-"
else:
guide_strand = "+"
strand.append(guide_strand)
presentallele = sequence[snppos_index]
if presentallele == snplist['ref'][i]:
SNPallele.append(presentallele + ' (ref)') #go to the specific snp position in the given sequence and print that
else:
SNPallele.append(presentallele + ' (alt)') #go to the specific snp position in the given sequence and print that
guidename.append('chr' + chrom_num + '_' + str(guide_start) + '_' + guide_strand + '_' + presentallele + '_' + cas_obj.name + '_' + str(guide_len) + 'nt')
else:
pass
#find guides on minus strand
for j in range(len(minus_starts)):
start = minus_starts[j]
PAM = minus_matches[j]
matched_group = minus_matched_group_list[j]
if (start >= (snppos_in_snpregion - max_snp_pos_in_protospacer) and (start <= snppos_in_snpregion)): #all the way up to the SNP being in the last position on the PAM - because that is always concrete, to N of PAM being max pos away from snp
pos_in_protospacer_or_pam_num = -1*(start + pam_len-1 - snppos_in_snpregion) #such that 'N' is -1, protospacer positions are positive integers, and PAM positions are negative
if pos_in_protospacer_or_pam_num <=0:
pos_in_protospacer_or_pam_num = pos_in_protospacer_or_pam_num-1
else:
pass
exclusion_set = cas_obj.exclusion_positions[int(matched_group[-1])]
if pos_in_protospacer_or_pam_num in exclusion_set:
flag = True
else:
flag = False
if flag:
continue
else:
pos_in_protospacer_or_pam.append(pos_in_protospacer_or_pam_num)
if cas_obj.is_five_prime:
PAMlist.append(str(PAM))
guide = snpregion[start+pam_len : start+pam_len+guide_len]
guidelist.append(str(guide))
else:
PAMlist.append(str(Seq(PAM).reverse_complement()))
guide = Seq(snpregion[start+pam_len : start+pam_len+guide_len])
guidelist.append(str(guide.reverse_complement()))
chrom = ('chr' + chrom_num)
chrom_list.append(chrom)
guide_start = start + pam_len + gc_index_conversion_num
start_list.append(guide_start)
guide_end = guide_start + guide_len - 1 # end of the guide, inclusive
end_list.append(guide_end)
# guide_coords = (guide_start, (guide_start + guide_len-1)) #start inclusive
# guide_coords_list.append(guide_coords)
SNPcoordinates.append(snplist['pos'][i])
if 'snp_id' in snplist.columns:
rsIDs.append(snplist['snp_id'][i])
else:
rsIDs.append('None')
variation.append((snplist['ref'][i] + '>' + snplist['alt'][i]))
if cas_obj.is_five_prime: #save guides as minus guides if cas PAM is five-prime, because that's how the output makes sense
guide_strand = '+'
else:
guide_strand = '-'
strand.append(guide_strand)
presentallele = sequence[snppos_index]
if presentallele == snplist['ref'][i]:
SNPallele.append(presentallele + ' (ref)') #go to the specific snp position in the given sequence and print that
else:
SNPallele.append(presentallele + ' (alt)') #go to the specific snp position in the given sequence and print that
guidename.append('chr' + chrom_num + '_' + str(guide_start) + '_' + guide_strand + '_' + presentallele + '_' + cas_obj.name + '_' + str(guide_len) + 'nt')
else:
pass
guides_df = pd.DataFrame({
'chrom': chrom_list,
'start': start_list,
'end': end_list,
'rsID': rsIDs,
'SNP position': SNPcoordinates,
'gRNA': guidelist,
'PAM': PAMlist,
'strand': strand,
'variation': variation,
'present allele': SNPallele,
'guide ID': guidename,
'SNP position in protospacer or PAM': pos_in_protospacer_or_pam
})
return guides_df
# COMBINE GUIDESDFs and ANNOTATE VARIANT INFO
"""
Input combined all_guides_df, and annotate in a new column whether each SNP is present in each vcf file input. Save alt AF frequency in a separate new column if SNP is in a population database vcf inputted.
Arguments:
gensdict: dictionary of gens dataframes, keyed by vcf file name
guidesdf: dataframe of guides
Returns:
new_guidesdf: dataframe of guides with variant information annotated
"""
[docs]
def all_guides_var_info(gensdict, guidesdf):
guidesdf_copy = guidesdf.copy()
new_guidesdf = guidesdf_copy.drop_duplicates()
new_guidesdf = new_guidesdf.sort_values(by='start')
for key in gensdict:
in_vcf = new_guidesdf["SNP position"].isin(gensdict[key]["pos"])
df_output = in_vcf.map({True: ("Y"), False: "N"})
col_name = "In " + str(key)[8:]
new_guidesdf[col_name] = list(df_output)
alt_af_list = []
for index, row in new_guidesdf.iterrows():
snp_pos = row["SNP position"]
af_values = []
for vcf_name, vcf_df in gensdict.items():
if "alt AF" in vcf_df.columns:
matches = vcf_df[vcf_df["pos"] == snp_pos]
if not matches.empty:
af_values.extend(matches["alt AF"].astype(str).tolist())
alt_af_list.append(",".join(af_values) if af_values else "N/A")
new_guidesdf["alt allele frequency"] = alt_af_list
return new_guidesdf
# OFF-TARGET ANALYSIS FUNCTIONS
"""
3 functions to count exact matches in the genome for each guide in a guidesdf. Uses multi-processing for supposedly faster execution.
"""
genome = None
cas_obj = None
"""
Arguments:
genome_path: path to the genome fasta file
cas_params: Cas object
Returns:
None
"""
[docs]
def init_worker(genome_path, cas_params):
global genome, cas_obj
genome = Fasta(genome_path)
cas_obj = cas_params
"""
Arguments:
guide: guide sequence
Returns:
total: number of exact matches in the genome
total_rc: number of exact matches in the genome on the reverse strand
"""
[docs]
def search_pattern(guide):
total = 0
total_rc = 0
rc_guide = str(Seq(guide).reverse_complement())
# Lookahead for overlapping matches, instead of overlapped=True. Probably can use overlapped=True too
if cas_obj.is_three_prime:
fwd_pattern = regex.compile(f"(?=({guide}{cas_obj.pam_three_prime}))", regex.IGNORECASE)
rev_pattern = regex.compile(f"(?=({cas_obj.pam_five_prime}{rc_guide}))", regex.IGNORECASE)
elif cas_obj.is_five_prime:
fwd_pattern = regex.compile(f"(?=({cas_obj.pam_five_prime}{guide}))", regex.IGNORECASE)
rev_pattern = regex.compile(f"(?=({rc_guide}{cas_obj.pam_three_prime}))", regex.IGNORECASE)
else:
raise ValueError("pam_orientation not defined")
for chrom in genome.keys():
seq = str(genome[chrom])
total += len(fwd_pattern.findall(seq, overlapped=True))
total_rc += len(rev_pattern.findall(seq, overlapped=True))
return total + total_rc
def _default_processes():
"""
Gets appropriate processing units
"""
# 1) SGE allocation (works on Wynton)
nslots = os.environ.get("NSLOTS")
if nslots and nslots.isdigit():
return max(1, int(nslots))
# 2) CPU affinity (often respects scheduler cpuset on Linux)
try:
return max(1, len(os.sched_getaffinity(0)))
except Exception:
pass
# 3) Local fallback
return max(1, mp.cpu_count() - 1)
[docs]
def count_exact_matches(df, genome_fasta_path, cas_parameters, num_processes=None):
"""
Counts exact matches in the genome for each guide in a guidesdf. Uses multi-processing for supposedly faster execution.
Arguments:
df: dataframe of guides
genome_fasta_path: path to the genome fasta file
cas_parameters: Cas object
num_processes: number of processes to use
Returns:
df: dataframe of guides with exact matches in the genome annotated
"""
guides = df['gRNA'].tolist()
num_processes = num_processes or _default_processes()
# Start the multiprocessing pool
with mp.Pool(
processes=num_processes,
initializer=init_worker,
initargs=(genome_fasta_path, cas_parameters)
) as pool:
match_counts = pool.map(search_pattern, guides)
df["exact matches in genome"] = match_counts
return df
"""
Counts 1 bp mismatches for each guide and stores in a new column in the dataframe.
Arguments:
df: dataframe of guides
chseq: sequence of the chromosome
Returns:
df: dataframe of guides with 1 bp mismatches in the genome annotated
"""
[docs]
def one_mismatch(df, chseq):
counts = []
for idx in range(len(df)):
guide = df['gRNA'].iloc[idx]
guide_rc = str(Seq(guide).reverse_complement())
chseq_str = str(chseq)
#Generate mismatched guides. Only creates 1 bp mismatched guides, never searches for the exact guide. eg: [^T] means match any character except T
mismatch_guides = [guide[:j] + '[^' + guide[j] + ']' + guide[j+1:] for j in range(len(guide))]
mismatch_guides_rc = [guide_rc[:k] + '[^' + guide_rc[k] + ']' + guide_rc[k+1:] for k in range(len(guide_rc))] #create 1 bp mismatched guide set for reverse complemented guide
count_per_guide = 0
#Forward strand matches: guide + NGG
for pat in mismatch_guides:
regexp = regex.compile(pat + r'[ACGT]GG', regex.IGNORECASE)
matches = regexp.findall(chseq_str, overlapped=True)
count_per_guide += len(matches)
#Reverse strand matches: CCN + rc_guide
for pat_rc in mismatch_guides_rc:
regexp_rc = regex.compile(r'CC[ACGT]' + pat_rc, regex.IGNORECASE)
matches_rc = regexp_rc.findall(chseq_str, overlapped=True)
count_per_guide += len(matches_rc)
counts.append(count_per_guide)
df['1 bp mismatches in ch1'] = counts
return df
# SUMMARY STATS OF THE SINGLE-GUIDE LIBRARY
"""
Returns a list of variants that are targetable by your Cas enzyme (have PAM sites within 10 bp of the SNP), as well as the number of guides found for each SNP.
Also includes variants/SNPs that were attempted (e.g., from snplist) but where no guide was found, reporting 0 guides for these.
Arguments:
guidesdf: dataframe of guides
snplist: list or dataframe of all SNPs attempted (each item should at least include 'rsID', 'SNP position', 'alt allele frequency' or a superset)
Returns:
targetable_snps: dataframe of all attempted SNPs, reporting number of guides found for each
"""
[docs]
def targetable_vars(guidesdf, snplist):
# Make sure snplist is a dataframe; if not, convert it (assume it's a list of dicts or tuples)
if not isinstance(snplist, pd.DataFrame):
snplist = pd.DataFrame(snplist)
# Select necessary columns and drop duplicate positions from snplist
snps = snplist.loc[:, ['rsID', 'SNP position', 'alt allele frequency']].drop_duplicates(subset='SNP position')
# Count number of guides for each SNP position in guidesdf
guide_counts = guidesdf.groupby('SNP position').size().reset_index(name='no. of guides found (with ref or alt allele)')
# Merge counts onto snps: all snps, fill 0 where no guide
targetable_snps = pd.merge(
snps,
guide_counts,
on='SNP position',
how='left'
)
targetable_snps['no. of guides found (with ref or alt allele)'] = targetable_snps['no. of guides found (with ref or alt allele)'].fillna(0).astype(int)
# Reorder columns for output
targetable_snps = targetable_snps.reset_index(drop=True)
return targetable_snps
# PAIRING-RELATED FUNCTIONS
"""
Returns a list of two dataframes, split from the guides dataframe, by guides targeting each allele, to enable pairing guides on the same allele.
Arguments:
clean_guidesdf: dataframe of guides
phased_vcf: string filename of phased VCF file
locus: genomic region in format chr#:startpos-endpos
Returns:
[filtered_guidesdf_allele1, filtered_guidesdf_allele2]: list of two dataframes, split from the guides dataframe, by guides targeting each allele
"""
[docs]
def split_phased(clean_guidesdf, phased_vcf, locus):
phased_gensdf = create_gens(phased_vcf, locus, 'cell-line')
#filtering out guides targeting SNPs that aren't heterozygous in the given vcf file
filtered_guidesdf = clean_guidesdf[clean_guidesdf['SNP position'].isin(phased_gensdf['pos'])].copy()
phasing_list = []
for i in range(len(filtered_guidesdf)):
# for the current row in guides df, getting the SNP allele present in the guide, and the position of the SNP
allele = filtered_guidesdf['present allele'].iloc[i]
pos = filtered_guidesdf['SNP position'].iloc[i]
# extract row of VCF data of cell-line for that SNP
match = phased_gensdf[phased_gensdf['pos'] == pos]
#error handling if SNP is not found in VCF
if match.empty:
raise ValueError(
f"Phased pairing requested but SNP at position {pos} in the guides dataframe is not present in the cell-line's VCF region."
)
# extracting the genotype from the extracted row, and splitting the genotype to create a list from the genotype data for easy parsing.
gt = match['genotype'].iloc[0].split('|')
# if the genotype wasn't able to be split by | (because the gt is unphased eg: 0/1), i.e., the pre-split and post-split value is the same, store its phasing information as "unphased". These guides will be included in both allele dataframes.
if gt[0] == match['genotype'].iloc[0]:
phasing_list.append('unphased')
else:
# storing the phasing of this version of the guide (ref or alt version of the SNP present) in this row of guidesdf
if allele[3:6] == 'ref':
if gt[0] == '0':
phasing_list.append('allele 1')
elif gt[1] == '0':
phasing_list.append('allele 2')
else:
phasing_list.append('error')
else:
if gt[0] == '1':
phasing_list.append('allele 1')
elif gt[1] == '1':
phasing_list.append('allele 2')
else:
phasing_list.append('error')
filtered_guidesdf['phasing'] = phasing_list #adding phasing column to df
#using this phasing, splitting the df into two guides dfs of phased guides.
rows_allele1 = []
rows_allele2 = []
#for each row of the guidesdf
for i in range(len(filtered_guidesdf)):
phasing = filtered_guidesdf['phasing'].iloc[i] #get phasing
if phasing == 'allele 1':
rows_allele1.append(filtered_guidesdf.iloc[i])
elif phasing == 'allele 2':
rows_allele2.append(filtered_guidesdf.iloc[i])
else:
rows_allele1.append(filtered_guidesdf.iloc[i])
rows_allele2.append(filtered_guidesdf.iloc[i])
#cleaning up each df
filtered_guidesdf_allele1 = pd.DataFrame(rows_allele1)
filtered_guidesdf_allele1 = filtered_guidesdf_allele1.reset_index()
filtered_guidesdf_allele1 = filtered_guidesdf_allele1.drop(['index'], axis=1)
filtered_guidesdf_allele2 = pd.DataFrame(rows_allele2)
filtered_guidesdf_allele2 = filtered_guidesdf_allele2.reset_index()
filtered_guidesdf_allele2 = filtered_guidesdf_allele2.drop(['index'], axis=1)
return [filtered_guidesdf_allele1, filtered_guidesdf_allele2]
"""
Returns paired gRNA library with all possible guide pairs of a list of sgRNA
Arguments:
guidesdf: dataframe of guides
Returns:
pairings_df: dataframe of paired guides
"""
[docs]
def random_pair(guidesdf):
data = []
# iterate over unique combinations of gene sequences and IDs
for i in range(len(guidesdf)):
for j in range(i+1, len(guidesdf)):
if guidesdf['SNP position'].iloc[i] != guidesdf['SNP position'].iloc[j]:
data.append([guidesdf['SNP position'].iloc[i], guidesdf['alt allele frequency'].iloc[i], guidesdf['guide ID'].iloc[i], guidesdf['gRNA'].iloc[i], guidesdf['SNP position'].iloc[j], guidesdf['alt allele frequency'].iloc[j], guidesdf['guide ID'].iloc[j], guidesdf['gRNA'].iloc[j], (guidesdf['guide ID'].iloc[i] + '|' + guidesdf['guide ID'].iloc[j])])
# convert to dataframe
pairings_df = pd.DataFrame(data, columns=['SNP position 1', 'alternate allele frequency 1', 'guide 1 ID', 'guide 1', 'SNP position 2', 'alternate allele frequency 2', 'guide 2 ID', 'guide 2', 'pair ID'])
pairings_df = pairings_df.reset_index()
pairings_df = pairings_df.drop(['index'], axis=1)
#compute excision sizes for each guide pair and store in a new column
excision_sizes = []
for i in range(len(pairings_df)):
guide_start1 = int((pairings_df['guide 1 ID'][i]).split("_")[1])
guide_start2 = int((pairings_df['guide 2 ID'][i]).split("_")[1])
excision_size = abs(guide_start1 - guide_start2)
excision_sizes.append(excision_size)
pairings_df['excision size'] = excision_sizes
return pairings_df
"""
Returns paired gRNA library with guides paired to cause an excision about a fixed point. Takes as input a list of any number of such fixed points. Implemented to allow use cases such as excising certain exons or features of interest.
Arguments:
guidesdf: dataframe of guides
points_list: list of fixed points
Returns:
fp_pairings_df: dataframe of paired guides
"""
[docs]
def fixed_point_pair(guidesdf, points_list):
data = []
for point in points_list:
point = int(point)
for i in range(len(guidesdf)):
if int(guidesdf['SNP position'].iloc[i]) <= point: #for snps below or equal to point
for j in range(len(guidesdf)):
if int(guidesdf['SNP position'].iloc[j]) > point: #if next guide snp is above point
data.append([guidesdf['SNP position'].iloc[i], guidesdf['alt allele frequency'].iloc[i], guidesdf['guide ID'].iloc[i], guidesdf['gRNA'].iloc[i], guidesdf['SNP position'].iloc[j], guidesdf['alt allele frequency'].iloc[j], guidesdf['guide ID'].iloc[j], guidesdf['gRNA'].iloc[j], (guidesdf['guide ID'].iloc[i] + '|' + guidesdf['guide ID'].iloc[j])])
else:
'passing'
pass
elif int(guidesdf['SNP position'].iloc[i]) >= point: #for snps above or equal to point
for j in range(len(guidesdf)):
if int(guidesdf['SNP position'].iloc[j]) < point: #if next guide snp is below point
data.append([guidesdf['SNP position'].iloc[i], guidesdf['alt allele frequency'].iloc[i], guidesdf['guide ID'].iloc[i], guidesdf['gRNA'].iloc[i], guidesdf['SNP position'].iloc[j], guidesdf['alt allele frequency'].iloc[j], guidesdf['guide ID'].iloc[j], guidesdf['gRNA'].iloc[j], (guidesdf['guide ID'].iloc[i] + '|' + guidesdf['guide ID'].iloc[j])])
else:
'passing'
pass
# convert to dataframe
fp_pairings_df = pd.DataFrame(data, columns=['SNP position 1', 'alternate allele frequency 1', 'guide 1 ID', 'guide 1', 'SNP position 2', 'alternate allele frequency 2', 'guide 2 ID', 'guide 2', 'pair ID'])
fp_pairings_df = fp_pairings_df.drop_duplicates()
fp_pairings_df = fp_pairings_df.reset_index()
fp_pairings_df = fp_pairings_df.drop(['index'], axis=1)
#compute excision sizes for each guide pair and store in a new column
excision_sizes = []
for i in range(len(fp_pairings_df)):
guide_start1 = int((fp_pairings_df['guide 1 ID'][i]).split("_")[1])
guide_start2 = int((fp_pairings_df['guide 2 ID'][i]).split("_")[1])
excision_size = abs(guide_start1 - guide_start2)
excision_sizes.append(excision_size)
fp_pairings_df['excision size'] = excision_sizes
return fp_pairings_df
"""
Returns paired gRNA library with guides paired to target adjacent SNPs
Arguments:
guidesdf: dataframe of guides
Returns:
tiled_pairings_df: dataframe of paired guides
"""
[docs]
def tiling_pair(guidesdf):
guidesdf = guidesdf.sort_values(by=["SNP position","start"]).reset_index(drop=True)
data = []
for i in range(len(guidesdf)):
snp_i = guidesdf['SNP position'].iloc[i]
for j in range(i + 1, len(guidesdf)):
snp_j = guidesdf['SNP position'].iloc[j]
if snp_j == snp_i:
continue # same SNP, skip
elif snp_j > snp_i:
# First time SNP changes — pair i with all guides of the next SNP
data.append([
snp_i,
guidesdf['alt allele frequency'].iloc[i],
guidesdf['guide ID'].iloc[i],
guidesdf['gRNA'].iloc[i],
snp_j,
guidesdf['alt allele frequency'].iloc[j],
guidesdf['guide ID'].iloc[j],
guidesdf['gRNA'].iloc[j],
guidesdf['guide ID'].iloc[i] + '|' + guidesdf['guide ID'].iloc[j]
])
else:
break # if sorted, this won't happen — just for safety
# now that we've reached the first different SNP, we continue checking if the next rows are also from that same SNP
# and keep pairing i with them
k = j + 1
while k < len(guidesdf) and guidesdf['SNP position'].iloc[k] == snp_j:
data.append([
snp_i,
guidesdf['alt allele frequency'].iloc[i],
guidesdf['guide ID'].iloc[i],
guidesdf['gRNA'].iloc[i],
guidesdf['SNP position'].iloc[k],
guidesdf['alt allele frequency'].iloc[k],
guidesdf['guide ID'].iloc[k],
guidesdf['gRNA'].iloc[k],
guidesdf['guide ID'].iloc[i] + '|' + guidesdf['guide ID'].iloc[k]
])
k += 1
break # prevent pairing with SNPs beyond the next one
# remove duplicates regardless of order (A|B == B|A)
seen = set()
unique_data = []
for row in data:
gid1, gid2 = row[2], row[6]
pair_id = '|'.join(sorted([gid1, gid2]))
if pair_id not in seen:
seen.add(pair_id)
row[8] = pair_id # replace with ordered version
unique_data.append(row)
return pd.DataFrame(data, columns=[
'SNP position 1', 'alternate allele frequency 1', 'guide 1 ID', 'guide 1',
'SNP position 2', 'alternate allele frequency 2', 'guide 2 ID', 'guide 2',
'pair ID'
])
# OUTPUT SGRNA LIBRARY IN BED FORMAT FOR UCSC GENOME BROWSER VIEWING
"""
Outputs a dataframe of guides in bed format for UCSC genome browser viewing
Arguments:
guidesdf: dataframe of guides
Returns:
guides_bed: dataframe of guides in bed format
"""