https://github.com/DixianZhu/deBWT
Raw File
Tip revision: 7ffb02aac0795747c5030bd06042cae555a67030 authored by Dixian Zhu on 04 May 2016, 03:19:06 UTC
Update main.c
Tip revision: 7ffb02a
mySort.c
#include "collect#$.h"
#define BUCKET_LENGTH 12 //need less than KMER_LENGTH_PlusOne
#define BUCKET_CAPACITY (1<<(BUCKET_LENGTH<<1))
/*
void swap(char *a,char *b, size_t size);
int cmp(const void *a, const void *b);
static void shortsort (char *lo, char *hi, size_t width, int comp(const void *, const void *));
void quickSort(void *base, uint64_t num, size_t width, int comp(const void *, const void *));
*/
void *multiThreadSort(void *arg);
//void decode(uint64_t obj);

void quickSort(uint64_t (*base)[2], uint64_t num, int comp(const void *, const void *));
void swapTwo(const void *a, const void *b);
int cmpKmer(const void *a, const void *b);
void *multiDistri(void *arg);
pthread_rwlock_t *rwlockBkt;
uint64_t *countKmer=NULL;
uint64_t (*hashKmer)[2]=NULL;
uint64_t trans[256];
uint64_t totalKmerNum;
uint64_t *segCount=NULL;
uint64_t *readBuf=NULL;
uint64_t readNum;
uint64_t tempMove=(32-BUCKET_LENGTH)<<1;
int mySort(void **arg)
{
	char *bin=(char *)arg[0];
	uint64_t THREAD_NUM=(uint64_t )arg[1];
	segCount=(uint64_t *)calloc(THREAD_NUM,sizeof(uint64_t));
	trans['A']=trans['a']=0;
    trans['C']=trans['c']=1;
    trans['N']=trans['G']=trans['g']=2;
    trans['T']=trans['t']=3;
    trans['#']=4;// patch for special characters
    trans['$']=5;
    uint64_t i=0;
    //////////////////////////////////////kmercounting///////////////////////////////////////////
	countKmer=(uint64_t *)calloc(BUCKET_CAPACITY,sizeof(uint64_t));
	char *seqBuf=(char *)calloc(KMER_LENGTH_PlusOne+1,sizeof(char));
	uint64_t bufferSize=BUFFERSIZE<<1;
	uint64_t occBuf;
	uint64_t *writeBuf=(uint64_t *)calloc(bufferSize,sizeof(uint64_t));
	uint64_t bufPoint=0;
	FILE *fpKmer=NULL, *fp1=NULL;
	char *outPath=getPath(bin,"/out");
	char *kmerPath=getPath(bin,"/kmerInfo");
	fp1=fopen(outPath,"r");
    if(fp1==NULL) printf("failed open out!\n"), exit(1);
	fpKmer=fopen(kmerPath,"wb");
    if(fpKmer==NULL) printf("failed open kmerInfo!\n"), exit(1);
	time_t countTime_start=time(0);
	printf("start kmercounting\n");
	while(fscanf(fp1,"%s%lu",seqBuf,&occBuf)!=EOF)
	{
        //if(dbgIO%1000==0) printf("dbgIO=%lu\n",dbgIO);
        //if(dbgIO>=3431775000) printf("%s: %lu\n",seqBuf,occBuf);
		int i,j;
		uint64_t seq=0,dimer;
		unsigned int move;
		for(i=0,j=31;i<BUCKET_LENGTH;i++,j--)
		{
			move=j<<1;
			dimer=trans[(int)seqBuf[i]];
			seq=seq|(dimer<<move);
		}
		uint64_t tempSeq=seq>>(tempMove);
      	countKmer[tempSeq]++;
		//Go on writing out the seq
		for(;i<KMER_LENGTH_PlusOne;i++,j--)
		{
			move=j<<1;
			dimer=trans[(int)seqBuf[i]];
			seq=seq|(dimer<<move);
		}
		writeBuf[bufPoint++]=seq;
		writeBuf[bufPoint++]=occBuf;
		if(bufPoint>=bufferSize)
		{
			fwrite(writeBuf,sizeof(uint64_t),bufferSize,fpKmer);
			bufPoint=0;
		}
	}
	if(bufPoint)
	{
		fwrite(writeBuf,sizeof(uint64_t),bufPoint,fpKmer);
	}
	remove(outPath);
	free(outPath);
    free(seqBuf);
	free(writeBuf);
    fclose(fpKmer);
	fclose(fp1);
	time_t countTime_end=time(0);
	fprintf(stderr, "read kmercounting results time (txt transfer): %ld\n",countTime_end-countTime_start );
	/////////////////////////////////////////kmer distributing///////////////////////////////////////////////
	//printf("countKmer[0]=%lu\n",countKmer[0]);
	for(i=1;i<BUCKET_CAPACITY;i++)
	{
		countKmer[i]=countKmer[i-1]+countKmer[i];//upper bound
		//printf("countKmer[%lu]=%lu\n",i,countKmer[i]);
	}
	totalKmerNum=countKmer[BUCKET_CAPACITY-1];
	segCount[0]=0;
	for(i=1;i<THREAD_NUM;i++)
	{
		uint64_t locateNum=i*(totalKmerNum/THREAD_NUM);
		uint64_t locate=BinarySearch(locateNum,countKmer,BUCKET_CAPACITY-1);
		segCount[i]=locate;
	}
	clock_t dis_start=clock();
	readBuf=(uint64_t *)calloc(bufferSize,sizeof(uint64_t));
	hashKmer=(uint64_t (*)[2])calloc(totalKmerNum<<1,sizeof(uint64_t));
	fpKmer=fopen(kmerPath,"rb");
	rwlockBkt=(pthread_rwlock_t*)calloc(BUCKET_CAPACITY,sizeof(pthread_rwlock_t));
	for(i=0;i<BUCKET_CAPACITY;i++)
	{
		if(pthread_rwlock_init(&rwlockBkt[i], NULL))
		{
			printf("fail to create rwlock %lu\n",i);
			exit(1);
		}	
	}
	pthread_t myThread[THREAD_NUM];
	printf("start kmer distributing\n");
	time_t multiDistri_start=time(0);
	while((readNum=fread(readBuf,sizeof(uint64_t),bufferSize,fpKmer))>0)
	{           
		for(i=0;i<THREAD_NUM;i++)
		{
			void **tt=(void **)calloc(2,sizeof(void *));
			tt[0]=(void *)i;
			tt[1]=(void *)THREAD_NUM;
			int check=pthread_create( &myThread[i], NULL, multiDistri, (void*)tt);
			if(check)
	   	 	{
	        	fprintf(stderr,"threadNum:%lu, Error - pthread_create() return code: %d\n",i,check);
	        	exit(EXIT_FAILURE);
	    	}	
		}
		for(i=0;i<THREAD_NUM;i++)
		{
			pthread_join( myThread[i], NULL);
		}
                
	}
	fclose(fpKmer);
	clock_t dis_end=clock();
	double dis_duration=(double)(dis_end-dis_start)/CLOCKS_PER_SEC;
	time_t multiDistri_end=time(0);
	long multiDistri_dur=(multiDistri_end-multiDistri_start);
	printf("distributing time = %lf (%ld)\n",dis_duration,multiDistri_dur);
	free(rwlockBkt);
	time_t start=time(0);
	printf("start sorting\n");
	clock_t mul_start=clock();
	//////////////////////////////////////multiThread//////////////////////////////////////////////////
	//pthread_attr_t attr[THREAD_NUM];
	for(i=0;i<THREAD_NUM;i++)
	{
		void **tt=(void **)calloc(2,sizeof(void *));
		tt[0]=(void *)i;
		tt[1]=(void *)THREAD_NUM;
		int check=pthread_create( &myThread[i], NULL, multiThreadSort, (void*) tt);
		if(check)
    	{
        	fprintf(stderr,"threadNum:%lu, Error - pthread_create() return code: %d\n",i,check);
        	exit(EXIT_FAILURE);
    	}
    	//pthread_join( myThread[i], NULL);	
	}
	
	for(i=0;i<THREAD_NUM;i++)
	{
		pthread_join( myThread[i], NULL);
	}
	
	time_t finish=time(0);
	clock_t mul_end=clock();
	long duration=(finish-start);
	double mul_dur=(double)(mul_end-mul_start)/CLOCKS_PER_SEC;
	printf("sort time is %lf (%ld)\n",mul_dur,duration );
	/////////////////////////////////////end///////////////////////////////////////////////////////////
	/*
	for (i = 0; i < totalKmerNum-1; ++i)
	{
		if(hashKmer[i][0]>hashKmer[i+1][0]) 
		{
			printf("hashKmer[%lu]=%lu, hashKmer[%lu]=%lu\n",i,hashKmer[i][0],i+1,hashKmer[i+1][0]);
		}
	}
	*/
	fpKmer=fopen(kmerPath,"wb");
	fwrite(hashKmer,sizeof(uint64_t)*2,totalKmerNum,fpKmer);
	fclose(fpKmer);
	free(kmerPath);
	free(readBuf);
	free(hashKmer);
	free(countKmer);
	return 0;
}

