https://github.com/JuliaLang/julia
Raw File
Tip revision: c5e041241f2b175d4870a9b87f6c34b7f2591cb3 authored by Stefan Karpinski on 18 September 2015, 20:12:08 UTC
remove deprecated definition of String
Tip revision: c5e0412
task.jl
# This file is a part of Julia. License is MIT: http://julialang.org/license

## basic task functions and TLS

# allow tasks to be constructed with arbitrary function objects
Task(f) = Task(()->f())

function show(io::IO, t::Task)
    print(io, "Task ($(t.state)) @0x$(hex(convert(UInt, pointer_from_objref(t)), WORD_SIZE>>2))")
end

# Container for a captured exception and its backtrace. Can be serialized.
type CapturedException
    ex::Any
    processed_bt::Vector{Any}

    function CapturedException(ex, bt_raw)
        # bt_raw MUST be an Array of code pointers than can be processed by jl_lookup_code_address
        # Typically the result of a catch_backtrace()

        # Process bt_raw so that it can be safely serialized
        bt_lines = Any[]
        process_func(name, file, line, inlined_file, inlined_line, n) =
            push!(bt_lines, (name, file, line, inlined_file, inlined_line, n))
        process_backtrace(process_func, :(:), bt_raw, 1:100) # Limiting this to 100 lines.

        new(ex, bt_lines)
    end
end

showerror(io::IO, ce::CapturedException) = showerror(io, ce.ex, ce.processed_bt, backtrace=true)

type CompositeException <: Exception
    exceptions::Array
    CompositeException() = new(Any[])
end
length(c::CompositeException) = length(c.exceptions)
push!(c::CompositeException, ex) = push!(c.exceptions, ex)

function showerror(io::IO, ex::CompositeException)
    if length(ex) > 0
        showerror(io, ex.exceptions[1])
        remaining = length(ex) - 1
        if remaining > 0
            print(io, "\n\n...and $remaining other exceptions.\n")
        end
    else
        print(io, "CompositeException()\n")
    end
end


macro task(ex)
    :(Task(()->$(esc(ex))))
end

# schedule an expression to run asynchronously, with minimal ceremony
macro schedule(expr)
    expr = :(()->($expr))
    :(enq_work(Task($(esc(expr)))))
end

current_task() = ccall(:jl_get_current_task, Any, ())::Task
istaskdone(t::Task) = ((t.state == :done) | (t.state == :failed))
istaskstarted(t::Task) = isdefined(t, :last)

yieldto(t::Task, x::ANY = nothing) = ccall(:jl_switchto, Any, (Any, Any), t, x)

# yield to a task, throwing an exception in it
function throwto(t::Task, exc)
    t.exception = exc
    yieldto(t)
end

task_local_storage() = get_task_tls(current_task())
function get_task_tls(t::Task)
    if is(t.storage, nothing)
        t.storage = ObjectIdDict()
    end
    (t.storage)::ObjectIdDict
end
task_local_storage(key) = task_local_storage()[key]
task_local_storage(key, val) = (task_local_storage()[key] = val)

function task_local_storage(body::Function, key, val)
    tls = task_local_storage()
    hadkey = haskey(tls,key)
    old = get(tls,key,nothing)
    tls[key] = val
    try body()
    finally
        hadkey ? (tls[key] = old) : delete!(tls,key)
    end
end

# NOTE: you can only wait for scheduled tasks
function wait(t::Task)
    if !istaskdone(t)
        if is(t.donenotify, nothing)
            t.donenotify = Condition()
        end
    end
    while !istaskdone(t)
        wait(t.donenotify)
    end
    if t.state == :failed
        throw(t.exception)
    end
    return t.result
end

suppress_excp_printing(t::Task) = isa(t.storage, ObjectIdDict) ? get(get_task_tls(t), :SUPPRESS_EXCEPTION_PRINTING, false) : false

