https://github.com/ska-sa/spead2
Raw File
Tip revision: 44f4e378a63803f2a319a4d3cfce5e1de6dd23c4 authored by Bruce Merry on 21 January 2019, 13:55:18 UTC
Flake8 fix
Tip revision: 44f4e37
recv_inproc.cpp
/* Copyright 2018 SKA South Africa
 *
 * This program is free software: you can redistribute it and/or modify it under
 * the terms of the GNU Lesser General Public License as published by the Free
 * Software Foundation, either version 3 of the License, or (at your option) any
 * later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 * details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

/**
 * @file
 */

#include <cstddef>
#include <memory>
#include <functional>
#include <spead2/common_inproc.h>
#include <spead2/common_logging.h>
#include <spead2/recv_inproc.h>
#include <spead2/recv_reader.h>

namespace spead2
{
namespace recv
{

inproc_reader::inproc_reader(
    stream &owner,
    std::shared_ptr<inproc_queue> queue)
    : reader(owner),
    queue(std::move(queue)),
    data_sem_wrapper(wrap_fd(get_io_service(),
                             this->queue->buffer.get_data_sem().get_fd()))
{
    enqueue();
}

void inproc_reader::process_one_packet(stream_base::add_packet_state &state,
                                       const inproc_queue::packet &packet)
{
    packet_header header;
    std::size_t size = decode_packet(header, packet.data.get(), packet.size);
    if (size == packet.size)
    {
        get_stream_base().add_packet(state, header);
    }
    else if (size != 0)
    {
        log_info("discarding packet due to size mismatch (%1% != %2%)", size, packet.size);
    }
}

void inproc_reader::packet_handler(
    const boost::system::error_code &error,
    std::size_t bytes_transferred)
{
    if (get_stream_base().is_stopped())
    {
        log_info("inproc reader: discarding packet received after stream stopped");
    }
    else
    {
        stream_base::add_packet_state state(get_stream_base());
        try
        {
            inproc_queue::packet packet = queue->buffer.try_pop();
            process_one_packet(state, packet);
            /* TODO: could grab a batch of packets to amortise costs */
        }
        catch (ringbuffer_stopped)
        {
            get_stream_base().stop_received();
        }
        catch (ringbuffer_empty)
        {
            // spurious wakeup - no action needed
        }
    }
    if (!get_stream_base().is_stopped())
        enqueue();
    else
    {
        data_sem_wrapper.close();
        stopped();
    }
}

void inproc_reader::enqueue()
{
    using namespace std::placeholders;
    data_sem_wrapper.async_read_some(
        boost::asio::null_buffers(),
        get_stream().get_strand().wrap(std::bind(&inproc_reader::packet_handler, this, _1, _2)));
}

void inproc_reader::stop()
{
    data_sem_wrapper.close();
}

bool inproc_reader::lossy() const
{
    return false;
}

} // namespace recv
} // namespace spead2
back to top