https://github.com/ska-sa/spead2
Raw File
Tip revision: 999de8f4f36a02007cdb284459ab72720ebaf92f authored by Bruce Merry on 08 March 2016, 09:59:33 UTC
Label 0.6.2
Tip revision: 999de8f
py_recv.cpp
/* Copyright 2015 SKA South Africa
 *
 * This program is free software: you can redistribute it and/or modify it under
 * the terms of the GNU Lesser General Public License as published by the Free
 * Software Foundation, either version 3 of the License, or (at your option) any
 * later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 * details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#define PY_ARRAY_UNIQUE_SYMBOL spead2_ARRAY_API
#define NO_IMPORT_ARRAY
#include <boost/python.hpp>
#include <boost/make_shared.hpp>
#include <numpy/arrayobject.h>
#include <stdexcept>
#include <cstdint>
#include <unistd.h>
#include "recv_udp.h"
#include "recv_mem.h"
#include "recv_stream.h"
#include "recv_ring_stream.h"
#include "recv_live_heap.h"
#include "recv_heap.h"
#include "common_ringbuffer.h"
#include "py_common.h"

namespace py = boost::python;

namespace spead2
{
namespace recv
{

/**
 * Wraps @ref item to provide safe memory management. The item references
 * memory inside the heap, so it needs to hold a reference to that
 * heap, as do any memoryviews created on the value.
 */
class item_wrapper : public item
{
private:
    /// Python object containing a @ref heap
    py::handle<> owning_heap;

public:
    item_wrapper() = default;
    item_wrapper(const item &it, PyObject *owning_heap)
        : item(it), owning_heap(py::borrowed(owning_heap)) {}

    /**
     * Obtain the raw value, as a Python object memoryview.  Since
     * boost::python doesn't support creating classes with the buffer protocol,
     * we need a proxy object to wrap in the memoryview, and it needs a way to
     * hold a reference on the owning heap. PyBuffer_* doesn't satisfy the
     * second requirement, but numpy arrays do. It's rather overkill, but the
     * package uses numpy anyway.
     */
    py::object get_value() const
    {
        npy_intp dims[1];
        // Construct the numpy array to wrap our data
        dims[0] = length;
        PyObject *array = PyArray_SimpleNewFromData(
            1, dims, NPY_UINT8,
            reinterpret_cast<void *>(ptr));
        if (array == NULL)
            py::throw_error_already_set();
        // Make the numpy array hold a ref to the owning heap
        if (PyArray_SetBaseObject(
                (PyArrayObject *) array,
                py::incref(owning_heap.get())) == -1)
        {
            py::decref(owning_heap.get());
            py::throw_error_already_set();
        }

        // Create a memoryview wrapper around the numpy array
        PyObject *view = PyMemoryView_FromObject(array);
        if (view == NULL)
        {
            py::decref(array); // discard the array
            py::throw_error_already_set();
        }
        py::object obj{py::handle<>(view)};
        py::decref(array); // the view holds its own ref
        return obj;
    }
};

/**
 * Wrapper for heaps. This is used as a held type, so that it gets a back
 * reference to the Python object (needed so that item_wrapper can hold a
 * reference to the heap).
 *
 * The constructor steals data from the provided heap, but Boost.Python
 * doesn't currently understand C++11 rvalue semantics. Thus, it is vital
 * that any wrapped function returning a heap returns a temporary that can be
 * safely clobbered.
 */
class heap_wrapper : public heap
{
private:
    PyObject *self;

public:
    heap_wrapper(PyObject *self, const heap &h)
        : heap(std::move(const_cast<heap &>(h))), self(self) {}

    /// Wrap @ref heap::get_items, and convert vector to a Python list
    py::list get_items() const
    {
        std::vector<item> base = heap::get_items();
        py::list out;
        for (const item &it : base)
        {
            // Filter out descriptors here. The base class can't do so, because
            // the descriptors are retrieved from the items.
            if (it.id != DESCRIPTOR_ID)
                out.append(item_wrapper(it, self));
        }
        return out;
    }

    /// Wrap @ref heap::get_descriptors, and convert vector to a Python list
    py::list get_descriptors() const
    {
        std::vector<descriptor> descriptors = heap::get_descriptors();
        py::list out;
        for (const descriptor& d : descriptors)
            out.append(d);
        return out;
    }
};