# runtime system hook called when a task finishes
function task_done_hook(t::Task)
    err = (t.state == :failed)
    result = t.result
    nexttask = t.last
    handled = true
    if err
        t.backtrace = catch_backtrace()
    end

    q = t.consumers

    #### un-optimized version
    #isa(q,Condition) && notify(q, result, error=err)
    if isa(q,Task)
        nexttask = q
        nexttask.state = :runnable
    elseif isa(q,Condition) && !isempty(q.waitq)
        notify(q, result, error=err)
    else
        handled = false
    end

    t.consumers = nothing

    if isa(t.donenotify,Condition)
        handled |= !isempty(t.donenotify.waitq)
        notify(t.donenotify, result, error=err)
    end

    if nexttask.state == :runnable
        if err
            nexttask.exception = result
        end
        yieldto(nexttask, result)
    else
        if err && !handled
            if isa(result,InterruptException) && isdefined(Base,:active_repl_backend) &&
                active_repl_backend.backend_task.state == :waiting && isempty(Workqueue) &&
                active_repl_backend.in_eval
                throwto(active_repl_backend.backend_task, result)
            end
            if !suppress_excp_printing(t)
                let bt = t.backtrace
                    # run a new task to print the error for us
                    @schedule with_output_color(:red, STDERR) do io
                        print(io, "ERROR (unhandled task failure): ")
                        showerror(io, result, bt)
                        println(io)
                    end
                end
            end
        end
        # if a finished task accidentally gets into the queue, wait()
        # could return. in that case just take the next task off the queue.
        while true
            wait()
        end
    end
end


## produce, consume, and task iteration

function produce(v)
    #### un-optimized version
    #q = current_task().consumers
    #t = shift!(q.waitq)
    #empty = isempty(q.waitq)
    ct = current_task()
    local empty, t, q
    while true
        q = ct.consumers
        if isa(q,Task)
            t = q
            ct.consumers = nothing
            empty = true
            break
        elseif isa(q,Condition) && !isempty(q.waitq)
            t = shift!(q.waitq)
            empty = isempty(q.waitq)
            break
        end
        wait()
    end

    t.state = :runnable
    if empty
        if isempty(Workqueue)
            yieldto(t, v)
        else
            schedule_and_wait(t, v)
        end
        while true
            # wait until there are more consumers
            q = ct.consumers
            if isa(q,Task)
                return q.result
            elseif isa(q,Condition) && !isempty(q.waitq)
                return q.waitq[1].result
            end
            wait()
        end
    else
        schedule(t, v)
        # make sure `t` runs before us. otherwise, the producer might
        # finish before `t` runs again, causing it to see the producer
        # as done, causing done(::Task, _) to miss the value `v`.
        # see issue #7727
        yield()
        return q.waitq[1].result
    end
end
produce(v...) = produce(v)

function consume(P::Task, values...)
    if istaskdone(P)
        return wait(P)
    end

    ct = current_task()
    ct.result = length(values)==1 ? values[1] : values

    #### un-optimized version
    #if P.consumers === nothing
    #    P.consumers = Condition()
    #end
    #push!(P.consumers.waitq, ct)
    # optimized version that avoids the queue for 1 consumer
    if P.consumers === nothing || (isa(P.consumers,Condition)&&isempty(P.consumers.waitq))
        P.consumers = ct
    else
        if isa(P.consumers, Task)
            t = P.consumers
            P.consumers = Condition()
            push!(P.consumers.waitq, t)
        end
        push!(P.consumers.waitq, ct)
    end
    ct.state = :waiting

    schedule_and_wait(P)
end

start(t::Task) = nothing
function done(t::Task, val)
    t.result = consume(t)
    istaskdone(t)
end
next(t::Task, val) = (t.result, nothing)

isempty(::Task) = error("isempty not defined for Tasks")

## condition variables

type Condition
    waitq::Vector{Any}

    Condition() = new([])
end

