https://github.com/JuliaLang/julia
Tip revision: 63b33a2ef51b66d06cc075f9a36efb480515f887 authored by Tony Kelman on 04 November 2016, 23:38:30 UTC
Homebrew changed tap layouts
Homebrew changed tap layouts
Tip revision: 63b33a2
task.jl
# This file is a part of Julia. License is MIT: http://julialang.org/license
## basic task functions and TLS
# allow tasks to be constructed with arbitrary function objects
Task(f) = Task(()->f())
function show(io::IO, t::Task)
print(io, "Task ($(t.state)) @0x$(hex(convert(UInt, pointer_from_objref(t)), WORD_SIZE>>2))")
if t.state == :failed
println(io)
show(io, CapturedException(t.result, t.backtrace))
end
end
# Container for a captured exception and its backtrace. Can be serialized.
type CapturedException
ex::Any
processed_bt::Vector{Any}
function CapturedException(ex, bt_raw)
# bt_raw MUST be an Array of code pointers than can be processed by jl_lookup_code_address
# Typically the result of a catch_backtrace()
# Process bt_raw so that it can be safely serialized
bt_lines = Any[]
process_func(name, file, line, inlined_file, inlined_line, n) =
push!(bt_lines, (name, file, line, inlined_file, inlined_line, n))
process_backtrace(process_func, :(:), bt_raw, 1:100) # Limiting this to 100 lines.
new(ex, bt_lines)
end
end
showerror(io::IO, ce::CapturedException) = showerror(io, ce.ex, ce.processed_bt, backtrace=true)
type CompositeException <: Exception
exceptions::Array
CompositeException() = new(Any[])
end
length(c::CompositeException) = length(c.exceptions)
push!(c::CompositeException, ex) = push!(c.exceptions, ex)
function showerror(io::IO, ex::CompositeException)
if length(ex) > 0
showerror(io, ex.exceptions[1])
remaining = length(ex) - 1
if remaining > 0
print(io, "\n\n...and $remaining other exceptions.\n")
end
else
print(io, "CompositeException()\n")
end
end
macro task(ex)
:(Task(()->$(esc(ex))))
end
# schedule an expression to run asynchronously, with minimal ceremony
macro schedule(expr)
expr = :(()->($expr))
:(enq_work(Task($(esc(expr)))))
end
current_task() = ccall(:jl_get_current_task, Any, ())::Task
istaskdone(t::Task) = ((t.state == :done) | (t.state == :failed))
istaskstarted(t::Task) = isdefined(t, :last)
yieldto(t::Task, x::ANY = nothing) = ccall(:jl_switchto, Any, (Any, Any), t, x)
# yield to a task, throwing an exception in it
function throwto(t::Task, exc)
t.exception = exc
yieldto(t)
end
task_local_storage() = get_task_tls(current_task())
function get_task_tls(t::Task)
if is(t.storage, nothing)
t.storage = ObjectIdDict()
end
(t.storage)::ObjectIdDict
end
task_local_storage(key) = task_local_storage()[key]
task_local_storage(key, val) = (task_local_storage()[key] = val)
function task_local_storage(body::Function, key, val)
tls = task_local_storage()
hadkey = haskey(tls,key)
old = get(tls,key,nothing)
tls[key] = val
try body()
finally
hadkey ? (tls[key] = old) : delete!(tls,key)
end
end
# NOTE: you can only wait for scheduled tasks
function wait(t::Task)
if !istaskdone(t)
if is(t.donenotify, nothing)
t.donenotify = Condition()
end
end
while !istaskdone(t)
wait(t.donenotify)
end
if t.state == :failed
throw(t.exception)
end
return t.result
end
suppress_excp_printing(t::Task) = isa(t.storage, ObjectIdDict) ? get(get_task_tls(t), :SUPPRESS_EXCEPTION_PRINTING, false) : false
# runtime system hook called when a task finishes
function task_done_hook(t::Task)
err = (t.state == :failed)
result = t.result
nexttask = t.last
handled = true
if err
t.backtrace = catch_backtrace()
end
q = t.consumers
#### un-optimized version
#isa(q,Condition) && notify(q, result, error=err)
if isa(q,Task)
nexttask = q
nexttask.state = :runnable
elseif isa(q,Condition) && !isempty(q.waitq)
notify(q, result, error=err)
else
handled = false
end
t.consumers = nothing
if isa(t.donenotify,Condition)
handled |= !isempty(t.donenotify.waitq)
notify(t.donenotify, result, error=err)
end
if nexttask.state == :runnable
if err
nexttask.exception = result
end
yieldto(nexttask, result)
else
if err && !handled
if isa(result,InterruptException) && isdefined(Base,:active_repl_backend) &&
active_repl_backend.backend_task.state == :waiting && isempty(Workqueue) &&
active_repl_backend.in_eval
throwto(active_repl_backend.backend_task, result)
end
if !suppress_excp_printing(t)
let bt = t.backtrace
# run a new task to print the error for us
@schedule with_output_color(:red, STDERR) do io
print(io, "ERROR (unhandled task failure): ")
showerror(io, result, bt)
println(io)
end
end
end
end
# if a finished task accidentally gets into the queue, wait()
# could return. in that case just take the next task off the queue.
while true
wait()
end
end
end
## produce, consume, and task iteration
function produce(v)
#### un-optimized version
#q = current_task().consumers
#t = shift!(q.waitq)
#empty = isempty(q.waitq)
ct = current_task()
local empty, t, q
while true
q = ct.consumers
if isa(q,Task)
t = q
ct.consumers = nothing
empty = true
break
elseif isa(q,Condition) && !isempty(q.waitq)
t = shift!(q.waitq)
empty = isempty(q.waitq)
break
end
wait()
end
t.state = :runnable
if empty
if isempty(Workqueue)
yieldto(t, v)
else
schedule_and_wait(t, v)
end
while true
# wait until there are more consumers
q = ct.consumers
if isa(q,Task)
return q.result
elseif isa(q,Condition) && !isempty(q.waitq)
return q.waitq[1].result
end
wait()
end
else
schedule(t, v)
# make sure `t` runs before us. otherwise, the producer might
# finish before `t` runs again, causing it to see the producer
# as done, causing done(::Task, _) to miss the value `v`.
# see issue #7727
yield()
return q.waitq[1].result
end
end
produce(v...) = produce(v)
function consume(P::Task, values...)
if istaskdone(P)
return wait(P)
end
ct = current_task()
ct.result = length(values)==1 ? values[1] : values
#### un-optimized version
#if P.consumers === nothing
# P.consumers = Condition()
#end
#push!(P.consumers.waitq, ct)
# optimized version that avoids the queue for 1 consumer
if P.consumers === nothing || (isa(P.consumers,Condition)&&isempty(P.consumers.waitq))
P.consumers = ct
else
if isa(P.consumers, Task)
t = P.consumers
P.consumers = Condition()
push!(P.consumers.waitq, t)
end
push!(P.consumers.waitq, ct)
end
ct.state = :waiting
schedule_and_wait(P)
end
start(t::Task) = nothing
function done(t::Task, val)
t.result = consume(t)
istaskdone(t)
end
next(t::Task, val) = (t.result, nothing)
isempty(::Task) = error("isempty not defined for Tasks")
## condition variables
type Condition
waitq::Vector{Any}
Condition() = new([])
end
function wait(c::Condition)
ct = current_task()
ct.state = :waiting
push!(c.waitq, ct)
try
return wait()
catch
filter!(x->x!==ct, c.waitq)
if ct.state == :waiting
ct.state = :runnable
end
rethrow()
end
end
notify(c::Condition, arg::ANY=nothing; all=true, error=false) = notify(c, arg, all, error)
function notify(c::Condition, arg, all, error)
if all
for t in c.waitq
schedule(t, arg, error=error)
end
empty!(c.waitq)
elseif !isempty(c.waitq)
t = shift!(c.waitq)
schedule(t, arg, error=error)
end
nothing
end
notify1(c::Condition, arg=nothing) = notify(c, arg, all=false)
notify_error(c::Condition, err) = notify(c, err, error=true)
notify1_error(c::Condition, err) = notify(c, err, error=true, all=false)
## scheduler and work queue
global const Workqueue = Any[]
function enq_work(t::Task)
ccall(:uv_stop,Void,(Ptr{Void},),eventloop())
push!(Workqueue, t)
t.state = :queued
t
end
schedule(t::Task) = enq_work(t)
function schedule(t::Task, arg; error=false)
# schedule a task to be (re)started with the given value or exception
if error
t.exception = arg
else
t.result = arg
end
enq_work(t)
end
# fast version of schedule(t,v);wait()
function schedule_and_wait(t, v=nothing)
if isempty(Workqueue)
if t.state == :runnable
return yieldto(t, v)
end
else
if t.state == :runnable
t.result = v
push!(Workqueue, t)
t.state = :queued
end
end
wait()
end
yield() = (enq_work(current_task()); wait())
function wait()
while true
if isempty(Workqueue)
c = process_events(true)
if c==0 && eventloop()!=C_NULL && isempty(Workqueue)
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
t = shift!(Workqueue)
arg = t.result
t.result = nothing
t.state = :runnable
result = yieldto(t, arg)
process_events(false)
# return when we come out of the queue
return result
end
end
assert(false)
end
function pause()
@unix_only ccall(:pause, Void, ())
@windows_only ccall(:Sleep,stdcall, Void, (UInt32,), 0xffffffff)
end
## dynamically-scoped waiting for multiple items
sync_begin() = task_local_storage(:SPAWNS, ([], get(task_local_storage(), :SPAWNS, ())))
function sync_end()
spawns = get(task_local_storage(), :SPAWNS, ())
if is(spawns,())
error("sync_end() without sync_begin()")
end
refs = spawns[1]
task_local_storage(:SPAWNS, spawns[2])
c_ex = CompositeException()
for r in refs
try
wait(r)
catch ex
if !isa(r, Task) || (isa(r, Task) && !(r.state == :failed))
rethrow(ex)
end
finally
if isa(r, Task) && (r.state == :failed)
push!(c_ex, CapturedException(r.result, r.backtrace))
end
end
end
if length(c_ex) > 0
throw(c_ex)
end
nothing
end
macro sync(block)
quote
sync_begin()
v = $(esc(block))
sync_end()
v
end
end
function sync_add(r)
spawns = get(task_local_storage(), :SPAWNS, ())
if !is(spawns,())
push!(spawns[1], r)
if isa(r, Task)
tls_r = get_task_tls(r)
tls_r[:SUPPRESS_EXCEPTION_PRINTING] = true
end
end
r
end
function async_run_thunk(thunk)
t = Task(thunk)
sync_add(t)
enq_work(t)
t
end
macro async(expr)
expr = localize_vars(:(()->($expr)), false)
:(async_run_thunk($(esc(expr))))
end