Hi All,
I'm trying to speed up a BLASTP call as part of a bigger RBH workflow to detect orthologs, and I'm in the process of testing different approaches with a 100K sequence database and a 1107 sequence query (real database will be 350K, queries will differ in size). My function splits the query into separate files and processes them separately using python multiprocessing (Process or Pool), and I'm also looking at combining that with BLASTP's -num_threads
parameter to increase speed further. I'm very knew to parallelisation in general (both threading and multiprocessing) but am keen to know more!
I posted these questions together as they all relate to the same code, are generally continuing on from each other and I can accept multiple answers (unlike stack overflow), but please let me know if you'd suggest posting them separately. I'd be grateful for any answers, doesn't have to cover every point in one go :D
Question 1 - I'm running local BLASTP and was wondering about the details for the num_threads
parameter. Am I right in thinking that (as the name suggests), it spreads the workload across multiple threads across a single CPU, so is kind of analogous to Pythons threading
module (as opposed to the multiprocessing
module, which spreads tasks across separate CPUs)? I've heard BLAST only goes above 1 thread when it 'needs too', but I'm not clear on what this actually means - what determines if it needs more threads? Does it depend on the input query size? Are threads split at a specific step in the BLASTP program?
Question 2 - To check I have the right ideas conceptually, if the above is correct, would I be correct to say that BLAST itself is I/O bound (hence the threading), which makes sense as its processing thousands of sequences in the query etc so lots of input? But if you want to call BLAST in a workflow script (e.g. using Python's subprocess
module), then the call is CPU bound if you set num_threads
to a high number, as it spreads the work across multiple threads in a single CPU, which takes a lot of the CPU power? Or does the fact that blastP is not taking full advantage of the threading mean that the CPU is not actually getting fully utilised, so a call will still be input/output bound independent of num_threads
? If that's correct, then maybe I could use threading to speed separately process the split queries rather than multiprocessing...
Question 3 - Are there any suggestions for how to get the best core and thread parameters for general use across different machines without relying on individual benchmarking (I want it to work on other peoples machines with as little tuning and optimisation as possible). Is it just cores = as many cores as you have (ie multiprocessing.cpu_count()
) and threads = cores + 1 (defined by the BLASTP parameter num_threads
)? Would this still be true on machines with more/less cores?
Question 4 - for benchmarking, how do external programs affect multiprocessing speed - would scrolling the web with 100 tabs open impact multiprocessing speed by increasing the work done by one of the CPUs, taking away resources from one of the processes running my script? If the answer is yes, whats the best way to benchmark this kind of thing? I'm including this question to give context on my benchmarking questions below (i.e. the numbers I am throwing around may be crap). I tried to include graphs of the numbers but they wont copy in, however I found a post explaining how to add pics so if they are helpful I can add them in.
Question 5 - Perhaps a more general question, I'm only splitting my query into 4 processes so would have thought multiprocessing.Process
would be better (vs multiprocessing.Pool
, which seems the preferred choice if you have lots of processes). But this isn't the case in my benchmarks, for multiprocessing using blastP_paralellised_process
and blastP_paralellised_pool
- any idea why? Timewise the process
to pool
'seconds' ratio hovers around 1 with no obvious pattern for all num_threads
(1-9) and core
(1-5) combinations.
Question 6 - why does increasing the numbers of cores used to process number of cores
* split BLASTP queries
not result in obvious speed improvements? I would expect this with cores set >4, as my pc is a 4-core machine, but there seems to be little difference between processing 1/4 query files across 4 cores vs processing 1/2 query files across 2 cores. Is my assumption for Question 2 incorrect? There is a little bit of slowdown for running on a single core and a dramatic increase for 1 core with 2 and 1 threads (1618 seconds and 2297 seconds), but for 2-5 cores with 1-9 threads the time for each blastP run is around 1000 seconds with some small random fluctuations (eg 4 cores 1 thread is 1323 seconds, but the other multicore single thread runs are normal timewise relative to the baseline of the other values).
I've copied my code below. I've not included functions like split_fasta
etc, as both they and BLASTP seem to be working (in the sense that im getting xml results files that I havent started parsing yet but look ok when i open in notepad) and I don't want to add 100 lines of unnecessary code and comments. Also, theyre used in the same way for both blastP_paralellised_process
and blastP_paralellised_pool
, so I don't think they are causing the time differences. Please let me know if including these would help though!
def blastP_paralellised_process(evalue_user, query_in_path, blastp_exe_path, results_out_path, db_user, num_cores, thread_num):
#function to split fasta query in 1 txt file per core
filenames_query_in_split=fasta_split(query_in_path, num_cores)
#function to construct result names for blastp parameter 'out'
filenames_results_out_split=build_split_filename(results_out_path, num_cores)
#copy a makeblastdb database given as iinput. generate one database per core.
#Change name of file to include 'copy' and keep original database directory for quality control.
delim=db_user.rindex('\\')
db_name=db_user[delim::]
db_base=db_user[:delim]
databases=copy_dir(db_base, num_cores)#1 db per process or get lock
#split blastp params across processes.
processes=[]
for file_in_real, file_out_name, database in zip(filenames_query_in_split, filenames_results_out_split, databases):
#'blastP_subprocess' is a blast specific subprocess call that sets the environment to have
#env={'BLASTDB_LMDB_MAP_SIZE':'1000000'} and has some diagnostic error management.
blastP_process=Process(target=blastP_subprocess,
args=(evalue_user,
file_in_real,
blastp_exe_path,
file_out_name,
database+db_name,
thread_num))
blastP_process.start()
processes.append(blastP_process)
#let processes all finish
for blastP_process in processes:
blastP_process.join()
def blastP_paralellised_pool(evalue_user, query_in_path, blastp_exe_path, results_out_path, db_user, num_cores, thread_num):
####as above####
filenames_query_in_split=fasta_split(query_in_path, num_cores)
filenames_results_out_split=build_split_filename(results_out_path, num_cores)
delim=db_user.rindex('\\')
db_name=db_user[delim::]
db_base=db_user[:delim]
databases=copy_dir(db_base, num_cores)
################
#build params for blast
params_new=list(zip(
[evalue_user]*num_cores,
filenames_query_in_split,
[blastp_exe_path]*num_cores,
filenames_results_out_split,
[database+db_name for database in databases],
[thread_num]*num_cores))
#feed each param to a worker in pool
with Pool(num_cores) as pool:
blastP_process=pool.starmap(blastP_subprocess, params_new)
if __name__ == '__main__':
#make blast db
makeblastdb_exe_path=r'C:\Users\u03132tk\.spyder-py3\ModuleMapper\Backend\Executables\NCBI\blast-2.10.1+\bin\makeblastdb.exe'
input_fasta_path=r'C:\Users\u03132tk\.spyder-py3\ModuleMapper\Backend\Precomputed_files\fasta_sequences_SMCOG_efetch_only.txt'
db_outpath=r'C:\Users\u03132tk\.spyder-py3\ModuleMapper\Backend\Intermediate_files\BLASTP_queries\DEMgenome_old\database\smcog_db'
db_type_str='prot'
start_time = time.time()
makeblastdb_subprocess(makeblastdb_exe_path, input_fasta_path, db_type_str, db_outpath)
print("--- makeblastdb %s seconds ---" % (time.time() - start_time))
#get blast settings
evalue_user= 0.001
query_user=r'C:\Users\u03132tk\.spyder-py3\ModuleMapper\Backend\Intermediate_files\BLASTP_queries\DEMgenome_old\genome_1_vicky_3.txt'
blastp_exe_path=r'C:\Users\u03132tk\.spyder-py3\ModuleMapper\Backend\Executables\NCBI\blast-2.10.1+\bin\blastp.exe'
out_path=r'C:\Users\u03132tk\.spyder-py3\ModuleMapper\Backend\Intermediate_files\BLASTP_results\blastresults_genome_1_vicky_3.xml'#zml?
num_cores=os.cpu_count()
#benchmarking
for num_cores in range(1,6)[::-1]:
print()
for num_threads in range (1,10)[::-1]:
start_time = time.time()
blastP_paralellised_process(evalue_user, query_user, blastp_exe_path, out_path, db_outpath, num_cores, num_threads)
end_time=time.time()
print(f"blastP process\t{end_time - start_time} seconds\t{num_cores} cores\t{num_threads} threads\treplicate {replicate}" )
start_time = time.time()
blastP_paralellised_pool(evalue_user, query_user, blastp_exe_path, out_path, db_outpath, num_cores, num_threads)
end_time=time.time()
print(f"blastP pool\t{end_time - start_time} seconds\t{num_cores} cores\t{num_threads} threads\treplicate {replicate}" )
print()
Hi Xonq,
Thanks for your reply, managed to cover every question! I agree its probably not absolutely necessary here, but I'm quite keen to get more familiar with Python's parallelisation libraries so thought it would be a good opportunity :P Can see how it goes, definitely given some food for thought!