https://github.com/JuliaLang/julia
Raw File
Tip revision: fd615943215e797c639678624c5cf99917a43521 authored by Keno Fischer on 25 December 2015, 14:19:12 UTC
Address Further ORC review comments
Tip revision: fd61594
threading.c
// This file is a part of Julia. License is MIT: http://julialang.org/license

/*
  threading infrastructure
  . thread and threadgroup creation
  . thread function
  . invoke Julia function from multiple threads

TODO:
  . fix interface to properly support thread groups
  . add queue per thread for tasks
  . add reduction; reduce values returned from thread function
  . make code generation thread-safe and remove the lock
*/


#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifndef _MSC_VER
#include <unistd.h>
#include <sched.h>
#else
#define sleep(x) Sleep(1000*x)
#endif

#include "julia.h"
#include "julia_internal.h"

#ifdef __cplusplus
extern "C" {
#endif

#include "ia_misc.h"
#include "threadgroup.h"
#include "threading.h"

#if !defined(_CPU_X86_64_) && !defined(_CPU_X86_) && defined(__linux__)

static int _get_perf_fd(void)
{
    static int fd = -1;
    if (fd < 0) {
        static struct perf_event_attr attr;
        attr.type = PERF_TYPE_HARDWARE;
        attr.config = PERF_COUNT_HW_CPU_CYCLES;
        fd = syscall(__NR_perf_event_open, &attr, 0, -1, -1, 0);
    }
    return fd;
}

__attribute__((destructor)) static void
_close_perf_fd(void)
{
    close(_get_perf_fd());
}

long long
rdtsc(void)
{
    long long result = 0;
    if (read(_get_perf_fd(), &result, sizeof(result)) < sizeof(result))
        return 0;
    return result;
}
#endif

// utility
JL_DLLEXPORT void jl_cpu_pause(void)
{
    cpu_pause();
}

#ifdef JULIA_ENABLE_THREADING
// fallback provided for embedding
static JL_CONST_FUNC jl_tls_states_t *jl_get_ptls_states_fallback(void)
{
#  if !defined(_COMPILER_MICROSOFT_)
    static __thread jl_tls_states_t tls_states;
#  else
    static __declspec(thread) jl_tls_states_t tls_states;
#  endif
    return &tls_states;
}
static jl_tls_states_t *jl_get_ptls_states_init(void);
static jl_get_ptls_states_func jl_tls_states_cb = jl_get_ptls_states_init;
static jl_tls_states_t *jl_get_ptls_states_init(void)
{
    // This 2-step initialization is used to detect calling
    // `jl_set_ptls_states_getter` after the address of the TLS variables
    // are used. Since the address of TLS variables should be constant,
    // changing the getter address can result in wierd crashes.

    // This is clearly not thread safe but should be fine since we
    // make sure the tls states callback is finalized before adding
    // multiple threads
    jl_tls_states_cb = jl_get_ptls_states_fallback;
    return jl_get_ptls_states_fallback();
}
JL_DLLEXPORT JL_CONST_FUNC jl_tls_states_t *(jl_get_ptls_states)(void)
{
    return (*jl_tls_states_cb)();
}
JL_DLLEXPORT void jl_set_ptls_states_getter(jl_get_ptls_states_func f)
{
    // only allow setting this once
    if (f && f != jl_get_ptls_states_init &&
        jl_tls_states_cb == jl_get_ptls_states_init) {
        jl_tls_states_cb = f;
    }
    else {
        jl_safe_printf("ERROR: Attempt to change TLS address.\n");
        exit(1);
    }
}
jl_get_ptls_states_func jl_get_ptls_states_getter(void)
{
    if (jl_tls_states_cb == jl_get_ptls_states_init)
        jl_get_ptls_states_init();
    // for codegen
    return jl_tls_states_cb;
}
#else
JL_DLLEXPORT jl_tls_states_t jl_tls_states;
JL_DLLEXPORT JL_CONST_FUNC jl_tls_states_t *(jl_get_ptls_states)(void)
{
    return &jl_tls_states;
}
#endif

// thread ID
JL_DLLEXPORT int jl_n_threads;     // # threads we're actually using
jl_thread_task_state_t *jl_all_task_states;

// return calling thread's ID
JL_DLLEXPORT int16_t jl_threadid(void) { return ti_tid; }

struct _jl_thread_heap_t *jl_mk_thread_heap(void);
// must be called by each thread at startup

static void ti_initthread(int16_t tid)
{
    ti_tid = tid;
    jl_pgcstack = NULL;
#ifdef JULIA_ENABLE_THREADING
    jl_all_heaps[tid] = jl_mk_thread_heap();
#else
    jl_mk_thread_heap();
#endif

    jl_all_task_states[tid].ptls = jl_get_ptls_states();
    jl_all_task_states[tid].signal_stack = jl_install_thread_signal_handler();
}

static void ti_init_master_thread(void)
{
#ifdef _OS_WINDOWS_
    if (!DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
                         GetCurrentProcess(), &hMainThread, 0,
                         TRUE, DUPLICATE_SAME_ACCESS)) {
        jl_printf(JL_STDERR, "WARNING: failed to access handle to main thread\n");
        hMainThread = INVALID_HANDLE_VALUE;
    }
    jl_all_task_states[0].system_id = hMainThread;
#else
    jl_all_task_states[0].system_id = pthread_self();
#endif
    ti_initthread(0);
}

