https://github.com/JuliaLang/julia
Raw File
Tip revision: 42c2abf4283ab6e7149c1b552a34a0def2ab9227 authored by gbaraldi on 15 March 2024, 21:46:50 UTC
Fix test!
Tip revision: 42c2abf
safepoint.c
// This file is a part of Julia. License is MIT: https://julialang.org/license

#include "julia.h"
#include "julia_internal.h"
#include "threading.h"
#ifndef _OS_WINDOWS_
#include <sys/mman.h>
#if defined(_OS_DARWIN_) && !defined(MAP_ANONYMOUS)
#define MAP_ANONYMOUS MAP_ANON
#endif
#endif
#include "julia_assert.h"

#ifdef __cplusplus
extern "C" {
#endif

// 0: no sigint is pending
// 1: at least one sigint is pending, only the sigint page is enabled.
// 2: at least one sigint is pending, both safepoint pages are enabled.
JL_DLLEXPORT sig_atomic_t jl_signal_pending = 0;
_Atomic(uint32_t) jl_gc_running = 0;
char *jl_safepoint_pages = NULL;
// The number of safepoints enabled on the three pages.
// The first page, is the SIGINT page, only used by the master thread.
// The second page, is the GC page for the master thread, this is where
// the `safepoint` tls pointer points to for the master thread.
// The third page is the GC page for the other threads. The thread's
// `safepoint` tls pointer points the beginning of this page + `sizeof(size_t)`
// so that both safepoint load and pending signal load falls in this page.
// The initialization of the `safepoint` pointer is done `ti_initthread`
// in `threading.c`.
// The fourth page is the count of suspended threads
uint16_t jl_safepoint_enable_cnt[4] = {0, 0, 0, 0};

// This lock should be acquired before enabling/disabling the safepoint
// or accessing one of the following variables:
//
// * jl_gc_running
// * jl_signal_pending
// * jl_safepoint_enable_cnt
//
// Additionally accessing `jl_gc_running` should use acquire/release
// load/store so that threads waiting for the GC doesn't have to also
// fight on the safepoint lock...
uv_mutex_t safepoint_lock;
uv_cond_t safepoint_cond_begin;
uv_cond_t safepoint_cond_end;

static void jl_safepoint_enable(int idx) JL_NOTSAFEPOINT
{
    // safepoint_lock should be held
    assert(0 <= idx && idx <= 3);
    if (jl_safepoint_enable_cnt[idx]++ != 0) {
        // We expect this to be enabled at most twice
        // one for the GC, one for SIGINT.
        // Update this if this is not the case anymore in the future.
        assert(jl_safepoint_enable_cnt[idx] <= (idx == 3 ? INT16_MAX : 2));
        return;
    }
    // Now that we are requested to mprotect the page and it wasn't already.
    char *pageaddr = jl_safepoint_pages + jl_page_size * idx;
#ifdef _OS_WINDOWS_
    DWORD old_prot;
    VirtualProtect(pageaddr, jl_page_size, PAGE_NOACCESS, &old_prot);
#else
    int r = mprotect(pageaddr, jl_page_size, PROT_NONE);
    (void)r; //if (r) perror("mprotect");
#endif
}

static void jl_safepoint_disable(int idx) JL_NOTSAFEPOINT
{
    // safepoint_lock should be held
    assert(0 <= idx && idx <= 3);
    if (--jl_safepoint_enable_cnt[idx] != 0) {
        assert(jl_safepoint_enable_cnt[idx] > 0);
        return;
    }
    // Now that we are requested to un-mprotect the page and no one else
    // want it to be kept protected.
    char *pageaddr = jl_safepoint_pages + jl_page_size * idx;
#ifdef _OS_WINDOWS_
    DWORD old_prot;
    VirtualProtect(pageaddr, jl_page_size, PAGE_READONLY, &old_prot);
#else
    int r = mprotect(pageaddr, jl_page_size, PROT_READ);
    (void)r; //if (r) perror("mprotect");
#endif
}

void jl_safepoint_init(void)
{
    uv_mutex_init(&safepoint_lock);
    uv_cond_init(&safepoint_cond_begin);
    uv_cond_init(&safepoint_cond_end);
    // jl_page_size isn't available yet.
    size_t pgsz = jl_getpagesize();
#ifdef _OS_WINDOWS_
    char *addr = (char*)VirtualAlloc(NULL, pgsz * 4, MEM_COMMIT, PAGE_READONLY);
#else
    char *addr = (char*)mmap(0, pgsz * 4, PROT_READ,
                             MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
    if (addr == MAP_FAILED)
        addr = NULL;
#endif
    if (addr == NULL) {
        jl_printf(JL_STDERR, "could not allocate GC synchronization page\n");
        jl_gc_debug_critical_error();
        abort();
    }
//    // If we able to skip past the faulting safepoint instruction conditionally,
//    // then we can make this safepoint page unconditional. But otherwise we
//    // only enable this page when required, though it gives us less
//    // fine-grained control over individual resume.
//    char *pageaddr = addr + pgsz * 3;
//#ifdef _OS_WINDOWS_
//    DWORD old_prot;
//    VirtualProtect(pageaddr, pgsz, PAGE_NOACCESS, &old_prot);
//#else
//    int r = mprotect(pageaddr, pgsz, PROT_NONE);
//    (void)r; //if (r) perror("mprotect");
//#endif
    // The signal page is for the gc safepoint.
    // The page before it is the sigint pending flag.
    jl_safepoint_pages = addr;
}

void jl_gc_wait_for_the_world(jl_ptls_t* gc_all_tls_states, int gc_n_threads)
{
    JL_TIMING(GC, GC_Stop);
#ifdef USE_TRACY
    TracyCZoneCtx ctx = JL_TIMING_DEFAULT_BLOCK->tracy_ctx;
    TracyCZoneColor(ctx, 0x696969);
#endif
    assert(gc_n_threads);
    if (gc_n_threads > 1)
        jl_wake_libuv();
    for (int i = 0; i < gc_n_threads; i++) {
        jl_ptls_t ptls2 = gc_all_tls_states[i];
        if (ptls2 != NULL) {
            // This acquire load pairs with the release stores
            // in the signal handler of safepoint so we are sure that
            // all the stores on those threads are visible.
            // We're currently also using atomic store release in mutator threads
            // (in jl_gc_state_set), but we may want to use signals to flush the
            // memory operations on those threads lazily instead.
            while (!jl_atomic_load_relaxed(&ptls2->gc_state) || !jl_atomic_load_acquire(&ptls2->gc_state)) {
                // Use system mutexes rather than spin locking to minimize wasted CPU time
                // while we wait for other threads reach a safepoint.
                // This is particularly important when run under rr.
                uv_mutex_lock(&safepoint_lock);
                if (!jl_atomic_load_relaxed(&ptls2->gc_state))
                    uv_cond_wait(&safepoint_cond_begin, &safepoint_lock);
                uv_mutex_unlock(&safepoint_lock);
            }
        }
    }
}

int jl_safepoint_start_gc(void)
{
    // The thread should have just set this before entry
    assert(jl_atomic_load_relaxed(&jl_current_task->ptls->gc_state) == JL_GC_STATE_WAITING);
    uv_mutex_lock(&safepoint_lock);
    uv_cond_broadcast(&safepoint_cond_begin);
    // make sure we are permitted to run GC now (we might be required to stop instead)
    jl_task_t *ct = jl_current_task;
    while (jl_atomic_load_relaxed(&ct->ptls->suspend_count)) {
        uv_mutex_unlock(&safepoint_lock);
        jl_safepoint_wait_thread_resume();
        uv_mutex_lock(&safepoint_lock);
    }
    // In case multiple threads enter the GC at the same time, only allow
    // one of them to actually run the collection. We can't just let the
    // master thread do the GC since it might be running unmanaged code
    // and can take arbitrarily long time before hitting a safe point.
    uint32_t running = 0;
    if (!jl_atomic_cmpswap(&jl_gc_running, &running, 1)) {
        uv_mutex_unlock(&safepoint_lock);
        jl_safepoint_wait_gc();
        return 0;
    }
    // Foreign thread adoption disables the GC and waits for it to finish, however, that may
    // introduce a race between it and this thread checking if the GC is enabled and only
    // then setting jl_gc_running. To avoid that, check again now that we won that race.
    if (jl_atomic_load_acquire(&jl_gc_disable_counter)) {
        jl_atomic_store_release(&jl_gc_running, 0);
        uv_mutex_unlock(&safepoint_lock);
        return 0;
    }
    jl_safepoint_enable(1);
    jl_safepoint_enable(2);
    uv_mutex_unlock(&safepoint_lock);
    return 1;
}

void jl_safepoint_end_gc(void)
{
    assert(jl_atomic_load_relaxed(&jl_gc_running));
    uv_mutex_lock(&safepoint_lock);
    // Need to reset the page protection before resetting the flag since
    // the thread will trigger a segfault immediately after returning from
    // the signal handler.
    jl_safepoint_disable(2);
    jl_safepoint_disable(1);
    jl_atomic_store_release(&jl_gc_running, 0);
#  ifdef _OS_DARWIN_
    // This wakes up other threads on mac.
    jl_mach_gc_end();
#  endif
    uv_mutex_unlock(&safepoint_lock);
    uv_cond_broadcast(&safepoint_cond_end);
}

void jl_set_gc_and_wait(void) // n.b. not used on _OS_DARWIN_
{
    jl_task_t *ct = jl_current_task;
    // reading own gc state doesn't need atomic ops since no one else
    // should store to it.
    int8_t state = jl_atomic_load_relaxed(&ct->ptls->gc_state);
    jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_WAITING);
    uv_mutex_lock(&safepoint_lock);
    uv_cond_broadcast(&safepoint_cond_begin);
    uv_mutex_unlock(&safepoint_lock);
    jl_safepoint_wait_gc();
    jl_atomic_store_release(&ct->ptls->gc_state, state);
    jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state
}

// this is the core of jl_set_gc_and_wait
void jl_safepoint_wait_gc(void) JL_NOTSAFEPOINT
{
    jl_task_t *ct = jl_current_task; (void)ct;
    JL_TIMING_SUSPEND_TASK(GC_SAFEPOINT, ct);
    // The thread should have set this is already
    assert(jl_atomic_load_relaxed(&ct->ptls->gc_state) != JL_GC_STATE_UNSAFE);
    // Use normal volatile load in the loop for speed until GC finishes.
    // Then use an acquire load to make sure the GC result is visible on this thread.
    while (jl_atomic_load_relaxed(&jl_gc_running) || jl_atomic_load_acquire(&jl_gc_running)) {
        // Use system mutexes rather than spin locking to minimize wasted CPU
        // time on the idle cores while we wait for the GC to finish.
        // This is particularly important when run under rr.
        uv_mutex_lock(&safepoint_lock);
        if (jl_atomic_load_relaxed(&jl_gc_running))
            uv_cond_wait(&safepoint_cond_end, &safepoint_lock);
        uv_mutex_unlock(&safepoint_lock);
    }
}

// equivalent to jl_set_gc_and_wait, but waiting on resume-thread lock instead
void jl_safepoint_wait_thread_resume(void)
{
    jl_task_t *ct = jl_current_task;
    // n.b. we do not permit a fast-path here that skips the lock acquire since
    // we otherwise have no synchronization point to ensure that this thread
    // will observe the change to the safepoint, even though the other thread
    // might have already observed our gc_state.
    // if (!jl_atomic_load_relaxed(&ct->ptls->suspend_count)) return;
    int8_t state = jl_atomic_load_relaxed(&ct->ptls->gc_state);
    jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_WAITING);
    uv_mutex_lock(&ct->ptls->sleep_lock);
    if (jl_atomic_load_relaxed(&ct->ptls->suspend_count)) {
        // defer this broadcast until we determine whether uv_cond_wait is really going to be needed
        uv_mutex_unlock(&ct->ptls->sleep_lock);
        uv_mutex_lock(&safepoint_lock);
        uv_cond_broadcast(&safepoint_cond_begin);
        uv_mutex_unlock(&safepoint_lock);
        uv_mutex_lock(&ct->ptls->sleep_lock);
    }
    while (jl_atomic_load_relaxed(&ct->ptls->suspend_count))
        uv_cond_wait(&ct->ptls->wake_signal, &ct->ptls->sleep_lock);
    // must while still holding the mutex_unlock, so we know other threads in
    // jl_safepoint_suspend_thread will observe this thread in the correct GC
    // state, and not still stuck in JL_GC_STATE_WAITING
    jl_atomic_store_release(&ct->ptls->gc_state, state);
    uv_mutex_unlock(&ct->ptls->sleep_lock);
}

