https://github.com/JuliaLang/julia
Raw File
Tip revision: de5cbe1eae46332429d4ea1948f7cdb6ed9088d0 authored by Jameson Nash on 10 November 2016, 06:46:22 UTC
simplify scoping rules
Tip revision: de5cbe1
task.jl
# This file is a part of Julia. License is MIT: http://julialang.org/license

## basic task functions and TLS

# Container for a captured exception and its backtrace. Can be serialized.
type CapturedException <: Exception
    ex::Any
    processed_bt::Vector{Any}

    function CapturedException(ex, bt_raw)
        # bt_raw MUST be an Array of code pointers than can be processed by jl_lookup_code_address
        # Typically the result of a catch_backtrace()

        # Process bt_raw so that it can be safely serialized
        bt_lines = Any[]
        process_func(args...) = push!(bt_lines, args)
        process_backtrace(process_func, bt_raw, 100) # Limiting this to 100 lines.

        new(ex, bt_lines)
    end
end

showerror(io::IO, ce::CapturedException) = showerror(io, ce.ex, ce.processed_bt, backtrace=true)

type CompositeException <: Exception
    exceptions::Vector{Any}
    CompositeException() = new(Any[])
    CompositeException(exceptions) = new(exceptions)
end
length(c::CompositeException) = length(c.exceptions)
push!(c::CompositeException, ex) = push!(c.exceptions, ex)
isempty(c::CompositeException) = isempty(c.exceptions)
start(c::CompositeException) = start(c.exceptions)
next(c::CompositeException, state) = next(c.exceptions, state)
done(c::CompositeException, state) = done(c.exceptions, state)

function showerror(io::IO, ex::CompositeException)
    if !isempty(ex)
        showerror(io, ex.exceptions[1])
        remaining = length(ex) - 1
        if remaining > 0
            print(io, "\n\n...and $remaining other exceptions.\n")
        end
    else
        print(io, "CompositeException()\n")
    end
end

function show(io::IO, t::Task)
    print(io, "Task ($(t.state)) @0x$(hex(convert(UInt, pointer_from_objref(t)), Sys.WORD_SIZE>>2))")
end

"""
    @task

Wrap an expression in a [`Task`](:class:`Task`) without executing it, and return the [`Task`](:class:`Task`). This only
creates a task, and does not run it.
"""
macro task(ex)
    :(Task(()->$(esc(ex))))
end

"""
    current_task()

Get the currently running [`Task`](:class:`Task`).
"""
current_task() = ccall(:jl_get_current_task, Ref{Task}, ())

"""
    istaskdone(task) -> Bool

Determine whether a task has exited.
"""
istaskdone(t::Task) = ((t.state == :done) | (t.state == :failed))

"""
    istaskstarted(task) -> Bool

Determine whether a task has started executing.
"""
istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0

task_local_storage() = get_task_tls(current_task())
function get_task_tls(t::Task)
    if t.storage === nothing
        t.storage = ObjectIdDict()
    end
    (t.storage)::ObjectIdDict
end

"""
    task_local_storage(key)

Look up the value of a key in the current task's task-local storage.
"""
task_local_storage(key) = task_local_storage()[key]

"""
    task_local_storage(key, value)

Assign a value to a key in the current task's task-local storage.
"""
task_local_storage(key, val) = (task_local_storage()[key] = val)

"""
    task_local_storage(body, key, value)

Call the function `body` with a modified task-local storage, in which `value` is assigned to
`key`; the previous value of `key`, or lack thereof, is restored afterwards. Useful
for emulating dynamic scoping.
"""
function task_local_storage(body::Function, key, val)
    tls = task_local_storage()
    hadkey = haskey(tls,key)
    old = get(tls,key,nothing)
    tls[key] = val
    try body()
    finally
        hadkey ? (tls[key] = old) : delete!(tls,key)
    end
end

# NOTE: you can only wait for scheduled tasks
function wait(t::Task)
    if !istaskdone(t)
        if t.donenotify === nothing
            t.donenotify = Condition()
        end
    end
    while !istaskdone(t)
        wait(t.donenotify)
    end
    if t.state == :failed
        throw(t.exception)
    end
    return t.result
end

suppress_excp_printing(t::Task) = isa(t.storage, ObjectIdDict) ? get(get_task_tls(t), :SUPPRESS_EXCEPTION_PRINTING, false) : false

# runtime system hook called when a task finishes
function task_done_hook(t::Task)
    # `finish_task` sets `sigatomic` before entering this function
    err = (t.state == :failed)
    result = t.result
    handled = false
    if err
        t.backtrace = catch_backtrace()
    end

    q = t.consumers
    t.consumers = nothing

    if isa(t.donenotify, Condition) && !isempty(t.donenotify.waitq)
        handled = true
        notify(t.donenotify, result, error=err)
    end

    #### un-optimized version
    #isa(q,Condition) && notify(q, result, error=err)
    if isa(q,Task)
        handled = true
        nexttask = q
        nexttask.state = :runnable
        if err
            nexttask.exception = result
        end
        yieldto(nexttask, result) # this terminates the task
    elseif isa(q,Condition) && !isempty(q.waitq)
        handled = true
        notify(q, result, error=err)
    end

    if err && !handled
        if isa(result,InterruptException) && isdefined(Base,:active_repl_backend) &&
            active_repl_backend.backend_task.state == :runnable && isempty(Workqueue) &&
            active_repl_backend.in_eval
            throwto(active_repl_backend.backend_task, result)
        end
        if !suppress_excp_printing(t)
            let bt = t.backtrace
                # run a new task to print the error for us
                @schedule with_output_color(:red, STDERR) do io
                    print(io, "ERROR (unhandled task failure): ")
                    showerror(io, result, bt)
                    println(io)
                end
            end
        end
    end
    # Clear sigatomic before waiting
    sigatomic_end()
    wait()
