https://github.com/dkondor/matching
Raw File
Tip revision: b63a836e4eb4fe841659c0508f3f74a8e35d5308 authored by Kondor Dániel on 25 October 2017, 08:16:23 UTC
added work on Bayesian estimates of matching probabilities
Tip revision: b63a836
matchingtestp4_all.cpp
/*
 * matchingtestp4_all.cpp
 * 
 * match transportation and CDR data
 * use the crecords class for storing CDR data and searching
 * read LTA records from text file, iterate over it, store possible / impossible matches in hash sets
 * 	(will probably use a lot of memory)
 * 
 * parallel implementation
 * 
 * improved version, use two crecords classes (one for CDR and one for LTA), perform matching in both directions (if needed)
 * 
 * further improved version, use copies of both dataset sorted by uids, first generate possible matches then check for impossible matches among these
 *  + extra option to test if old and new versions of functions give the same result
 * 
 * read data from binary storage files
 * 
 * new possibilities:
 * 	count impossible matches (i.e.~total number of temporal matches), write out statistics about that too
 * 	keep track of all points, allow only one match per point in both datasets
 * 
 * Copyright 2016 Kondor Dániel <dkondor@mit.edu>
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are
 * met:
 * 
 * * Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 * * Redistributions in binary form must reproduce the above
 *   copyright notice, this list of conditions and the following disclaimer
 *   in the documentation and/or other materials provided with the
 *   distribution.
 * * Neither the name of the  nor the names of its
 *   contributors may be used to endorse or promote products derived from
 *   this software without specific prior written permission.
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 * 
 * 
 */

#ifndef USECEREAL
#define USECEREAL
#endif

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <time.h>
#include <vector>
#include <unordered_map> // use C++11 STL hashmaps instead of google::dense_hash_*
#include <unordered_set>
#include <set>
#include <algorithm> // std::sort for determining the top matches
#include <pthread.h>
#include <signal.h>
#include <unistd.h>
#include "cdrrecords.h"
#include "popen_noshell.h"


// shared data by the threads (records to process) -- note: this could be implemented as a derived class of crecords
struct rdata {
	//distinct user ids and pointers in the above vectors to the start of the data associated with them
	std::vector<int64_t> uids_uniq;
	//~ std::vector<uint64_t> uids_off;
	
	//number of users processed (a thread should choose it's next user to process as uids_uniq[nusers]
	unsigned int nusers;
	//~ pthread_mutex_t nusers_mutex; //mutex to use to choose the next user to process
	
	//files to write the output to (possible matches and summary statistics in separate files)
	FILE* out_possible;
	FILE* out_summary;
	
	//~ bool out_impossible_brief; //only write one record per user to out_impossible, the number of matches found
	//~ bool quick; //do not keep track of the number of matches for impossible combinations
	unsigned int dt1; // time window to use for walking
	unsigned int dt2; // time window to use for transit
	
	// 1. store data sorted by cell_id and timestamp for searching for possible matches
	crecords* cr; //store cdr records here (only read by the threads, not modified)
	// 2. store data to search for sorted by user_id and timestamp + use the same data for checking impossible matches
	crecords* tr; //store transportation data here (readonly too)
	// 3. store the CDR data again sorted by uid and timestamp to check for impossible matches
	crecords* cru;
	bool searchreverse; // if true, cr and cru store transportation records and tr store CDR records; the compare_users*() functions need to be called on cru
	
	unsigned int out_p_max; //output the first this many possible matches only
	
	// 4. optional: store the CDR data again, sorted by timestamp to do the original kind of processing to check if the new version is working well
	//~ crecords* crc; -- note: removed these so as to simplify the process_records() function
	
	//if set to true, failed tests do not result in aborted run, so that these can be better inspected from the debugger
	//~ bool debugrun;
	
	bool skiperrors; //if true, errors from search_records_possible() (i.e. missing place ID) do not result in an aborted run
		//when searching in reverse (searching for CDR records among the LTA data), there can be cells with no stops in or around them,
		//that is not an error
	bool statall; //if true, result (output) is a statistic of the number of all temporal matches, separated to possible and impossible
	bool statall_possible; //if true, the summary file will contain only the statistic of possible matches (i.e. all temporal matches need not be calculated)
	bool onematch; //if true, each point can result in only one match also in the searched dataset
	bool error; // signal an error during processing
	
