// nonpareil_mating - Part of the nonpareil package // @author Luis M. Rodriguez-R // @license artistic 2.0 // @version 1.0 #define _MULTI_THREADED #include #include #include #include #include #include #include #include "universal.h" #include "multinode.h" #include "sequence.h" #include "nonpareil_mating.h" extern int processID; extern int processes; #define LARGEST_PATH 4096 using namespace std; size_t nonpareil_mate(int *&result, char *file, int threads, unsigned int lines_in_ram, unsigned int total_seqs, unsigned int largest_seq, matepar_t matepar){ // Use `file` for both query and subject sequences return nonpareil_mate(result, file, file, threads, lines_in_ram, total_seqs, largest_seq, largest_seq, matepar); } size_t nonpareil_mate(int *&result, char *file, char *q_file, int threads, unsigned int lines_in_ram, unsigned int total_seqs, unsigned int largest_seq, unsigned int q_largest_seq, matepar_t matepar){ // Vars int no_blocks_sbj=0, no_blocks_qry=0, no_seqs_block_qry=0, no_seqs_block_sbj=0, tmp_ram, result_i=0, size_blockA, size_blockB; size_t qry_seqs=0; char **blockA, **blockB, *sampleFile; // Set subsampling //sampleFile = (char *)malloc(LARGEST_PATH * (sizeof *sampleFile)); sampleFile = new char[LARGEST_PATH]; if(processID==0){ sprintf(sampleFile, "%s.subsample.%d", q_file, getpid()); say("3ss$", "Building query set at ", sampleFile); qry_seqs = sub_sample_seqs(q_file, sampleFile, matepar.qryportion, (char *)"enveomics-seq"); say("4sus$", "Query set built with ", qry_seqs, " sequences"); if(qry_seqs==0) error("Impossible to create the query set. Is the -X/-x value too small?"); } sampleFile = broadcast_char(sampleFile, LARGEST_PATH); qry_seqs = broadcast_int(qry_seqs); // Blank results result = new int[qry_seqs]; for(size_t a=0; a1) say("3sis$", "Silencing log in slave processes (", processes-1, ")"); for(int i=0; i1){ // DEBUG //char *cntfile = new char[123]; //sprintf(cntfile, "T.c%d", processID); //nonpareil_save_mates(result, qry_seqs, cntfile); // END DEBUG int *result_sum = new int[qry_seqs]; reduce_sum_int(result, result_sum, qry_seqs); if(processID==0) for(size_t a=0; a (size_t)sizeBlockA ? sizeBlockA-matejob[thr].from : mates_per_thr); // How many qry sequences to process matejob[thr].from_in_result = from_in_result; // Where to start saving results (in zero-count) matejob[thr].par = matepar; // It's cheap to create multiple copies of this, and it's safer than passing a reference. matejob[thr].result = &result; // This is only used with mutex (to save RAM) matejob[thr].mutex = &mutex; // And this is the mutex, it MUST be the same matejob[thr].blockA = &blockA; matejob[thr].blockB = &blockB; matejob[thr].size_blockA = sizeBlockA; matejob[thr].size_blockB = sizeBlockB; if(matejob[thr].number==0) error("Unexpectedly, the thread contains zero load", thr); if((rc=pthread_create(&thread[thr], NULL, &nonpareil_count_mates_thr, (void *)&matejob[thr] ))) error("Thread creation failed", (char)rc); } // Gather jobs for(int thr=0; thrnumber]; if(!result_cp) error("Impossible to allocate memory for the results of the new thread", matejob->id); // Run comparisons for(size_t a=0; anumber; a++) result_cp[a] = 0; nonpareil_count_mates(result_cp, *matejob->blockA, *matejob->blockB, matejob->from, matejob->number, 0, matejob->size_blockB, (matejob->id==0?(int)ceil((double)matejob->number/100.0):0), matejob->par); // Transfer the results to the external array pthread_mutex_lock( matejob->mutex ); if(processID==0) say("4sisis>", "Thread ", matejob->id, " completed ", matejob->number," comparisons, joining results"); int *&result_ref = *matejob->result; // v--> position + first of the block + first of the thread for(size_t i=0; inumber; i++) result_ref[i + matejob->from_in_result + matejob->from] += result_cp[i]; pthread_mutex_unlock( matejob->mutex ); return (void *)0; } void nonpareil_count_mates(int *&result, char **&blockA, char **&blockB, int fromA, int numberA, int fromB, int numberB, int talk, matepar_t matepar){ // Finally, this is the core of the per-block-per-thread comparisons for(int i=0; i0 && i%talk==0) say("4sfs^", "Searching sequences: ", (double)i*100/(double)numberA, "% of the block"); for(int j=0; jmax_errors)) goto next_i; // Cannot be 'continue', because it's a nested loop } return true; next_i: ; // To avoid problems with the nested loop. } // Compare strands C-W if(matepar.revcom){ char *rc = new char[lenA+1]; bool out; reverse_complement(rc, seqA); matepar.revcom = false; out = nonpareil_compare_reads_shortfirst(rc, seqB, lenA, lenB, matepar); matepar.revcom = true; // Not really necesary, just in case it's passed as reference or something like that free(rc); return out; } return false; } void nonpareil_save_mates(int *&result, int no_results, char *file){ // Vars ofstream fileh; fileh.open(file, ios::out); if(!fileh.is_open()) error("Cannot open the file", file); for(int a=0; a