https://github.com/JuliaLang/julia
Raw File
Tip revision: ed93cc69dbd368b620ca92c3094b7f351ed1a6ca authored by Jeff Bezanson on 15 March 2019, 19:42:54 UTC
wip allow goto from try blocks
Tip revision: ed93cc6
lock.jl
# This file is a part of Julia. License is MIT: https://julialang.org/license

# Advisory reentrant lock
"""
    ReentrantLock()

Creates a re-entrant lock for synchronizing [`Task`](@ref)s.
The same task can acquire the lock as many times as required.
Each [`lock`](@ref) must be matched with an [`unlock`](@ref).
"""
mutable struct ReentrantLock <: AbstractLock
    locked_by::Union{Task, Nothing}
    cond_wait::GenericCondition{Threads.SpinLock}
    reentrancy_cnt::Int

    ReentrantLock() = new(nothing, GenericCondition{Threads.SpinLock}(), 0)
end


"""
    islocked(lock) -> Status (Boolean)

Check whether the `lock` is held by any task/thread.
This should not be used for synchronization (see instead [`trylock`](@ref)).
"""
function islocked(rl::ReentrantLock)
    return rl.reentrancy_cnt != 0
end

"""
    trylock(lock) -> Success (Boolean)

Acquire the lock if it is available,
and return `true` if successful.
If the lock is already locked by a different task/thread,
return `false`.

Each successful `trylock` must be matched by an [`unlock`](@ref).
"""
function trylock(rl::ReentrantLock)
    t = current_task()
    lock(rl.cond_wait)
    try
        if rl.reentrancy_cnt == 0
            rl.locked_by = t
            rl.reentrancy_cnt = 1
            return true
        elseif t == notnothing(rl.locked_by)
            rl.reentrancy_cnt += 1
            return true
        end
        return false
    finally
        unlock(rl.cond_wait)
    end
end

"""
    lock(lock)

Acquire the `lock` when it becomes available.
If the lock is already locked by a different task/thread,
wait for it to become available.

Each `lock` must be matched by an [`unlock`](@ref).
"""
function lock(rl::ReentrantLock)
    t = current_task()
    lock(rl.cond_wait)
    try
        while true
            if rl.reentrancy_cnt == 0
                rl.locked_by = t
                rl.reentrancy_cnt = 1
                return
            elseif t == notnothing(rl.locked_by)
                rl.reentrancy_cnt += 1
                return
            end
            wait(rl.cond_wait)
        end
    finally
        unlock(rl.cond_wait)
    end
end

"""
    unlock(lock)

Releases ownership of the `lock`.

If this is a recursive lock which has been acquired before, decrement an
internal counter and return immediately.
"""
function unlock(rl::ReentrantLock)
    t = current_task()
    rl.reentrancy_cnt == 0 && error("unlock count must match lock count")
    rl.locked_by == t || error("unlock from wrong thread")
    lock(rl.cond_wait)
    try
        rl.reentrancy_cnt -= 1
        if rl.reentrancy_cnt == 0
            rl.locked_by = nothing
            notify(rl.cond_wait)
        end
    finally
        unlock(rl.cond_wait)
    end
    return
end

function unlockall(rl::ReentrantLock)
    t = current_task()
    n = rl.reentrancy_cnt
    rl.locked_by == t || error("unlock from wrong thread")
    n == 0 && error("unlock count must match lock count")
    lock(rl.cond_wait)
    try
        rl.reentrancy_cnt = 0
        rl.locked_by = nothing
        notify(rl.cond_wait)
    finally
        unlock(rl.cond_wait)
    end
    return n
end

function relockall(rl::ReentrantLock, n::Int)
    t = current_task()
    lock(rl)
    n1 = rl.reentrancy_cnt
    rl.reentrancy_cnt = n
    n1 == 1 || error("concurrency violation detected")
    return
end

function lock(f, l::AbstractLock)
    lock(l)
    try
        return f()
    finally
        unlock(l)
    end
end

function trylock(f, l::AbstractLock)
    if trylock(l)
        try
            return f()
        finally
            unlock(l)
        end
    end
    return false
end

@eval Threads begin
    """
        Threads.Condition([lock])

    A thread-safe version of [`Base.Condition`](@ref).

    !!! compat "Julia 1.2"
        This functionality requires at least Julia 1.2.
    """
    const Condition = Base.GenericCondition{Base.ReentrantLock}

    """
    Special note for [`Threads.Condition`](@ref):

    The caller must be holding the [`lock`](@ref) that owns `c` before calling this method.
    The calling task will be blocked until some other task wakes it,
    usually by calling [`notify`](@ref)` on the same Condition object.
    The lock will be atomically released when blocking (even if it was locked recursively),
    and will be reacquired before returning.
    """
    wait(c::Condition)
end

"""
    Semaphore(sem_size)

Create a counting semaphore that allows at most `sem_size`
acquires to be in use at any time.
Each acquire must be matched with a release.
"""
mutable struct Semaphore
    sem_size::Int
    curr_cnt::Int
    cond_wait::Threads.Condition
    Semaphore(sem_size) = sem_size > 0 ? new(sem_size, 0, Threads.Condition()) : throw(ArgumentError("Semaphore size must be > 0"))
end

"""
    acquire(s::Semaphore)

Wait for one of the `sem_size` permits to be available,
blocking until one can be acquired.
"""
function acquire(s::Semaphore)
    lock(s.cond_wait)
    try
        while s.curr_cnt >= s.sem_size
            wait(s.cond_wait)
        end
        s.curr_cnt = s.curr_cnt + 1
    finally
        unlock(s.cond_wait)
    end
    return
end

"""
    release(s::Semaphore)

Return one permit to the pool,
possibly allowing another task to acquire it
and resume execution.
"""
function release(s::Semaphore)
    lock(s.cond_wait)
    try
        s.curr_cnt > 0 || error("release count must match acquire count")
        s.curr_cnt -= 1
        notify(s.cond_wait; all=false)
    finally
        unlock(s.cond_wait)
    end
    return
end


"""
    Event()

Create a level-triggered event source. Tasks that call [`wait`](@ref) on an
`Event` are suspended and queued until `notify` is called on the `Event`.
After `notify` is called, the `Event` remains in a signaled state and
tasks will no longer block when waiting for it.

!!! compat "Julia 1.1"
    This functionality requires at least Julia 1.1.
"""
mutable struct Event
    notify::Threads.Condition
    set::Bool
    Event() = new(Threads.Condition(), false)
end

function wait(e::Event)
    e.set && return
    lock(e.notify)
    try
        while !e.set
            wait(e.notify)
        end
    finally
        unlock(e.notify)
    end
    nothing
end

function notify(e::Event)
    lock(e.notify)
    try
        if !e.set
            e.set = true
            notify(e.notify)
        end
    finally
        unlock(e.notify)
    end
    nothing
end
back to top