// n.b. suspended threads may still run in the GC or GC safe regions
// but shouldn't be observable, depending on which enum the user picks (only 1 and 2 are typically recommended here)
// waitstate = 0 : do not wait for suspend to finish
// waitstate = 1 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE)
// waitstate = 2 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) and that GC is not running on that thread
// waitstate = 3 : wait for full suspend (gc_state == JL_GC_STATE_WAITING) -- this may never happen if thread is sleeping currently
// if another thread comes along and calls jl_safepoint_resume, we also return early
// return new suspend count on success, 0 on failure
int jl_safepoint_suspend_thread(int tid, int waitstate)
{
    if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads))
        return 0;
    jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
    uv_mutex_lock(&ptls2->sleep_lock);
    int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count) + 1;
    jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count);
    if (suspend_count == 1) { // first to suspend
        jl_safepoint_enable(3);
        jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 3 + sizeof(void*)));
    }
    uv_mutex_unlock(&ptls2->sleep_lock);
    if (waitstate) {
        // wait for suspend (or another thread to call resume)
        if (waitstate >= 2) {
            // We currently cannot distinguish if a thread is helping run GC or
            // not, so assume it is running GC and wait for GC to finish first.
            // It will be unable to reenter helping with GC because we have
            // changed its safepoint page.
            jl_set_gc_and_wait();
        }
        while (jl_atomic_load_acquire(&ptls2->suspend_count) != 0) {
            int8_t state2 = jl_atomic_load_acquire(&ptls2->gc_state);
            if (waitstate <= 2 && state2 != JL_GC_STATE_UNSAFE)
                break;
            if (waitstate == 3 && state2 == JL_GC_STATE_WAITING)
                break;
            jl_cpu_pause(); // yield (wait for safepoint_cond_begin, for example)?
        }
    }
    return suspend_count;
}