	uint32_t tmin; // if > 0, limit searches to this range
	uint32_t* tmax; // if ntmax > 0 use several search ranges
	int ntmax;
	
	// limit the number of running threads and / or inform the main thread about the number of running threads
	// (need to be set to the number of threads before start, thread decrement these when exiting)
	unsigned int nthreads;
	unsigned int running_threads;
	unsigned int tid;
};


// run this function on multiple threads, do the actual processing here
void* process_records(void* p0) {
	
	// set thread signal mask to ignore SIGINT (that should be handled in the main thread)
	sigset_t set;
	sigemptyset(&set);
	sigaddset(&set, SIGQUIT);
	sigaddset(&set, SIGINT);
	pthread_sigmask(SIG_BLOCK, &set, 0);
	
	rdata* rd = (rdata*)p0;
	
	// sets used during normal processing
	std::unordered_set<int64_t> s_candidates; //use this to store all / any matches which are later checked 
	
	std::vector<uint32_t>* h_possible_matches = new std::vector<uint32_t>[rd->ntmax]; //used as temporary storage to create histograms of the number of matches
	std::vector<uint32_t>* h_impossible_matches = new std::vector<uint32_t>[rd->ntmax]; //note: these are resized dynamically as needed
	unsigned int* matches_possible = 0;
	unsigned int* matches_temporal = 0;
	if(rd->ntmax > 1) {
		matches_possible = new unsigned int[rd->ntmax];
		matches_temporal = new unsigned int[rd->ntmax];
	}
	
	// assign thread ID
	unsigned int tid1 = __atomic_fetch_add(&(rd->tid),1,__ATOMIC_ACQ_REL);
	bool running = true;
	
	while(1) {
		// check for error or quit request
		if( rd->error || rd->nthreads == 0 ) break;
		if( tid1 >= rd->nthreads ) {
			running = false;
			unsigned int nt2 = __atomic_fetch_sub(&(rd->running_threads),1,__ATOMIC_ACQ_REL);
			if(nt2 == 0) { // subtracting 1 from unsigned 0 is undefined behavior, this is an error then
				rd->running_threads = 0;
				rd->error = true;
				break;
			}
			
			do sleep(5);
			while( tid1 >= rd->nthreads && !(rd->error) && rd->nusers < rd->uids_uniq.size() );
			if(rd->error) break;
			__atomic_fetch_add(&(rd->running_threads),1,__ATOMIC_ACQ_REL);
			running = true;
		}
		
		unsigned int nusers = __atomic_fetch_add(&(rd->nusers),1,__ATOMIC_ACQ_REL);
		if(nusers >= rd->uids_uniq.size()) break;
		int64_t uid0 = rd->uids_uniq[nusers];
		
		//process the given user
		unsigned int ne = 0;
		for(crecords::iterator cit = rd->tr->get_uid_iterator_begin(uid0); cit != rd->tr->get_uid_iterator_end(uid0); ++cit) {
			const record& r = *cit;
			if( r.ts < rd->tmin || ( rd->tmax[rd->ntmax-1] > 0 && r.ts >= rd->tmax[rd->ntmax-1] ) ) continue; // leave out records outside the given time range
			
			uint32_t tmin1 = r.ts - rd->dt1;
			uint32_t tmax1 = r.ts + rd->dt1;
			if(r.startstop == 0) tmax1 = r.ts + rd->dt2; // start of a trip, use the transit-related time interval
			if(r.startstop == 1) tmin1 = r.ts - rd->dt2; // end of a trip, use the transit-related time interval
			if(tmin1 < rd->tmin) tmin1 = rd->tmin;
			if(rd->tmax[rd->ntmax-1] > 0 && tmax1 >= rd->tmax[rd->ntmax-1]) tmax1 = rd->tmax[rd->ntmax-1] - 1;
			
			if(rd->cr->search_records(r.cell_id, tmin1, tmax1, s_candidates, s_candidates, true)) {
				if(!rd->skiperrors) {
					fprintf(stderr,"Error searching for record with the new method: (%ld, %u, %d)!\n",r.uid,r.ts,r.cell_id);
					rd->error = true;
					break;
				}
			}
			ne++;
		} //for(i) -- process all records by one user
		if(rd->error) break;
		
		//test for impossible matches in the set, write out results
		if(ne > 0) {
			for(int i=0;i<rd->ntmax;i++) {
				h_possible_matches[i].resize(ne,0); //note: new elements are added with the specified value (0)
				h_impossible_matches[i].resize(ne,0); //the clear() at the end ensures that the vectors at this point are empty, so all elements become zero
			}
			
			for(auto it = s_candidates.cbegin(); it != s_candidates.cend(); ++it) {
				// check for possible matches first
				unsigned int ni = 0;
				int ni2 = 0;
				
				if(rd->ntmax > 1) {
					for(int i=0;i<rd->ntmax;i++) { matches_possible[i] = 0; matches_temporal[i] = 0; }
					if(rd->searchreverse) ni2 = rd->cru->compare_users_onematch_tmulti(*it,*(rd->tr),uid0,rd->ntmax,matches_temporal,matches_possible,
						rd->tmin,rd->tmax,false,true);
					else ni2 = rd->tr->compare_users_onematch_tmulti(uid0,*(rd->cru),*it,rd->ntmax,matches_temporal,matches_possible,
						rd->tmin,rd->tmax,false,true);
					if(ni2 == -1) {
						fprintf(stderr,"Error comparing users %ld and %ld!\n",uid0,*it);
						rd->error = true;
						break;
					}
					for(int i=0;i<rd->ntmax;i++) {
						if(matches_possible[i]) h_possible_matches[i][ matches_possible[i]-1 ]++;
						if(matches_temporal[i]) h_impossible_matches[i][ matches_temporal[i]-1 ]++;
					}
				}
				else {
					if(rd->searchreverse) {
						if(rd->onematch) ni2 = rd->cru->compare_users2_onematch(*it,*(rd->tr),uid0,&ni,0,false,rd->skiperrors,true,rd->tmin,rd->tmax[0]);
						else ni2 = rd->cru->compare_users2(*it,*(rd->tr),uid0,&ni,0,false,rd->skiperrors,true,rd->tmin,rd->tmax[0]);
					}
					else {
						if(rd->onematch) ni2 = rd->tr->compare_users2_onematch(uid0,*(rd->cru),*it,&ni,0,false,rd->skiperrors,true,rd->tmin,rd->tmax[0]);
						else ni2 = rd->tr->compare_users2(uid0,*(rd->cru),*it,&ni,0,false,rd->skiperrors,true,rd->tmin,rd->tmax[0]);
					}
					
					if(ni2 == -1) {
						fprintf(stderr,"Error comparing users %ld and %ld!\n",uid0,*it);
						rd->error = true;
						break;
					}
					
					if(ni2 == 0) { // impossible, ni contains the number of temporal matches
						// add to the histogram
						if(ni > 0) h_impossible_matches[0][ni-1]++;
					}
					else { // possible
						// just add to the histogram
						if(ni > 0) h_possible_matches[0][ni-1]++;
					}
				}
			}
			
			// output histograms (note: do not output matches, that can be done with the original matchingtest4.cpp program which does the search
			//			for only possible matches more efficiently)
			flockfile(rd->out_summary);
			
			for(uint32_t i=0;i<ne;i++) {
				if(rd->ntmax > 1) {
					for(int j=0;j<rd->ntmax;j++) if(h_possible_matches[j][i] || h_impossible_matches[j][i]) 
						fprintf(rd->out_summary,"%u\t%u\t%ld\t%u\t%u\t%u\t%u\n",rd->tmin,rd->tmax[j],uid0,ne,i+1,
							h_possible_matches[j][i],h_impossible_matches[j][i]);
				}
				else if(h_possible_matches[0][i] || h_impossible_matches[0][i])
					fprintf(rd->out_summary,"%ld\t%u\t%u\t%u\t%u\n",uid0,ne,i+1,h_possible_matches[0][i],h_impossible_matches[0][i]);
			}
			
			funlockfile(rd->out_summary);
			
			for(int i=0;i<rd->ntmax;i++) {
				h_possible_matches[i].clear(); //this ensures that all counts will be zero the next time
				h_impossible_matches[i].clear(); //this ensures that all counts will be zero the next time
			}
				
			s_candidates.clear();
				
		} //if(ne > 0)
	
	} //while( ! error ) -- process all users
	
	// decrement the counter with the number of running threads
	tid1 = __atomic_fetch_sub(&(rd->running_threads),1,__ATOMIC_ACQ_REL);
	if(tid1 == 0) { // subtracting 1 from unsigned 0 is undefined behavior ?
		rd->running_threads = 0;
		rd->error = true;
	}
	
	delete[]h_possible_matches;
	delete[]h_impossible_matches;
	if(matches_possible) delete[]matches_possible;
	if(matches_temporal) delete[]matches_temporal;
	
	return 0;
}