/**
 * Extends mem_reader to obtain data using the Python buffer protocol.
 * It steals the provided buffer view; it is not passed by rvalue reference
 * because it cannot be perfectly forwarded.
 */
class buffer_reader : public mem_reader
{
private:
    buffer_view view;
public:
    explicit buffer_reader(stream &s, buffer_view &view)
        : mem_reader(s, reinterpret_cast<const std::uint8_t *>(view.view.buf), view.view.len),
        view(std::move(view))
    {
    }
};

/**
 * Stream that handles the magic necessary to reflect heaps into
 * Python space and capture the reference to it.
 *
 * The GIL needs to be handled carefully. Any operation run by the thread pool
 * might need to take the GIL to do logging. Thus, any operation that blocks
 * on completion of code scheduled through the thread pool must drop the GIL
 * first.
 */
class ring_stream_wrapper : public thread_pool_handle_wrapper, public ring_stream<ringbuffer<live_heap, semaphore_gil<semaphore_fd>, semaphore> >
{
private:
    boost::asio::ip::address make_address(const std::string &hostname)
    {
        if (hostname.empty())
            return boost::asio::ip::address_v4::any();
        else
        {
            using boost::asio::ip::udp;
            udp::resolver resolver(get_strand().get_io_service());
            udp::resolver::query query(hostname, "", udp::resolver::query::passive);
            return resolver.resolve(query)->endpoint().address();
        }
    }

    boost::asio::ip::udp::endpoint make_endpoint(const std::string &hostname, std::uint16_t port)
    {
        return boost::asio::ip::udp::endpoint(make_address(hostname), port);
    }

public:
    using ring_stream::ring_stream;

    heap next()
    {
        try
        {
            return get();
        }
        catch (ringbuffer_stopped &e)
        {
            throw stop_iteration();
        }
    }

    heap get()
    {
        return ring_stream::pop();
    }

    heap get_nowait()
    {
        return try_pop();
    }

    int get_fd() const
    {
        return get_ringbuffer().get_data_sem().get_fd();
    }

    void set_memory_pool(std::shared_ptr<memory_pool> pool)
    {
        release_gil gil;
        ring_stream::set_memory_pool(std::move(pool));
    }

    void add_buffer_reader(py::object buffer)
    {
        buffer_view view(buffer);
        release_gil gil;
        emplace_reader<buffer_reader>(std::ref(view));
    }

    void add_udp_reader(
        std::uint16_t port,
        std::size_t max_size = udp_reader::default_max_size,
        std::size_t buffer_size = udp_reader::default_buffer_size,
        const std::string &bind_hostname = "",
        const py::object &socket = py::object())
    {
        int fd2 = -1;
        if (!socket.is_none())
        {
            int fd = py::extract<int>(socket.attr("fileno")());
            /* Python still owns this FD and will close it, so we have to duplicate
             * it for ourselves.
             */
            fd2 = ::dup(fd);
            if (fd2 == -1)
            {
                PyErr_SetFromErrno(PyExc_OSError);
                throw py::error_already_set();
            }
        }

        release_gil gil;
        auto endpoint = make_endpoint(bind_hostname, port);
        if (fd2 == -1)
        {
            emplace_reader<udp_reader>(endpoint, max_size, buffer_size);
        }
        else
        {
            boost::asio::ip::udp::socket asio_socket(
                get_strand().get_io_service(), endpoint.protocol(), fd2);
            emplace_reader<udp_reader>(std::move(asio_socket), endpoint, max_size, buffer_size);
        }
    }

    void add_udp_reader_multicast_v4(
        const std::string &multicast_group,
        std::uint16_t port,
        std::size_t max_size,
        std::size_t buffer_size,
        const std::string &interface_address)
    {
        release_gil gil;
        auto endpoint = make_endpoint(multicast_group, port);
        emplace_reader<udp_reader>(endpoint, max_size, buffer_size, make_address(interface_address));
    }

    void add_udp_reader_multicast_v6(
        const std::string &multicast_group,
        std::uint16_t port,
        std::size_t max_size,
        std::size_t buffer_size,
        unsigned int interface_index)
    {
        release_gil gil;
        auto endpoint = make_endpoint(multicast_group, port);
        emplace_reader<udp_reader>(endpoint, max_size, buffer_size, interface_index);
    }

