https://github.com/ska-sa/spead2
Raw File
Tip revision: 953d63ff013cb1cf7beab747fd1fab9ce112788c authored by Bruce Merry on 08 September 2023, 12:52:18 UTC
Fix dependency on numpy and spelling of test-numba
Tip revision: 953d63f
recv_live_heap.cpp
/* Copyright 2015, 2020, 2023 National Research Foundation (SARAO)
 *
 * This program is free software: you can redistribute it and/or modify it under
 * the terms of the GNU Lesser General Public License as published by the Free
 * Software Foundation, either version 3 of the License, or (at your option) any
 * later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 * details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

/**
 * @file
 */

#include <cassert>
#include <cstring>
#include <algorithm>
#include <utility>
#include <stdexcept>
#include <spead2/recv_live_heap.h>
#include <spead2/recv_utils.h>
#include <spead2/common_defines.h>
#include <spead2/common_endian.h>
#include <spead2/common_logging.h>

namespace spead2::recv
{

live_heap::live_heap(const packet_header &initial_packet,
                     bug_compat_mask bug_compat)
    : cnt(initial_packet.heap_cnt),
    decoder(initial_packet.heap_address_bits),
    bug_compat(bug_compat)
{
    assert(cnt >= 0);
}

void live_heap::payload_reserve(std::size_t size, bool exact, const packet_header &packet,
                                memory_allocator &allocator)
{
    if (size > payload_reserved)
    {
        if (!exact && size < payload_reserved * 2)
        {
            size = payload_reserved * 2;
        }
        memory_allocator::pointer new_payload;
        new_payload = allocator.allocate(size, (void *) &packet);
        if (payload && new_payload)
            std::memcpy(new_payload.get(), payload.get(), payload_reserved);
        payload = std::move(new_payload);
        payload_reserved = size;
    }
}

bool live_heap::add_payload_range(s_item_pointer_t first, s_item_pointer_t last)
{
    decltype(payload_ranges)::iterator prev, next, ptr;
    next = payload_ranges.upper_bound(first);
    if (next != payload_ranges.end() && next->first < last)
    {
        log_warning("packet rejected because it partially overlaps existing payload");
        return false;
    }
    else if (next == payload_ranges.begin()
        || (prev = std::prev(next))->second < first)
    {
        // The prior range, if any, does not intersect this one
        ptr = payload_ranges.emplace_hint(next, first, last);
    }
    else if (prev->second == first)
    {
        prev->second = last;
        ptr = prev;
    }
    else
    {
        /* Only debug level for this, because it can legitimately happen in the
         * network. There are also ways this can happen with a partial overlap
         * instead of a duplicate, but it would cost more cycles to test for
         * it.
         */
        log_debug("packet rejected because it is a duplicate");
        return false;
    }

    if (next != payload_ranges.end() && next->first == last)
    {
        ptr->second = next->second;
        payload_ranges.erase(next);
    }
    return true;
}

void live_heap::add_pointers(std::size_t n, const std::uint8_t *pointers)
{
    for (std::size_t i = 0; i < n; i++)
    {
        item_pointer_t pointer = load_be<item_pointer_t>(pointers + i * sizeof(item_pointer_t));
        if (!decoder.is_immediate(pointer))
            min_length = std::max(min_length, s_item_pointer_t(decoder.get_address(pointer)));
        s_item_pointer_t item_id = decoder.get_id(pointer);
        if (item_id == 0 || item_id > PAYLOAD_LENGTH_ID)
        {
            /* NULL items are included because they can be direct-addressed, and this
             * pointer may determine the length of the previous direct-addressed item.
             */
            bool seen;
            if (n_inline_pointers >= 0)
                seen = std::count(inline_pointers.begin(), inline_pointers.begin() + n_inline_pointers,
                                  pointer);
            else
                seen = seen_pointers.count(pointer);
            if (!seen)
            {
                if (n_inline_pointers == max_inline_pointers)
                {
                    external_pointers.reserve(n_inline_pointers + (n - i));
                    external_pointers.insert(external_pointers.end(),
                                             inline_pointers.begin(),
                                             inline_pointers.begin() + n_inline_pointers);
                    seen_pointers.insert(inline_pointers.begin(),
                                         inline_pointers.begin() + n_inline_pointers);
                    n_inline_pointers = -1;
                }

                if (n_inline_pointers >= 0)
                {
                    inline_pointers[n_inline_pointers++] = pointer;
                }
                else
                {
                    external_pointers.push_back(pointer);
                    seen_pointers.insert(pointer);
                }

                if (item_id == STREAM_CTRL_ID && decoder.is_immediate(pointer)
                    && decoder.get_immediate(pointer) == CTRL_STREAM_STOP)
                    end_of_stream = true;
            }
        }
    }
}

bool live_heap::add_packet(const packet_header &packet,
                           const packet_memcpy_function &packet_memcpy,
                           memory_allocator &allocator,
                           bool allow_out_of_order)
{
    /* It's important that these initial checks can't fail for a
     * just-constructed live heap, because otherwise an initial_packet could
     * create a heap with a specific flavour but then get rejected.
     */
    assert(cnt == packet.heap_cnt);
    if (heap_length >= 0
        && packet.heap_length >= 0
        && packet.heap_length != heap_length)
    {
        // this could cause overflows later if not caught
        log_info("packet rejected because its HEAP_LEN is inconsistent with the heap");
        return false;
    }
    if (packet.heap_length >= 0 && packet.heap_length < min_length)
    {
        log_info("packet rejected because its HEAP_LEN is too small for the heap");
        return false;
    }
    if (packet.heap_address_bits != decoder.address_bits())
    {
        log_info("packet rejected because its flavour is inconsistent with the heap");
        return false;
    }

    // Packet seems sane, check if we've already seen it, and if not, insert it
    bool new_packet;
    if (allow_out_of_order)
    {
        new_packet = add_payload_range(packet.payload_offset,
                                       packet.payload_offset + packet.payload_length);
    }
    else if (packet.payload_offset == received_length)
        new_packet = true;
    else
    {
        if (packet.payload_offset < received_length)
            log_warning("packet rejected because it is out-of-order in the heap "
                        "(you might need to set allow_out_of_order to false in the stream config)");
        else
            log_debug("packet rejected because there is a gap in the heap and "
                      "allow_out_of_order is false");
        new_packet = false;
    }

    if (!new_packet)
        return false;

    ///////////////////////////////////////////////
    // Packet is now accepted, and we modify state
    ///////////////////////////////////////////////

    if (packet.heap_length >= 0)
    {
        // If this is the first time we know the length, record it
        if (heap_length < 0)
        {
            heap_length = packet.heap_length;
            min_length = std::max(min_length, heap_length);
            payload_reserve(min_length, true, packet, allocator);
        }
    }
    else
    {
        min_length = std::max(min_length, packet.payload_offset + packet.payload_length);
        payload_reserve(min_length, false, packet, allocator);
    }

    add_pointers(packet.n_items, packet.pointers);

    if (packet.payload_length > 0)
    {
        packet_memcpy(payload, packet);
        received_length += packet.payload_length;
    }
    log_debug("packet with %d bytes of payload at offset %d added to heap %d",
              packet.payload_length, packet.payload_offset, cnt);
    return true;
}

bool live_heap::is_complete() const
{
    /* The check against min_length is purely a sanity check on the packet:
     * if it contains item pointer offsets that are bigger than the explicitly
     * provided heap length, it cannot be decoded correctly.
     */
    return received_length == heap_length && received_length == min_length;
}

bool live_heap::is_contiguous() const
{
    return received_length == min_length;
}

bool live_heap::is_end_of_stream() const
{
    return end_of_stream;
}

s_item_pointer_t live_heap::get_received_length() const
{
    return received_length;
}

s_item_pointer_t live_heap::get_heap_length() const
{
    return heap_length;
}

item_pointer_t *live_heap::pointers_begin()
{
    if (n_inline_pointers >= 0)
        return inline_pointers.data();
    else
        return external_pointers.data();
}

item_pointer_t *live_heap::pointers_end()
{
    if (n_inline_pointers >= 0)
        return inline_pointers.data() + n_inline_pointers;
    else
        return external_pointers.data() + external_pointers.size();
}

void live_heap::reset()
{
    heap_length = -1;
    received_length = 0;
    min_length = 0;
    end_of_stream = false;
    payload.reset();
    payload_reserved = 0;
    n_inline_pointers = 0;
    external_pointers.clear();
    external_pointers.shrink_to_fit();
    seen_pointers.clear();
    payload_ranges.clear();
}

} // namespace spead2::recv
back to top