https://github.com/rainwoodman/MP-sort
Raw File
Tip revision: fb201bd5f3d6d4458d46dafce5301b2f1e188649 authored by Yu Feng on 09 October 2023, 03:06:36 UTC
Merge pull request #15 from rainwoodman/aarch64-fix
Tip revision: fb201bd
mpsort-omp.c
#include <stdio.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <omp.h>

#include "internal.h"

#include "internal-parallel.h"
#include "mpsort.h"

struct crompstruct {
    struct piter pi;
    unsigned char * P;
    unsigned char * Pmax;
    unsigned char * Pmin;
    ptrdiff_t * C; /* expected counts */
    ptrdiff_t * CLT; /* counts of less than P */
    ptrdiff_t * CLE; /* counts of less than or equal to P */
    ptrdiff_t * GL_CLT; /* counts of less than P */
    ptrdiff_t * GL_CLE; /* counts of less than or equal to P */
    ptrdiff_t * GL_C; /* counts of less than or equal to P */
};


/* OPENMP version of radix sort;
 * this is truely parallel;
 * but it is usually slower than
 * simple radix sort if the number of processor is small.
 *
 * some benchmarks on Coma at CMU shows best performance is at 16
 * CPUs; still faster than serial version with 8 CPUs.
 * comparable with qsort (non-radix) at 8 CPUs.
 *
 * the coding is more of a prototype of the MPI radix sort;
 * it is thus very poorly written in the standards of an OPENMP program;
 * */

static void _setup_mpsort_omp(struct crompstruct * o, struct crstruct * d);
static void _cleanup_mpsort_omp(struct crompstruct * o, struct crstruct * d);

static void mpsort_omp_single(void * base, size_t nmemb,
        struct crstruct * d, struct crompstruct * o);

void mpsort_omp(void * base, size_t nmemb, size_t size,
        void (*radix)(const void * ptr, void * radix, void * arg),
        size_t rsize,
        void * arg) {
    if(nmemb == 0) return;

    struct crstruct d;
    struct crompstruct o;
    _setup_radix_sort(&d, base, nmemb, size, radix, rsize, arg);

    _setup_mpsort_omp(&o, &d);

    /*
     * first solve for P such that CLT[i] < C <= CLE[i]
     *
     * Then calculate a communication layout.
     *
     * Then alltoall.
     * Then local sort again
     * */

#pragma omp parallel
    {
        mpsort_omp_single (base, nmemb, &d, &o);
    }

    _cleanup_mpsort_omp(&o, &d);
}

static void _setup_mpsort_omp(struct crompstruct * o, struct crstruct * d) {
    int NTaskMax = omp_get_max_threads();

    o->P = calloc(NTaskMax, d->rsize);

    int NTaskMax1 = NTaskMax + 1;
    size_t NTaskMax12 = (NTaskMax + 1) * (NTaskMax + 1);

    /* following variables are used in counting index by ThisTask + 1 */
    o->GL_CLT = calloc(NTaskMax12, sizeof(ptrdiff_t));
    o->GL_CLE = calloc(NTaskMax12, sizeof(ptrdiff_t));
    o->GL_C = calloc(NTaskMax12, sizeof(ptrdiff_t));

    o->C = calloc(NTaskMax1, sizeof(ptrdiff_t)); /* expected counts */
    o->CLT = calloc(NTaskMax1, sizeof(ptrdiff_t)); /* counts of less than P */
    o->CLE = calloc(NTaskMax1, sizeof(ptrdiff_t)); /* counts of less than or equal to P */

    o->Pmin = calloc(1, d->rsize);
    o->Pmax = calloc(1, d->rsize);
    memset(o->Pmin, -1, d->rsize);
    memset(o->Pmax, 0, d->rsize);
}
static void _cleanup_mpsort_omp(struct crompstruct * o, struct crstruct * d) {
    free(o->CLE);
    free(o->CLT);
    free(o->C);
    free(o->GL_C);
    free(o->GL_CLE);
    free(o->GL_CLT);
    free(o->Pmin);
    free(o->Pmax);
    free(o->P);
}

static void _reduce_sum(ptrdiff_t * send, ptrdiff_t * recv, size_t count) {
    ptrdiff_t i;
#pragma omp single
    for(i = 0; i < count; i ++) {
        recv[i] = 0;
    }
#pragma omp critical
    for(i = 0; i < count; i ++) {
        recv[i] += send[i];
    }
#pragma omp barrier
}
static void _gather(void * sendbuf, int sendcount1, void * recvbuf, size_t itemsize) {
    int ThisTask = omp_get_thread_num();
    memcpy((char*) recvbuf + ThisTask * sendcount1 * itemsize,
            sendbuf, sendcount1 * itemsize);
#pragma omp barrier
}

