Hi guys, I am very new and dummy with python and I try to write a script to filter some alignments. The filter works removing the sequences for every alignment with a chi2 < 0.05 and writing a new fasta files without those sequences, but I have a problem. I have two terminals in some of the alignments that are my out-groups and I dont want to remove them, buy my script remove those sequences because some of them dont pass the chi2 test, so maybe some one can help with that. I will appreciate a lot. Thanks
Here is the part of the code that do the chi2 test:
# Chi2 analysis para folders v1
import sys
import pandas as pd
from Bio import AlignIO
from scipy import stats
import numpy as np
import os
import datetime
import csv
# Obtener la fecha y hora actual para el registro de la ejecución
current_time = datetime.datetime.now()
formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
def compositionMatrix(aln):
compDict = {}
fixedCharacters = ["-","A","C","G","T"]
for record in aln:
header = record.id
seq = record.seq.upper() # Convert sequence to uppercase
currentSeqMat = [0]*5
for i in range(len(seq)):
nt = seq[i]
try:
characterPos = fixedCharacters.index(nt)
currentSeqMat[characterPos] += 1
except:
print("ERROR:", header, "contains character ("+nt+") not in the list:",fixedCharacters)
compDict[header] = currentSeqMat
compDF = pd.DataFrame.from_dict(compDict, orient='index', columns=fixedCharacters)
return compDF
def chi2test(compDF):
seqTotals = compDF.sum(axis=1)
gaps = compDF["-"]
gapsPerSeq = gaps/seqTotals
nonGap = compDF.loc[:, 'A':'T']
nonGapTotals = nonGap.sum().to_frame()
nonGapSeqTotals = nonGap.sum(axis=1).to_frame()
numCharacters = nonGapTotals.sum()
expectedFreq = nonGapTotals / numCharacters
expectedCountArray = np.dot(nonGapSeqTotals, expectedFreq.transpose())
expectedCountDF = pd.DataFrame(expectedCountArray, columns=nonGap.columns, index=nonGap.index.values )
chi2DF = ((expectedCountDF - nonGap)**2)/expectedCountDF
chi2Sum = chi2DF.sum(axis=1)
pValueDf = 1 - stats.chi2.cdf(chi2Sum, 3)
outDF = pd.DataFrame({"Gap/Ambiguity":gapsPerSeq,"p-value":pValueDf})
outDF.index.name = 'header'
return outDF
###############################################################################################################
# Obtener una lista de todos los archivos en la carpeta de entrada
input_folder = sys.argv[1]
print(f"Carpeta de entrada: {input_folder}")
input_files = os.listdir(input_folder)
# Crear la carpeta de salida si no existe
output_folder = os.path.join(input_folder, "filtered_alignments")
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# Procesar cada archivo individualmente
for input_file in input_files:
# Saltar cualquier archivo que no sea un archivo fasta
if not input_file.endswith(".fasta"):
continue
# Leer el archivo fasta de entrada
input_file_path = os.path.join(input_folder, input_file)
alignment = AlignIO.read(open(input_file_path), "fasta")
# Obtener el dataframe de la composición de aminoácidos
compDF = compositionMatrix(alignment)
# Aplicar la prueba de chi2 y filtrar las secuencias que no pasan la prueba
outDF = chi2test(compDF)
removed_seqs = outDF.index[outDF['p-value'] < 0.05].tolist()
# Crear un nuevo objeto de alineamiento sin las secuencias eliminadas
filtered_alignment = AlignIO.MultipleSeqAlignment([seq for seq in alignment if seq.id not in removed_seqs])
# Escribir el archivo de salida con los valores de p
output_file = os.path.join(output_folder, input_file.replace(".fasta", "_filtered.tsv"))
outDF.to_csv(output_file, sep='\t')
# Escribir el archivo fasta sin las secuencias eliminadas
output_fasta_file = os.path.join(output_folder, input_file.replace(".fasta", "_filtered.fasta"))
with open(output_fasta_file, 'w') as f:
AlignIO.write(filtered_alignment, f, 'fasta')
###############################################################################################################
# Registro de la ejecución y resultados
log_file = os.path.join(input_folder, "filtered_log.txt")
csv_file = os.path.join(input_folder, "filtered_sequences.csv")
with open(log_file, 'w') as f:
f.write(f"Programa iniciado: {formatted_time}\n")
f.write(f"Carpeta de entrada: {input_folder}\n")
f.write(f"Carpeta de salida: {output_folder}\n\n")
results = []
for input_file in input_files:
if input_file.endswith(".fasta"):
input_file_path = os.path.join(input_folder, input_file)
f.write(f"Leyendo archivo fasta de entrada: {input_file}\n")
alignment = AlignIO.read(open(input_file_path), "fasta")
compDF = compositionMatrix(alignment)
outDF = chi2test(compDF)
removed_seqs = outDF.index[outDF['p-value'] < 0.05].tolist()
if removed_seqs:
f.write(f"Secuencias removidas en {input_file}:\n")
for seq in removed_seqs:
f.write(f"- {seq}\n")
results.append((input_file, seq, outDF.loc[seq]['p-value']))
filtered_alignment = AlignIO.MultipleSeqAlignment([seq for seq in alignment if seq.id not in removed_seqs])
output_file = os.path.join(output_folder, input_file.replace(".fasta", "_filtered.tsv"))
outDF.to_csv(output_file, sep='\t')
output_fasta_file = os.path.join(output_folder, input_file.replace(".fasta", "_filtered.fasta"))
with open(output_fasta_file, 'w') as f2:
AlignIO.write(filtered_alignment, f2, 'fasta')
f.write(f"Archivos de salida guardados en {output_folder}\n")
else:
f.write(f"No se removieron secuencias en {input_file}\n")
f.write("Programa finalizado.")
# Guardar archivo CSV con los resultados
with open(csv_file, 'w') as f:
writer = csv.writer(f)
writer.writerow(["Archivo", "Secuencia", "Valor de p"])
for result in results:
writer.writerow(result)
print("Se han generado los archivos de salida y el archivo CSV con los resultados.")
Thanks for you're answer I am going to test it and I let you know how works
Best