https://github.com/JuliaLang/julia
Raw File
Tip revision: bad925efccd6a782b08e717d515d9e9cf2a80216 authored by Curtis Vogt on 09 October 2020, 20:24:47 UTC
Support all iterators again
Tip revision: bad925e
threads_overloads.jl
# This file is a part of Julia. License is MIT: https://julialang.org/license

"""
    Threads.foreach(f, channel::Channel;
                    schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
                    ntasks=Threads.nthreads())

Similar to `foreach(f, channel)`, but iteration over `channel` and calls to
`f` are split across `ntasks` tasks spawned by `Threads.@spawn`. This function
will wait for all internally spawned tasks to complete before returning.

If `schedule isa FairSchedule`, `Threads.foreach` will attempt to spawn tasks in a
manner that enables Julia's scheduler to more freely load-balance work items across
threads. This approach generally has higher per-item overhead, but may perform
better than `StaticSchedule` in concurrence with other multithreaded workloads.

If `schedule isa StaticSchedule`, `Threads.foreach` will spawn tasks in a manner
that incurs lower per-item overhead than `FairSchedule`, but is less amenable
to load-balancing. This approach thus may be more suitable for fine-grained,
uniform workloads, but may perform worse than `FairSchedule` in concurrence
with other multithreaded workloads.

!!! compat "Julia 1.6"
    This function requires Julia 1.6 or later.
"""
function Threads.foreach(f, channel::Channel;
                         schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
                         ntasks=Threads.nthreads())
    apply = _apply_for_schedule(schedule)
    stop = Threads.Atomic{Bool}(false)
    @sync for _ in 1:ntasks
        Threads.@spawn try
            for item in channel
                $apply(f, item)
                # do `stop[] && break` after `f(item)` to avoid losing `item`.
                # this isn't super comprehensive since a task could still get
                # stuck on `take!` at `for item in channel`. We should think
                # about a more robust mechanism to avoid dropping items. See also:
                # https://github.com/JuliaLang/julia/pull/34543#discussion_r422695217
                stop[] && break
            end
        catch
            stop[] = true
            rethrow()
        end
    end
    return nothing
end

_apply_for_schedule(::Threads.StaticSchedule) = (f, x) -> f(x)
_apply_for_schedule(::Threads.FairSchedule) = (f, x) -> wait(Threads.@spawn f(x))
back to top