/*
 * solve for the communication layout based on
 *
 * C: the desired number of items per task
 * GL_CLT[t,i+1]: the offset of lt P[i] in task t
 * GL_CLE[t,i+1]: the offset of le P[i] in task t
 *
 * the result is saved in
 *
 * GL_C[t, i]: the offset of sending to task i in task t.
 *
 * this routine requires GL_ to scale with NTask * NTask;
 * won't work with 1,000 + ranks.
 * */
static void _solve_for_layout (
        int NTask,
        ptrdiff_t * C,
        ptrdiff_t * GL_CLT,
        ptrdiff_t * GL_CLE,
        ptrdiff_t * GL_C) {
    int NTask1 = NTask + 1;
    int i, j;
    /* first assume we just send according to GL_CLT */
    for(i = 0; i < NTask + 1; i ++) {
        for(j = 0; j < NTask; j ++) {
            GL_C[j * NTask1 + i] = GL_CLT[j * NTask1 + i];
        }
    }

    /* Solve for each receiving task i
     *
     * this solves for GL_C[..., i + 1], which depends on GL_C[..., i]
     *
     * and we have GL_C[..., 0] == 0 by definition.
     *
     * this cannot be done in parallel wrt i because of the dependency.
     *
     *  a solution is guaranteed because GL_CLE and GL_CLT
     *  brackes the total counts C (we've found it with the
     *  iterative counting.
     *
     * */

    for(i = 0; i < NTask; i ++) {
        ptrdiff_t sure = 0;

        /* how many will I surely receive? */
        for(j = 0; j < NTask; j ++) {
            ptrdiff_t sendcount = GL_C[j * NTask1 + i + 1] - GL_C[j * NTask1 + i];
            sure += sendcount;
        }
        /* let's see if we have enough */
        ptrdiff_t deficit = C[i + 1] - C[i] - sure;

        for(j = 0; j < NTask; j ++) {
            /* deficit solved */
            if(deficit == 0) break;
            if(deficit < 0) {
                fprintf(stderr, "serious bug: more items than there should be: deficit=%ld\n", deficit);
                abort();
            }
            /* how much task j can supply ? */
            ptrdiff_t supply = GL_CLE[j * NTask1 + i + 1] - GL_C[j * NTask1 + i + 1];
            if(supply < 0) {
                fprintf(stderr, "serious bug: less items than there should be: supply =%ld\n", supply);
                abort();
            }
            if(supply <= deficit) {
                GL_C[j * NTask1 + i + 1] += supply;
                deficit -= supply;
            } else {
                GL_C[j * NTask1 + i + 1] += deficit;
                deficit = 0;
            }
        }
    }

#if 0
    for(i = 0; i < NTask; i ++) {
        for(j = 0; j < NTask + 1; j ++) {
            printf("%d %d %d, ",
                    GL_CLT[i * NTask1 + j],
                    GL_C[i * NTask1 + j],
                    GL_CLE[i * NTask1 + j]);
        }
        printf("\n");
    }
#endif

}

