https://github.com/ska-sa/spead2
Raw File
Tip revision: 953d63ff013cb1cf7beab747fd1fab9ce112788c authored by Bruce Merry on 08 September 2023, 12:52:18 UTC
Fix dependency on numpy and spelling of test-numba
Tip revision: 953d63f
spead2_recv.cpp
/* Copyright 2015, 2018-2020, 2023 National Research Foundation (SARAO)
 *
 * This program is free software: you can redistribute it and/or modify it under
 * the terms of the GNU Lesser General Public License as published by the Free
 * Software Foundation, either version 3 of the License, or (at your option) any
 * later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 * details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <iostream>
#include <utility>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <vector>
#include <string>
#include <memory>
#include <new>
#include <random>
#include <boost/program_options.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <spead2/common_thread_pool.h>
#include <spead2/common_endian.h>
#include <spead2/recv_udp.h>
#include <spead2/recv_tcp.h>
#if SPEAD2_USE_IBV
# include <spead2/recv_udp_ibv.h>
#endif
#if SPEAD2_USE_PCAP
# include <spead2/recv_udp_pcap.h>
#endif
#include <spead2/recv_heap.h>
#include <spead2/recv_live_heap.h>
#include <spead2/recv_ring_stream.h>
#include "spead2_cmdline.h"

namespace po = boost::program_options;
namespace asio = boost::asio;

struct options
{
    bool quiet = false;
    bool descriptors = false;
    bool joint = false;
    int threads = 1;
    bool verify = false;
    std::vector<std::string> sources;
    spead2::protocol_options protocol;
    spead2::recv::receiver_options receiver;
};

typedef std::chrono::time_point<std::chrono::high_resolution_clock> time_point;

static time_point start = std::chrono::high_resolution_clock::now();

static void usage(std::ostream &o, const po::options_description &desc)
{
    o << "Usage: spead2_recv [options] [host]:<port>|file\n";
    o << desc;
}

static options parse_args(int argc, const char **argv)
{
    options opts;
    po::options_description desc, hidden, all;
    spead2::option_adder adder(desc);
    opts.protocol.enumerate(adder);
    opts.receiver.enumerate(adder);
    desc.add_options()
        ("quiet", spead2::make_value_semantic(&opts.quiet), "Only show total of heaps received")
        ("descriptors", spead2::make_value_semantic(&opts.descriptors), "Show descriptors")
        ("joint", spead2::make_value_semantic(&opts.joint), "Treat all sources as a single stream")
        ("threads", spead2::make_value_semantic(&opts.threads), "Number of worker threads")
        ("verify", spead2::make_value_semantic(&opts.verify), "Verify payload (use spead2_send with same option")
    ;

    hidden.add_options()
        ("source", spead2::make_value_semantic(&opts.sources), "sources");
    desc.add_options()
        ("help,h", "Show help text");
    all.add(desc);
    all.add(hidden);

    po::positional_options_description positional;
    positional.add("source", -1);
    try
    {
        po::variables_map vm;
        po::store(po::command_line_parser(argc, argv)
            .style(po::command_line_style::default_style & ~po::command_line_style::allow_guessing)
            .options(all)
            .positional(positional)
            .run(), vm);
        po::notify(vm);
        if (vm.count("help"))
        {
            usage(std::cout, desc);
            std::exit(0);
        }
        if (opts.sources.empty())
            throw po::error("At least one source is required");
        opts.protocol.notify();
        opts.receiver.notify(opts.protocol);
        return opts;
    }
    catch (po::error &e)
    {
        std::cerr << e.what() << '\n';
        usage(std::cerr, desc);
        std::exit(2);
    }
}

static void show_heap(const spead2::recv::heap &fheap, const options &opts)
{
    if (opts.quiet)
        return;
    time_point now = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = now - start;
    std::cout << std::showbase;
    std::cout << "Received heap " << fheap.get_cnt() << " at " << elapsed.count() << '\n';
    if (opts.descriptors)
    {
        std::vector<spead2::descriptor> descriptors = fheap.get_descriptors();
        for (const auto &descriptor : descriptors)
        {
            std::cout
                << "Descriptor for " << descriptor.name
                << " (" << std::hex << descriptor.id << std::dec << ")\n"
                << "  description: " << descriptor.description << '\n'
                << "  format:      [";
            bool first = true;
            for (const auto &[ftype, fsize] : descriptor.format)
            {
                if (!first)
                    std::cout << ", ";
                first = false;
                std::cout << '(' << ftype << ", " << fsize << ')';
            }
            std::cout << "]\n";
            std::cout
                << "  dtype:       " << descriptor.numpy_header << '\n'
                << "  shape:       (";
            first = true;
            for (const auto &size : descriptor.shape)
            {
                if (!first)
                    std::cout << ", ";
                first = false;
                if (size == -1)
                    std::cout << "?";
                else
                    std::cout << size;
            }
            std::cout << ")\n";
        }
    }
    const auto &items = fheap.get_items();
    for (const auto &item : items)
    {
        std::cout << std::hex << item.id << std::dec
            << " = [" << item.length << " bytes]";
        if (item.is_immediate)
            std::cout << " = " << std::hex << item.immediate_value << std::dec;
        std::cout << '\n';
    }
    std::cout << std::noshowbase;
}

/* O(log(z)) version of engine.discard(z), which in GCC seems to be implemented in O(z). */
template<std::uint_fast32_t a, std::uint_fast32_t m>
static void fast_discard(std::linear_congruential_engine<std::uint_fast32_t, a, 0, m> &engine,
                         unsigned long long z)
{
    if (z == 0)
        return;
    // There is no way to directly observe the current state. We can only
    // advance to the following state.
    std::uint64_t x = engine();
    z--;
    // Multiply by a^z mod m
    std::uint64_t apow = a;
    while (z > 0)
    {
        if (z & 1)
            x = x * apow % m;
        apow = apow * apow % m;
        z >>= 1;
    }
    engine.seed(x);
}