// all threads call this function to run user code
static jl_value_t *ti_run_fun(jl_function_t *f, jl_svec_t *args)
{
    JL_TRY {
        jl_apply(f, jl_svec_data(args), jl_svec_len(args));
    }
    JL_CATCH {
        return jl_exception_in_transit;
    }
    return jl_nothing;
}


#ifdef JULIA_ENABLE_THREADING

// lock for code generation
JL_DEFINE_MUTEX(codegen);
JL_DEFINE_MUTEX(typecache);

// thread heap
struct _jl_thread_heap_t **jl_all_heaps;

// only one thread group for now
ti_threadgroup_t *tgworld;

// for broadcasting work to threads
ti_threadwork_t threadwork;

#if PROFILE_JL_THREADING
double cpu_ghz;
uint64_t prep_ticks;
uint64_t *fork_ticks;
uint64_t *user_ticks;
uint64_t *join_ticks;
#endif

static uv_barrier_t thread_init_done;

// thread function: used by all except the main thread
void ti_threadfun(void *arg)
{
    ti_threadarg_t *ta = (ti_threadarg_t *)arg;
    ti_threadgroup_t *tg;
    ti_threadwork_t *work;

    // initialize this thread (set tid, create heap, etc.)
    ti_initthread(ta->tid);
    jl_init_stack_limits();

    // set up tasking
    jl_init_root_task(jl_stack_lo, jl_stack_hi - jl_stack_lo);
#ifdef COPY_STACKS
    jl_set_base_ctx((char*)&arg);
#endif

    // set the thread-local tid and wait for a thread group
    while (ta->state == TI_THREAD_INIT)
        cpu_pause();
    cpu_lfence();
    uv_barrier_wait(&thread_init_done);
    // initialize this thread in the thread group
    tg = ta->tg;
    ti_threadgroup_initthread(tg, ti_tid);

    // free the thread argument here
    free(ta);

    // work loop
    for (; ;) {
#if PROFILE_JL_THREADING
        uint64_t tstart = rdtsc();
#endif

        ti_threadgroup_fork(tg, ti_tid, (void **)&work);

#if PROFILE_JL_THREADING
        uint64_t tfork = rdtsc();
        fork_ticks[ti_tid] += tfork - tstart;
#endif

        if (work) {
            if (work->command == TI_THREADWORK_DONE)
                break;
            else if (work->command == TI_THREADWORK_RUN)
                // TODO: return value? reduction?
                ti_run_fun(work->fun, work->args);
        }

#if PROFILE_JL_THREADING
        uint64_t tuser = rdtsc();
        user_ticks[ti_tid] += tuser - tfork;
#endif

        ti_threadgroup_join(tg, ti_tid);

#if PROFILE_JL_THREADING
        uint64_t tjoin = rdtsc();
        join_ticks[ti_tid] += tjoin - tuser;
#endif

        // TODO:
        // nowait should skip the join, but confirm that fork is reentrant
    }
}

#if PROFILE_JL_THREADING
void ti_reset_timings(void);
#endif

// interface to Julia; sets up to make the runtime thread-safe
void jl_init_threading(void)
{
    char *cp;

    // how many threads available, usable
    int max_threads = jl_cpu_cores();
    jl_n_threads = DEFAULT_NUM_THREADS;
    cp = getenv(NUM_THREADS_NAME);
    if (cp) {
        jl_n_threads = (uint64_t)strtol(cp, NULL, 10);
    }
    if (jl_n_threads > max_threads)
        jl_n_threads = max_threads;

    // set up space for per-thread heaps
    jl_all_heaps = (struct _jl_thread_heap_t **)malloc(jl_n_threads * sizeof(void*));
    jl_all_task_states = (jl_thread_task_state_t *)malloc(jl_n_threads * sizeof(jl_thread_task_state_t));

#if PROFILE_JL_THREADING
    // estimate CPU speed
    uint64_t cpu_tim = rdtsc();
    sleep(1);
    cpu_ghz = ((double)(rdtsc() - cpu_tim)) / 1e9;

    // set up space for profiling information
    fork_ticks = (uint64_t*)jl_malloc_aligned(jl_n_threads * sizeof(uint64_t), 64);
    user_ticks = (uint64_t*)jl_malloc_aligned(jl_n_threads * sizeof(uint64_t), 64);
    join_ticks = (uint64_t*)jl_malloc_aligned(jl_n_threads * sizeof(uint64_t), 64);
    ti_reset_timings();
#endif

    // initialize this master thread (set tid, create heap, etc.)
    ti_init_master_thread();
}