    void stop()
    {
        release_gil gil;
        ring_stream::stop();
    }

    ~ring_stream_wrapper()
    {
        stop();
    }
};

/// Register the receiver module with Boost.Python
void register_module()
{
    using namespace boost::python;
    using namespace spead2::recv;

    // Create the module, and set it as the current boost::python scope so that
    // classes we define are added to this module rather than the root.
    py::object module(py::handle<>(py::borrowed(PyImport_AddModule("spead2._recv"))));
    py::scope scope = module;

    class_<heap, heap_wrapper>("Heap", no_init)
        .add_property("cnt", &heap_wrapper::get_cnt)
        .add_property("flavour",
            make_function(&heap_wrapper::get_flavour, return_value_policy<copy_const_reference>()))
        .def("get_items", &heap_wrapper::get_items)
        .def("get_descriptors", &heap_wrapper::get_descriptors)
        .def("is_start_of_stream", &heap_wrapper::is_start_of_stream);
    class_<item_wrapper>("RawItem", no_init)
        .def_readonly("id", &item_wrapper::id)
        .def_readonly("is_immediate", &item_wrapper::is_immediate)
        .def_readonly("immediate_value", &item_wrapper::immediate_value)
        .add_property("value", &item_wrapper::get_value);
    class_<ring_stream_wrapper, boost::noncopyable>("Stream",
            init<thread_pool_wrapper &, bug_compat_mask, std::size_t, std::size_t>(
                (arg("thread_pool"), arg("bug_compat") = 0,
                 arg("max_heaps") = ring_stream_wrapper::default_max_heaps,
                 arg("ring_heaps") = ring_stream_wrapper::default_ring_heaps))[
                store_handle_postcall<ring_stream_wrapper, thread_pool_handle_wrapper, &thread_pool_handle_wrapper::thread_pool_handle, 1, 2>()])
        .def("__iter__", objects::identity_function())
        .def(
#if PY_MAJOR_VERSION >= 3
              // Python 3 uses __next__ for the iterator protocol
              "__next__"
#else
              "next"
#endif
        , &ring_stream_wrapper::next)
        .def("get", &ring_stream_wrapper::get)
        .def("get_nowait", &ring_stream_wrapper::get_nowait)
        .def("set_memory_pool", &ring_stream_wrapper::set_memory_pool,
             arg("pool"))
        .def("add_buffer_reader", &ring_stream_wrapper::add_buffer_reader,
             arg("buffer"))
        .def("add_udp_reader", &ring_stream_wrapper::add_udp_reader,
             (arg("port"),
              arg("max_size") = udp_reader::default_max_size,
              arg("buffer_size") = udp_reader::default_buffer_size,
              arg("bind_hostname") = std::string(),
              arg("socket") = py::object()))
        .def("add_udp_reader", &ring_stream_wrapper::add_udp_reader_multicast_v4,
             (
              arg("multicast_group"),
              arg("port"),
              arg("max_size") = udp_reader::default_max_size,
              arg("buffer_size") = udp_reader::default_buffer_size,
              arg("interface_address") = "0.0.0.0"))
        .def("add_udp_reader", &ring_stream_wrapper::add_udp_reader_multicast_v6,
             (
              arg("multicast_group"),
              arg("port"),
              arg("max_size") = udp_reader::default_max_size,
              arg("buffer_size") = udp_reader::default_buffer_size,
              arg("interface_index") = (unsigned int) 0))
        .def("stop", &ring_stream_wrapper::stop)
        .add_property("fd", &ring_stream_wrapper::get_fd)
        .def_readonly("DEFAULT_MAX_HEAPS", ring_stream_wrapper::default_max_heaps)
        .def_readonly("DEFAULT_RING_HEAPS", ring_stream_wrapper::default_ring_heaps)
        .def_readonly("DEFAULT_UDP_MAX_SIZE", udp_reader::default_max_size)
        .def_readonly("DEFAULT_UDP_BUFFER_SIZE", udp_reader::default_buffer_size);
}

} // namespace recv
} // namespace spead2
back to top