void *multiThreadSort(void *arg)
{
	clock_t start=clock();
	void **argT=(void **)arg;
	uint64_t num=(uint64_t )argT[0];
	uint64_t THREAD_NUM=(uint64_t)argT[1];
	free(argT);
	uint64_t i,j;
	uint64_t low,up;
	low=segCount[num];
	if(num<THREAD_NUM-1)
	{
		up=segCount[num+1];
	}
	else//num==THREAD_NUM-1
	{
		up=BUCKET_CAPACITY-1;
	}
	//printf("thread:%lu  [%lu,%lu]\n",num,low,up);
	for(i=low;i<up;i++)
	{
		//quickSort(&hashKmer[countKmer[i]],countKmer[i+1]-countKmer[i],cmp);
		//quickSort((void *)&hashKmer[countKmer[i]],countKmer[i+1]-countKmer[i],2*sizeof(uint64_t),cmp);
		qsort(hashKmer[countKmer[i]],(countKmer[i+1]-countKmer[i]),2*sizeof(uint64_t),cmpKmer);
	}
	if(i==BUCKET_CAPACITY-1)
	{
		//quickSort(&hashKmer[countKmer[i]],totalKmerNum-countKmer[i],cmp);
		//quickSort((void *)&hashKmer[countKmer[i]],totalKmerNum-countKmer[i],2*sizeof(uint64_t),cmp);
		qsort(hashKmer[countKmer[i]],(totalKmerNum-countKmer[i]),2*sizeof(uint64_t),cmpKmer);
	}
	
	clock_t finish=clock();
	double duration=(double)(finish-start)/CLOCKS_PER_SEC;
	return (void *)NULL;
}
///////////////////////////////////not used yet////////////////////////////////////////////////

