https://github.com/ska-sa/spead2
Raw File
Tip revision: 953d63ff013cb1cf7beab747fd1fab9ce112788c authored by Bruce Merry on 08 September 2023, 12:52:18 UTC
Fix dependency on numpy and spelling of test-numba
Tip revision: 953d63f
common_semaphore.cpp
/* Copyright 2015 National Research Foundation (SARAO)
 *
 * This program is free software: you can redistribute it and/or modify it under
 * the terms of the GNU Lesser General Public License as published by the Free
 * Software Foundation, either version 3 of the License, or (at your option) any
 * later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 * details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

/**
 * @file
 *
 * Semaphore that uses file descriptors, so that it can be plumbed
 * into an event loop.
 */

#ifndef _GNU_SOURCE
# define _GNU_SOURCE
#endif
#include <spead2/common_features.h>
#include <cerrno>
#include <system_error>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#include <atomic>
#include <spead2/common_semaphore.h>
#include <spead2/common_logging.h>
#if SPEAD2_USE_EVENTFD
# include <sys/eventfd.h>
#endif

namespace spead2
{

semaphore_spin::semaphore_spin(unsigned int initial)
    : value(initial)
{
}

void semaphore_spin::put()
{
    value.fetch_add(1, std::memory_order_release);
}

int semaphore_spin::get()
{
    unsigned int cur = value.load(std::memory_order_acquire);
    while (true)
    {
        if (cur == 0)
            cur = value.load(std::memory_order_acquire);
        else if (value.compare_exchange_weak(
            cur, cur - 1, std::memory_order_acquire))
            break;
    }
    return 0;
}

int semaphore_spin::try_get()
{
    unsigned int cur = value.load(std::memory_order_acquire);
    if (cur > 0
        && value.compare_exchange_strong(
            cur, cur - 1, std::memory_order_acquire))
        return 0;
    else
        return -1;
}

/////////////////////////////////////////////////////////////////////////////

#if SPEAD2_USE_POSIX_SEMAPHORES

semaphore_posix::semaphore_posix(unsigned int initial)
{
    if (sem_init(&sem, 0, initial) == -1)
        throw_errno("sem_init failed");
}

semaphore_posix::~semaphore_posix()
{
    if (sem_destroy(&sem) == -1)
    {
        // Destructor, so can't throw
        log_errno("failed to destroy semaphore: %1% (%2%)");
    }
}

void semaphore_posix::put()
{
    if (sem_post(&sem) == -1)
        throw_errno("sem_post failed");
}

int semaphore_posix::try_get()
{
    int status = sem_trywait(&sem);
    if (status == -1)
    {
        if (errno == EAGAIN || errno == EINTR)
            return -1;
        else
            throw_errno("sem_trywait failed");
    }
    else
        return 0;
}

int semaphore_posix::get()
{
    int status = sem_wait(&sem);
    if (status == -1)
    {
        if (errno == EINTR)
            return -1;
        else
            throw_errno("sem_wait failed");
    }
    else
        return 0;
}

#endif // SPEAD2_USE_POSIX_SEMAPHORES

/////////////////////////////////////////////////////////////////////////////

semaphore_pipe::semaphore_pipe(semaphore_pipe &&other)
{
    for (int i = 0; i < 2; i++)
    {
        pipe_fds[i] = other.pipe_fds[i];
        other.pipe_fds[i] = -1;
    }
}

semaphore_pipe &semaphore_pipe::operator=(semaphore_pipe &&other)
{
    for (int i = 0; i < 2; i++)
    {
        if (pipe_fds[i] != -1)
        {
            if (close(pipe_fds[i]) == -1)
                throw_errno("close failed");
            pipe_fds[i] = -1;
        }
    }
    for (int i = 0; i < 2; i++)
    {
        pipe_fds[i] = other.pipe_fds[i];
        other.pipe_fds[i] = -1;
    }
    return *this;
}

semaphore_pipe::semaphore_pipe(unsigned int initial)
{
    if (pipe(pipe_fds) == -1)
        throw_errno("pipe failed");
    for (int i = 0; i < 2; i++)
    {
        int flags = fcntl(pipe_fds[i], F_GETFD);
        if (flags == -1)
            throw_errno("fcntl failed");
        flags |= FD_CLOEXEC;
        if (fcntl(pipe_fds[i], F_SETFD, flags) == -1)
            throw_errno("fcntl failed");
    }
    // Make the read end non-blocking, for try_get
    int flags = fcntl(pipe_fds[0], F_GETFL);
    if (flags == -1)
        throw_errno("fcntl failed");
    flags |= O_NONBLOCK;
    if (fcntl(pipe_fds[0], F_SETFL, flags) == -1)
        throw_errno("fcntl failed");
    // TODO: this could probably be optimised
    for (unsigned int i = 0; i < initial; i++)
        put();
}

semaphore_pipe::~semaphore_pipe()
{
    for (int i = 0; i < 2; i++)
        if (pipe_fds[i] != -1)
        {
            if (close(pipe_fds[i]) == -1)
            {
                // Can't throw, because this is a destructor
                log_errno("failed to close pipe: %1% (%2%)");
            }
        }
}

void semaphore_pipe::put()
{
    char byte = 0;
    int status;
    do
    {
        status = write(pipe_fds[1], &byte, 1);
        if (status < 0 && errno != EINTR)
        {
            throw_errno("write failed");
        }
    } while (status < 0);
}

int semaphore_pipe::get()
{
    char byte = 0;
    while (true)
    {
        struct pollfd pfd = {};
        pfd.fd = pipe_fds[0];
        pfd.events = POLLIN;
        int status = poll(&pfd, 1, -1);
        if (status == -1)
        {
            if (errno == EINTR)
                return -1;
            else
                throw_errno("poll failed");
        }
        status = read(pipe_fds[0], &byte, 1);
        if (status < 0)
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
                continue; // spurious wakeup from poll
            else
                throw_errno("read failed");
        }
        else
        {
            assert(status == 1);
            return 0;
        }
    }
}

