https://github.com/JuliaLang/julia
Tip revision: bd4464b61b1fdd8688dfc53f646970d27c3bb25c authored by Jameson Nash on 29 February 2024, 22:18:08 UTC
codegen: add a pass for late conversion of known modify ops to call atomicrmw
codegen: add a pass for late conversion of known modify ops to call atomicrmw
Tip revision: bd4464b
channels.jl
# This file is a part of Julia. License is MIT: https://julialang.org/license
using Random
using Base: Experimental
using Base: n_avail
@testset "single-threaded Condition usage" begin
a = Condition()
t = @async begin
Base.notify(a, "success")
"finished"
end
@test wait(a) == "success"
@test fetch(t) == "finished"
end
@testset "wait first behavior of wait on Condition" begin
a = Condition()
waiter1 = @async begin
wait(a)
end
waiter2 = @async begin
wait(a)
end
waiter3 = @async begin
wait(a; first=true)
end
waiter4 = @async begin
wait(a)
end
t = @async begin
Base.notify(a, "success"; all=false)
"finished"
end
@test fetch(waiter3) == "success"
@test fetch(t) == "finished"
end
@testset "various constructors" begin
c = Channel()
@test eltype(c) == Any
@test c.sz_max == 0
@test isempty(c) == true # Nothing in it
@test isfull(c) == true # But no more room
c = Channel(1)
@test eltype(c) == Any
@test put!(c, 1) == 1
@test isready(c) == true
@test take!(c) == 1
@test isready(c) == false
@test eltype(Channel(1.0)) == Any
c = Channel(1)
@test isfull(c) == false
put!(c, 1)
@test isfull(c) == true
c = Channel{Int}(1)
@test eltype(c) == Int
@test_throws MethodError put!(c, "Hello")
c = Channel{Int}()
@test eltype(c) == Int
@test c.sz_max == 0
c = Channel{Int}(Inf)
@test eltype(c) == Int
pvals = map(i->put!(c,i), 1:10^6)
tvals = Int[take!(c) for i in 1:10^6]
@test pvals == tvals
@test_throws ArgumentError Channel(-1)
@test_throws InexactError Channel(1.5)
end
@testset "Task constructors" begin
c = Channel{Int}() do c; map(i->put!(c,i), 1:100); end
@test eltype(c) == Int
@test c.sz_max == 0
@test collect(c) == 1:100
c = Channel() do c; put!(c, 1); put!(c, "hi") end
@test c.sz_max == 0
@test collect(c) == [1, "hi"]
c = Channel(Inf) do c; put!(c,1); end
@test eltype(c) == Any
@test c.sz_max == typemax(Int)
c = Channel{Int}(Inf) do c; put!(c,1); end
@test eltype(c) == Int
@test c.sz_max == typemax(Int)
taskref = Ref{Task}()
c = Channel{Int}(0, taskref=taskref) do c; put!(c, 0); end
@test eltype(c) == Int
@test c.sz_max == 0
@test istaskstarted(taskref[])
@test !istaskdone(taskref[])
take!(c); wait(taskref[])
@test istaskdone(taskref[])
# Legacy constructor
c = Channel(ctype=Float32, csize=2) do c; map(i->put!(c,i), 1:100); end
@test eltype(c) == Float32
@test c.sz_max == 2
@test isopen(c)
@test collect(c) == 1:100
end
@testset "Multithreaded task constructors" begin
taskref = Ref{Task}()
c = Channel(spawn=true, taskref=taskref) do c; put!(c, 0); end
# Test that the task is using the multithreaded scheduler
@test taskref[].sticky == false
@test collect(c) == [0]
end
let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no channel_threadpool.jl`
new_env = copy(ENV)
new_env["JULIA_NUM_THREADS"] = "1,1"
run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr))
end
@testset "multiple concurrent put!/take! on a channel for different sizes" begin
function testcpt(sz)
c = Channel{Int}(sz)
size = 0
inc() = size += 1
dec() = size -= 1
@sync for i = 1:10^4
@async (sleep(rand()); put!(c, i); inc())
@async (sleep(rand()); take!(c); dec())
end
@test size == 0
end
testcpt(0)
testcpt(1)
testcpt(32)
testcpt(Inf)
end
@testset "type conversion in put!" begin
c = Channel{Int64}(0)
@async put!(c, Int32(1))
wait(c)
@test isa(take!(c), Int64)
@test_throws MethodError put!(c, "")
@assert !islocked(c.cond_take)
end
@testset "multiple for loops waiting on the same channel" begin
# Test multiple "for" loops waiting on the same channel which
# is closed after adding a few elements.
c = Channel(32)
results = []
@sync begin
for i in 1:20
@async for ii in c
push!(results, ii)
end
end
sleep(1.0)
for i in 1:5
put!(c,i)
end
close(c)
end
@test sum(results) == 15
end
# Tests for channels bound to tasks.
using Distributed
@testset "channels bound to tasks" for N in [0, 10]
# Normal exit of task
c = Channel(N)
bind(c, @async (GC.gc(); yield(); nothing))
@test_throws InvalidStateException take!(c)
@test !isopen(c)
# Error exception in task
c = Channel(N)
task = @async (GC.gc(); yield(); error("foo"))
bind(c, task)
@test_throws TaskFailedException(task) take!(c)
@test !isopen(c)
# Multiple channels closed by the same bound task
cs = [Channel(N) for i in 1:5]
tf2() = begin
GC.gc()
if N > 0
foreach(c -> (@assert take!(c) === 2), cs)
end
yield()
error("foo")
end
task = Task(tf2)
foreach(c -> bind(c, task), cs)
schedule(task)
if N > 0
for i in 1:5
@test put!(cs[i], 2) === 2
end
end
for i in 1:5
while isopen(cs[i])
yield()
end
@test_throws TaskFailedException(task) wait(cs[i])
@test_throws TaskFailedException(task) take!(cs[i])
@test_throws TaskFailedException(task) put!(cs[i], 1)
N == 0 || @test_throws TaskFailedException(task) fetch(cs[i])
N == 0 && @test_throws ErrorException fetch(cs[i])
end
# Multiple tasks, first one to terminate closes the channel
nth = rand(1:5)
ref = Ref(0)
tf3(i) = begin
GC.gc()
if i == nth
ref[] = i
else
sleep(2.0)
end
end
tasks = [Task(() -> tf3(i)) for i in 1:5]
c = Channel(N)
foreach(t -> bind(c, t), tasks)
foreach(schedule, tasks)
@test_throws InvalidStateException wait(c)
@test !isopen(c)
@test ref[] == nth
@assert !islocked(c.cond_take)
# channeled_tasks
for T in [Any, Int]
tf_chnls1 = (c1, c2) -> (@assert take!(c1) == 1; put!(c2, 2))
chnls, tasks = Base.channeled_tasks(2, tf_chnls1; ctypes=[T,T], csizes=[N,N])
put!(chnls[1], 1)
@test take!(chnls[2]) === 2
@test_throws InvalidStateException wait(chnls[1])
@test_throws InvalidStateException wait(chnls[2])
@test istaskdone(tasks[1])
@test !isopen(chnls[1])
@test !isopen(chnls[2])
f = Future()
tf4 = (c1, c2) -> begin
@assert take!(c1) === 1
wait(f)
end
tf5 = (c1, c2) -> begin
put!(c2, 2)
wait(f)
end
chnls, tasks = Base.channeled_tasks(2, tf4, tf5; ctypes=[T,T], csizes=[N,N])
put!(chnls[1], 1)
@test take!(chnls[2]) === 2
yield()
put!(f, 1) # allow tf4 and tf5 to exit after now, eventually closing the channel
@test_throws InvalidStateException wait(chnls[1])
@test_throws InvalidStateException wait(chnls[2])
@test istaskdone(tasks[1])
@test istaskdone(tasks[2])
@test !isopen(chnls[1])
@test !isopen(chnls[2])
end
# channel
tf6 = c -> begin
@assert take!(c) === 2
error("foo")
end
for T in [Any, Int]
taskref = Ref{Task}()
chnl = Channel{T}(tf6, N, taskref=taskref)
put!(chnl, 2)
yield()
@test_throws TaskFailedException(taskref[]) wait(chnl)
@test istaskdone(taskref[])
@test !isopen(chnl)
@test_throws TaskFailedException(taskref[]) take!(chnl)
end
end
@testset "timedwait" begin
alwaystrue() = true
alwaysfalse() = false
@test timedwait(alwaystrue, 0) === :ok
@test timedwait(alwaysfalse, 0) === :timed_out
@test_throws ArgumentError timedwait(alwaystrue, 0; pollint=0)
# Allowing a smaller positive `pollint` results in `timewait` hanging
@test_throws ArgumentError timedwait(alwaystrue, 0, pollint=1e-4)
# Callback passed in raises an exception
failure_cb = function (fail_on_call=1)
i = 0
function ()
i += 1
i >= fail_on_call && error("callback failed")
return false
end
end
@test_throws ErrorException("callback failed") timedwait(failure_cb(1), 0)
@test_throws ErrorException("callback failed") timedwait(failure_cb(2), 0)
# Validate that `timedwait` actually waits. Ideally we should also test that `timedwait`
# doesn't exceed a maximum duration but that would require guarantees from the OS.
duration = @elapsed timedwait(alwaysfalse, 1) # Using default pollint of 0.1
@test duration >= 1
duration = @elapsed timedwait(alwaysfalse, 0; pollint=1)
@test duration >= 1
end
@testset "timedwait on multiple channels" begin
Experimental.@sync begin
sync = Channel(1)
rr1 = Channel(1)
rr2 = Channel(1)
rr3 = Channel(1)
callback() = all(map(isready, [rr1, rr2, rr3]))
# precompile functions which will be tested for execution time
@test !callback()
@test timedwait(callback, 0) === :timed_out
@async begin put!(sync, :ready); sleep(0.5); put!(rr1, :ok) end
@async begin sleep(1.0); put!(rr2, :ok) end
@async begin @test take!(rr3) == :done end
@test take!(sync) == :ready
et = @elapsed timedwait(callback, 1)
@test et >= 1.0
@test isready(rr1)
put!(rr3, :done)
end
end
@testset "yield/wait/event failures" begin
# garbage_finalizer returns `nothing` rather than the garbage object so
# that the interpreter doesn't accidentally root the garbage when
# interpreting the calling function.
@noinline garbage_finalizer(f) = (finalizer(f, "gar" * "bage"); nothing)
run = Ref(0)
garbage_finalizer(Returns(nothing)) # warmup
@test GC.enable(false)
# test for finalizers trying to yield leading to failed attempts to context switch
garbage_finalizer((x) -> (run[] += 1; sleep(1)))
garbage_finalizer((x) -> (run[] += 1; yield()))
garbage_finalizer((x) -> (run[] += 1; yieldto(@task () -> ())))
t = @task begin
@test !GC.enable(true)
GC.gc()
true
end
oldstderr = stderr
newstderr = redirect_stderr()
local errstream
try
errstream = @async read(newstderr[1], String)
yield(t)
finally
redirect_stderr(oldstderr)
close(newstderr[2])
end
@test istaskdone(t)
@test fetch(t)
@test run[] == 3
output = fetch(errstream)
@test 3 == length(findall(
"""error in running finalizer: ErrorException("task switch not allowed from inside gc finalizer")""", output))
# test for invalid state in Workqueue during yield
t = @async nothing
t._state = 66
newstderr = redirect_stderr()
try
errstream = @async read(newstderr[1], String)
yield()
finally
redirect_stderr(oldstderr)
close(newstderr[2])
end
@test fetch(errstream) == "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state !== :runnable\n"
end
@testset "throwto" begin
t = @task(nothing)
ct = current_task()
testerr = ErrorException("expected")
@async Base.throwto(t, testerr)
@test (try
Base.wait(t)
false
catch ex
ex
end).task.exception === testerr
end
@testset "Timer / AsyncCondition triggering and race #12719" begin
let tc = Ref(0)
t = Timer(0) do t
tc[] += 1
end
cb = first(t.cond.waitq)
Libc.systemsleep(0.005)
@test isopen(t)
Base.process_events()
@test !isopen(t)
@test tc[] == 0
yield()
@test tc[] == 1
@test istaskdone(cb)
end
let tc = Ref(0)
t = Timer(0) do t
tc[] += 1
end
cb = first(t.cond.waitq)
Libc.systemsleep(0.005)
@test isopen(t)
close(t)
@test !isopen(t)
wait(cb)
@test tc[] == 0
@test t.handle === C_NULL
end
let tc = Ref(0)
async = Base.AsyncCondition() do async
tc[] += 1
end
cb = first(async.cond.waitq)
@test isopen(async)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
@test isempty(Base.Workqueue)
Base.process_events() # schedule event
Sys.iswindows() && Base.process_events() # schedule event (windows?)
@test length(Base.Workqueue) == 1
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
@test tc[] == 0
yield() # consume event
@test tc[] == 1
Sys.iswindows() && Base.process_events() # schedule event (windows?)
yield() # consume event
@test tc[] == 2
sleep(0.1) # no further events
@test tc[] == 2
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
Base.process_events() # schedule event
Sys.iswindows() && Base.process_events() # schedule event (windows?)
close(async) # and close
@test !isopen(async)
@test tc[] == 3
@test tc[] == 3
yield() # consume event & then close
@test tc[] == 3
sleep(0.1) # no further events
wait(cb)
@test tc[] == 3
@test async.handle === C_NULL
end
let tc = Ref(0)
async = Base.AsyncCondition() do async
tc[] += 1
end
cb = first(async.cond.waitq)
@test isopen(async)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
Base.process_events() # schedule event
Sys.iswindows() && Base.process_events() # schedule event (windows)
close(async)
@test !isopen(async)
Base.process_events() # and close
@test tc[] == 1
yield() # consume event & then close
@test tc[] == 1
sleep(0.1) # no further events
wait(cb)
@test tc[] == 1
@test async.handle === C_NULL
end
end
@testset "check_channel_state" begin
c = Channel(1)
close(c)
@test !isopen(c)
c.excp == nothing # to trigger the branch
@test_throws InvalidStateException Base.check_channel_state(c)
end
# PR #36641
# Ensure that `isempty()` does not mutate a Channel's state:
@testset "isempty(::Channel) mutation" begin
function isempty_timeout(c::Channel)
inner_c = Channel{Union{Bool,Nothing}}()
@async put!(inner_c, isempty(c))
@async begin
sleep(0.01)
if isopen(inner_c)
put!(inner_c, nothing)
end
end
result = take!(inner_c)
if result === nothing
error("isempty() timed out!")
end
return result
end
# First, with a non-buffered channel
c = Channel()
@test isempty_timeout(c)
t_put = @async put!(c, 1)
@test !isempty_timeout(c)
# check a second time to ensure `isempty(c)` didn't just consume the element.
@test !isempty_timeout(c)
@test take!(c) == 1
@test isempty_timeout(c)
wait(t_put)
# Next, with a buffered channel:
c = Channel(2)
@test isempty_timeout(c)
t_put = put!(c, 1)
@test !isempty_timeout(c)
@test !isempty_timeout(c)
@test take!(c) == 1
@test isempty_timeout(c)
end
# issue #12473
# make sure 1-shot timers work
let a = []
Timer(t -> push!(a, 1), 0.01, interval = 0)
sleep(0.2)
@test a == [1]
end
# make sure that we don't accidentally create a one-shot timer
let
t = Timer(Returns(nothing), 10, interval=0.00001)
@test ccall(:uv_timer_get_repeat, UInt64, (Ptr{Cvoid},), t) == 1
close(t)
end
# make sure repeating timers work
@noinline function make_unrooted_timer(a)
t = Timer(0.0, interval = 0.1)
finalizer(t -> a[] += 1, t)
wait(t)
e = @elapsed for i = 1:5
wait(t)
end
@test e >= 0.4
@test a[] == 0
nothing
end
let a = Ref(0)
make_unrooted_timer(a)
GC.gc()
@test a[] == 1
end
# trying to `schedule` a finished task
let t = @async nothing
wait(t)
@test_throws ErrorException("schedule: Task not runnable") schedule(t, nothing)
end
@testset "push!(c, v) -> c" begin
c = Channel(Inf)
@test push!(c, nothing) === c
end
# Channel `show`
let c = Channel(3)
@test repr(c) == "Channel{Any}(3)"
@test repr(MIME("text/plain"), c) == "Channel{Any}(3) (empty)"
put!(c, 0)
@test repr(MIME("text/plain"), c) == "Channel{Any}(3) (1 item available)"
put!(c, 1)
@test repr(MIME("text/plain"), c) == "Channel{Any}(3) (2 items available)"
close(c)
@test repr(MIME("text/plain"), c) == "Channel{Any}(3) (closed)"
end
# PR #41833: data races in Channel
@testset "n_avail(::Channel)" begin
# Buffered: n_avail() = buffer length + number of waiting tasks
let c = Channel(2)
@test n_avail(c) == 0; put!(c, 0)
@test n_avail(c) == 1; put!(c, 0)
@test n_avail(c) == 2; t1 = @task put!(c, 0); yield(t1)
@test n_avail(c) == 3; t2 = @task put!(c, 0); yield(t2)
@test n_avail(c) == 4
# Test n_avail(c) after interrupting a task waiting on the channel
t3 = @task put!(c, 0)
yield(t3)
@test n_avail(c) == 5
@async Base.throwto(t3, ErrorException("Exit put!"))
try wait(t3) catch end
@test n_avail(c) == 4
close(c)
try wait(t1) catch end
try wait(t2) catch end
@test n_avail(c) == 2 # Already-buffered items remain
end
# Unbuffered: n_avail() = number of waiting tasks
let c = Channel()
@test n_avail(c) == 0; t1 = @task put!(c, 0); yield(t1)
@test n_avail(c) == 1; t2 = @task put!(c, 0); yield(t2)
@test n_avail(c) == 2
# Test n_avail(c) after interrupting a task waiting on the channel
t3 = @task put!(c, 0)
yield(t3)
@test n_avail(c) == 3
@async Base.throwto(t3, ErrorException("Exit put!"))
try wait(t3) catch end
@test n_avail(c) == 2
close(c)
try wait(t1) catch end
try wait(t2) catch end
@test n_avail(c) == 0
end
end
@testset "Task properties" begin
f() = rand(2,2)
t = Task(f)
message = "Querying a Task's `scope` field is disallowed.\nThe private `Core.current_scope()` function is better, though still an implementation detail."
@test_throws ErrorException(message) t.scope
@test t.state == :runnable
end