// return old suspend count on success, 0 on failure
// n.b. threads often do not resume until after all suspended threads have been resumed!
int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT
{
    if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads))
        return 0;
    jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
    uv_mutex_lock(&safepoint_lock);
    uv_mutex_lock(&ptls2->sleep_lock);
    int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count);
    if (suspend_count == 1) { // last to unsuspend
        if (tid == 0)
            jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size));
        else
            jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 2 + sizeof(void*)));
        uv_cond_signal(&ptls2->wake_signal);
#ifdef _OS_DARWIN_
        jl_safepoint_resume_thread_mach(ptls2, tid);
#endif
    }
    if (suspend_count != 0) {
        jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count - 1);
        if (suspend_count == 1)
            jl_safepoint_disable(3);
    }
    uv_mutex_unlock(&ptls2->sleep_lock);
    uv_mutex_unlock(&safepoint_lock);
    return suspend_count;
}

void jl_safepoint_enable_sigint(void)
{
    uv_mutex_lock(&safepoint_lock);
    // Make sure both safepoints are enabled exactly once for SIGINT.
    switch (jl_signal_pending) {
    default:
        assert(0 && "Shouldn't happen.");
    case 0:
        // Enable SIGINT page
        jl_safepoint_enable(0);
        // fall through
    case 1:
        // SIGINT page is enabled, enable GC page
        jl_safepoint_enable(1);
        // fall through
    case 2:
        jl_signal_pending = 2;
    }
    uv_mutex_unlock(&safepoint_lock);
}

void jl_safepoint_defer_sigint(void)
{
    uv_mutex_lock(&safepoint_lock);
    // Make sure the GC safepoint is disabled for SIGINT.
    if (jl_signal_pending == 2) {
        jl_safepoint_disable(1);
        jl_signal_pending = 1;
    }
    uv_mutex_unlock(&safepoint_lock);
}

int jl_safepoint_consume_sigint(void)
{
    int has_signal = 0;
    uv_mutex_lock(&safepoint_lock);
    // Make sure both safepoints are disabled for SIGINT.
    switch (jl_signal_pending) {
    default:
        assert(0 && "Shouldn't happen.");
    case 2:
        // Disable gc page
        jl_safepoint_disable(1);
        // fall through
    case 1:
        // GC page is disabled, disable SIGINT page
        jl_safepoint_disable(0);
        has_signal = 1;
        // fall through
    case 0:
        jl_signal_pending = 0;
    }
    uv_mutex_unlock(&safepoint_lock);
    return has_signal;
}

#ifdef __cplusplus
}
#endif
back to top