static void mpsort_omp_single(void * base, size_t nmemb,
        struct crstruct * d, struct crompstruct * o) {
    int NTask = omp_get_num_threads();
    int ThisTask = omp_get_thread_num();

    ptrdiff_t myCLT[NTask + 1]; /* counts of less than P */
    ptrdiff_t myCLE[NTask + 1]; /* counts of less than or equal to P */

    int i;

#pragma omp single
    {
        o->C[0] = 0;
        for(i = 0; i < NTask; i ++) {
        /* how many items are desired per thread */
            o->C[i + 1] = nmemb * (i + 1) / NTask;
        }
    }

    double t0 = omp_get_wtime();

    /* distribute the array evenly */
    char * mybase = (char*) base + nmemb * ThisTask / NTask * d->size;
    size_t mynmemb = nmemb * (ThisTask + 1)/ NTask - nmemb * (ThisTask) / NTask;


    /* and sort the local array */
    radix_sort(mybase, mynmemb, d->size, d->radix, d->rsize, d->arg);


    /* find the max radix and min radix of all */
    if(mynmemb > 0) {
        unsigned char myPmax[d->rsize];
        unsigned char myPmin[d->rsize];
        d->radix(mybase + (mynmemb - 1) * d->size, myPmax, d->arg);
        d->radix(mybase, myPmin, d->arg);
#pragma omp critical
        {
            if(d->compar(myPmax, o->Pmax, d->rsize) > 0) {
                memcpy(o->Pmax, myPmax, d->rsize);
            }
            if(d->compar(myPmin, o->Pmin, d->rsize) < 0) {
                memcpy(o->Pmin, myPmin, d->rsize);
            }
        }
    }
#pragma omp barrier

    double t1 = omp_get_wtime();
    printf("Initial sort took %g\n", t1 - t0);

    /* now do the radix counting iterations */

#pragma omp single
    piter_init(&o->pi, o->Pmin, o->Pmax, NTask - 1, d);

    int iter = 0;

    int done = 0;

    while(!done) {
        iter ++;
#pragma omp barrier
#pragma omp single
        piter_bisect(&o->pi, o->P);

        _histogram(o->P, NTask - 1, mybase, mynmemb, myCLT, myCLE, d);

        _reduce_sum(myCLT, o->CLT, NTask + 1);
        _reduce_sum(myCLE, o->CLE, NTask + 1);

#pragma omp single
        piter_accept(&o->pi, o->P, o->C, o->CLT, o->CLE);

        done = piter_all_done(&o->pi);
    }
#pragma omp barrier

#pragma omp single
    piter_destroy(&o->pi);

    double t2 = omp_get_wtime();
    printf("counting took %g\n", t2 - t1);

#if 0
#pragma omp single
    {
        printf("AfterIteration: split , CLT, C, CLE\n");
        int i;
        for(i = 0; i < NTask + 1; i ++) {
            printf("%d %ld %ld %ld\n", i, o->CLT[i], o->C[i], o->CLE[i]);
        }
    }
#endif

    _histogram(o->P, NTask - 1, mybase, mynmemb, myCLT, myCLE, d);

    /* gather to all (used only by single */
    _gather(myCLT, NTask + 1, o->GL_CLT, sizeof(ptrdiff_t));
    _gather(myCLE, NTask + 1, o->GL_CLE, sizeof(ptrdiff_t));

    /* indexing is
     * GL_C[Sender * NTask1 + Recver] */
    /* here we know:
     * o->GL_CLT, o->GL_CLE
     * The first NTask - 1 items in CLT and CLE gives the bounds of
     * split points  ( CLT < split <= CLE)
     * */

    /* find split points in O->GL_C*/
#pragma omp single
    _solve_for_layout(NTask, o->C, o->GL_CLT, o->GL_CLE, o->GL_C);

    double t3 = omp_get_wtime();
    printf("find split took %g\n", t3 - t2);

    /* exchange data */
    /* */
    char * buffer = malloc(d->size * mynmemb);
    int NTask1 = NTask + 1;

#if 0
#pragma omp critical
    {
        printf("%d contains %d items ", ThisTask, mynmemb);
        int k;
        int * ptr = mybase;
        for(k = 0; k < mynmemb; k ++) {
            printf("%d ", ((int *) ptr)[k]);
        }
        printf("\n");
    }
#pragma omp barrier
#endif

    /* can't do with an alltoall because it is difficult to sync
     * the sendbuf pointers with omp */
    char * recv = buffer;
    for(i = 0; i < NTask; i ++) {
        char * sendbuf = (char*) base + d->size * (i * nmemb / NTask);
        size_t size = (o->GL_C[i * NTask1 + ThisTask + 1]
                - o->GL_C[i * NTask1 + ThisTask]) * d->size;
        char * ptr = &sendbuf[d->size * o->GL_C[i * NTask1 + ThisTask]];

        memcpy(recv, ptr, size);
        recv += size;
    }

#pragma omp barrier
    memcpy(mybase, buffer, mynmemb * d->size);
    free(buffer);

#if 0
#pragma omp critical
    {
        printf("%d after exchange %d items ", ThisTask, mynmemb);
        int k;
        int * ptr = mybase;
        for(k = 0; k < mynmemb; k ++) {
            printf("%d ", ((int *) ptr)[k]);
        }
        printf("\n");
    }
#pragma omp barrier
#endif

    double t4 = omp_get_wtime();
    printf("exchange took %g\n", t4 - t3);

    radix_sort(mybase, mynmemb, d->size, d->radix, d->rsize, d->arg);

    double t5 = omp_get_wtime();
    printf("final sort %g\n", t5 - t4);

#pragma omp barrier
}

back to top