int semaphore_pipe::try_get()
{
    char byte = 0;
    int status = read(pipe_fds[0], &byte, 1);
    if (status < 0)
    {
        if (errno == EAGAIN || errno == EWOULDBLOCK)
            return -1;
        else
            throw_errno("read failed");
    }
    else
    {
        assert(status == 1);
        return 0;
    }
}

int semaphore_pipe::get_fd() const
{
    return pipe_fds[0];
}

/////////////////////////////////////////////////////////////////////////////

#if SPEAD2_USE_EVENTFD

semaphore_eventfd::semaphore_eventfd(semaphore_eventfd &&other)
{
    fd = other.fd;
    other.fd = -1;
}

semaphore_eventfd::~semaphore_eventfd()
{
    if (fd != -1 && close(fd) == -1)
        log_errno("failed to close eventfd: %1% (%2%)");
}

semaphore_eventfd &semaphore_eventfd::operator=(semaphore_eventfd &&other)
{
    std::swap(fd, other.fd);
    if (other.fd != -1)
    {
        if (close(other.fd) == -1)
            throw_errno("close failed");
        other.fd = -1;
    }
    return *this;
}

semaphore_eventfd::semaphore_eventfd(unsigned int initial)
{
    fd = eventfd(initial, EFD_CLOEXEC | EFD_NONBLOCK | EFD_SEMAPHORE);
    if (fd == -1)
        throw_errno("eventfd failed");
}

void semaphore_eventfd::put()
{
    int status;
    do
    {
        status = eventfd_write(fd, 1);
        if (status == -1)
        {
            if (errno != -1)
                throw_errno("eventfd_write failed");
        }
    } while (status == -1);
}

int semaphore_eventfd::try_get()
{
    eventfd_t value;
    int status = eventfd_read(fd, &value);
    if (status == -1)
    {
        if (errno == EAGAIN || errno == EINTR)
            return -1;
        else
            throw_errno("eventfd_read failed");
    }
    assert(status == 0);
    return 0;
}

int semaphore_eventfd::get()
{
    while (true)
    {
        eventfd_t value;
        struct pollfd pfd = {};
        pfd.fd = fd;
        pfd.events = POLLIN;
        int status = poll(&pfd, 1, -1);
        if (status == -1)
        {
            if (errno == EINTR)
                return -1;
            else
                throw_errno("poll failed");
        }
        status = eventfd_read(fd, &value);
        if (status < 0)
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
                continue; // spurious wakeup from poll
            else
                throw_errno("eventfd_read failed");
        }
        else
        {
            assert(status == 0);
            return 0;
        }
    }
}

int semaphore_eventfd::get_fd() const
{
    return fd;
}

#endif // SPEAD2_USE_EVENTFD

boost::asio::posix::stream_descriptor wrap_fd(boost::asio::io_service &io_service, int fd)
{
    int fd2 = dup(fd);
    if (fd2 < 0)
        throw_errno("dup failed");
    boost::asio::posix::stream_descriptor wrapper(io_service, fd2);
    wrapper.native_non_blocking(true);
    return wrapper;
}

} // namespace spead2
back to top