function wait(c::Condition)
    ct = current_task()

    ct.state = :waiting
    push!(c.waitq, ct)

    try
        return wait()
    catch
        filter!(x->x!==ct, c.waitq)
        if ct.state == :waiting
            ct.state = :runnable
        end
        rethrow()
    end
end

notify(c::Condition, arg::ANY=nothing; all=true, error=false) = notify(c, arg, all, error)
function notify(c::Condition, arg, all, error)
    if all
        for t in c.waitq
            schedule(t, arg, error=error)
        end
        empty!(c.waitq)
    elseif !isempty(c.waitq)
        t = shift!(c.waitq)
        schedule(t, arg, error=error)
    end
    nothing
end

notify1(c::Condition, arg=nothing) = notify(c, arg, all=false)

notify_error(c::Condition, err) = notify(c, err, error=true)
notify1_error(c::Condition, err) = notify(c, err, error=true, all=false)


## scheduler and work queue

global const Workqueue = Any[]

function enq_work(t::Task)
    ccall(:uv_stop,Void,(Ptr{Void},),eventloop())
    push!(Workqueue, t)
    t.state = :queued
    t
end

schedule(t::Task) = enq_work(t)

function schedule(t::Task, arg; error=false)
    # schedule a task to be (re)started with the given value or exception
    if error
        t.exception = arg
    else
        t.result = arg
    end
    enq_work(t)
end

# fast version of schedule(t,v);wait()
function schedule_and_wait(t, v=nothing)
    if isempty(Workqueue)
        if t.state == :runnable
            return yieldto(t, v)
        end
    else
        if t.state == :runnable
            t.result = v
            push!(Workqueue, t)
            t.state = :queued
        end
    end
    wait()
end

yield() = (enq_work(current_task()); wait())

function wait()
    while true
        if isempty(Workqueue)
            c = process_events(true)
            if c==0 && eventloop()!=C_NULL && isempty(Workqueue)
                # if there are no active handles and no runnable tasks, just
                # wait for signals.
                pause()
            end
        else
            t = shift!(Workqueue)
            arg = t.result
            t.result = nothing
            t.state = :runnable
            result = yieldto(t, arg)
            process_events(false)
            # return when we come out of the queue
            return result
        end
    end
    assert(false)
end

function pause()
    @unix_only    ccall(:pause, Void, ())
    @windows_only ccall(:Sleep,stdcall, Void, (UInt32,), 0xffffffff)
end


## dynamically-scoped waiting for multiple items
sync_begin() = task_local_storage(:SPAWNS, ([], get(task_local_storage(), :SPAWNS, ())))

function sync_end()
    spawns = get(task_local_storage(), :SPAWNS, ())
    if is(spawns,())
        error("sync_end() without sync_begin()")
    end
    refs = spawns[1]
    task_local_storage(:SPAWNS, spawns[2])

    c_ex = CompositeException()
    for r in refs
        try
            wait(r)
        catch ex
            if !isa(r, Task) || (isa(r, Task) && !(r.state == :failed))
                rethrow(ex)
            end
        finally
            if isa(r, Task) && (r.state == :failed)
                push!(c_ex, CapturedException(r.result, r.backtrace))
            end
        end
    end

    if length(c_ex) > 0
        throw(c_ex)
    end
    nothing
end

macro sync(block)
    quote
        sync_begin()
        v = $(esc(block))
        sync_end()
        v
    end
end

function sync_add(r)
    spawns = get(task_local_storage(), :SPAWNS, ())
    if !is(spawns,())
        push!(spawns[1], r)
        if isa(r, Task)
            tls_r = get_task_tls(r)
            tls_r[:SUPPRESS_EXCEPTION_PRINTING] = true
        end
    end
    r
end

function async_run_thunk(thunk)
    t = Task(thunk)
    sync_add(t)
    enq_work(t)
    t
end

macro async(expr)
    expr = localize_vars(:(()->($expr)), false)
    :(async_run_thunk($(esc(expr))))
end
back to top