void quickSort(uint64_t (*base)[2], uint64_t num, int comp(const void *, const void *))
{
	uint64_t lo=0, hi=num-1, mid;
	int64_t loguy, higuy;
	uint64_t lostk[STKSIZ], histk[STKSIZ];
	int stkptr=0;
	size_t size;
	if(num<2) return;
	while(1)
	{
		size=hi-lo+1;
		mid=lo+(size>>1);
		if(comp((void *)base[lo],(void *)base[mid]) > 0)
		{
			swapTwo((void *)base[lo],(void *)base[mid]);
		}
		if(comp((void *)base[lo],(void *)base[hi]) > 0)
		{
			swapTwo((void *)base[lo],(void *)base[hi]);
		}
		if(comp((void *)base[mid],(void *)base[hi]) > 0)
		{
			swapTwo((void *)base[mid],(void *)base[hi]);	
		}
		loguy=lo;
		higuy=hi;
		while(1)
		{
			do
			{
				loguy++;
			}while(loguy<=hi && comp((void *)base[loguy],(void *)base[mid])<=0);
			do
			{
				higuy--;
			}while(higuy>mid && comp((void *)base[higuy],(void *)base[mid])>0);
			if(higuy<loguy) 
			{
				break;
			}
			swapTwo((void *)base[loguy],(void *)base[higuy]);
			if(mid==higuy) mid=loguy;
		}
		while(higuy>lo&&comp((void *)base[higuy],(void *)base[mid])==0)
		{
			higuy--;
		}
		if ( higuy - lo >= hi - loguy )//deal with the smaller block first
        {
        	if (lo < higuy)
        	{
              lostk[stkptr] = lo;  
              histk[stkptr] = higuy;  
              ++stkptr;  
            }                           // save big recursion for later   
            if (loguy < hi) 
            {  
              lo = loguy;  
              continue;           // do small recursion  
            }
        }  
        else
        {
        	if (loguy < hi) 
        	{  
              lostk[stkptr] = loguy;  
              histk[stkptr] = hi;  
              ++stkptr;               // save big recursion for later   
        	} 
        	if (lo < higuy) 
           	{  
              hi = higuy;  
              continue;           // do small recursion  
           	} 
        }
        --stkptr;  
    	if (stkptr >= 0) 
    	{  
    	  lo = lostk[stkptr];  
   	      hi = histk[stkptr];  
   	      continue;           // pop subarray from stack  
   		}  
   		else  return; 
	}
}
void swapTwo(const void *a, const void *b)
{
	uint64_t temp;
	uint64_t (*va)[2]=((uint64_t (*)[2])a),(*vb)[2]=((uint64_t (*)[2])b);
	temp=va[0][0];
	va[0][0]=vb[0][0];
	vb[0][0]=temp;
	temp=va[0][1];
	va[0][1]=vb[0][1];
	vb[0][1]=temp;
}

