Revision 4954af9c5ee5bb1b5b9172ddbcbac03ca6e151ea authored by Keno Fischer on 01 September 2023, 19:52:14 UTC, committed by GitHub on 01 September 2023, 19:52:14 UTC
The change in #50429 moves around some dispatch boundaries and pushes the allocations in the offsetarrays `maximum!` test over the limit. The implementation of that code is massively type unstable. Somewhat, ironically, the whole original point of that test was to test that the implementation was not type-unstable (#28941), so actually opt our OffsetArrays implementation into the interface that's supposed to guarantee that. If this PR is fine here, I'll submit the same upstream to avoid diverging the implementations too much. Co-authored-by: Jameson Nash <vtjnash@gmail.com>
1 parent a173010
threads.jl
# This file is a part of Julia. License is MIT: https://julialang.org/license
using Test
using Base.Threads
include("print_process_affinity.jl") # import `uv_thread_getaffinity`
# simple sanity tests for locks under cooperative concurrent access
let lk = ReentrantLock()
c1 = Event()
c2 = Event()
@test trylock(lk)
@test trylock(lk)
t1 = @async (notify(c1); lock(lk); unlock(lk); trylock(lk))
t2 = @async (notify(c2); trylock(lk))
wait(c1)
wait(c2)
@test t1.queue === lk.cond_wait.waitq
@test t2.queue !== lk.cond_wait.waitq
@test istaskdone(t2)
@test !fetch(t2)
unlock(lk)
@test t1.queue === lk.cond_wait.waitq
unlock(lk)
@test t1.queue !== lk.cond_wait.waitq
@test fetch(t1)
end
let e = Event(), started1 = Event(false), started2 = Event(false)
for i = 1:3
done1 = false
done2 = false
t1 = @async (notify(started1); wait(e); done1 = true)
t2 = @async (notify(started2); wait(e); done2 = true)
wait(started1)
wait(started2)
sleep(0.1)
@test !done1 && !done2
notify(e)
wait(t1)
@test done1
wait(t2)
@test done2
wait(e)
notify(e)
reset(e)
end
end
let e = Event(true), started1 = Event(true), started2 = Event(true), done = Event(true)
for i = 1:3
done1 = false
done2 = false
t1 = @async (notify(started1); wait(e); done1 = true; notify(done))
t2 = @async (notify(started2); wait(e); done2 = true; notify(done))
wait(started1)
wait(started2)
sleep(0.1)
@test !done1 && !done2
notify(e)
wait(done)
@test done1 ⊻ done2
done1 ? wait(t1) : wait(t2)
notify(e)
wait(t1)
@test done1
wait(t2)
@test done2
wait(done)
end
end
let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no threads_exec.jl`
for test_nthreads in (1, 2, 4, 4) # run once to try single-threaded mode, then try a couple times to trigger bad races
new_env = copy(ENV)
new_env["JULIA_NUM_THREADS"] = string(test_nthreads)
run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr))
end
end
# Timing-sensitive tests can fail on CI due to occasional unexpected delays,
# so this test is disabled.
#=
let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no threadpool_latency.jl`
for test_nthreads in (1, 2)
new_env = copy(ENV)
new_env["JULIA_NUM_THREADS"] = string(test_nthreads, ",1")
run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr))
end
end
=#
let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no threadpool_use.jl`
new_env = copy(ENV)
new_env["JULIA_NUM_THREADS"] = "1,1"
run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr))
end
function run_with_affinity(cpus)
script = joinpath(@__DIR__, "print_process_affinity.jl")
return readchomp(setcpuaffinity(`$(Base.julia_cmd()) $script`, cpus))
end
# issue #34415 - make sure external affinity settings work
if Sys.islinux()
const SYS_rrcall_check_presence = 1008
global running_under_rr() = 0 == ccall(:syscall, Int,
(Int, Int, Int, Int, Int, Int, Int),
SYS_rrcall_check_presence, 0, 0, 0, 0, 0, 0)
else
global running_under_rr() = false
end
# Note also that libuv does not support affinity in macOS and it is known to
# hang in FreeBSD. So, it's tested only in Linux and Windows:
const AFFINITY_SUPPORTED = (Sys.islinux() || Sys.iswindows()) && !running_under_rr()
if AFFINITY_SUPPORTED
allowed_cpus = findall(uv_thread_getaffinity())
if length(allowed_cpus) ≥ 2
@test run_with_affinity(allowed_cpus[1:1]) == "$(allowed_cpus[1])"
@test run_with_affinity(allowed_cpus[1:2]) == "$(allowed_cpus[1]),$(allowed_cpus[2])"
end
end
function get_nthreads(options = ``; cpus = nothing)
cmd = `$(Base.julia_cmd()) --startup-file=no $(options)`
cmd = `$cmd -e "print(Threads.threadpoolsize())"`
cmd = addenv(cmd, "JULIA_EXCLUSIVE" => "0", "JULIA_NUM_THREADS" => "auto")
if cpus !== nothing
cmd = setcpuaffinity(cmd, cpus)
end
return parse(Int, read(cmd, String))
end
@testset "nthreads determined based on CPU affinity" begin
if AFFINITY_SUPPORTED
allowed_cpus = findall(uv_thread_getaffinity())
if length(allowed_cpus) ≥ 2
@test get_nthreads() ≥ 2
@test get_nthreads(cpus = allowed_cpus[1:1]) == 1
@test get_nthreads(cpus = allowed_cpus[2:2]) == 1
@test get_nthreads(cpus = allowed_cpus[1:2]) == 2
@test get_nthreads(`-t1`, cpus = allowed_cpus[1:1]) == 1
@test get_nthreads(`-t1`, cpus = allowed_cpus[2:2]) == 1
@test get_nthreads(`-t1`, cpus = allowed_cpus[1:2]) == 1
if length(allowed_cpus) ≥ 3
@test get_nthreads(cpus = allowed_cpus[1:2:3]) == 2
@test get_nthreads(cpus = allowed_cpus[2:3]) == 2
end
end
end
end
# issue #34769
function idle_callback(handle)
idle = Base.@handle_as handle UvTestIdle
if idle.active
idle.count += 1
if idle.count == 1
# We want to hit the case where we're allowing
# the thread to go to sleep, which only happens
# after some default amount of time (DEFAULT_THREAD_SLEEP_THRESHOLD)
# so spend that amount of time here.
Libc.systemsleep(0.004)
elseif idle.count >= 10
lock(idle.cond)
try
notify(idle.cond, true)
finally
unlock(idle.cond)
end
idle.active = false
end
end
nothing
end
mutable struct UvTestIdle
handle::Ptr{Cvoid}
cond::Base.ThreadSynchronizer
isopen::Bool
active::Bool
count::Int
function UvTestIdle()
this = new(Libc.malloc(Base._sizeof_uv_idle), Base.ThreadSynchronizer(), true, false, 0)
Base.iolock_begin()
Base.associate_julia_struct(this.handle, this)
err = ccall(:uv_idle_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}),
Base.eventloop(), this.handle)
if err != 0
Libc.free(this.handle)
this.handle = C_NULL
throw(_UVError("uv_idle_init", err))
end
err = ccall(:uv_idle_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}),
this.handle, @cfunction(idle_callback, Cvoid, (Ptr{Cvoid},)))
if err != 0
Libc.free(this.handle)
this.handle = C_NULL
throw(_UVError("uv_idle_start", err))
end
finalizer(Base.uvfinalize, this)
Base.iolock_end()
return this
end
end
Base.unsafe_convert(::Type{Ptr{Cvoid}}, idle::UvTestIdle) = idle.handle
function Base.uvfinalize(t::UvTestIdle)
Base.iolock_begin()
Base.lock(t.cond)
try
if t.handle != C_NULL
Base.disassociate_julia_struct(t.handle) # not going to call the usual close hooks
if t.isopen
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
t.handle = C_NULL
notify(t.cond, false)
end
finally
unlock(t.cond)
end
Base.iolock_end()
nothing
end
function Base.close(idle::UvTestIdle)
Base.uvfinalize(idle)
end
function Base.wait(idle::UvTestIdle)
Base.iolock_begin()
Base.preserve_handle(idle)
Base.lock(idle.cond)
try
idle.active = true
Base.iolock_end()
wait(idle.cond)
finally
Base.unlock(idle.cond)
Base.iolock_begin()
Base.unpreserve_handle(idle)
Base.iolock_end()
end
end
# Spawn another process as a watchdog. If this test fails, it'll unrecoverably
# hang in the event loop. Another process needs to kill it
cmd = """
@async (Base.wait_readnb(stdin, 1); exit())
sleep(100)
isopen(stdin) || exit()
println(stderr, "ERROR: Killing threads test due to watchdog expiry")
ccall(:uv_kill, Cint, (Cint, Cint), $(getpid()), Base.SIGTERM)
"""
proc = open(pipeline(`$(Base.julia_cmd()) -e $cmd`; stderr=stderr); write=true)
let idle=UvTestIdle()
wait(idle)
close(idle)
end
@threads for i = 1:1
let idle=UvTestIdle()
wait(idle)
close(idle)
end
end
@test process_running(proc)
# We don't need the watchdog anymore
close(proc.in)
# https://github.com/JuliaLang/julia/pull/42973
@testset "spawn and wait *a lot* of tasks in @profile" begin
# Not using threads_exec.jl for better isolation, reproducibility, and a
# tighter timeout.
script = "profile_spawnmany_exec.jl"
cmd_base = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no $script`
@testset for n in [20000, 200000, 2000000]
cmd = ignorestatus(setenv(cmd_base, "NTASKS" => n; dir = @__DIR__))
cmd = pipeline(cmd; stdout = stderr, stderr)
proc = run(cmd; wait = false)
done = Threads.Atomic{Bool}(false)
timeout = false
timer = Timer(100) do _
timeout = true
for sig in [Base.SIGTERM, Base.SIGHUP, Base.SIGKILL]
for _ in 1:1000
kill(proc, sig)
if done[]
if sig != Base.SIGTERM
@warn "Terminating `$script` required signal $sig"
end
return
end
sleep(0.001)
end
end
end
try
wait(proc)
finally
done[] = true
close(timer)
end
if ( !success(proc) ) || ( timeout )
@error "A \"spawn and wait lots of tasks\" test failed" n proc.exitcode proc.termsignal success(proc) timeout
end
if Sys.iswindows() || Sys.isapple()
# Known failure: https://github.com/JuliaLang/julia/issues/43124
@test_skip success(proc)
else
@test success(proc)
@test !timeout
end
end
end
@testset "bad arguments to @threads" begin
@test_throws ArgumentError @macroexpand(@threads 1 2) # wrong number of args
@test_throws ArgumentError @macroexpand(@threads 1) # arg isn't an Expr
@test_throws ArgumentError @macroexpand(@threads if true 1 end) # arg doesn't start with for
end
@testset "rand_ptls underflow" begin
@test Base.Partr.cong(UInt32(0)) == 0
end
Computing file changes ...