// nonpareil - Calculation of nonpareil curves // @author Luis M. Rodriguez-R // @author Santosh Kumar G. // @license Artistic-2.0 #include #include #include "enveomics/universal.h" #include "enveomics/multinode.h" #include "enveomics/sequence.h" #include "enveomics/nonpareil_mating.h" #include "enveomics/nonpareil_sampling.h" #include "enveomics/References.h" #include "enveomics/KmerCounter.h" #include #define LARGEST_PATH 4096 #define NP_VERSION 3.401 using namespace std; int processID; int processes; void help(const char *msg){ if(processID==0 && msg!=NULL && strlen(msg) != 0) cerr < : Path to the (input) file containing the sequences" < : Nonpareil algorithm, 'kmer' or 'alignment' accepted" < : Path to the prefix for all the output files" << endl <<" -f : The format of the sequence. Can be 'fasta' or fastq'"< : Maximum number of reads to use as query." < : kmer length. By default: 24" < : Number of sub-samples to generate per point." < : Minimum overlapping percentage of the aligned region" < : Maximum RAM usage in Mib. Ideally this value should" < : Number of threads. By default: 2" < : Verbosity level. By default 7" < : Random seed to make runs reproducible. Currently only"< 1)) help("Bad argument for -m option, accepted range: [0, 1]"); if ((max < 0) | (max > 1)) help("Bad argument for -M option, accepted range: [0, 1]"); if ((itv <= 0) | (itv > 1)) help("Bad argument for -i option, accepted range: (0, 1]"); if ((ovl <= 0.0) | (ovl > 1.0)) help("Bad argument for -L option, accepted range: (0, 100]"); if (thr <= 0) help("Bad argumement for -t option, accepted: positive non-zero integers"); if (n <= 0) help("Bad argument for -n option, accepted: positive non-zero integers"); if (ram <= 0) help("Bad argument for -R option, accepted: positive non-zero integers"); if (thr <= 0) help("Bad argument for -t option, accepted: positive non-zero integers"); if ((min_sim <= 0) | (min_sim > 1)) help("Bad argument for -S option, accepted range: (0, 1]"); if ((qry_portion < 0) | (qry_portion > 1)) help("Bad argument for -x option, accepted range: (0, 1]"); if ((divide < 0) | (divide >= 1)) help("Bad argument for -d option, accepted range: (0, 1)"); if ((k < 1) | (k > 32)) help("Bad argument for -k option, accepted range: [1, 32]"); char alldataTmp[LARGEST_PATH], outfileTmp[LARGEST_PATH], cntfileTmp[LARGEST_PATH]; if (baseout && (strlen(baseout) > 0)){ if (!alldata || (strlen(alldata) <= 0)){ sprintf(alldataTmp, "%s.npa", baseout); alldata=alldataTmp; } if (!cntfile || (strlen(cntfile) <= 0)){ sprintf(cntfileTmp, "%s.npc", baseout); cntfile=cntfileTmp; } if (!outfile || (strcmp(outfile, "-") == 0)){ sprintf(outfileTmp, "%s.npo", baseout); outfile=outfileTmp; } if (processID == 0 && !log_is_open()) { char logfile[LARGEST_PATH]; sprintf(logfile, "%s.npl", baseout); open_log(logfile); } } rseed = broadcast_int(rseed); barrier_multinode(); srand(rseed + processID); // file checking int count = 0; int limit = 10 * hX; //metagenome should have 10 times more than query reads if (strcmp(nonpareiltype, "alignment") == 0) limit = hX; Sequence test_temp; ifstream testifs((string(file))); if (strcmp(format,"fasta") == 0) { FastaReader testfastaReader(testifs); while(testfastaReader.readNextSeq(test_temp) != (size_t)(-1)) { count++; if (count > limit) break; } } else if(strcmp(format,"fastq") == 0) { FastqReader testfastqReader(testifs); while(testfastqReader.readNextSeq(test_temp) != (size_t)(-1)) { count++; if (count > limit) break; } } if (count == 0) { error("No reads found, probably the input file doesn't exist or it's unreadable"); } else if (count < limit) { if (strcmp(nonpareiltype, "alignment") == 0) error("Reduce the number of query reads (-X) to fit total reads"); else error("Reduce the number of query reads (-X) to ≤ 10\% of total reads"); } if(alldata && (strlen(alldata) > 0)) remove(alldata); if(cntfile && (strlen(cntfile) > 0)) remove(cntfile); if(outfile && (strlen(outfile) > 0) & (strcmp(outfile, "-") != 0)) remove(outfile); if(strcmp(nonpareiltype, "kmer") == 0) { if(alt_query) { if(strcmp(format,"fasta") == 0) { say("1ss$","WARNING: The kmer kernel implements an error correction ", "function only compatible with FastQ"); ifstream qifs((string(qfile))); say("1ss$","reading query", file); FastaReader qfastaReader(qifs); References references = References(qfastaReader, k, alt_query); say("1s$","Started counting"); ifstream ifs((string(file))); FastaReader fastaReader(ifs); KmerCounter counter = KmerCounter(references, fastaReader, string(cntfile)); mates = new int[counter.getTotalSeqs()]; counter.getCounts(mates); avg_seq_len = counter.getAvgLen(); total_seqs = counter.getTotalSeqs(); qry_seqs_no = counter.getTotalQSeqs(); adj_avg_seq_len = avg_seq_len - k + 1; say("1sus$", "Read file with ", total_seqs, " sequences"); say("1sfs$", "Average read length is ", avg_seq_len, "bp"); goto restart_samples; } } else { if(strcmp(format,"fastq") == 0) { ifstream ifs((string(file))); say("1ss$","reading ", file); FastqReader fastqReader(ifs); say("1sus$","Picking ", hX, " random sequences"); References references = References(fastqReader, hX, k); say("1s$","Started counting"); KmerCounter counter = KmerCounter(references, fastqReader, string(cntfile)); mates = new int[hX]; counter.getCounts(mates); avg_seq_len = counter.getAvgLen(); total_seqs = counter.getTotalSeqs(); qry_seqs_no = counter.getTotalQSeqs(); adj_avg_seq_len = avg_seq_len - k + 1; say("1sus$", "Read file with ", total_seqs, " sequences"); say("1sfs$", "Average read length is ", avg_seq_len, "bp"); goto restart_samples; } else if(strcmp(format,"fasta") == 0) { say("1ss$","WARNING: The kmer kernel implements an error correction ", "function only compatible with FastQ"); ifstream ifs((string(file))); say("1ss$","reading ", file); FastaReader fastaReader(ifs); say("1sus$","Picking ", hX, " random sequences"); References references = References(fastaReader, hX, k); say("1s$","Started counting"); KmerCounter counter = KmerCounter(references, fastaReader, string(cntfile)); mates = new int[hX]; counter.getCounts(mates); avg_seq_len = counter.getAvgLen(); total_seqs = counter.getTotalSeqs(); qry_seqs_no = counter.getTotalQSeqs(); adj_avg_seq_len = avg_seq_len - k + 1; say("1sus$", "Read file with ", total_seqs, " sequences"); say("1sfs$", "Average read length is ", avg_seq_len, "bp"); goto restart_samples; } } } say("9si$", "Hello from worker ", processID); barrier_multinode(); // Parse file if(processID==0) say("1s$", "Counting sequences"); namFile = (char *)malloc(LARGEST_PATH * (sizeof *namFile)); seqFile = (char *)malloc(LARGEST_PATH * (sizeof *seqFile)); if(processID==0){ total_seqs = build_index(file, format, namFile, seqFile, largest_seq, avg_seq_len); if(largest_seq<1) error("Empty sequences or internal error. Largest sequence: ", largest_seq); say("2sss$", "The file ", seqFile, " was just created"); say("4sis$", "Longest sequence has ", largest_seq, " characters"); say("4sfs$", "Average read length is ", avg_seq_len, " bp"); if(total_seqs==0) error("No input sequences. Before re-running please delete the file ", seqFile); say("1sus$", "Reading file with ", total_seqs, " sequences"); say("9s$", "Broadcasting"); } total_seqs = broadcast_int(total_seqs); namFile = broadcast_char(namFile, LARGEST_PATH); seqFile = broadcast_char(seqFile, LARGEST_PATH); largest_seq = broadcast_int(largest_seq); avg_seq_len = broadcast_double(avg_seq_len); barrier_multinode(); // Parse Q-File qNamFile = (char *)malloc(LARGEST_PATH * (sizeof *qNamFile)); qSeqFile = (char *)malloc(LARGEST_PATH * (sizeof *qSeqFile)); if(processID==0){ say("1s$", "Counting query sequences"); if(alt_query){ q_total_seqs = build_index(qfile, format, qNamFile, qSeqFile, q_largest_seq, q_avg_seq_len); if(q_largest_seq<1) error("No input sequences or internal error. Largest sequence: ", q_largest_seq); say("2sss$", "The file ", qSeqFile, " was just created"); say("4sis$", "Longest query sequence has ", q_largest_seq, " characters"); say("4sfs$", "Average query read length is ", q_avg_seq_len, " bp"); if(q_total_seqs==0) error("No input sequences. Before re-running please delete the file ", qSeqFile); say("1sus$", "Reading query file with ", q_total_seqs, " sequences"); }else{ q_total_seqs=0; qNamFile=(char *)""; qSeqFile=(char *)""; q_largest_seq = 0; q_avg_seq_len = 0.0; } say("9s$", "Broadcasting"); } q_total_seqs = broadcast_int(q_total_seqs); qNamFile = broadcast_char(qNamFile, LARGEST_PATH); qSeqFile = broadcast_char(qSeqFile, LARGEST_PATH); q_largest_seq = broadcast_int(q_largest_seq); q_avg_seq_len = broadcast_double(q_avg_seq_len); barrier_multinode(); restart_vars: say("9sis$", "Worker ", processID, " @start_vars."); if (processID == 0) { // Re-wire query portion if (qry_portion != 0) hX = (size_t)total_seqs*qry_portion; qry_portion = (double)hX/(alt_query ? q_total_seqs : total_seqs); // Prepare memory arguments if((size_t)ram > UINT_MAX/1024) error("The memory to allocate is too large, reduce -R", ram); ram_Kb = ram * 1024; required_ram_Kb = 2 * (int)hX * sizeof(int) * thr / 1024 + 2048; if(ram_Kb < required_ram_Kb) error("The amount of memory allowed is too small, increase -R to over ", (double)required_ram_Kb/1024); lines_in_ram = (ram_Kb - required_ram_Kb)/(largest_seq + 3); if(lines_in_ram > UINT_MAX/1024){ say("1sfs$", "WARNING: Unable to represent RAM in bits, lowering to ", (double)UINT_MAX/(1024*1024), "Mb"); lines_in_ram = UINT_MAX; }else lines_in_ram *= 1024; // <- Rounding down to the Kibi. say("3sfsisfs$", "Sequences to store in ", (double)(ram_Kb - required_ram_Kb)/1024, "Mb free: ", lines_in_ram, " (", (double)lines_in_ram*100.0/total_seqs, "%)"); } hX = broadcast_int(hX); qry_portion = broadcast_double(qry_portion); ram_Kb = broadcast_int(ram_Kb); required_ram_Kb = broadcast_int(required_ram_Kb); lines_in_ram = broadcast_int(lines_in_ram); barrier_multinode(); // Run comparisons restart_mates: say("9sis$", "Worker ", processID, " @start_mates."); matepar.overlap = ovl; matepar.similarity = min_sim; matepar.qryportion = qry_portion; matepar.revcom = revcom; matepar.n_as_mismatch = n_as_mismatch; matepar.k = k; if (processID == 0) say("1sfsis$", "Querying library with ", qry_portion, " times the total size (", hX," seqs)"); if (alt_query) { qry_seqs_no = nonpareil_mate(mates, seqFile, qSeqFile, thr, lines_in_ram, total_seqs, largest_seq, q_largest_seq, matepar); } else { qry_seqs_no = nonpareil_mate(mates, seqFile, thr, lines_in_ram, total_seqs, largest_seq, matepar); } if (processID == 0) { if (alt_query) for (size_t a=0; a 0)) nonpareil_save_mates(mates, qry_seqs_no, cntfile); } barrier_multinode(); // Sampling restart_samples: say("9sis$", "Worker ", processID, " @start_samples."); sampling_points = (divide==0) ? ((int)ceil((max-min)/itv)+1) : ((int)ceil( (log(2) - log(total_seqs))/log(divide) )+2); sample_t sample_summary[sampling_points]; size_t dummy=0; if (processID == 0) { sample_i = sample_after_20 = 0; samplepar.np_version = NP_VERSION; samplepar.replicates = n; samplepar.mates = &mates; samplepar.mates_size = qry_seqs_no; samplepar.portion_min = min; samplepar.portion_max = max; samplepar.portion_itv = itv; samplepar.seq_overlap = ovl; samplepar.total_reads = total_seqs; samplepar.max_read_len = largest_seq; samplepar.avg_read_len = avg_seq_len; samplepar.portion_as_label = portion_label; samplepar.divide = divide; if (strcmp(nonpareiltype, "kmer") != 0) { samplepar.type = 1; } else { samplepar.k = k; samplepar.adj_avg_read_len = adj_avg_seq_len; samplepar.type = 2; //kmer } say("1s$", "Sub-sampling library"); while(sample_i < sampling_points){ if (divide == 0) { samplepar.portion = min + itv*sample_i; } else { samplepar.portion = sample_i==0 ? 0 : pow(divide, sampling_points-sample_i-1); } samplepar.replicates = n; samples_no = nonpareil_sample_portion(sample_result, thr, samplepar); sample_summary[sample_i++] = nonpareil_sample_summary(sample_result, samples_no, alldata, outfile, samplepar); if (samplepar.portion <= 0.2) sample_after_20 = sample_i; } dummy = 1; } dummy = broadcast_int(dummy); barrier_multinode(); goto restart_checkings; // Check results restart_checkings: say("9sis$", "Worker ", processID, " @start_checkings."); if(processID==0){ say("1s>", "Evaluating consistency"); ok = true; // Low sequencing depth if(sample_after_20<=sample_i && sample_summary[sample_after_20].q2==0.0){ say("1ss$", "WARNING: The estimation at 20% has median zero, ", "possibly reflecting inaccurate estimations"); if(qry_portion<1.0 && hX<3000){ if(autoadjust){ hX = 0; qry_portion *= 2.0; if(qry_portion >= 1.0) qry_portion = 1.0; say("1sf$", "AUTOADJUST: -x ", qry_portion); goto restart_vars; } else say("1sf$", "To increase the sensitivity increase -X, currently set at ", hX); } else if(ovl > 0.25) { if(autoadjust){ if(ovl>1.0) ovl = 1.0;// This should never happen else if(ovl>0.75) ovl = 0.75; else if(ovl>0.5) ovl = 0.5; else if(ovl>0.25) ovl = 0.25; else error("Impossible to reduce -L further, sequencing depth under detection level"); say("1sf$", "AUTOADJUST: -L ", ovl*100); goto restart_mates; } else say("1sf$", "To increase sensitivity, decrease -L, currently set at ", ovl*100); } else { say("1ss$", "The portion used as query (-x) is currently set to the maximum, ", "and the overlap (-L) is set to the minimum"); say("1s$", "The dataset is probably too small for reliable estimations"); if(min_sim>0.75) say("1s$", "You could decrease the -S but values other than 0.95 are untested"); error("Sequencing depth under detection limit."); } ok = false; } // High sequencing depth if(sample_summary[sample_i-1].avg >= 0.95){ if(ovl<1.0){ if(autoadjust){ if(ovl<0.25) ovl = 0.25; else if(ovl<0.5) ovl = 0.5; else if(ovl<0.75) ovl = 0.75; else if(ovl<1.0) ovl = 1.0; say("1sf$", "AUTOADJUST: -L ", ovl*100.0); goto restart_mates; } } else { say("1ss$", "The overlap (-L) is currently set to the maximum, ", "meaning that the actual coverage is probably above 100X"); if(min_sim<1.0) say("1s$", "You could increase -S but values other than 0.95 are untested"); error("Sequencing depth above detection limit."); } ok = false; } // Low resolution if(sample_i>5 && sample_summary[5].avg >= 0.95){ say("1ss$", "WARNING: The curve reached near-saturation in 6 or less points, ", "hence diversity estimations could be unreliable"); if(autoadjust){ itv *= 0.5; say("1sf$", "AUTOADJUST: -i ", itv); goto restart_samples; } else say("1ssf$", "To increase the resolution of the curve increase the -i parameter, ", "currently set at ", itv); ok = false; } if(ok) say("1s$", "Everything seems correct"); dummy = 2; } dummy = broadcast_int(dummy); barrier_multinode(); goto exit; exit: say("9sis$", "Worker ", processID, " @exit."); // Clean temporals if (processID == 0) { remove(namFile); remove(seqFile); close_log(); } barrier_multinode(); finalize_multinode(); return 0; }