https://github.com/JuliaLang/julia
Tip revision: fd615943215e797c639678624c5cf99917a43521 authored by Keno Fischer on 25 December 2015, 14:19:12 UTC
Address Further ORC review comments
Address Further ORC review comments
Tip revision: fd61594
threadgroup.c
// This file is a part of Julia. License is MIT: http://julialang.org/license
/*
threading infrastructure
. threadgroup abstraction
. fork/join/barrier
*/
#include <stdlib.h>
#include <string.h>
#include "julia.h"
#include "julia_internal.h"
#ifdef __cplusplus
extern "C" {
#endif
#include "options.h"
#include "ia_misc.h"
#include "threadgroup.h"
int ti_threadgroup_create(uint8_t num_sockets, uint8_t num_cores,
uint8_t num_threads_per_core,
ti_threadgroup_t **newtg)
{
int i;
ti_threadgroup_t *tg;
int num_threads = num_sockets * num_cores * num_threads_per_core;
char *cp;
tg = (ti_threadgroup_t*)jl_malloc_aligned(sizeof(ti_threadgroup_t), 64);
tg->tid_map = (int16_t*)jl_malloc_aligned(num_threads * sizeof(int16_t), 64);
for (i = 0; i < num_threads; ++i)
tg->tid_map[i] = -1;
tg->num_sockets = num_sockets;
tg->num_cores = num_cores;
tg->num_threads_per_core = num_threads_per_core;
tg->num_threads = num_threads;
tg->added_threads = 0;
tg->thread_sense = (ti_thread_sense_t**)
jl_malloc_aligned(num_threads * sizeof(ti_thread_sense_t*), 64);
for (i = 0; i < num_threads; i++)
tg->thread_sense[i] = NULL;
tg->group_sense = 0;
tg->forked = 0;
uv_mutex_init(&tg->alarm_lock);
uv_cond_init(&tg->alarm);
tg->sleep_threshold = DEFAULT_THREAD_SLEEP_THRESHOLD;
cp = getenv(THREAD_SLEEP_THRESHOLD_NAME);
if (cp) {
if (!strncasecmp(cp, "infinite", 8))
tg->sleep_threshold = 0;
else
tg->sleep_threshold = (uint64_t)strtol(cp, NULL, 10);
}
*newtg = tg;
return 0;
}
int ti_threadgroup_addthread(ti_threadgroup_t *tg, int16_t ext_tid,
int16_t *tgtid)
{
if (ext_tid < 0 || ext_tid >= tg->num_threads)
return -1;
if (tg->tid_map[ext_tid] != -1)
return -2;
if (tg->added_threads == tg->num_threads)
return -3;
tg->tid_map[ext_tid] = tg->added_threads++;
if (tgtid) *tgtid = tg->tid_map[ext_tid];
return 0;
}
int ti_threadgroup_initthread(ti_threadgroup_t *tg, int16_t ext_tid)
{
ti_thread_sense_t *ts;
if (ext_tid < 0 || ext_tid >= tg->num_threads)
return -1;
if (tg->thread_sense[tg->tid_map[ext_tid]] != NULL)
return -2;
if (tg->num_threads == 0)
return -3;
ts = (ti_thread_sense_t*)jl_malloc_aligned(sizeof(ti_thread_sense_t), 64);
ts->sense = 1;
tg->thread_sense[tg->tid_map[ext_tid]] = ts;
return 0;
}
int ti_threadgroup_member(ti_threadgroup_t *tg, int16_t ext_tid,
int16_t *tgtid)
{
if (ext_tid < 0 || ext_tid >= tg->num_threads)
return -1;
if (tg == NULL) {
if (tgtid) *tgtid = -1;
return -2;
}
if (tg->tid_map[ext_tid] == -1) {
if (tgtid) *tgtid = -1;
return -3;
}
if (tgtid) *tgtid = tg->tid_map[ext_tid];
return 0;
}
int ti_threadgroup_size(ti_threadgroup_t *tg, int16_t *tgsize)
{
*tgsize = tg->num_threads;
return 0;
}
int ti_threadgroup_fork(ti_threadgroup_t *tg, int16_t ext_tid,
void **bcast_val)
{
if (tg->tid_map[ext_tid] == 0) {
tg->envelope = bcast_val ? *bcast_val : NULL;
cpu_sfence();
tg->forked = 1;
tg->group_sense = tg->thread_sense[0]->sense;
// if it's possible that threads are sleeping, signal them
if (tg->sleep_threshold) {
uv_mutex_lock(&tg->alarm_lock);
uv_cond_broadcast(&tg->alarm);
uv_mutex_unlock(&tg->alarm_lock);
}
}
else {
// spin up to threshold cycles (count sheep), then sleep
uint64_t spin_cycles, spin_start = rdtsc();
while (tg->group_sense !=
tg->thread_sense[tg->tid_map[ext_tid]]->sense) {
if (tg->sleep_threshold) {
spin_cycles = rdtsc() - spin_start;
if (spin_cycles >= tg->sleep_threshold) {
uv_mutex_lock(&tg->alarm_lock);
if (tg->group_sense !=
tg->thread_sense[tg->tid_map[ext_tid]]->sense) {
uv_cond_wait(&tg->alarm, &tg->alarm_lock);
}
uv_mutex_unlock(&tg->alarm_lock);
spin_start = rdtsc();
continue;
}
}
cpu_pause();
}
cpu_lfence();
if (bcast_val)
*bcast_val = tg->envelope;
}
return 0;
}
int ti_threadgroup_join(ti_threadgroup_t *tg, int16_t ext_tid)
{
int i;
tg->thread_sense[tg->tid_map[ext_tid]]->sense
= !tg->thread_sense[tg->tid_map[ext_tid]]->sense;
if (tg->tid_map[ext_tid] == 0) {
for (i = 1; i < tg->num_threads; ++i) {
while (tg->thread_sense[i]->sense == tg->group_sense)
cpu_pause();
}
tg->forked = 0;
}
return 0;
}
void ti_threadgroup_barrier(ti_threadgroup_t *tg, int16_t ext_tid)
{
if (tg->tid_map[ext_tid] == 0 && !tg->forked)
return;
ti_threadgroup_join(tg, ext_tid);
ti_threadgroup_fork(tg, ext_tid, NULL);
}
int ti_threadgroup_destroy(ti_threadgroup_t *tg)
{
int i;
uv_mutex_destroy(&tg->alarm_lock);
uv_cond_destroy(&tg->alarm);
for (i = 0; i < tg->num_threads; i++)
jl_free_aligned(tg->thread_sense[i]);
jl_free_aligned(tg->thread_sense);
jl_free_aligned(tg->tid_map);
jl_free_aligned(tg);
return 0;
}
#ifdef __cplusplus
}
#endif