static void verify_heap(const spead2::recv::heap &fheap, const options &opts)
{
    if (!opts.verify)
        return;
    const auto &items = fheap.get_items();

    typedef uint32_t element_t;
    bool first = true;
    std::size_t elements = 0;
    std::minstd_rand generator;
    for (const auto &item : items)
    {
        if (item.id < 0x1000)
            continue;
        if (first)
        {
            elements = item.length / sizeof(element_t);
            // The first heap gets numbered 1 rather than 0
            std::uint64_t start_pos = elements * items.size() * (fheap.get_cnt() - 1);
            fast_discard(generator, start_pos);
            first = false;
        }
        if (item.length != elements * sizeof(element_t))
        {
            std::cerr << "Heap " << fheap.get_cnt()
                << ", item 0x" << std::hex << item.id << std::dec
                << " has an inconsistent length\n";
            std::exit(1);
        }
        const element_t *data = std::launder(reinterpret_cast<const element_t *>(item.ptr));
        for (std::size_t i = 0; i < elements; i++)
        {
            element_t expected = generator();
            element_t actual = spead2::betoh(data[i]);
            if (expected != actual)
            {
                std::cerr << "Verification mismatch in heap " << fheap.get_cnt()
                    << ", item 0x" << std::hex << item.id << std::dec
                    << " offset " << i
                    << "\nexpected 0x" << std::hex << expected << ", actual 0x" << actual << std::dec
                    << std::endl;
                std::exit(1);
            }
        }
    }

    if (first && !fheap.is_end_of_stream())
    {
        spead2::log_warning("Heap %d has no verifiable items but is not an end-of-stream heap",
                            fheap.get_cnt());
    }
}

class callback_stream : public spead2::recv::stream
{
private:
    std::int64_t n_complete = 0;
    const options opts;