end


## produce, consume, and task iteration

function produce(v)
    #### un-optimized version
    #q = current_task().consumers
    #t = shift!(q.waitq)
    #empty = isempty(q.waitq)
    ct = current_task()
    local empty, t, q
    while true
        q = ct.consumers
        if isa(q,Task)
            t = q
            ct.consumers = nothing
            empty = true
            break
        elseif isa(q,Condition) && !isempty(q.waitq)
            t = shift!(q.waitq)
            empty = isempty(q.waitq)
            break
        end
        wait()
    end

    t.state == :runnable || throw(AssertionError("producer.consumer.state == :runnable"))
    if empty
        schedule_and_wait(t, v)
        while true
            # wait until there are more consumers
            q = ct.consumers
            if isa(q,Task)
                return q.result
            elseif isa(q,Condition) && !isempty(q.waitq)
                return q.waitq[1].result
            end
            wait()
        end
    else
        schedule(t, v)
        # make sure `t` runs before us. otherwise, the producer might
        # finish before `t` runs again, causing it to see the producer
        # as done, causing done(::Task, _) to miss the value `v`.
        # see issue #7727
        yield()
        return q.waitq[1].result
    end
end
produce(v...) = produce(v)

function consume(P::Task, values...)
    if istaskdone(P)
        return wait(P)
    end

    ct = current_task()
    ct.result = length(values)==1 ? values[1] : values

    #### un-optimized version
    #if P.consumers === nothing
    #    P.consumers = Condition()
    #end
    #push!(P.consumers.waitq, ct)
    # optimized version that avoids the queue for 1 consumer
    if P.consumers === nothing || (isa(P.consumers,Condition)&&isempty(P.consumers.waitq))
        P.consumers = ct
    else
        if isa(P.consumers, Task)
            t = P.consumers
            P.consumers = Condition()
            push!(P.consumers.waitq, t)
        end
        push!(P.consumers.waitq, ct)
    end

    P.state == :runnable ? schedule_and_wait(P) : wait() # don't attempt to queue it twice
end

start(t::Task) = nothing
function done(t::Task, val)
    t.result = consume(t)
    istaskdone(t)
end
next(t::Task, val) = (t.result, nothing)
iteratorsize(::Type{Task}) = SizeUnknown()
iteratoreltype(::Type{Task}) = EltypeUnknown()

isempty(::Task) = error("isempty not defined for Tasks")


## dynamically-scoped waiting for multiple items
sync_begin() = task_local_storage(:SPAWNS, ([], get(task_local_storage(), :SPAWNS, ())))

function sync_end()
    spawns = get(task_local_storage(), :SPAWNS, ())
    if spawns === ()
        error("sync_end() without sync_begin()")
    end
    refs = spawns[1]
    task_local_storage(:SPAWNS, spawns[2])

    c_ex = CompositeException()
    for r in refs
        try
            wait(r)
        catch ex
            if !isa(r, Task) || (isa(r, Task) && !(r.state == :failed))
                rethrow(ex)
            end
        finally
            if isa(r, Task) && (r.state == :failed)
                push!(c_ex, CapturedException(r.result, r.backtrace))
            end
        end
    end

    if !isempty(c_ex)
        throw(c_ex)
    end
    nothing
end

"""
    @sync

Wait until all dynamically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@parallel`
are complete. All exceptions thrown by enclosed async operations are collected and thrown as
a `CompositeException`.
"""
macro sync(block)
    quote
        sync_begin()
        v = $(esc(block))
        sync_end()
        v
    end
end

function sync_add(r)
    spawns = get(task_local_storage(), :SPAWNS, ())
    if spawns !== ()
        push!(spawns[1], r)
        if isa(r, Task)
            tls_r = get_task_tls(r)
            tls_r[:SUPPRESS_EXCEPTION_PRINTING] = true
        end
    end
    r
end

function async_run_thunk(thunk)
    t = Task(thunk)
    sync_add(t)
    enq_work(t)
    t
end

"""
    @async

Like `@schedule`, `@async` wraps an expression in a `Task` and adds it to the local
machine's scheduler queue. Additionally it adds the task to the set of items that the
nearest enclosing `@sync` waits for. `@async` also wraps the expression in a `let x=x, y=y, ...`
block to create a new scope with copies of all variables referenced in the expression.
"""
macro async(expr)
    expr = localize_vars(esc(:(()->($expr))), false)
    :(async_run_thunk($expr))
end
back to top