https://github.com/ska-sa/spead2
Tip revision: 953d63ff013cb1cf7beab747fd1fab9ce112788c authored by Bruce Merry on 08 September 2023, 12:52:18 UTC
Fix dependency on numpy and spelling of test-numba
Fix dependency on numpy and spelling of test-numba
Tip revision: 953d63f
py_recv.cpp
/* Copyright 2015, 2017, 2020-2023 National Research Foundation (SARAO)
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* @file
*/
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pybind11/operators.h>
#include <algorithm>
#include <stdexcept>
#include <type_traits>
#include <cstdint>
#include <cctype>
#include <unistd.h>
#include <sys/socket.h>
#include <spead2/recv_udp.h>
#include <spead2/recv_udp_ibv.h>
#include <spead2/recv_udp_pcap.h>
#include <spead2/recv_tcp.h>
#include <spead2/recv_mem.h>
#include <spead2/recv_inproc.h>
#include <spead2/recv_stream.h>
#include <spead2/recv_ring_stream.h>
#include <spead2/recv_chunk_stream.h>
#include <spead2/recv_chunk_stream_group.h>
#include <spead2/recv_live_heap.h>
#include <spead2/recv_heap.h>
#include <spead2/common_ringbuffer.h>
#include <spead2/py_common.h>
namespace py = pybind11;
namespace spead2
{
namespace recv
{
/* True if both arguments are plain function pointers and point to the same function */
template<typename R, typename... Args>
static bool equal_functions(const std::function<R(Args...)> &a,
const std::function<R(Args...)> &b)
{
using ptr = R (*)(Args...);
const ptr *x = a.template target<ptr>();
const ptr *y = b.template target<ptr>();
return x && y && *x == *y;
}
/**
* Wraps @ref item to provide safe memory management. The item references
* memory inside the heap, so it needs to hold a reference to that
* heap, as do any memoryviews created on the value.
*/
class item_wrapper : public item
{
private:
/// Python object containing a @ref heap
py::object owning_heap;
public:
item_wrapper() = default;
item_wrapper(const item &it, py::object owning_heap)
: item(it), owning_heap(std::move(owning_heap)) {}
/**
* Obtain the raw value using Python buffer protocol.
*/
py::buffer_info get_value() const
{
return py::buffer_info(
reinterpret_cast<void *>(ptr),
1, // size of individual elements
py::format_descriptor<std::uint8_t>::format(),
length);
}
};
/**
* Extends mem_reader to obtain data using the Python buffer protocol.
* It steals the provided buffer view; it is not passed by rvalue reference
* because it cannot be perfectly forwarded.
*/
class buffer_reader : public mem_reader
{
private:
py::buffer_info view;
public:
explicit buffer_reader(stream &s, py::buffer_info &view)
: mem_reader(s, reinterpret_cast<const std::uint8_t *>(view.ptr), view.itemsize * view.size),
view(std::move(view))
{
}
};
#if SPEAD2_USE_IBV
/* Managing the endpoints and interface address requires some sleight of
* hand. We store a separate copy in the wrapper in a Python-centric format.
* When constructing the reader, we make a copy with the C++ view.
*/
class udp_ibv_config_wrapper : public udp_ibv_config
{
public:
std::vector<std::pair<std::string, std::uint16_t>> py_endpoints;
std::string py_interface_address;
};
#endif // SPEAD2_USE_IBV
static boost::asio::ip::address make_address(stream &s, const std::string &hostname)
{
return make_address_no_release(s.get_io_service(), hostname,
boost::asio::ip::udp::resolver::query::passive);
}
template<typename Protocol>
static typename Protocol::endpoint make_endpoint(
stream &s, const std::string &hostname, std::uint16_t port)
{
return typename Protocol::endpoint(make_address(s, hostname), port);
}
static void add_buffer_reader(stream &s, py::buffer buffer)
{
py::buffer_info info = request_buffer_info(buffer, PyBUF_C_CONTIGUOUS);
py::gil_scoped_release gil;
s.emplace_reader<buffer_reader>(std::ref(info));
}
static void add_udp_reader(
stream &s,
std::uint16_t port,
std::size_t max_size,
std::size_t buffer_size,
const std::string &bind_hostname)
{
py::gil_scoped_release gil;
auto endpoint = make_endpoint<boost::asio::ip::udp>(s, bind_hostname, port);
s.emplace_reader<udp_reader>(endpoint, max_size, buffer_size);
}
static void add_udp_reader_socket(
stream &s,
const socket_wrapper<boost::asio::ip::udp::socket> &socket,
std::size_t max_size = udp_reader::default_max_size)
{
auto asio_socket = socket.copy(s.get_io_service());
py::gil_scoped_release gil;
s.emplace_reader<udp_reader>(std::move(asio_socket), max_size);
}
static void add_udp_reader_bind_v4(
stream &s,
const std::string &address,
std::uint16_t port,
std::size_t max_size,
std::size_t buffer_size,
const std::string &interface_address)
{
py::gil_scoped_release gil;
auto endpoint = make_endpoint<boost::asio::ip::udp>(s, address, port);
s.emplace_reader<udp_reader>(endpoint, max_size, buffer_size, make_address(s, interface_address));
}
static void add_udp_reader_bind_v6(
stream &s,
const std::string &address,
std::uint16_t port,
std::size_t max_size,
std::size_t buffer_size,
unsigned int interface_index)
{
py::gil_scoped_release gil;
auto endpoint = make_endpoint<boost::asio::ip::udp>(s, address, port);
s.emplace_reader<udp_reader>(endpoint, max_size, buffer_size, interface_index);
}
static void add_tcp_reader(
stream &s,
std::uint16_t port,
std::size_t max_size,
std::size_t buffer_size,
const std::string &bind_hostname)
{
py::gil_scoped_release gil;
auto endpoint = make_endpoint<boost::asio::ip::tcp>(s, bind_hostname, port);
s.emplace_reader<tcp_reader>(endpoint, max_size, buffer_size);
}
static void add_tcp_reader_socket(
stream &s,
const socket_wrapper<boost::asio::ip::tcp::acceptor> &acceptor,
std::size_t max_size)
{
auto asio_socket = acceptor.copy(s.get_io_service());
py::gil_scoped_release gil;
s.emplace_reader<tcp_reader>(std::move(asio_socket), max_size);
}
#if SPEAD2_USE_IBV
static void add_udp_ibv_reader(stream &s, const udp_ibv_config_wrapper &config_wrapper)
{
py::gil_scoped_release gil;
udp_ibv_config config = config_wrapper;
for (const auto &[host, port] : config_wrapper.py_endpoints)
config.add_endpoint(make_endpoint<boost::asio::ip::udp>(s, host, port));
config.set_interface_address(
make_address(s, config_wrapper.py_interface_address));
s.emplace_reader<udp_ibv_reader>(config);
}
#endif // SPEAD2_USE_IBV
#if SPEAD2_USE_PCAP
static void add_udp_pcap_file_reader(stream &s, const std::string &filename, const std::string &filter)
{
py::gil_scoped_release gil;
s.emplace_reader<udp_pcap_file_reader>(filename, filter);
}
#endif
static void add_inproc_reader(stream &s, std::shared_ptr<inproc_queue> queue)
{
py::gil_scoped_release gil;
s.emplace_reader<inproc_reader>(queue);
}
class ring_stream_config_wrapper : public ring_stream_config
{
private:
bool incomplete_keep_payload_ranges = false;
public:
ring_stream_config_wrapper() = default;
ring_stream_config_wrapper(const ring_stream_config &base) :
ring_stream_config(base)
{
}
ring_stream_config_wrapper &set_incomplete_keep_payload_ranges(bool keep)
{
incomplete_keep_payload_ranges = keep;
return *this;
}
bool get_incomplete_keep_payload_ranges() const
{
return incomplete_keep_payload_ranges;
}
};
/**
* Stream that handles the magic necessary to reflect heaps into
* Python space and capture the reference to it.
*
* The GIL needs to be handled carefully. Any operation run by the thread pool
* might need to take the GIL to do logging. Thus, any operation that blocks
* on completion of code scheduled through the thread pool must drop the GIL
* first.
*/
class ring_stream_wrapper : public ring_stream<ringbuffer<live_heap, semaphore_fd, semaphore> >
{
private:
bool incomplete_keep_payload_ranges;
exit_stopper stopper{[this] { stop(); }};
py::object to_object(live_heap &&h)
{
if (h.is_contiguous())
return py::cast(heap(std::move(h)), py::return_value_policy::move);
else
return py::cast(incomplete_heap(std::move(h), false, incomplete_keep_payload_ranges),
py::return_value_policy::move);
}
public:
ring_stream_wrapper(
io_service_ref io_service,
const stream_config &config = stream_config(),
const ring_stream_config_wrapper &ring_config = ring_stream_config_wrapper())
: ring_stream<ringbuffer<live_heap, semaphore_fd, semaphore>>(
std::move(io_service), config, ring_config),
incomplete_keep_payload_ranges(ring_config.get_incomplete_keep_payload_ranges())
{}
py::object next()
{
try
{
return get();
}
catch (ringbuffer_stopped &e)
{
throw py::stop_iteration();
}
}
py::object get()
{
return to_object(ring_stream::pop_live(gil_release_tag()));
}
py::object get_nowait()
{
return to_object(try_pop_live());
}
int get_fd() const
{
return get_ringbuffer().get_data_sem().get_fd();
}
ring_stream_config_wrapper get_ring_config() const
{
ring_stream_config_wrapper ring_config(
ring_stream<ringbuffer<live_heap, semaphore_fd, semaphore> >::get_ring_config());
ring_config.set_incomplete_keep_payload_ranges(incomplete_keep_payload_ranges);
return ring_config;
}
virtual void stop() override
{
stopper.reset();
py::gil_scoped_release gil;
ring_stream::stop();
}
~ring_stream_wrapper()
{
stop();
}
};
/**
* Package a chunk with a reference to the original Python object.
* This is used
* only for chunks in the ringbuffer, not those owned by Python.
*/
class chunk_wrapper : public chunk
{
public:
py::object obj;
};
/**
* Get the original Python object from a wrapped chunk, and
* restore its pointers.
*/
static py::object unwrap_chunk(std::unique_ptr<chunk> &&c)
{
chunk_wrapper &cw = dynamic_cast<chunk_wrapper &>(*c);
chunk &orig = cw.obj.cast<chunk &>();
py::object ret = std::move(cw.obj);
orig = std::move(*c);
return ret;
}
/**
* Wrap up a Python chunk into an object that can traverse the ringbuffer.
* Python doesn't allow ownership to be given away, so we have to create a
* new C++ object which refers back to the original Python object to keep
* it alive.
*/
static std::unique_ptr<chunk_wrapper> wrap_chunk(chunk &c)
{
if (!c.data)
throw std::invalid_argument("data buffer is not set");
if (!c.present)
throw std::invalid_argument("present buffer is not set");
auto cw = std::make_unique<chunk_wrapper>();
static_cast<chunk &>(*cw) = std::move(c);
cw->obj = py::cast(c);
return cw;
}
/**
* Push a chunk onto a ringbuffer. The specific operation is described by
* @a func; this function takes care of wrapping in @ref chunk_wrapper.
*/
template<typename T>
static void push_chunk(T func, chunk &c)
{
/* Note: the type of 'wrapper' must exactly match what the ringbuffer
* expects, otherwise it constructs a new, temporary unique_ptr by
* moving from 'wrapper', and we lose ownership in the failure path.
*/
std::unique_ptr<chunk> wrapper = wrap_chunk(c);
try
{
func(std::move(wrapper));
}
catch (std::exception &)
{
// Undo the move that happened as part of wrapping
if (wrapper)
c = std::move(*wrapper);
throw;
}
}
typedef ringbuffer<std::unique_ptr<chunk>, semaphore_fd, semaphore_fd> chunk_ringbuffer;
/* Note: ring_stream_wrapper drops the GIL while stopping. We
* can't do that here because stop() can free chunks that were
* in flight, which involves interaction with the Python API.
* I think the only reason ring_stream_wrapper drops the GIL is
* that logging used to directly acquire the GIL, and so if stop()
* did any logging it would deadlock. Now that logging is pushed
* off to a separate thread that should no longer be an issue.
*/
#define EXIT_STOPPER_WRAPPER(cls, base) \
class cls : public base \
{ \
private: \
exit_stopper stopper{[this] { stop(); }}; \
public: \
using base::base; \
virtual void stop() override \
{ \
stopper.reset(); \
base::stop(); \
} \
}
// These aliases are needed because a type passed to a macro cannot contain a comma
using chunk_ring_stream_orig = chunk_ring_stream<chunk_ringbuffer, chunk_ringbuffer>;
using chunk_stream_ring_group_orig = chunk_stream_ring_group<chunk_ringbuffer, chunk_ringbuffer>;
EXIT_STOPPER_WRAPPER(chunk_ring_stream_wrapper, chunk_ring_stream_orig);
EXIT_STOPPER_WRAPPER(chunk_stream_ring_group_wrapper, chunk_stream_ring_group_orig);
EXIT_STOPPER_WRAPPER(chunk_stream_group_member_wrapper, chunk_stream_group_member);
#undef EXIT_STOPPER_WRAPPER
/// Register the receiver module with Python
py::module register_module(py::module &parent)
{
using namespace pybind11::literals;
// Create the module
py::module m = parent.def_submodule("recv");
py::class_<heap_base>(m, "HeapBase")
.def_property_readonly("cnt", SPEAD2_PTMF(heap_base, get_cnt))
.def_property_readonly("flavour", SPEAD2_PTMF(heap_base, get_flavour))
.def("get_items", [](py::object &self) -> py::list
{
const heap_base &h = self.cast<const heap_base &>();
std::vector<item> base = h.get_items();
py::list out;
for (const item &it : base)
{
// Filter out descriptors here. The base class can't do so, because
// the descriptors are retrieved from the items.
if (it.id != DESCRIPTOR_ID)
out.append(item_wrapper(it, self));
}
return out;
})
.def("is_start_of_stream", SPEAD2_PTMF(heap_base, is_start_of_stream))
.def("is_end_of_stream", SPEAD2_PTMF(heap_base, is_end_of_stream));
py::class_<heap, heap_base>(m, "Heap")
.def("get_descriptors", SPEAD2_PTMF(heap, get_descriptors));
py::class_<incomplete_heap, heap_base>(m, "IncompleteHeap")
.def_property_readonly("heap_length", SPEAD2_PTMF(incomplete_heap, get_heap_length))
.def_property_readonly("received_length", SPEAD2_PTMF(incomplete_heap, get_received_length))
.def_property_readonly("payload_ranges", SPEAD2_PTMF(incomplete_heap, get_payload_ranges));
py::class_<item_wrapper>(m, "RawItem", py::buffer_protocol())
.def_readonly("id", &item_wrapper::id)
.def_readonly("is_immediate", &item_wrapper::is_immediate)
.def_readonly("immediate_value", &item_wrapper::immediate_value)
.def_buffer([](item_wrapper &item) { return item.get_value(); });
py::class_<stream_stat_config> stream_stat_config_cls(m, "StreamStatConfig");
/* We have to register the embedded enum type before we can use it as a
* default value for the stream_stat constructor/
*/
py::enum_<stream_stat_config::mode>(stream_stat_config_cls, "Mode")
.value("COUNTER", stream_stat_config::mode::COUNTER)
.value("MAXIMUM", stream_stat_config::mode::MAXIMUM);
stream_stat_config_cls
.def(
py::init<std::string, stream_stat_config::mode>(),
"name"_a, "mode"_a = stream_stat_config::mode::COUNTER)
.def_property_readonly("name", SPEAD2_PTMF(stream_stat_config, get_name))
.def_property_readonly("mode", SPEAD2_PTMF(stream_stat_config, get_mode))
.def("combine", SPEAD2_PTMF(stream_stat_config, combine))
.def(py::self == py::self)
.def(py::self != py::self);
py::class_<stream_stats> stream_stats_cls(m, "StreamStats");
stream_stats_cls
.def("__getitem__", [](const stream_stats &self, std::size_t index)
{
if (index < self.size())
return self[index];
else
throw py::index_error();
})
.def("__getitem__", [](const stream_stats &self, const std::string &name)
{
auto pos = self.find(name);
if (pos == self.end())
throw py::key_error(name);
return pos->second;
})
.def("__setitem__", [](stream_stats &self, std::size_t index, std::uint64_t value)
{
if (index < self.size())
self[index] = value;
else
throw py::index_error();
})
.def("__setitem__", [](stream_stats &self, const std::string &name, std::uint64_t value)
{
auto pos = self.find(name);
if (pos == self.end())
throw py::key_error(name);
pos->second = value;
})
.def("__contains__", [](const stream_stats &self, const std::string &name)
{
return self.find(name) != self.end();
})
.def("get", [](const stream_stats &self, const std::string &name, py::object &default_)
{
auto pos = self.find(name);
return pos != self.end() ? py::int_(pos->second) : default_;
}, py::arg(), py::arg() = py::none())
/* TODO: keys, values and items should ideally return view that
* simulate Python's dictionary views (py::bind_map does this, but it
* can't be used because it expects the map to implement erase).
*/
.def(
"items",
[](const stream_stats &self) { return py::make_iterator(self.begin(), self.end()); },
py::keep_alive<0, 1>() // keep the stats alive while it is iterated
)
.def(
"__iter__",
[](const stream_stats &self) { return py::make_key_iterator(self.begin(), self.end()); },
py::keep_alive<0, 1>() // keep the stats alive while it is iterated
)
.def(
"keys",
[](const stream_stats &self) { return py::make_key_iterator(self.begin(), self.end()); },
py::keep_alive<0, 1>() // keep the stats alive while it is iterated
)
.def(
"values",
[](const stream_stats &self) { return py::make_value_iterator(self.begin(), self.end()); },
py::keep_alive<0, 1>() // keep the stats alive while it is iterated
)
.def("__len__", SPEAD2_PTMF(stream_stats, size))
.def_property_readonly("config", SPEAD2_PTMF(stream_stats, get_config))
.def(py::self + py::self)
.def(py::self += py::self);
py::module stream_stat_indices_module = m.def_submodule("stream_stat_indices");
/* The macro registers a property on stream_stats to access the built-in stats
* by name, and at the same time populates the index constant in submodule
* stream_stat_indices (upper-casing it).
*/
#define STREAM_STATS_PROPERTY(field) \
do { \
stream_stats_cls.def_property( \
#field, \
[](const stream_stats &self) { return self[stream_stat_indices::field]; }, \
[](stream_stats &self, std::uint64_t value) { self[stream_stat_indices::field] = value; }); \
std::string upper = #field; \
std::transform(upper.begin(), upper.end(), upper.begin(), ::toupper); \
stream_stat_indices_module.attr(upper.c_str()) = stream_stat_indices::field; \
} while (false)
STREAM_STATS_PROPERTY(heaps);
STREAM_STATS_PROPERTY(incomplete_heaps_evicted);
STREAM_STATS_PROPERTY(incomplete_heaps_flushed);
STREAM_STATS_PROPERTY(packets);
STREAM_STATS_PROPERTY(batches);
STREAM_STATS_PROPERTY(worker_blocked);
STREAM_STATS_PROPERTY(max_batch);
STREAM_STATS_PROPERTY(single_packet_heaps);
STREAM_STATS_PROPERTY(search_dist);
#undef STREAM_STATS_PROPERTY
py::class_<stream_config>(m, "StreamConfig")
.def(py::init(&data_class_constructor<stream_config>))
.def_property("max_heaps",
SPEAD2_PTMF(stream_config, get_max_heaps),
SPEAD2_PTMF(stream_config, set_max_heaps))
.def_property("substreams",
SPEAD2_PTMF(stream_config, get_substreams),
SPEAD2_PTMF(stream_config, set_substreams))
.def_property("bug_compat",
SPEAD2_PTMF(stream_config, get_bug_compat),
SPEAD2_PTMF(stream_config, set_bug_compat))
.def_property("memcpy",
[](const stream_config &self) {
stream_config cmp;
memcpy_function_id ids[] = {MEMCPY_STD, MEMCPY_NONTEMPORAL};
for (memcpy_function_id id : ids)
{
cmp.set_memcpy(id);
if (equal_functions(self.get_memcpy(), cmp.get_memcpy()))
return int(id);
}
throw std::invalid_argument("memcpy function is not one of the standard ones");
},
[](stream_config &self, int id) { self.set_memcpy(memcpy_function_id(id)); })
.def_property("memory_allocator",
SPEAD2_PTMF(stream_config, get_memory_allocator),
SPEAD2_PTMF_VOID(stream_config, set_memory_allocator))
.def_property("stop_on_stop_item",
SPEAD2_PTMF(stream_config, get_stop_on_stop_item),
SPEAD2_PTMF_VOID(stream_config, set_stop_on_stop_item))
.def_property("allow_unsized_heaps",
SPEAD2_PTMF(stream_config, get_allow_unsized_heaps),
SPEAD2_PTMF_VOID(stream_config, set_allow_unsized_heaps))
.def_property("allow_out_of_order",
SPEAD2_PTMF(stream_config, get_allow_out_of_order),
SPEAD2_PTMF_VOID(stream_config, set_allow_out_of_order))
.def_property("stream_id",
SPEAD2_PTMF(stream_config, get_stream_id),
SPEAD2_PTMF(stream_config, set_stream_id))
.def("add_stat", SPEAD2_PTMF(stream_config, add_stat),
"name"_a,
"mode"_a = stream_stat_config::mode::COUNTER)
.def_property_readonly("stats", SPEAD2_PTMF(stream_config, get_stats))
.def("get_stat_index", SPEAD2_PTMF(stream_config, get_stat_index),
"name"_a)
.def("next_stat_index", SPEAD2_PTMF(stream_config, next_stat_index))
.def_readonly_static("DEFAULT_MAX_HEAPS", &stream_config::default_max_heaps);
py::class_<ring_stream_config_wrapper>(m, "RingStreamConfig")
.def(py::init(&data_class_constructor<ring_stream_config_wrapper>))
.def_property("heaps",
SPEAD2_PTMF(ring_stream_config_wrapper, get_heaps),
SPEAD2_PTMF_VOID(ring_stream_config_wrapper, set_heaps))
.def_property("contiguous_only",
SPEAD2_PTMF(ring_stream_config_wrapper, get_contiguous_only),
SPEAD2_PTMF_VOID(ring_stream_config_wrapper, set_contiguous_only))
.def_property("incomplete_keep_payload_ranges",
SPEAD2_PTMF(ring_stream_config_wrapper, get_incomplete_keep_payload_ranges),
SPEAD2_PTMF_VOID(ring_stream_config_wrapper, set_incomplete_keep_payload_ranges))
.def_readonly_static("DEFAULT_HEAPS", &ring_stream_config_wrapper::default_heaps);
#if SPEAD2_USE_IBV
py::class_<udp_ibv_config_wrapper>(m, "UdpIbvConfig")
.def(py::init(&data_class_constructor<udp_ibv_config_wrapper>))
.def_readwrite("endpoints", &udp_ibv_config_wrapper::py_endpoints)
.def_readwrite("interface_address", &udp_ibv_config_wrapper::py_interface_address)
.def_property("buffer_size",
SPEAD2_PTMF(udp_ibv_config_wrapper, get_buffer_size),
SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_buffer_size))
.def_property("max_size",
SPEAD2_PTMF(udp_ibv_config_wrapper, get_max_size),
SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_max_size))
.def_property("comp_vector",
SPEAD2_PTMF(udp_ibv_config_wrapper, get_comp_vector),
SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_comp_vector))
.def_property("max_poll",
SPEAD2_PTMF(udp_ibv_config_wrapper, get_max_poll),
SPEAD2_PTMF_VOID(udp_ibv_config_wrapper, set_max_poll))
.def_readonly_static("DEFAULT_BUFFER_SIZE", &udp_ibv_config_wrapper::default_buffer_size)
.def_readonly_static("DEFAULT_MAX_SIZE", &udp_ibv_config_wrapper::default_max_size)
.def_readonly_static("DEFAULT_MAX_POLL", &udp_ibv_config_wrapper::default_max_poll);
#endif // SPEAD2_USE_IBV
py::class_<stream>(m, "_Stream")
// SPEAD2_PTMF doesn't work for get_stats because it's defined in stream_base, which is a protected ancestor
.def_property_readonly("stats", [](const stream &self) { return self.get_stats(); })
.def_property_readonly("config",
[](const stream &self) { return self.get_config(); })
.def("add_buffer_reader", add_buffer_reader, "buffer"_a)
.def("add_udp_reader", add_udp_reader,
"port"_a,
"max_size"_a = udp_reader::default_max_size,
"buffer_size"_a = udp_reader::default_buffer_size,
"bind_hostname"_a = std::string())
.def("add_udp_reader", add_udp_reader_socket,
"socket"_a,
"max_size"_a = udp_reader::default_max_size)
.def("add_udp_reader", add_udp_reader_bind_v4,
"multicast_group"_a,
"port"_a,
"max_size"_a = udp_reader::default_max_size,
"buffer_size"_a = udp_reader::default_buffer_size,
"interface_address"_a = "0.0.0.0")
.def("add_udp_reader", add_udp_reader_bind_v6,
"multicast_group"_a,
"port"_a,
"max_size"_a = udp_reader::default_max_size,
"buffer_size"_a = udp_reader::default_buffer_size,
"interface_index"_a = (unsigned int) 0)
.def("add_tcp_reader", add_tcp_reader,
"port"_a,
"max_size"_a = tcp_reader::default_max_size,
"buffer_size"_a = tcp_reader::default_buffer_size,
"bind_hostname"_a = std::string())
.def("add_tcp_reader", add_tcp_reader_socket,
"acceptor"_a,
"max_size"_a = tcp_reader::default_max_size)
#if SPEAD2_USE_IBV
.def("add_udp_ibv_reader", add_udp_ibv_reader,
"config"_a)
#endif
#if SPEAD2_USE_PCAP
.def("add_udp_pcap_file_reader", add_udp_pcap_file_reader,
"filename"_a, "filter"_a = "")
#endif
.def("add_inproc_reader", add_inproc_reader,
"queue"_a)
.def("stop", SPEAD2_PTMF(stream, stop))
.def_readonly_static("DEFAULT_UDP_MAX_SIZE", &udp_reader::default_max_size)
.def_readonly_static("DEFAULT_UDP_BUFFER_SIZE", &udp_reader::default_buffer_size)
.def_readonly_static("DEFAULT_TCP_MAX_SIZE", &tcp_reader::default_max_size)
.def_readonly_static("DEFAULT_TCP_BUFFER_SIZE", &tcp_reader::default_buffer_size);
py::class_<ring_stream_wrapper, stream> stream_class(m, "Stream");
stream_class
.def(py::init<std::shared_ptr<thread_pool_wrapper>,
const stream_config &,
const ring_stream_config_wrapper &>(),
"thread_pool"_a.none(false), "config"_a = stream_config(),
"ring_config"_a = ring_stream_config_wrapper())
.def("__iter__", [](py::object self) { return self; })
.def("__next__", SPEAD2_PTMF(ring_stream_wrapper, next))
.def("get", SPEAD2_PTMF(ring_stream_wrapper, get))
.def("get_nowait", SPEAD2_PTMF(ring_stream_wrapper, get_nowait))
.def_property_readonly("fd", SPEAD2_PTMF(ring_stream_wrapper, get_fd))
.def_property_readonly("ringbuffer", SPEAD2_PTMF(ring_stream_wrapper, get_ringbuffer))
.def_property_readonly("ring_config", SPEAD2_PTMF(ring_stream_wrapper, get_ring_config));
using Ringbuffer = ringbuffer<live_heap, semaphore_fd, semaphore>;
py::class_<Ringbuffer>(stream_class, "Ringbuffer")
.def("size", SPEAD2_PTMF(Ringbuffer, size))
.def("capacity", SPEAD2_PTMF(Ringbuffer, capacity));
py::class_<chunk_stream_config>(m, "ChunkStreamConfig")
.def(py::init(&data_class_constructor<chunk_stream_config>))
.def_property("items",
SPEAD2_PTMF(chunk_stream_config, get_items),
SPEAD2_PTMF(chunk_stream_config, set_items))
.def_property("max_chunks",
SPEAD2_PTMF(chunk_stream_config, get_max_chunks),
SPEAD2_PTMF(chunk_stream_config, set_max_chunks))
.def_property(
"place",
[](const chunk_stream_config &config) {
return callback_to_python(config.get_place());
},
[](chunk_stream_config &config, py::object obj) {
config.set_place(callback_from_python<chunk_place_function>(
obj,
"void (void *, size_t)",
"void (void *, size_t, void *)"
));
})
.def(
"enable_packet_presence", SPEAD2_PTMF(chunk_stream_config, enable_packet_presence),
"payload_size"_a)
.def("disable_packet_presence", SPEAD2_PTMF(chunk_stream_config, disable_packet_presence))
.def_property_readonly("packet_presence_payload_size",
SPEAD2_PTMF(chunk_stream_config, get_packet_presence_payload_size))
.def_property("max_heap_extra",
SPEAD2_PTMF(chunk_stream_config, get_max_heap_extra),
SPEAD2_PTMF(chunk_stream_config, set_max_heap_extra))
.def_readonly_static("DEFAULT_MAX_CHUNKS", &chunk_stream_config::default_max_chunks);
py::class_<chunk>(m, "Chunk")
.def(py::init(&data_class_constructor<chunk>))
.def_readwrite("chunk_id", &chunk::chunk_id)
.def_readwrite("stream_id", &chunk::stream_id)
// Can't use def_readwrite for present, data, extra because they're
// non-copyable types
.def_property(
"present",
[](const chunk &c) -> const memory_allocator::pointer & { return c.present; },
[](chunk &c, memory_allocator::pointer &&value)
{
if (value)
{
auto *alloc = get_buffer_allocation(value);
assert(alloc != nullptr);
c.present_size = alloc->buffer_info.size * alloc->buffer_info.itemsize;
}
else
c.present_size = 0;
c.present = std::move(value);
})
.def_property(
"data",
[](const chunk &c) -> const memory_allocator::pointer & { return c.data; },
[](chunk &c, memory_allocator::pointer &&value) { c.data = std::move(value); })
.def_property(
"extra",
[](const chunk &c) -> const memory_allocator::pointer & { return c.extra; },
[](chunk &c, memory_allocator::pointer &&value) { c.extra = std::move(value); });
// Don't allow ChunkRingPair to be constructed from Python. It exists
// purely to be a base class.
using chunk_ring_pair = detail::chunk_ring_pair<chunk_ringbuffer, chunk_ringbuffer>;
py::class_<chunk_ring_pair>(m, "ChunkRingPair")
.def(
"add_free_chunk",
[](chunk_ring_pair &self, chunk &c)
{
push_chunk(
[&self](std::unique_ptr<chunk> &&wrapper)
{
self.add_free_chunk(std::move(wrapper));
},
c
);
},
"chunk"_a)
.def_property_readonly("data_ringbuffer", SPEAD2_PTMF(chunk_ring_pair, get_data_ringbuffer))
.def_property_readonly("free_ringbuffer", SPEAD2_PTMF(chunk_ring_pair, get_free_ringbuffer));
py::class_<chunk_ring_stream_wrapper,
detail::chunk_ring_pair<chunk_ringbuffer, chunk_ringbuffer>,
stream>(m, "ChunkRingStream")
.def(py::init<std::shared_ptr<thread_pool_wrapper>,
const stream_config &,
const chunk_stream_config &,
std::shared_ptr<chunk_ringbuffer>,
std::shared_ptr<chunk_ringbuffer>>(),
"thread_pool"_a.none(false),
"config"_a = stream_config(),
"chunk_stream_config"_a,
"data_ringbuffer"_a.none(false),
"free_ringbuffer"_a.none(false),
// Keep the Python ringbuffer objects alive, not just the C++ side.
// This allows Python subclasses to be passed then later retrieved
// from properties.
py::keep_alive<1, 5>(),
py::keep_alive<1, 6>());
py::class_<chunk_ringbuffer, std::shared_ptr<chunk_ringbuffer>>(m, "ChunkRingbuffer")
.def(py::init<std::size_t>(), "maxsize"_a)
.def("qsize", SPEAD2_PTMF(chunk_ringbuffer, size))
.def_property_readonly("maxsize", SPEAD2_PTMF(chunk_ringbuffer, capacity))
.def_property_readonly(
"data_fd",
[](const chunk_ringbuffer &ring) { return ring.get_data_sem().get_fd(); })
.def_property_readonly(
"space_fd",
[](const chunk_ringbuffer &ring) { return ring.get_space_sem().get_fd(); })
.def("get", [](chunk_ringbuffer &ring) { return unwrap_chunk(ring.pop(gil_release_tag())); })
.def("get_nowait", [](chunk_ringbuffer &ring) { return unwrap_chunk(ring.try_pop()); })
.def(
"put",
[](chunk_ringbuffer &ring, chunk &c)
{
push_chunk(
[&ring](std::unique_ptr<chunk> &&wrapper)
{
ring.push(std::move(wrapper), gil_release_tag());
},
c
);
},
"chunk"_a)
.def(
"put_nowait",
[](chunk_ringbuffer &ring, chunk &c)
{
push_chunk(
[&ring](std::unique_ptr<chunk> &&wrapper) { ring.try_push(std::move(wrapper)); },
c
);
},
"chunk"_a)
.def("empty", [](const chunk_ringbuffer &ring) { return ring.size() == 0; })
.def("full", [](const chunk_ringbuffer &ring) { return ring.size() == ring.capacity(); })
.def("stop", SPEAD2_PTMF(chunk_ringbuffer, stop))
.def("add_producer", SPEAD2_PTMF(chunk_ringbuffer, add_producer))
.def("remove_producer", SPEAD2_PTMF(chunk_ringbuffer, remove_producer))
.def("__iter__", [](py::object self) { return self; })
.def(
"__next__", [](chunk_ringbuffer &ring)
{
try
{
return unwrap_chunk(ring.pop(gil_release_tag()));
}
catch (ringbuffer_stopped &)
{
throw py::stop_iteration();
}
});
py::class_<chunk_stream_group_config> chunk_stream_group_config_cls(m, "ChunkStreamGroupConfig");
chunk_stream_group_config_cls
.def(py::init(&data_class_constructor<chunk_stream_group_config>))
.def_property("max_chunks",
SPEAD2_PTMF(chunk_stream_group_config, get_max_chunks),
SPEAD2_PTMF(chunk_stream_group_config, set_max_chunks))
.def_property("eviction_mode",
SPEAD2_PTMF(chunk_stream_group_config, get_eviction_mode),
SPEAD2_PTMF(chunk_stream_group_config, set_eviction_mode))
.def_readonly_static("DEFAULT_MAX_CHUNKS", &chunk_stream_group_config::default_max_chunks);
py::enum_<chunk_stream_group_config::eviction_mode>(chunk_stream_group_config_cls, "EvictionMode")
.value("LOSSY", chunk_stream_group_config::eviction_mode::LOSSY)
.value("LOSSLESS", chunk_stream_group_config::eviction_mode::LOSSLESS);
py::class_<chunk_stream_group_member, stream>(m, "ChunkStreamGroupMember");
py::class_<chunk_stream_ring_group_wrapper,
detail::chunk_ring_pair<chunk_ringbuffer, chunk_ringbuffer>>(m, "ChunkStreamRingGroup")
.def(py::init<const chunk_stream_group_config &,
std::shared_ptr<chunk_ringbuffer>,
std::shared_ptr<chunk_ringbuffer>>(),
"config"_a,
"data_ringbuffer"_a.none(false),
"free_ringbuffer"_a.none(false),
// Keep the Python ringbuffer objects alive, not just the C++ side.
// This allows Python subclasses to be passed then later retrieved
// from properties.
py::keep_alive<1, 3>(),
py::keep_alive<1, 4>())
.def_property_readonly(
"config", SPEAD2_PTMF(chunk_stream_ring_group_wrapper, get_config))
.def(
"emplace_back",
[](chunk_stream_ring_group_wrapper &group,
std::shared_ptr<thread_pool_wrapper> thread_pool,
const stream_config &config,
const chunk_stream_config &chunk_stream_config) -> chunk_stream_group_member & {
return group.emplace_back<chunk_stream_group_member_wrapper>(std::move(thread_pool), config, chunk_stream_config);
},
"thread_pool"_a, "config"_a, "chunk_stream_config"_a,
py::return_value_policy::reference_internal
)
.def("__len__", SPEAD2_PTMF(chunk_stream_ring_group_wrapper, size))
.def(
"__getitem__",
[](chunk_stream_ring_group_wrapper &group, std::ptrdiff_t index) -> chunk_stream_group_member & {
if (index < 0)
index += group.size();
if (index >= 0 && std::size_t(index) < group.size())
return group[index];
else
throw py::index_error();
},
py::return_value_policy::reference_internal
)
.def(
"__getitem__",
[](chunk_stream_ring_group_wrapper &group, const py::slice &slice) {
py::list out;
std::size_t start, stop, step, length;
if (!slice.compute(group.size(), &start, &stop, &step, &length))
throw py::error_already_set();
py::object self = py::cast(group);
for (std::size_t i = 0; i < length; i++) {
out.append(py::cast(group[start], py::return_value_policy::reference_internal, self));
start += step;
}
return out;
}
)
.def("stop", SPEAD2_PTMF(chunk_stream_ring_group_wrapper, stop));
return m;
}
} // namespace recv
} // namespace spead2