    virtual void heap_ready(spead2::recv::live_heap &&heap) override
    {
        if (heap.is_contiguous())
        {
            spead2::recv::heap frozen(std::move(heap));
            show_heap(frozen, opts);
            verify_heap(frozen, opts);
            n_complete++;
        }
        else if (!opts.quiet)
            std::cout << "Discarding incomplete heap " << heap.get_cnt() << '\n';
    }

    std::promise<void> stop_promise;

public:
    template<typename... Args>
    callback_stream(const options &opts, Args&&... args)
        : spead2::recv::stream::stream(std::forward<Args>(args)...),
        opts(opts) {}

    ~callback_stream()
    {
        stop();
    }

    virtual void stop_received() override
    {
        spead2::recv::stream::stop_received();
        stop_promise.set_value();
    }

    std::int64_t join()
    {
        std::future<void> future = stop_promise.get_future();
        future.get();
        return n_complete;
    }
};

template<typename It>
static std::unique_ptr<spead2::recv::stream> make_stream(
    spead2::thread_pool &thread_pool, const options &opts,
    It first_source, It last_source)
{
    using asio::ip::udp;
    using asio::ip::tcp;

    std::unique_ptr<spead2::recv::stream> stream;
    spead2::recv::stream_config config = opts.receiver.make_stream_config(opts.protocol);

    if (opts.receiver.ring)
    {
        spead2::recv::ring_stream_config ring_config = opts.receiver.make_ring_stream_config();
        stream.reset(new spead2::recv::ring_stream<>(thread_pool, config, ring_config));
    }
    else
        stream.reset(new callback_stream(opts, thread_pool, config));

    std::vector<std::string> endpoints(first_source, last_source);
    opts.receiver.add_readers(*stream, endpoints, opts.protocol, true);
    return stream;
}

int main(int argc, const char **argv)
{
    options opts = parse_args(argc, argv);

    spead2::thread_pool thread_pool(opts.threads);
    std::vector<std::unique_ptr<spead2::recv::stream> > streams;
    if (opts.joint)
    {
        streams.push_back(make_stream(thread_pool, opts, opts.sources.begin(), opts.sources.end()));
    }
    else
    {
        if (opts.sources.size() > 1 && opts.receiver.ring)
        {
            std::cerr << "Multiple streams cannot be used with --ring\n";
            std::exit(2);
        }
        for (auto it = opts.sources.begin(); it != opts.sources.end(); ++it)
            streams.push_back(make_stream(thread_pool, opts, it, it + 1));
    }

    spead2::thread_pool stopper_thread_pool;
    boost::asio::signal_set signals(stopper_thread_pool.get_io_service());
    signals.add(SIGINT);
    signals.async_wait([&streams] (const boost::system::error_code &error,
                                   [[maybe_unused]] int signal_number) {
        if (!error)
            for (const std::unique_ptr<spead2::recv::stream> &stream : streams)
            {
                stream->stop();
            }
    });

    std::int64_t n_complete = 0;
    if (opts.receiver.ring)
    {
        auto &stream = dynamic_cast<spead2::recv::ring_stream<> &>(*streams[0]);
        while (true)
        {
            try
            {
                spead2::recv::heap fh = stream.pop();
                n_complete++;
                show_heap(fh, opts);
            }
            catch (spead2::ringbuffer_stopped &e)
            {
                break;
            }
        }
    }
    else
    {
        for (const auto &ptr : streams)
        {
            auto &stream = dynamic_cast<callback_stream &>(*ptr);
            n_complete += stream.join();
        }
    }
    signals.cancel();
    spead2::recv::stream_stats stats;
    for (auto &ptr : streams)
    {
        /* Even though we've seen the stop condition, if we don't explicitly
         * stop the stream then a race condition means we might not see the
         * last batch of statistics updates.
         */
        ptr->stop();
        stats += ptr->get_stats();
    }

    std::cout << "Received " << n_complete << " heaps\n";
    for (const auto &[name, value] : stats)
        std::cout << name << ": " << value << '\n';
    return 0;
}
back to top