# This file is a part of Julia. License is MIT: https://julialang.org/license ## basic task functions and TLS Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer()) # Container for a captured exception and its backtrace. Can be serialized. struct CapturedException <: Exception ex::Any processed_bt::Vector{Any} function CapturedException(ex, bt_raw::Vector) # bt_raw MUST be a vector that can be processed by StackTraces.stacktrace # Typically the result of a catch_backtrace() # Process bt_raw so that it can be safely serialized bt_lines = process_backtrace(bt_raw, 100) # Limiting this to 100 lines. CapturedException(ex, bt_lines) end CapturedException(ex, processed_bt::Vector{Any}) = new(ex, processed_bt) end function showerror(io::IO, ce::CapturedException) showerror(io, ce.ex, ce.processed_bt, backtrace=true) end """ capture_exception(ex, bt) -> Exception Returns an exception, possibly incorporating information from a backtrace `bt`. Defaults to returning [`CapturedException(ex, bt)`](@ref). Used in [`asyncmap`](@ref) and [`asyncmap!`](@ref) to capture exceptions thrown during the user-supplied function call. """ capture_exception(ex, bt) = CapturedException(ex, bt) """ CompositeException Wrap a `Vector` of exceptions thrown by a [`Task`](@ref) (e.g. generated from a remote worker over a channel or an asynchronously executing local I/O write or a remote worker under `pmap`) with information about the series of exceptions. For example, if a group of workers are executing several tasks, and multiple workers fail, the resulting `CompositeException` will contain a "bundle" of information from each worker indicating where and why the exception(s) occurred. """ struct CompositeException <: Exception exceptions::Vector{Any} CompositeException() = new(Any[]) CompositeException(exceptions) = new(exceptions) end length(c::CompositeException) = length(c.exceptions) push!(c::CompositeException, ex) = push!(c.exceptions, ex) pushfirst!(c::CompositeException, ex) = pushfirst!(c.exceptions, ex) isempty(c::CompositeException) = isempty(c.exceptions) iterate(c::CompositeException, state...) = iterate(c.exceptions, state...) eltype(::Type{CompositeException}) = Any function showerror(io::IO, ex::CompositeException) if !isempty(ex) showerror(io, ex.exceptions[1]) remaining = length(ex) - 1 if remaining > 0 print(io, "\n\n...and ", remaining, " more exception", remaining > 1 ? "s" : "", ".\n") end else print(io, "CompositeException()\n") end end """ TaskFailedException This exception is thrown by a [`wait(t)`](@ref) call when task `t` fails. `TaskFailedException` wraps the failed task `t`. """ struct TaskFailedException <: Exception task::Task end function showerror(io::IO, ex::TaskFailedException, bt = nothing; backtrace=true) print(io, "TaskFailedException") if bt !== nothing && backtrace show_backtrace(io, bt) end println(io) printstyled(io, "\n nested task error: ", color=error_color()) show_task_exception(io, ex.task) end function show_task_exception(io::IO, t::Task; indent = true) stack = current_exceptions(t) b = IOBuffer() if isempty(stack) # exception stack buffer not available; probably a serialized task showerror(IOContext(b, io), t.result) else show_exception_stack(IOContext(b, io), stack) end str = String(take!(b)) if indent str = replace(str, "\n" => "\n ") end print(io, str) end function show(io::IO, t::Task) state = t.state state_str = "$state" * ((state == :runnable && istaskstarted(t)) ? ", started" : "") print(io, "Task ($state_str) @0x$(string(convert(UInt, pointer_from_objref(t)), base = 16, pad = Sys.WORD_SIZE>>2))") end """ @task Wrap an expression in a [`Task`](@ref) without executing it, and return the [`Task`](@ref). This only creates a task, and does not run it. # Examples ```jldoctest julia> a1() = sum(i for i in 1:1000); julia> b = @task a1(); julia> istaskstarted(b) false julia> schedule(b); julia> yield(); julia> istaskdone(b) true ``` """ macro task(ex) thunk = Base.replace_linenums!(:(()->$(esc(ex))), __source__) :(Task($thunk)) end """ current_task() Get the currently running [`Task`](@ref). """ current_task() = ccall(:jl_get_current_task, Ref{Task}, ()) # task states const task_state_runnable = UInt8(0) const task_state_done = UInt8(1) const task_state_failed = UInt8(2) const _state_index = findfirst(==(:_state), fieldnames(Task)) @eval function load_state_acquire(t) # TODO: Replace this by proper atomic operations when available @GC.preserve t llvmcall($(""" %ptr = inttoptr i$(Sys.WORD_SIZE) %0 to i8* %rv = load atomic i8, i8* %ptr acquire, align 8 ret i8 %rv """), UInt8, Tuple{Ptr{UInt8}}, Ptr{UInt8}(pointer_from_objref(t) + fieldoffset(Task, _state_index))) end @inline function getproperty(t::Task, field::Symbol) if field === :state # TODO: this field name should be deprecated in 2.0 st = load_state_acquire(t) if st === task_state_runnable return :runnable elseif st === task_state_done return :done elseif st === task_state_failed return :failed else @assert false end elseif field === :backtrace # TODO: this field name should be deprecated in 2.0 return current_exceptions(t)[end][2] elseif field === :exception # TODO: this field name should be deprecated in 2.0 return t._isexception ? t.result : nothing elseif field === :scope error("Querying `scope` is disallowed. Use `current_scope` instead.") else return getfield(t, field) end end @inline function setproperty!(t::Task, field::Symbol, @nospecialize(v)) if field === :scope istaskstarted(t) && error("Setting scope on a started task directly is disallowed.") end return @invoke setproperty!(t::Any, field::Symbol, v::Any) end """ istaskdone(t::Task) -> Bool Determine whether a task has exited. # Examples ```jldoctest julia> a2() = sum(i for i in 1:1000); julia> b = Task(a2); julia> istaskdone(b) false julia> schedule(b); julia> yield(); julia> istaskdone(b) true ``` """ istaskdone(t::Task) = load_state_acquire(t) !== task_state_runnable """ istaskstarted(t::Task) -> Bool Determine whether a task has started executing. # Examples ```jldoctest julia> a3() = sum(i for i in 1:1000); julia> b = Task(a3); julia> istaskstarted(b) false ``` """ istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0 """ istaskfailed(t::Task) -> Bool Determine whether a task has exited because an exception was thrown. # Examples ```jldoctest julia> a4() = error("task failed"); julia> b = Task(a4); julia> istaskfailed(b) false julia> schedule(b); julia> yield(); julia> istaskfailed(b) true ``` !!! compat "Julia 1.3" This function requires at least Julia 1.3. """ istaskfailed(t::Task) = (load_state_acquire(t) === task_state_failed) Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1) function Threads.threadpool(t::Task) tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), t) return Threads._tpid_to_sym(tpid) end task_result(t::Task) = t.result task_local_storage() = get_task_tls(current_task()) function get_task_tls(t::Task) if t.storage === nothing t.storage = IdDict() end return (t.storage)::IdDict{Any,Any} end """ task_local_storage(key) Look up the value of a key in the current task's task-local storage. """ task_local_storage(key) = task_local_storage()[key] """ task_local_storage(key, value) Assign a value to a key in the current task's task-local storage. """ task_local_storage(key, val) = (task_local_storage()[key] = val) """ task_local_storage(body, key, value) Call the function `body` with a modified task-local storage, in which `value` is assigned to `key`; the previous value of `key`, or lack thereof, is restored afterwards. Useful for emulating dynamic scoping. """ function task_local_storage(body::Function, key, val) tls = task_local_storage() hadkey = haskey(tls, key) old = get(tls, key, nothing) tls[key] = val try return body() finally hadkey ? (tls[key] = old) : delete!(tls, key) end end # just wait for a task to be done, no error propagation function _wait(t::Task) if !istaskdone(t) donenotify = t.donenotify::ThreadSynchronizer lock(donenotify) try while !istaskdone(t) wait(donenotify) end finally unlock(donenotify) end end nothing end # have `waiter` wait for `t` function _wait2(t::Task, waiter::Task) if !istaskdone(t) # since _wait2 is similar to schedule, we should observe the sticky # bit, even if we don't call `schedule` with early-return below if waiter.sticky && Threads.threadid(waiter) == 0 && !GC.in_finalizer() # Issue #41324 # t.sticky && tid == 0 is a task that needs to be co-scheduled with # the parent task. If the parent (current_task) is not sticky we must # set it to be sticky. # XXX: Ideally we would be able to unset this current_task().sticky = true tid = Threads.threadid() ccall(:jl_set_task_tid, Cint, (Any, Cint), waiter, tid-1) end donenotify = t.donenotify::ThreadSynchronizer lock(donenotify) if !istaskdone(t) push!(donenotify.waitq, waiter) unlock(donenotify) return nothing else unlock(donenotify) end end schedule(waiter) nothing end function wait(t::Task) t === current_task() && error("deadlock detected: cannot wait on current task") _wait(t) if istaskfailed(t) throw(TaskFailedException(t)) end nothing end """ fetch(x::Any) Return `x`. """ fetch(@nospecialize x) = x """ fetch(t::Task) Wait for a [`Task`](@ref) to finish, then return its result value. If the task fails with an exception, a [`TaskFailedException`](@ref) (which wraps the failed task) is thrown. """ function fetch(t::Task) wait(t) return task_result(t) end ## lexically-scoped waiting for multiple items struct ScheduledAfterSyncException <: Exception values::Vector{Any} end function showerror(io::IO, ex::ScheduledAfterSyncException) print(io, "ScheduledAfterSyncException: ") if isempty(ex.values) print(io, "(no values)") return end show(io, ex.values[1]) if length(ex.values) == 1 print(io, " is") elseif length(ex.values) == 2 print(io, " and one more ") print(io, nameof(typeof(ex.values[2]))) print(io, " are") else print(io, " and ", length(ex.values) - 1, " more objects are") end print(io, " registered after the end of a `@sync` block") end function sync_end(c::Channel{Any}) local c_ex while isready(c) r = take!(c) if isa(r, Task) _wait(r) if istaskfailed(r) if !@isdefined(c_ex) c_ex = CompositeException() end push!(c_ex, TaskFailedException(r)) end else try wait(r) catch e if !@isdefined(c_ex) c_ex = CompositeException() end push!(c_ex, e) end end end close(c) # Capture all waitable objects scheduled after the end of `@sync` and # include them in the exception. This way, the user can check what was # scheduled by examining at the exception object. if isready(c) local racy for r in c if !@isdefined(racy) racy = [] end push!(racy, r) end if @isdefined(racy) if !@isdefined(c_ex) c_ex = CompositeException() end # Since this is a clear programming error, show this exception first: pushfirst!(c_ex, ScheduledAfterSyncException(racy)) end end if @isdefined(c_ex) throw(c_ex) end nothing end const sync_varname = gensym(:sync) """ @sync Wait until all lexically-enclosed uses of [`@async`](@ref), [`@spawn`](@ref Threads.@spawn), `Distributed.@spawnat` and `Distributed.@distributed` are complete. All exceptions thrown by enclosed async operations are collected and thrown as a [`CompositeException`](@ref). # Examples ```julia-repl julia> Threads.nthreads() 4 julia> @sync begin Threads.@spawn println("Thread-id \$(Threads.threadid()), task 1") Threads.@spawn println("Thread-id \$(Threads.threadid()), task 2") end; Thread-id 3, task 1 Thread-id 1, task 2 ``` """ macro sync(block) var = esc(sync_varname) quote let $var = Channel(Inf) v = $(esc(block)) sync_end($var) v end end end # schedule an expression to run asynchronously """ @async Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue. Values can be interpolated into `@async` via `\$`, which copies the value directly into the constructed underlying closure. This allows you to insert the _value_ of a variable, isolating the asynchronous code from changes to the variable's value in the current task. !!! warning It is strongly encouraged to favor `Threads.@spawn` over `@async` always **even when no parallelism is required** especially in publicly distributed libraries. This is because a use of `@async` disables the migration of the *parent* task across worker threads in the current implementation of Julia. Thus, seemingly innocent use of `@async` in a library function can have a large impact on the performance of very different parts of user applications. !!! compat "Julia 1.4" Interpolating values via `\$` is available as of Julia 1.4. """ macro async(expr) do_async_macro(expr, __source__) end # generate the code for @async, possibly wrapping the task in something before # pushing it to the wait queue. function do_async_macro(expr, linenums; wrap=identity) letargs = Base._lift_one_interp!(expr) thunk = Base.replace_linenums!(:(()->($(esc(expr)))), linenums) var = esc(sync_varname) quote let $(letargs...) local task = Task($thunk) if $(Expr(:islocal, var)) put!($var, $(wrap(:task))) end schedule(task) task end end end # task wrapper that doesn't create exceptions wrapped in TaskFailedException struct UnwrapTaskFailedException <: Exception task::Task end # common code for wait&fetch for UnwrapTaskFailedException function unwrap_task_failed(f::Function, t::UnwrapTaskFailedException) try f(t.task) catch ex if ex isa TaskFailedException throw(ex.task.exception) else rethrow() end end end # the unwrapping for above task wrapper (gets triggered in sync_end()) wait(t::UnwrapTaskFailedException) = unwrap_task_failed(wait, t) # same for fetching the tasks, for convenience fetch(t::UnwrapTaskFailedException) = unwrap_task_failed(fetch, t) # macro for running async code that doesn't throw wrapped exceptions macro async_unwrap(expr) do_async_macro(expr, __source__, wrap=task->:(Base.UnwrapTaskFailedException($task))) end """ errormonitor(t::Task) Print an error log to `stderr` if task `t` fails. # Examples ```julia-repl julia> Base._wait(errormonitor(Threads.@spawn error("task failed"))) Unhandled Task ERROR: task failed Stacktrace: [...] ``` """ function errormonitor(t::Task) t2 = Task() do if istaskfailed(t) local errs = stderr try # try to display the failure atomically errio = IOContext(PipeBuffer(), errs::IO) emphasize(errio, "Unhandled Task ") display_error(errio, scrub_repl_backtrace(current_exceptions(t))) write(errs, errio) catch try # try to display the secondary error atomically errio = IOContext(PipeBuffer(), errs::IO) print(errio, "\nSYSTEM: caught exception while trying to print a failed Task notice: ") display_error(errio, scrub_repl_backtrace(current_exceptions())) write(errs, errio) flush(errs) # and then the actual error, as best we can Core.print(Core.stderr, "while handling: ") Core.println(Core.stderr, current_exceptions(t)[end][1]) catch e # give up Core.print(Core.stderr, "\nSYSTEM: caught exception of type ", typeof(e).name.name, " while trying to print a failed Task notice; giving up\n") end end end nothing end t2.sticky = false _wait2(t, t2) return t end # Capture interpolated variables in $() and move them to let-block function _lift_one_interp!(e) letargs = Any[] # store the new gensymed arguments _lift_one_interp_helper(e, false, letargs) # Start out _not_ in a quote context (false) letargs end _lift_one_interp_helper(v, _, _) = v function _lift_one_interp_helper(expr::Expr, in_quote_context, letargs) if expr.head === :$ if in_quote_context # This $ is simply interpolating out of the quote # Now, we're out of the quote, so any _further_ $ is ours. in_quote_context = false else newarg = gensym() push!(letargs, :($(esc(newarg)) = $(esc(expr.args[1])))) return newarg # Don't recurse into the lifted $() exprs end elseif expr.head === :quote in_quote_context = true # Don't try to lift $ directly out of quotes elseif expr.head === :macrocall return expr # Don't recur into macro calls, since some other macros use $ end for (i,e) in enumerate(expr.args) expr.args[i] = _lift_one_interp_helper(e, in_quote_context, letargs) end expr end # add a wait-able object to the sync pool macro sync_add(expr) var = esc(sync_varname) quote local ref = $(esc(expr)) put!($var, ref) ref end end # runtime system hook called when a task finishes function task_done_hook(t::Task) # `finish_task` sets `sigatomic` before entering this function err = istaskfailed(t) result = task_result(t) handled = false donenotify = t.donenotify if isa(donenotify, ThreadSynchronizer) lock(donenotify) try if !isempty(donenotify.waitq) handled = true notify(donenotify) end finally unlock(donenotify) end end if err && !handled && Threads.threadid() == 1 if isa(result, InterruptException) && isdefined(Base, :active_repl_backend) && active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) && active_repl_backend.in_eval throwto(active_repl_backend.backend_task, result) # this terminates the task end end # Clear sigatomic before waiting sigatomic_end() try wait() # this will not return catch e # If an InterruptException happens while blocked in the event loop, try handing # the exception to the REPL task since the current task is done. # issue #19467 if Threads.threadid() == 1 && isa(e, InterruptException) && isdefined(Base, :active_repl_backend) && active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) && active_repl_backend.in_eval throwto(active_repl_backend.backend_task, e) else rethrow() end end end ## scheduler and work queue mutable struct IntrusiveLinkedListSynchronized{T} queue::IntrusiveLinkedList{T} lock::Threads.SpinLock IntrusiveLinkedListSynchronized{T}() where {T} = new(IntrusiveLinkedList{T}(), Threads.SpinLock()) end isempty(W::IntrusiveLinkedListSynchronized) = isempty(W.queue) length(W::IntrusiveLinkedListSynchronized) = length(W.queue) function push!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T lock(W.lock) try push!(W.queue, t) finally unlock(W.lock) end return W end function pushfirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T lock(W.lock) try pushfirst!(W.queue, t) finally unlock(W.lock) end return W end function pop!(W::IntrusiveLinkedListSynchronized) lock(W.lock) try return pop!(W.queue) finally unlock(W.lock) end end function popfirst!(W::IntrusiveLinkedListSynchronized) lock(W.lock) try return popfirst!(W.queue) finally unlock(W.lock) end end function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T lock(W.lock) try list_deletefirst!(W.queue, t) finally unlock(W.lock) end return W end const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task} global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()] const Workqueues_lock = Threads.SpinLock() const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable function workqueue_for(tid::Int) qs = Workqueues if length(qs) >= tid && isassigned(qs, tid) return @inbounds qs[tid] end # slow path to allocate it @assert tid > 0 l = Workqueues_lock @lock l begin qs = Workqueues if length(qs) < tid nt = Threads.maxthreadid() @assert tid <= nt global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) end if !isassigned(qs, tid) @inbounds qs[tid] = StickyWorkqueue() end return @inbounds qs[tid] end end function enq_work(t::Task) (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable") # Sticky tasks go into their thread's work queue. if t.sticky tid = Threads.threadid(t) if tid == 0 # The task is not yet stuck to a thread. Stick it to the current # thread and do the same to the parent task (the current task) so # that the tasks are correctly co-scheduled (issue #41324). # XXX: Ideally we would be able to unset this. if GC.in_finalizer() # The task was launched in a finalizer. There is no thread to sticky it # to, so just allow it to run anywhere as if it had been non-sticky. t.sticky = false @goto not_sticky else tid = Threads.threadid() ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1) current_task().sticky = true end end push!(workqueue_for(tid), t) else @label not_sticky tp = Threads.threadpool(t) if tp === :foreign || Threads.threadpoolsize(tp) == 1 # There's only one thread in the task's assigned thread pool; # use its work queue. tid = (tp === :interactive) ? 1 : Threads.threadpoolsize(:interactive)+1 ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1) push!(workqueue_for(tid), t) else # Otherwise, put the task in the multiqueue. Partr.multiq_insert(t, t.priority) tid = 0 end end ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16) return t end schedule(t::Task) = enq_work(t) """ schedule(t::Task, [val]; error=false) Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref). If a second argument `val` is provided, it will be passed to the task (via the return value of [`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in the woken task. !!! warning It is incorrect to use `schedule` on an arbitrary `Task` that has already been started. See [the API reference](@ref low-level-schedule-wait) for more information. # Examples ```jldoctest julia> a5() = sum(i for i in 1:1000); julia> b = Task(a5); julia> istaskstarted(b) false julia> schedule(b); julia> yield(); julia> istaskstarted(b) true julia> istaskdone(b) true ``` """ function schedule(t::Task, @nospecialize(arg); error=false) # schedule a task to be (re)started with the given value or exception t._state === task_state_runnable || Base.error("schedule: Task not runnable") if error t.queue === nothing || Base.list_deletefirst!(t.queue::IntrusiveLinkedList{Task}, t) setfield!(t, :result, arg) setfield!(t, :_isexception, true) else t.queue === nothing || Base.error("schedule: Task not runnable") setfield!(t, :result, arg) end enq_work(t) return t end """ yield() Switch to the scheduler to allow another scheduled task to run. A task that calls this function is still runnable, and will be restarted immediately if there are no other runnable tasks. """ function yield() ct = current_task() enq_work(ct) try wait() catch ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct) rethrow() end end @inline set_next_task(t::Task) = ccall(:jl_set_next_task, Cvoid, (Any,), t) """ yield(t::Task, arg = nothing) A fast, unfair-scheduling version of `schedule(t, arg); yield()` which immediately yields to `t` before calling the scheduler. """ function yield(t::Task, @nospecialize(x=nothing)) (t._state === task_state_runnable && t.queue === nothing) || error("yield: Task not runnable") t.result = x enq_work(current_task()) set_next_task(t) return try_yieldto(ensure_rescheduled) end """ yieldto(t::Task, arg = nothing) Switch to the given task. The first time a task is switched to, the task's function is called with no arguments. On subsequent switches, `arg` is returned from the task's last call to `yieldto`. This is a low-level call that only switches tasks, not considering states or scheduling in any way. Its use is discouraged. """ function yieldto(t::Task, @nospecialize(x=nothing)) # TODO: these are legacy behaviors; these should perhaps be a scheduler # state error instead. if t._state === task_state_done return x elseif t._state === task_state_failed throw(t.result) end t.result = x set_next_task(t) return try_yieldto(identity) end function try_yieldto(undo) try ccall(:jl_switch, Cvoid, ()) catch undo(ccall(:jl_get_next_task, Ref{Task}, ())) rethrow() end ct = current_task() if ct._isexception exc = ct.result ct.result = nothing ct._isexception = false throw(exc) end result = ct.result ct.result = nothing return result end # yield to a task, throwing an exception in it function throwto(t::Task, @nospecialize exc) t.result = exc t._isexception = true set_next_task(t) return try_yieldto(identity) end function ensure_rescheduled(othertask::Task) ct = current_task() W = workqueue_for(Threads.threadid()) if ct !== othertask && othertask._state === task_state_runnable # we failed to yield to othertask # return it to the head of a queue to be retried later tid = Threads.threadid(othertask) Wother = tid == 0 ? W : workqueue_for(tid) pushfirst!(Wother, othertask) end # if the current task was queued, # also need to return it to the runnable state # before throwing an error list_deletefirst!(W, ct) nothing end function trypoptask(W::StickyWorkqueue) while !isempty(W) t = popfirst!(W) if t._state !== task_state_runnable # assume this somehow got queued twice, # probably broken now, but try discarding this switch and keep going # can't throw here, because it's probably not the fault of the caller to wait # and don't want to use print() here, because that may try to incur a task switch ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...), "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state !== :runnable\n") continue end return t end return Partr.multiq_deletemin() end checktaskempty = Partr.multiq_check_empty @noinline function poptask(W::StickyWorkqueue) task = trypoptask(W) if !(task isa Task) task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty) end set_next_task(task) nothing end function wait() GC.safepoint() W = workqueue_for(Threads.threadid()) poptask(W) result = try_yieldto(ensure_rescheduled) process_events() # return when we come out of the queue return result end if Sys.iswindows() pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff) else pause() = ccall(:pause, Cvoid, ()) end