int main(int argc, char **argv)
{
	char* cdrfile = 0; //file with CDR records
	char* ltafile = 0; //file with transportation data
	
	char** placesmapfiles = 0; //place-cell mapping files
	unsigned int nplacesmapfiles = 0; // number of files to read
	
	unsigned int dt1 = 600; // time window for walking (default: 10 min)
	unsigned int dt2 = 300; // time window for transit (default: 5 min)
	double radius1p = 500.0; // possible radius for walking
	double radius1i = 500.0; // impossible radius for walking
	double radius2p = 1000.0; // possible radius for transit
	double radius2i = 2000.0; // impossible radius for transit
	
	char* outf_summary = 0; //write out the number of impossible matches and excluded users for each input user separately instead of in the main output file
	
	uint32_t tmin = 0; // minimum and maximum timestamp
	uint32_t* tmax = 0; // to limit searches in time
	uint32_t tmaxnull = 0; // to use as a limit if none is given
	int ntmax = 0; // several tmax values to get statistics of increasing intervals
	
	//Voronoi tessellation
	char* v_vertex_coords = 0;
	char* v_vertex_to_cell = 0;
	char* v_edges = 0;
	char* v_cell_unique = 0;
	
	unsigned int nthreads = 1;
	
	bool searchreverse = false; //if set, search for CDR events among LTA records
	bool onematch = false; //if true, each point can result in only one match also in the searched dataset
	bool outzip = false; //compress output files (if not writing to stdout)
	bool inzip = false; //read compressed binary files
	
	// auxilliary data for the custom popen() implementation used to save gziped files (note: the glibc / stdlib.h popen() will probably request
	//	to reserve memory 2x the size of the calling process due to using fork() which might fail)
	struct popen_noshell_pass_to_pclose spc,spc2;
	const char* popen_args[4];
	popen_args[0] = "/bin/gzip";
	popen_args[1] = "-c";
	popen_args[2] = 0;
	popen_args[3] = 0;
	
	
	//!! TODO: more rigorous processing of arguments, protect against invalid arguments
	for(int i=1;i<argc;i++) if(argv[i][0] == '-') switch(argv[i][1]) {
		case 'p':
			int j;
			for(j=i+1;j<argc;j++) if(argv[j][0] == '-') break;
			nplacesmapfiles = j-i-1;
			if(nplacesmapfiles > 0) placesmapfiles = argv+i+1;
			else fprintf(stderr,"Missing file names after %s!\n",argv[i]);
			break;
		case 'r':
			cdrfile = argv[i+1];
			break;
		case 'l':
			ltafile = argv[i+1];
			break;
		case 'z':
			inzip = true;
			break;
		case 't':
			if(argv[i][2] == 'i') { // time intervals for the search; expected as tmin, tmax1, tmax2, ...
				int j;
				for(j=i+1;j<argc;j++) if(argv[j][0] == '-') break;
				if(j-i-1 < 2) {
					fprintf(stderr,"Invalid parameter: %s (-ti requires at least two timestamps)!\n",argv[i]);
					break;
				}
				ntmax = j-i-2;
				tmax = (uint32_t*)malloc(sizeof(uint32_t)*ntmax);
				if(tmax == 0) {
					fprintf(stderr,"Error allocating memory!\n");
					return 1;
				}
				tmin = atoi(argv[i+1]);
				for(int k=i+2;k<j;k++) tmax[k-i-2] = atoi(argv[k]);
				break;
			}
			dt1 = atoi(argv[i+1]);
			if(i+2 < argc && argv[i+2][0] != '-') { dt2 = atoi(argv[i+2]); i++; }
			else dt2 = dt1;
			i++;
			break;
		case 'd':
			if(argv[i][2] == '1') {
				radius1p = atof(argv[i+1]);
				if(i+2 < argc && argv[i+2][0] != '-') { radius1i = atof(argv[i+2]); i++; }
				else radius1i = radius1p;
				i++;
				break;
			}
			if(argv[i][2] == '2') {
				radius2p = atof(argv[i+1]);
				if(i+2 < argc && argv[i+2][0] != '-') { radius2i = atof(argv[i+2]); i++; }
				else radius2i = radius2p;
				i++;
				break;
			}
			fprintf(stderr,"Unknown parameter: %s!\n",argv[i]);
			break;
		case 'T':
			nthreads = atoi(argv[i+1]);
			break;
		case 'R':
			searchreverse = true;
			break;
		case '1':
			onematch = true;
			break;
		case 'o':
			if(argv[i][2] == 'z') { outzip = true; break; }
			outf_summary = argv[i+1];
			break;
		case 'v':
			if(argv[i][2] == 'c') { v_vertex_coords = argv[i+1]; break; }
			if(argv[i][2] == 'v') { v_vertex_to_cell = argv[i+1]; break; }
			if(argv[i][2] == 'e') { v_edges = argv[i+1]; break; }
			if(argv[i][2] == 'u') { v_cell_unique = argv[i+1]; break; }
		default:
			fprintf(stderr,"Unknown parameter: %s!\n",argv[i]);
			break;
	}
	
	if(cdrfile == 0 || ltafile == 0 || placesmapfiles == 0) {
		fprintf(stderr,"Error: missing input files!\n");
		return 1;
	}
	
	if(ntmax > 1 && onematch == false) {
		fprintf(stderr,"Error: searching with multiple time windows without limiting to one match per point is not supported (use the -1 switch)!\n");
		free(tmax);
		return 1;
	}
		
	rdata rd;
	crecords cr;
	crecords tr;
	cr.use_combined_sort = true;
	tr.use_combined_sort = true;
	if(searchreverse) {
		rd.cr = &tr;
		rd.tr = &cr;
		rd.searchreverse = true;
	}
	else {
		rd.cr = &cr;
		rd.tr = &tr;
		rd.searchreverse = false;
	}
	
	rd.dt1 = dt1;
	rd.dt2 = dt2;
	rd.skiperrors = true;
	rd.onematch = onematch;
	rd.tmin = tmin;
	if(ntmax > 0) {
		rd.tmax = tmax;
		rd.ntmax = ntmax;
	}
	else {
		rd.tmax = &tmaxnull;
		rd.ntmax = 1;
	}
	rd.error = false;
	
	//read place-cell mapping
	for(unsigned int i=0;i<nplacesmapfiles;i++) {
		FILE* pmf = fopen(placesmapfiles[i],"r");
		if(!pmf) {
			fprintf(stderr,"Error opening place-cell mapping file %s!\n",placesmapfiles[i]);
			free(tmax);
			return 1;
		}
		fprintf(stderr,"Reading place-cell mapping from file %s -- ",placesmapfiles[i]);
		if(cr.read_places_map(pmf) != 0) {
			fprintf(stderr,"\nError reading place-cell mapping from file %s!\n",placesmapfiles[i]);
			free(tmax);
			fclose(pmf);
			return 1;
		}
		fclose(pmf);
	}
	tr.copy_places_map_reverse(cr); // copy everything reversed to the transportation records

	// read CDR and LTA records first
	time_t t1 = time(0);
	char* ctimec = ctime(&t1);
	for(char* c1 = ctimec;*c1;c1++) if(*c1 == '\n') *c1 = 0;
	fprintf(stderr,"%s, reading CDR and transportation data\n",ctimec);
	if(crecords::read_crecords_serialized(cr,cdrfile,inzip)) {
		fprintf(stderr,"Error reading data from file %s!\n",cdrfile);
		free(tmax);
		return 2;
	}
	t1 = time(0);
	ctimec = ctime(&t1);
	for(char* c1 = ctimec;*c1;c1++) if(*c1 == '\n') *c1 = 0;
	fprintf(stderr,"%s, %lu CDR records read from input file %s\n",ctimec,cr.getnrecords(),cdrfile);
	if(crecords::read_crecords_serialized(tr,ltafile,inzip)) {
		fprintf(stderr,"Error reading data from file %s!\n",ltafile);
		free(tmax);
		return 2;
	}
	t1 = time(0);
	ctimec = ctime(&t1);
	for(char* c1 = ctimec;*c1;c1++) if(*c1 == '\n') *c1 = 0;
	fprintf(stderr,"%s, %lu transportation records read from input file %s\n",ctimec,tr.getnrecords(),ltafile);
	uint64_t lines2 = 0;
	if(searchreverse) lines2 = cr.getnrecords();
	else lines2 = tr.getnrecords();
	
	
	// open output files
	if(outf_summary) {
		if(outzip) {
			popen_args[1] = "-c";
			popen_args[2] = 0;
			rd.out_summary = popen_noshell_redirect(popen_args[0],popen_args,"w",&spc,0,outf_summary,0);
		}
		else rd.out_summary = fopen(outf_summary,"w");
		if(!rd.out_summary) {
			fprintf(stderr,"Error opening output file %s!\n",outf_summary);
			free(tmax);
			return 1;
		}
	}
	else rd.out_summary = stdout;
	
	
	// create copies
	crecords* cru = 0;
	double radiusp = radius1p;
	if(radius2p > radius1p) radiusp = radius2p;
	if(searchreverse) {
		cru = new crecords(tr);
		cru->set_compare_params(dt1,dt2,radius1p,radius1i,radius2p,radius2i); // create place -- cell mappings for user comparisons
		tr.set_default_places_map(radiusp);
	}
	else {
		cru = new crecords(cr);
		tr.set_compare_params(dt1,dt2,radius1p,radius1i,radius2p,radius2i,true); // create place -- cell mappings for user comparisons
			// note: last param: places_map is created in "reverse", i.e. cell -> place as it is needed in this case
		cr.set_default_places_map(radiusp);
	}
	cru->use_combined_sort = true;
	rd.cru = cru;
	
	//explicit sort step is needed
	//do this on separate threads per sort step
	//note: nested functions is probably a gcc extension
	if(nthreads > 1) {
		struct pthread_sort {
			static void* pthread_sort_by_cellid_time(void* p) { crecords* c = (crecords*)p; c->sort_by_cellid_time(); return 0; }
			static void* pthread_sort_by_uid_time(void* p) { crecords* c = (crecords*)p; c->sort_by_uid_time(); return 0; }
			static void* pthread_sort_by_time(void* p) { crecords* c = (crecords*)p; c->sort_by_time(); return 0; }
		};
		
		pthread_t sort_threads[3];
		
		if(pthread_create(sort_threads,0,&pthread_sort::pthread_sort_by_time,(void*)(rd.cr) )) { fprintf(stderr,"Error creating sort threads!\n"); return 6; }
		if(pthread_create(sort_threads+1,0,&pthread_sort::pthread_sort_by_uid_time,(void*)(rd.tr) )) { fprintf(stderr,"Error creating sort threads!\n"); return 6; }
		if(pthread_create(sort_threads+2,0,&pthread_sort::pthread_sort_by_uid_time,(void*)(rd.cru) )) { fprintf(stderr,"Error creating sort threads!\n"); return 6; }
		
		if(pthread_join(sort_threads[0],0)) { fprintf(stderr,"Error runnning sort threads!\n"); return 7; }
		if(pthread_join(sort_threads[1],0)) { fprintf(stderr,"Error runnning sort threads!\n"); return 7; }
		if(pthread_join(sort_threads[2],0)) { fprintf(stderr,"Error runnning sort threads!\n"); return 7; }
	}
	else {
		//do sort only using the current thread
		rd.cr->sort_by_time();
		rd.tr->sort_by_uid_time();
		rd.cru->sort_by_uid_time();
	}
	
	//fill out start offsets for all users
	int64_t uid0 = 0;
	int64_t uid1 = 0;
	
	crecords* crutmp = rd.tr;
	rd.uids_uniq = crutmp->get_uids(false);
	
	t1 = time(0);
	ctimec = ctime(&t1);
	for(char* c1 = ctimec;*c1;c1++) if(*c1 == '\n') *c1 = 0;
	fprintf(stderr,"%s, done preprocessing, starting search\n",ctimec);
	
	
	rd.nusers = 0;
	rd.running_threads = nthreads;
	rd.nthreads = nthreads;
	rd.tid = 0;
	if(nthreads > 1) {
		pthread_t* threads = (pthread_t*)malloc(sizeof(pthread_t)*nthreads);
		if(!threads) {
			fprintf(stderr,"Error allocating memory!\n");
			free(tmax);
			return 4;
		}
		
		//create threads, wait until they run
		unsigned int i=0;
		for(i=0;i<nthreads;i++) {
			if(pthread_create(threads + i,0,&process_records,(void*)&rd)) {
				fprintf(stderr,"Error creating thread %u!\n",i);
				rd.error = true;
			}
		}
		rd.running_threads = i;
		rd.nthreads = i;
		
		// wait until the threads are finished or SIGINT is received in which case the user is asked to lower the number of running threads
		sigset_t set;
		sigemptyset(&set);
		sigaddset(&set, SIGQUIT);
		sigaddset(&set, SIGINT);
		pthread_sigmask(SIG_BLOCK,&set,0);
		while( rd.running_threads && !rd.error ) {
			struct timespec ts;
			ts.tv_sec = 5;
			ts.tv_nsec = 0;
			errno = 0;
			int r = sigtimedwait(&set,0,&ts);
			if(r > 0) {
				pthread_sigmask(SIG_UNBLOCK,&set,0);
				fprintf(stderr,"Number of running threads: %u, possible range: 1 -- %u\nSpecify the new value in this range or 0 to quit: ",
					rd.running_threads,i);
				fflush(stderr);
				unsigned int nt2;
				r = fscanf(stdin,"%u",&nt2);
				if(r != 1) fprintf(stderr,"No number given, not adjusting\n");
				else {
					if(nt2 > i) fprintf(stderr,"Error: out of range, not adjusting\n");
					else {
						if(nt2 == 0) fprintf(stderr,"Exiting...\n");
						else fprintf(stderr,"Limiting the number of running threads to %u\n",nt2);
						rd.nthreads = nt2;
					}
				}
				pthread_sigmask(SIG_BLOCK,&set,0);
			}
			else if(errno != EAGAIN) {
				fprintf(stderr,"Error calling sigtimedwait(), aborting...\n");
				rd.error = true;
				break;
			}
		}
		pthread_sigmask(SIG_UNBLOCK,&set,0);
		
		for(unsigned int j=0;j<i;j++) {
			if(pthread_join(threads[j],0)) {
				fprintf(stderr,"Error waiting for thread %u!\n",i);
			}
		}
		
		free(threads);
	}
	else process_records((void*)&rd); // run everything in the current thread if we are not using > 1 threads to make debugging simpler
	
	t1 = time(0);
	ctimec = ctime(&t1);
	for(char* c1 = ctimec;*c1;c1++) if(*c1 == '\n') *c1 = 0;
	unsigned int nusers1 = rd.nusers;
	if(nusers1 > rd.uids_uniq.size()) nusers1 = rd.uids_uniq.size();
	fprintf(stderr,"%s, %u users processed\n",ctimec,nusers1);
	if(rd.out_summary != stdout) {
		if(outzip) pclose_noshell(&spc);
		else fclose(rd.out_summary);
	}
	
	delete cru;
	free(tmax);
	
	return 0;
}

back to top