void jl_start_threads(void)
{
    char *cp, mask[UV_CPU_SETSIZE];
    int i, exclusive;
    uv_thread_t uvtid;
    ti_threadarg_t **targs;

    // do we have exclusive use of the machine? default is no
    exclusive = DEFAULT_MACHINE_EXCLUSIVE;
    cp = getenv(MACHINE_EXCLUSIVE_NAME);
    if (cp)
        exclusive = strtol(cp, NULL, 10);

    // exclusive use: affinitize threads, master thread on proc 0, rest
    // according to a 'compact' policy
    // non-exclusive: no affinity settings; let the kernel move threads about
    if (exclusive) {
        memset(mask, 0, UV_CPU_SETSIZE);
        mask[0] = 1;
        uvtid = (uv_thread_t)uv_thread_self();
        uv_thread_setaffinity(&uvtid, mask, NULL, UV_CPU_SETSIZE);
    }

    // create threads
    targs = (ti_threadarg_t **)malloc((jl_n_threads - 1) * sizeof (ti_threadarg_t *));

    uv_barrier_init(&thread_init_done, jl_n_threads);

    for (i = 0;  i < jl_n_threads - 1;  ++i) {
        targs[i] = (ti_threadarg_t *)malloc(sizeof (ti_threadarg_t));
        targs[i]->state = TI_THREAD_INIT;
        targs[i]->tid = i + 1;
        uv_thread_create(&uvtid, ti_threadfun, targs[i]);
        if (exclusive) {
            memset(mask, 0, UV_CPU_SETSIZE);
            mask[i+1] = 1;
            uv_thread_setaffinity(&uvtid, mask, NULL, UV_CPU_SETSIZE);
        }
        uv_thread_detach(&uvtid);
        jl_all_task_states[i + 1].system_id = uvtid;
    }

    // set up the world thread group
    ti_threadgroup_create(1, jl_n_threads, 1, &tgworld);
    for (i = 0;  i < jl_n_threads;  ++i)
        ti_threadgroup_addthread(tgworld, i, NULL);
    ti_threadgroup_initthread(tgworld, ti_tid);

    // give the threads the world thread group; they will block waiting for fork
    for (i = 0;  i < jl_n_threads - 1;  ++i) {
        targs[i]->tg = tgworld;
        cpu_sfence();
        targs[i]->state = TI_THREAD_WORK;
    }

    uv_barrier_wait(&thread_init_done);

    // free the argument array; the threads will free their arguments
    free(targs);
}

// TODO: is this needed? where/when/how to call it?
void jl_shutdown_threading(void)
{
    // stop the spinning threads by sending them a command
    ti_threadwork_t *work = &threadwork;

    work->command = TI_THREADWORK_DONE;
    ti_threadgroup_fork(tgworld, ti_tid, (void **)&work);

    sleep(1);

    // destroy the world thread group
    ti_threadgroup_destroy(tgworld);

    // TODO: clean up and free the per-thread heaps

#if PROFILE_JL_THREADING
    jl_free_aligned(join_ticks);
    jl_free_aligned(user_ticks);
    jl_free_aligned(fork_ticks);
    fork_ticks = user_ticks = join_ticks = NULL;
#endif
}

// return thread's thread group
JL_DLLEXPORT void *jl_threadgroup(void) { return (void *)tgworld; }

