https://github.com/splatlab/squeakr
Tip revision: 0d5813442b49454d35507ec4445170f1645ded6d authored by prashantpandey on 18 December 2020, 05:05:14 UTC
Merge branch 'development'
Merge branch 'development'
Tip revision: 0d58134
count.cc
/*
* ============================================================================
*
* Authors: Prashant Pandey <ppandey@cs.stonybrook.edu>
* Rob Johnson <robj@vmware.com>
* Rob Patro (rob.patro@cs.stonybrook.edu)
*
* ============================================================================
*/
#include <iostream>
#include <algorithm>
#include <cstring>
#include <vector>
#include <set>
#include <unordered_set>
#include <bitset>
#include <cassert>
#include <fstream>
#include <boost/thread/thread.hpp>
#include <boost/lockfree/queue.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/atomic.hpp>
#include <time.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/mman.h>
#include "clipp.h"
#include "ProgOpts.h"
#include "SqueakrFS.h"
#include "squeakrconfig.h"
#include "gqf_cpp.h"
#include "chunk.h"
#include "kmer.h"
#include "reader.h"
#include "util.h"
#define BITMASK(nbits) ((nbits) == 64 ? 0xffffffffffffffff : (1ULL << (nbits)) \
- 1ULL)
#define QBITS_LOCAL_QF 16
#define MAX_NUM_THREADS 64
typedef struct {
CQF<KeyObject> *local_cqf;
CQF<KeyObject> *main_cqf;
uint32_t count {0};
uint32_t ksize {28};
bool exact;
spdlog::logger* console{nullptr};
} flush_object;
/*create a multi-prod multi-cons queue for storing the chunk of fastq file.*/
boost::lockfree::queue<file_pointer*, boost::lockfree::fixed_sized<true> > ip_files(64);
boost::atomic<int> num_files {0};
/* dump the contents of a local QF into the main QF */
static bool dump_local_qf_to_main(flush_object *obj)
{
CQF<KeyObject>::Iterator it = obj->local_cqf->begin();
do {
KeyObject hash = it.get_cur_hash();
int ret = obj->main_cqf->insert(hash, QF_WAIT_FOR_LOCK | QF_KEY_IS_HASH);
if (ret == QF_NO_SPACE) {
obj->console->error("The CQF is full. Please rerun the with a larger size.");
return false;
}
++it;
} while (!it.done());
obj->local_cqf->reset();
return true;
}
/* convert a chunk of the fastq file into kmers */
bool reads_to_kmers(chunk &c, flush_object *obj)
{
auto fs = c.get_reads();
auto fe = c.get_reads();
auto end = fs + c.get_size();
while (fs && fs!=end) {
fs = static_cast<char*>(memchr(fs, '\n', end-fs)); // ignore the first line
fs++; // increment the pointer
fe = static_cast<char*>(memchr(fs, '\n', end-fs)); // read the read
std::string read(fs, fe-fs);
start_read:
if (read.length() < obj->ksize) // start with the next read if length is smaller than K
goto next_read;
{
__int128_t first = 0;
__int128_t first_rev = 0;
__int128_t item = 0;
for(uint32_t i = 0; i < obj->ksize; i++) { //First kmer
uint8_t curr = Kmer::map_base(read[i]);
if (curr > DNA_MAP::G) { // 'N' is encountered
if (i + 1 < read.length())
read = read.substr(i + 1, read.length());
else
goto next_read;
goto start_read;
}
first = first | curr;
first = first << 2;
}
first = first >> 2;
first_rev = Kmer::reverse_complement(first, obj->ksize);
if (Kmer::compare_kmers(first, first_rev))
item = first;
else
item = first_rev;
/*
* first try and insert in the main QF.
* If lock can't be acquired in the first attempt then
* insert the item in the local QF.
*/
KeyObject k(item, 0, 1);
int ret = obj->main_cqf->insert(k, QF_TRY_ONCE_LOCK);
if (ret == QF_NO_SPACE) {
obj->console->error("The CQF is full. Please rerun the with a larger size.");
exit(1);
} else if (ret == -2) {
obj->local_cqf->insert(k, QF_NO_LOCK);
obj->count++;
// check of the load factor of the local QF is more than 50%
if (obj->count > 1ULL<<(QBITS_LOCAL_QF-1)) {
if (!dump_local_qf_to_main(obj))
return false;
obj->count = 0;
}
}
uint64_t next = (first << 2) & BITMASK(2 * obj->ksize);
uint64_t next_rev = first_rev >> 2;
for(uint32_t i = obj->ksize; i < read.length(); i++) { //next kmers
uint8_t curr = Kmer::map_base(read[i]);
if (curr > DNA_MAP::G) { // 'N' is encountered
if (i + 1 < read.length())
read = read.substr(i + 1, read.length());
else
goto next_read;
goto start_read;
}
next |= curr;
uint64_t tmp = Kmer::reverse_complement_base(curr);
tmp <<= (obj->ksize * 2 - 2);
next_rev = next_rev | tmp;
if (Kmer::compare_kmers(next, next_rev))
item = next;
else
item = next_rev;
/*
* first try and insert in the main QF.
* If lock can't be accuired in the first attempt then
* insert the item in the local QF.
*/
KeyObject k(item, 0, 1);
ret = obj->main_cqf->insert(k, QF_TRY_ONCE_LOCK);
if (ret == QF_NO_SPACE) {
obj->console->error("The CQF is full. Please rerun the with a larger size.");
exit(1);
} else if (ret == -2) {
obj->local_cqf->insert(k, QF_NO_LOCK);
obj->count++;
// check of the load factor of the local QF is more than 50%
if (obj->count > 1ULL<<(QBITS_LOCAL_QF-1)) {
if (!dump_local_qf_to_main(obj))
return false;
obj->count = 0;
}
}
next = (next << 2) & BITMASK(2*obj->ksize);
next_rev = next_rev >> 2;
}
}
next_read:
fs = ++fe; // increment the pointer
fs = static_cast<char*>(memchr(fs, '\n', end-fs)); // ignore one line
fs++; // increment the pointer
fs = static_cast<char*>(memchr(fs, '\n', end-fs)); // ignore one more line
fs++; // increment the pointer
}
free(c.get_reads());
return true;
}
/* read a part of the fastq file, parse it, convert the reads to kmers, and
* insert them in the CQF
*/
static bool fastq_to_uint64kmers_prod(flush_object* obj)
{
file_pointer* fp;
while (num_files) {
while (ip_files.pop(fp)) {
if (reader::fastq_read_parts(fp->mode, fp)) {
ip_files.push(fp);
chunk c(fp->part, fp->size);
if (!reads_to_kmers(c, obj)) {
obj->console->error("Insertion in the CQF failed.");
abort();
}
} else {
/* close the file */
if (fp->mode == 0)
fclose(fp->freader->in);
else if (fp->mode == 1)
gzclose(fp->freader->in_gzip);
else if (fp->mode == 2)
if (fp->freader->in) {
BZ2_bzReadClose(&(fp->freader->bzerror), fp->freader->in_bzip2);
fclose(fp->freader->in);
}
delete[] fp->part_buffer;
delete fp;
num_files--;
}
}
}
if (obj->count) {
if (!dump_local_qf_to_main(obj)) {
obj->console->error("Insertion in the CQF failed.");
abort();
}
obj->count = 0;
}
return true;
}
/* main method */
int count_main(CountOpts &opts)
{
int mode = 0;
spdlog::logger* console = opts.console.get();
if (opts.exact && opts.ksize > 32) {
console->error("Does not support k-mer size > 32 for squeakr-exact.");
return 1;
}
if (opts.numthreads == 0)
opts.numthreads = std::thread::hardware_concurrency();
enum qf_hashmode hash = QF_HASH_DEFAULT;
int num_hash_bits = opts.qbits+8; // we use 8 bits for remainders in the main QF
if (opts.exact) {
num_hash_bits = 2*opts.ksize; // Each base 2 bits.
hash = QF_HASH_INVERTIBLE;
}
std::string ser_ext(".squeakr");
std::string log_ext(".log");
std::string cluster_ext(".cluster");
std::string freq_ext(".freq");
struct timeval start1, start2, end1, end2;
struct timezone tzp;
uint32_t OVERHEAD_SIZE = 65535;
std::string filepath(opts.filenames.front());
auto const pos = filepath.find_last_of('.');
std::string input_ext = filepath.substr(pos + 1);
if (input_ext == std::string("fastq") || input_ext == std::string("fq"))
mode = 0;
else if (input_ext == std::string("gz"))
mode = 1;
else if (input_ext == std::string("bz2"))
mode = 2;
else {
console->error("Does not support this input file type.");
return 1;
}
for( auto& fn : opts.filenames ) {
auto* fr = new reader;
if (reader::getFileReader(mode, fn.c_str(), fr)) {
file_pointer* fp = new file_pointer;
fp->mode = mode;
fp->freader.reset(fr);
fp->part_buffer = new char[OVERHEAD_SIZE];
ip_files.push(fp);
num_files++;
} else {
delete fr;
}
}
std::string ds_file = opts.output_file;
//Initialize the main QF
CQF<KeyObject> cqf(opts.qbits, num_hash_bits, hash, SEED);
if (opts.numthreads == 1)
cqf.set_auto_resize();
CQF<KeyObject> *local_cqfs = (CQF<KeyObject>*)calloc(MAX_NUM_THREADS,
sizeof(CQF<KeyObject>));
boost::thread_group prod_threads;
for (int i = 0; i < opts.numthreads; i++) {
local_cqfs[i] = CQF<KeyObject>(QBITS_LOCAL_QF, num_hash_bits, hash, SEED);
flush_object* obj = (flush_object*)malloc(sizeof(flush_object));
obj->local_cqf = &local_cqfs[i];
obj->main_cqf = &cqf;
obj->ksize = opts.ksize;
obj->exact = opts.exact;
obj->count = 0;
obj->console = console;
prod_threads.add_thread(new boost::thread(fastq_to_uint64kmers_prod,
obj));
}
console->info("Reading from the fastq file and inserting in the CQF.");
gettimeofday(&start1, &tzp);
prod_threads.join_all();
// Resize the CQF if:
// there is cutoff value greater than 1
// the final CQF doesn't need counts
// the default size if used
if (opts.cutoff > 1 || opts.contains_counts == 0 || !opts.setqbits) {
if (opts.cutoff > 1)
console->info("Filtering k-mers based on the cutoff.");
else if (opts.contains_counts == 0)
console->info("Removing counts from the CQF.");
else if (!opts.setqbits)
console->info("Trying to compress the final CQF.");
uint64_t num_kmers{0}, estimated_size{0}, log_estimated_size{0};
CQF<KeyObject>::Iterator it = cqf.begin();
while (!it.done()) {
KeyObject hash = it.get_cur_hash();
if (hash.count >= (uint32_t)opts.cutoff)
num_kmers++;
//console->info("hash fraction: {}", k.key / (float)cqf.range());
if (cqf.get_unique_index(hash, QF_KEY_IS_HASH) / (float)(1ULL <<
opts.qbits) >
0.05) {
estimated_size = num_kmers * (cqf.range() / static_cast<__uint128_t>(hash.key));
//console->info("estimated size: {}", estimated_size);
if (opts.contains_counts == 1)
estimated_size *= 3; // to account for counts.
log_estimated_size = ceil(log2(estimated_size));
uint64_t total_slots = 1ULL << log_estimated_size;
//console->info("estimated size after ceiling: {}", 1ULL << log_estimated_size);
if ((total_slots-estimated_size)/(float)estimated_size < 0.1)
log_estimated_size += 1;
//console->info("estimated size after ceiling: {}", 1ULL << log_estimated_size);
break;
}
++it;
}
console->info("Estimated size of the final CQF: {}", log_estimated_size);
if (opts.cutoff > 1 || opts.contains_counts == 0 || cqf.numslots() > (1ULL
<<
log_estimated_size))
{
CQF<KeyObject> filtered_cqf(log_estimated_size, num_hash_bits, hash, SEED);
filtered_cqf.set_auto_resize();
it = cqf.begin();
uint64_t max_cnt = 0;
while (!it.done()) {
KeyObject hash = it.get_cur_hash();
if (hash.count >= (uint32_t)opts.cutoff) {
int ret;
if (opts.contains_counts == 1)
ret = filtered_cqf.insert(hash, QF_NO_LOCK | QF_KEY_IS_HASH);
else {
hash.count = 1;
ret = filtered_cqf.insert(hash, QF_NO_LOCK | QF_KEY_IS_HASH);
}
if (ret == QF_NO_SPACE) {
console->error("The CQF is full. Estimated size of the final CQF is wrong.");
exit(1);
}
}
if (max_cnt < hash.count)
max_cnt = hash.count;
++it;
}
cqf = filtered_cqf;
}
}
console->info("Calculating frequency distribution:");
gettimeofday(&start2, &tzp);
uint64_t max_cnt = 0;
CQF<KeyObject>::Iterator it = cqf.begin();
while (!it.done()) {
KeyObject hash = it.get_cur_hash();
if (max_cnt < hash.count)
max_cnt = hash.count;
++it;
}
gettimeofday(&end2, &tzp);
print_time_elapsed("Iteration:", &start2, &end2, console);
// serialize the CQF to disk.
cqf.serialize(ds_file);
gettimeofday(&end1, &tzp);
print_time_elapsed("Counting:", &start1, &end1, console);
console->info("Maximum freq: {}", max_cnt);
console->info("Num distinct elem: {}", cqf.dist_elts());
console->info("Total num elems: {}", cqf.total_elts());
// seek to the end of the file and write the k-mer size
std::ofstream squeakr_file(ds_file, std::ofstream::out |
std::ofstream::app | std::ofstream::binary);
squeakr_file.seekp(0, squeakr_file.end);
squeakrconfig config;
config.kmer_size = opts.ksize;
config.cutoff = opts.cutoff;
config.contains_counts = opts.contains_counts;
squeakr_file.write((const char*)&config, sizeof(config));
squeakr_file.close();
return 0;
}