int cmpKmer(const void *a, const void *b)
{
	uint64_t va=((uint64_t (*)[2])a)[0][0],vb=((uint64_t (*)[2])b)[0][0];
	//printf("va=%lu; vb=%lu\n",va,vb );
	if(va<vb) return -1;
	else if(va==vb) return 0;
	else return 1;
}
/*
void decode(uint64_t obj)
{
	int i,j;
	uint64_t eliminate=3;
	for(i=0,j=31;i<KMER_LENGTH;i++,j--)
	{
		unsigned int move=j<<1;
		printf("%c","ACGT"[(obj>>move)&eliminate]);
	}
	putchar('\n');
}
uint64_t BinarySearch(uint64_t mk, uint64_t *target, int64_t up)//return the upper bound
{
  int64_t low=0,mid=0;
  while(low<=up)
  {
     mid=(low+up)>>1;
     if(mk<target[mid])   up=mid-1;
     else if(mk>target[mid])   low=mid+1;
     if(mk==target[mid])    return mid;
  }
  return low;
}
*/
void *multiDistri(void *arg)
{
	void **argT=(void **)arg;
	uint64_t num=(uint64_t )argT[0];
	uint64_t THREAD_NUM=(uint64_t)argT[1];
	free(argT);
	uint64_t segment=(readNum>>1)/THREAD_NUM;
	uint64_t start=(num*segment)<<1,i;
	uint64_t end;
	if(num<THREAD_NUM-1)
	{
		end=((num+1)*segment)<<1;
	}
	else//num==THREAD_NUM-1
	{
		end=readNum;
	}
        //printf("thread:%d  (%lu, %lu)\n",num,start,end);
	for(i=start;i<end;i+=2)
	{
		uint64_t tempSeq=readBuf[i]>>(tempMove);
		uint64_t tempOcc=readBuf[i+1];
		//countKmer[tempSeq] stands for the position
		pthread_rwlock_wrlock(&rwlockBkt[tempSeq]);
		countKmer[tempSeq]--;//because it is upper bound(cannot reach)
		hashKmer[countKmer[tempSeq]][0]=readBuf[i];
		hashKmer[countKmer[tempSeq]][1]=readBuf[i+1];
		pthread_rwlock_unlock(&rwlockBkt[tempSeq]);
	}
    return (void *)NULL;
}
back to top