// interface to user code: specialize and compile the user thread function
// and run it in all threads
JL_DLLEXPORT jl_value_t *jl_threading_run(jl_function_t *f, jl_svec_t *args)
{
#if PROFILE_JL_THREADING
    uint64_t tstart = rdtsc();
#endif

    jl_tupletype_t *argtypes = NULL;
    jl_function_t *fun = NULL;
    if ((jl_value_t*)args == jl_emptytuple)
        args = jl_emptysvec;
    JL_TYPECHK(jl_threading_run, function, (jl_value_t*)f);
    JL_TYPECHK(jl_threading_run, simplevector, (jl_value_t*)args);

    JL_GC_PUSH2(&argtypes, &fun);
    if (jl_svec_len(args) == 0)
        argtypes = (jl_tupletype_t*)jl_typeof(jl_emptytuple);
    else
        argtypes = arg_type_tuple(jl_svec_data(args), jl_svec_len(args));
    fun = jl_get_specialization(f, argtypes, NULL);
    if (fun == NULL)
        fun = f;
    jl_generate_fptr(fun);

    threadwork.command = TI_THREADWORK_RUN;
    threadwork.fun = fun;
    threadwork.args = args;
    threadwork.ret = jl_nothing;

#if PROFILE_JL_THREADING
    uint64_t tcompile = rdtsc();
    prep_ticks += (tcompile - tstart);
#endif

    // fork the world thread group
    ti_threadwork_t *tw = (ti_threadwork_t *)&threadwork;
    ti_threadgroup_fork(tgworld, ti_tid, (void **)&tw);

#if PROFILE_JL_THREADING
    uint64_t tfork = rdtsc();
    fork_ticks[ti_tid] += (tfork - tcompile);
#endif

    // this thread must do work too (TODO: reduction?)
    tw->ret = ti_run_fun(fun, args);

#if PROFILE_JL_THREADING
    uint64_t trun = rdtsc();
    user_ticks[ti_tid] += (trun - tfork);
#endif

    // wait for completion (TODO: nowait?)
    ti_threadgroup_join(tgworld, ti_tid);

#if PROFILE_JL_THREADING
    uint64_t tjoin = rdtsc();
    join_ticks[ti_tid] += (tjoin - trun);
#endif

    JL_GC_POP();

    return tw->ret;
}

#if PROFILE_JL_THREADING

void ti_reset_timings(void)
{
    int i;
    prep_ticks = 0;
    for (i = 0;  i < jl_n_threads;  i++)
        fork_ticks[i] = user_ticks[i] = join_ticks[i] = 0;
}

void ti_timings(uint64_t *times, uint64_t *min, uint64_t *max, uint64_t *avg)
{
    int i;
    *min = UINT64_MAX;
    *max = *avg = 0;
    for (i = 0;  i < jl_n_threads;  i++) {
        if (times[i] < *min)
            *min = times[i];
        if (times[i] > *max)
            *max = times[i];
        *avg += times[i];
    }
    *avg /= jl_n_threads;
}

#define TICKS_TO_SECS(t)        (((double)(t)) / (cpu_ghz * 1e9))

JL_DLLEXPORT void jl_threading_profile(void)
{
    if (!fork_ticks) return;

    printf("\nti profile:\n");
    printf("prep: %g (%llu)\n", TICKS_TO_SECS(prep_ticks), (unsigned long long)prep_ticks);

    uint64_t min, max, avg;
    ti_timings(fork_ticks, &min, &max, &avg);
    printf("fork: %g (%g - %g)\n", TICKS_TO_SECS(min), TICKS_TO_SECS(max),
            TICKS_TO_SECS(avg));
    ti_timings(user_ticks, &min, &max, &avg);
    printf("user: %g (%g - %g)\n", TICKS_TO_SECS(min), TICKS_TO_SECS(max),
            TICKS_TO_SECS(avg));
    ti_timings(join_ticks, &min, &max, &avg);
    printf("join: %g (%g - %g)\n", TICKS_TO_SECS(min), TICKS_TO_SECS(max),
            TICKS_TO_SECS(avg));
}

#else //!PROFILE_JL_THREADING

JL_DLLEXPORT void jl_threading_profile(void)
{
}

#endif //!PROFILE_JL_THREADING

#else // !JULIA_ENABLE_THREADING

JL_DLLEXPORT jl_value_t *jl_threading_run(jl_function_t *f, jl_svec_t *args)
{
    if ((jl_value_t*)args == jl_emptytuple)
        args = jl_emptysvec;
    JL_TYPECHK(jl_threading_run, function, (jl_value_t*)f);
    JL_TYPECHK(jl_threading_run, simplevector, (jl_value_t*)args);
    return ti_run_fun(f, args);
}

void jl_init_threading(void)
{
    static jl_thread_task_state_t _jl_all_task_states;
    jl_all_task_states = &_jl_all_task_states;
    jl_n_threads = 1;

#if defined(__linux__) && defined(JL_USE_INTEL_JITEVENTS)
    if (jl_using_intel_jitevents)
        // Intel VTune Amplifier needs at least 64k for alternate stack.
        if (SIGSTKSZ < 1<<16)
            sig_stack_size = 1<<16;
#endif

    ti_init_master_thread();
}

void jl_start_threads(void) { }

#endif // !JULIA_ENABLE_THREADING

#ifdef __cplusplus
}
#endif
back to top