// vim:set ft=cpp: -*- Mode: C++ -*-
/*
 * Copyright (C) 2026 Kernkonzept GmbH.
 * Author(s): Martin Decky <martin.decky@kernkonzept.com>
 *
 * License: see LICENSE.spdx (in this directory or the directories above)
 */

/**
 * \file
 * Shared-memory multiple-producer multiple-consumer head-drop ring buffer
 * implementation.
 *
 * This implements a generic ring buffer for transferring fixed-sized items
 * in shared memory between producers and consumers without coupling. This
 * means that the pace of producers is never affected by the pace of consumers
 * (producers never block and never wait for the consumers).
 *
 * When consumers are unable to dequeue items from the ring buffer quickly
 * enough, they lose access to the oldest items (head-drop policy), but this
 * condition is always detected.
 *
 * When there are more producers than the total number of items actively
 * enqueuing items into the ring buffer, the state of the ring buffer is
 * undefined. When the sequence/generation counter of the items in the ring
 * buffer overflow, the state of the ring buffer is undefined.
 */

#pragma once

#include <cstddef>
#include <stdexcept>
#include <atomic>
#include <bit>
#include <mutex>
#include <functional>

namespace utrace {

/// Stable cache alignment to prevent false sharing.
#if defined(ARCH_arm64)
  static constexpr size_t stable_cache_alignment = 256U;
#elif defined(ARCH_ppc32)
  static constexpr size_t stable_cache_alignment = 128U;
#else
  static constexpr size_t stable_cache_alignment = 64U;
#endif

#ifdef __GCC_DESTRUCTIVE_SIZE
static_assert(stable_cache_alignment >= __GCC_DESTRUCTIVE_SIZE,
              "Stable cache alignment not sufficient");
#endif // __GCC_DESTRUCTIVE_SIZE

/// Size of static string buffers.
static constexpr size_t string_buffer_size = 1024U;

/**
 * Ring buffer.
 *
 * Base class of all ring buffer objects. Not directly usable by the user.
 *
 * \tparam SEQUENCE_TYPE  Numerical type for storing sequence and generation
 *                        counters. Must be suitable for atomic access.
 */
template<typename SEQUENCE_TYPE>
class Ring_buffer
{
public:
  using Sequence = SEQUENCE_TYPE;
  using Generation = std::atomic<Sequence>;

  /// Invalid (non-committed) items set their sequence counter to 0.
  static constexpr Sequence const nil = 0U;

  /**
   * Check that the number of items is a power of two.
   *
   * This ring buffer implementation supports a non-zero power of two number
   * of items.
   *
   * \param items  Number of items.
   *
   * \throw std::invalid_argument  If the number of items is zero or not
   *                               a power of two.
   */
  static void check_items(size_t const items)
  {
    if (items == 0)
      throw std::invalid_argument("Zero items not supported.");

    if (items != std::bit_ceil(items))
      throw std::invalid_argument("Number of items must be power of two.");
  }

  /**
   * Check that the item mask is a power of two minus one.
   *
   * This ring buffer implementation uses the bitwise AND for implementing
   * a modulo operation with the number of items. Thus the item mask must
   * be a power of two minus one.
   *
   * \param mask  Item mask.
   *
   * \throw td::length_error  If the item mask is not a power of two minus one.
   */
  static void check_mask(size_t const mask)
  {
    auto items = mask + 1;
    if (items != std::bit_ceil(items))
      throw std::length_error("Invalid mask.");
  }
};

/**
 * Ring buffer status area.
 *
 * The ring buffer status area contains run-time configuration information
 * of the ring buffer, most notably the current atomic generation counter
 * (i.e. the last sequence number allocated to an item).
 *
 * This structure is only written by the producers, it is read-only for the
 * consumers.
 *
 * \tparam SEQUENCE_TYPE  Numerical type for storing sequence and generation
 *                        counters. Must be suitable for atomic access.
 */
template<typename SEQUENCE_TYPE>
class Ring_status
{
public:
  using Sequence = SEQUENCE_TYPE;
  using Generation = typename Ring_buffer<Sequence>::Generation;

  template<typename S, typename T, auto G> friend class Ring_buffer_producer;
  template<typename S, typename T, auto G>
  friend class Ring_buffer_consumer_raw;

  /**
   * Get the version of the ring buffer items.
   *
   * The semantics of the version number is defined by the user.
   *
   * \return Version of the ring buffer items.
   */
  unsigned version() const
  { return _version; }

  /**
   * Get the stable alignment of the ring buffer status members.
   *
   * This is relevant for the _mask and _tail members.
   *
   * \return Stable alignment of the ring buffer status members.
   */
  size_t alignment() const
  { return _alignment; }

  /// Get the item mask.
  size_t mask() const
  { return _mask; }

  /// Get the number of items.
  size_t items() const
  { return _mask + 1; }

private:
  /// Items version.
  unsigned _version;

  /// Stable cache alignment.
  size_t _alignment;

  /// Item mask.
  alignas(stable_cache_alignment) size_t _mask;

  /// Tail item.
  alignas(stable_cache_alignment) Generation _tail;
};

/**
 * Ring buffer slot alignment.
 *
 * This is a separate declaration in order to workaround an erratum in
 * GCC 11 where the direct use of the same expression in alignas() is
 * incorrectly treated as non-manifestly constant-evaluated.
 *
 * \tparam ITEM_TYPE  Actual ring buffer item type.
 */
template<typename ITEM_TYPE>
static constexpr size_t ring_slot_alignment = std::bit_ceil(sizeof(ITEM_TYPE));

/**
 * Ring buffer slot.
 *
 * The ring buffer slot encapsulates one ring buffer item and makes sure
 * that the item is aligned to the nearest larger power-of-two of its size.
 *
 * Another purpose of this class is to abstract away the access to the sequence
 * number of the item.
 *
 * \tparam ITEM_TYPE       Actual ring buffer item type.
 * \tparam GENERATION_PTR  Class member pointer to the member that contains
 *                         the sequence number within the item. The member
 *                         needs to be suitable for an atomic reference.
 */
template<typename ITEM_TYPE, auto GENERATION_PTR>
class alignas(ring_slot_alignment<ITEM_TYPE>) Ring_slot
{
public:
  using This = Ring_slot;
  using Item = ITEM_TYPE;

  template<typename S, typename T, auto G> friend class Ring_buffer_producer;
  template<typename S, typename T, auto G>
  friend class Ring_buffer_consumer_raw;

  /**
   * Get the size (in bytes) of the given count of items.
   *
   * \param items  Count of items.
   *
   * \return Size (in bytes) of the given count of items.
   */
  static size_t size(size_t const items)
  { return items * sizeof(This); }

private:
  /// Get the generation counter of the item (i.e. atomic reference to the
  /// sequence number of the item).
  auto generation() const noexcept
  { return std::atomic_ref(_data.*GENERATION_PTR); }

  /// Actual item.
  Item _data;
};

/**
 * Ring buffer producer.
 *
 * \tparam SEQUENCE_TYPE   Numerical type for storing sequence and generation
 *                         counters. Must be suitable for atomic access.
 * \tparam ITEM_TYPE       Actual ring buffer item type.
 * \tparam GENERATION_PTR  Class member pointer to the member that contains
 *                         the sequence number within the item. The member
 *                         needs to be suitable for an atomic reference.
 */
template<typename SEQUENCE_TYPE, typename ITEM_TYPE, auto GENERATION_PTR>
class Ring_buffer_producer : public Ring_buffer<SEQUENCE_TYPE>
{
public:
  using This = Ring_buffer_producer;
  using Super = Ring_buffer<SEQUENCE_TYPE>;

  using Item = ITEM_TYPE;

  using Status = Ring_status<SEQUENCE_TYPE>;
  using Slot = Ring_slot<Item, GENERATION_PTR>;

  /**
   * Construct ring buffer producer.
   *
   * This initializes the ring buffer status area and slots.
   *
   * \param status   Ring buffer status area.
   * \param slots    Ring buffer slots.
   * \param version  Version of the ring buffer items. The semantics of the
   *                 version number is defined by the user.
   * \param items    Number of items in the ring buffer slots. Must be a
   *                 non-zero power of two.
   *
   * \throw std::invalid_argument  If the number of items is zero or not
   *                               a power of two.
   */
  Ring_buffer_producer(Status &status, Slot &slots, unsigned const version,
                       size_t const items) : _status(status), _slots(slots)
  {
    This::check_items(items);

    _status._version = version;
    _status._alignment = stable_cache_alignment;
    _status._mask = items - 1;
    _status._tail = Super::nil;

    for (size_t i = 0; i < items; ++i)
      _slots[i].generation() = Super::nil;
  }

  /// Get the number of items.
  size_t items() const
  { return _status.items(); }

  /**
   * Enqueue an item in the ring buffer.
   *
   * This operation never blocks and implements the head-drop policy (i.e. the
   * oldest item is overwritten).
   *
   * \param item  Item to enqueue (as a copy).
   */
  void enqueue(Item const &item) const
  {
    auto next = ++_status._tail;
    auto index = next & _status._mask;

    auto &generation = _slots[index].generation();
    auto &slot = _slots[index]._data;

    generation.store(Super::nil, std::memory_order_release);
    slot = item;
    generation.store(next, std::memory_order_release);
  }

private:
  Ring_buffer_producer() = delete;

  Status &_status;
  Slot &_slots;
};

/**
 * Polling ring buffer consumer.
 *
 * Both the ring buffer status area and ring buffer slots are accessed in a
 * read-only fashion.
 *
 * \tparam SEQUENCE_TYPE   Numerical type for storing sequence and generation
 *                         counters. Must be suitable for atomic access.
 * \tparam ITEM_TYPE       Actual ring buffer item type.
 * \tparam GENERATION_PTR  Class member pointer to the member that contains
 *                         the sequence number within the item. The member
 *                         needs to be suitable for an atomic reference.
 */
template<typename SEQUENCE_TYPE, typename ITEM_TYPE, auto GENERATION_PTR>
class Ring_buffer_consumer_raw : public Ring_buffer<SEQUENCE_TYPE>
{
public:
  using This = Ring_buffer_consumer_raw;
  using Super = Ring_buffer<SEQUENCE_TYPE>;

  using Sequence = SEQUENCE_TYPE;
  using Item = ITEM_TYPE;

  using Status = Ring_status<Sequence>;
  using Slot = Ring_slot<Item, GENERATION_PTR>;

  /**
   * Item drop policy.
   *
   * This affects the behavior of the consumer when an item drop is detected
   * (i.e. the consumer loses access to an item or multiple items in the ring
   * buffer that have been overwritten by a producer in the meantime).
   */
  enum class Drop_policy
  {
    /**
     * Conservative item drop policy.
     *
     * Continue with the newest item in the ring buffer (i.e. proactively
     * drop all the items except the newest one). This policy is suitable
     * when the production of items is always faster than the consumption
     * and it gives the consumer a chance to dequeue at least some items from
     * the ring buffer before it falls behind again.
     */
    Conservative = 0,

    /** Minimal item drop policy.
     *
     * Continue with the oldest item in the ring buffer (i.e. do not
     * proactively drop more items than those that have been lost already).
     * This policy is suitable when the production of items is mostly slower
     * than the consumption and the consumer is able to catch up after an
     * occasional drop of items.
     */
    Minimal
  };

  /// Status of the dequeue operation.
  enum class State
  {
    Idle = 0,  //< No item dequeued.
    Ready,     //< An item has been dequeued.
    Dropped    //< Some items have been dropped.
  };

  /**
   * Construct ring buffer consumer.
   *
   * \param status   Ring buffer status area.
   * \param slots    Ring buffer slots.
   *
   * \throw td::length_error  If the item mask (in the ring buffer status area)
   *                          is not a power of two minus one.
   */
  Ring_buffer_consumer_raw(Status const &status, Slot const *slots)
    : _status(status), _slots(slots)
  { This::check_mask(_status._mask); }

  /// Get the number of items.
  size_t items() const
  { return _status.items(); }

  /**
   * Poll and possibly dequeue an item from the ring buffer.
   *
   * This operation never blocks.
   *
   * \param[out]    item     Reference to dequeue the next item from the ring
   *                         buffer into (as a copy).
   * \param         policy   Item drop policy.
   * \param[in,out] current  Sequence counter of the next item to be dequeued
   *                         from the ring buffer. Initialize to
   *                         Ring_buffer::nil to get the sequence counter of
   *                         the oldest or the newest item currently available
   *                         in the ring buffer (depending on the item drop
   *                         policy).
   * \param[out]    drops    Pointer to store the number of items that have
   *                         been dropped. Can be nullptr.
   *
   * \retval State::Idle   The next item with the expected sequence counter has
   *                       not been produced yet. No item has been dequeued, no
   *                       item has been dropped, the sequence counter is
   *                       unchanged.
   * \retval State::Ready  The next item (with the expected sequence counter)
   *                       has been dequeued from the ring buffer and stored
   *                       in \a item. The sequence counter \a current has been
   *                       advanced to the next expected item. No item has been
   *                       dropped.
   * \retval State::Drop   Some items have been dropped (missed). No item has
   *                       been dequeued. The number of dropped items has been
   *                       stored to \a drops (if non-nullptr). The sequence
   *                       counter \a current has been advanced to the oldest
   *                       (if the drop policy is Minimal) or to the newest (if
   *                       the drop policy is Conservative), respectively.
   */
  State peek(Item &item, Drop_policy policy, Sequence &current,
             Sequence *drops = nullptr) const
  {
    if (drops)
      *drops = 0;

    // First access to the ring buffer.
    if (current == Super::nil)
      current = 1;

    auto index = current & _status._mask;

    auto const &generation = _slots[index].generation();
    auto const &slot = _slots[index]._data;

    auto seen = generation.load();
    if (seen < current)
      {
        // Expected item not produced yet.
        return State::Idle;
      }

    if (seen > current)
      goto drop;

    item = slot;
    seen = generation.load();

    // Make sure the item has not been overwritten while we have been copying
    // the data.
    if (seen == current)
      {
        // Consume item.
        ++current;
        return State::Ready;
      }

  drop:
    // Seen > current -> drop detected.
    Sequence head = _status._tail;

    // Conservative drop policy continues at the tail.
    // Minimal drop policy continues at the head.
    if (policy == Drop_policy::Minimal)
      head -= _status._mask;

    // How many items have been dropped.
    if (drops)
      *drops = head - current;

    current = head;
    return State::Dropped;
  }

private:
  Ring_buffer_consumer_raw() = delete;

  Status const &_status;
  Slot const *_slots;
};

/**
 * Blocking ring buffer consumer.
 *
 * Both the ring buffer status area and ring buffer slots are accessed in a
 * read-only fashion.
 *
 * \tparam SEQUENCE_TYPE   Numerical type for storing sequence and generation
 *                         counters. Must be suitable for atomic access.
 * \tparam ITEM_TYPE       Actual ring buffer item type.
 * \tparam GENERATION_PTR  Class member pointer to the member that contains
 *                         the sequence number within the item. The member
 *                         needs to be suitable for an atomic reference.
 */
template<typename SEQUENCE_TYPE, typename ITEM_TYPE, auto GENERATION_PTR>
class Ring_buffer_consumer
  : public Ring_buffer_consumer_raw<SEQUENCE_TYPE, ITEM_TYPE, GENERATION_PTR>
{
public:
  using Sequence = SEQUENCE_TYPE;
  using Item = ITEM_TYPE;

  using Super = Ring_buffer_consumer_raw<Sequence, Item, GENERATION_PTR>;

  using Drop_policy = Super::Drop_policy;
  using State = Super::State;
  using Status = Ring_status<Sequence>;
  using Slot = Ring_slot<Item, GENERATION_PTR>;

  using Yield = std::function<bool (Sequence)>;

  /**
   * Construct ring buffer consumer.
   *
   * \param status   Ring buffer status area.
   * \param slots    Ring buffer slots.
   *
   * \throw td::length_error  If the item mask (in the ring buffer status area)
   *                          is not a power of two minus one.
   */
  Ring_buffer_consumer(Status const &status, Slot const *slots)
    : Super(status, slots)
  {}

  /**
   * Poll and possibly dequeue an item from the ring buffer.
   *
   * This operation never blocks. However, since the expected sequence counter
   * of the next item is managed internally with mutual exclusion, multiple
   * concurrent calls to this method might affect the latency.
   *
   * \param[out] item      Reference to dequeue the next item from the ring
   *                       buffer into (as a copy).
   * \param      policy    Item drop policy.
   * \param[out] drops     Pointer to store the number of items that have been
   *                       dropped. Can be nullptr.
   *
   * \retval State::Idle   No next item has been produced yet. No item has been
   *                       dequeued, no item has been dropped.
   * \retval State::Ready  A next item has been dequeued from the ring buffer
   *                       and stored in \a item. No item has been dropped.
   * \retval State::Drop   Some items have been dropped (missed). No item has
   *                       been dequeued. The number of dropped items has been
   *                       stored to \a drops (if non-nullptr).
   */
  State peek(Item &item, Drop_policy policy, Sequence *drops = nullptr)
  {
    const std::lock_guard guard(_lock);
    return Super::peek(item, policy, _current, drops);
  }

  /**
   * Dequeue items from the ring buffer.
   *
   * Dequeue multiple items from the ring buffer. The goal is to dequeue as
   * many items as available from the ring buffer in a single call to avoid
   * overhead. This operation blocks until at least \a burst items have been
   * dequeued (but returns immediately if an item is dropped).
   *
   * The implementation implicitly spins actively while waiting for items to be
   * dequeued from the ring buffer. The \a yield function can implement passive
   * waiting. Its argument is the number of idle spinning cycles that already
   * happened and if the function returns true, this number is reset.
   *
   * \param[out] items     Array to dequeue the next items from the ring buffer
   *                       into (as copies).
   * \param      capacity  Capacity of \a items (in items).
   * \param      burst     Minimal number of items that should be collected
   *                       before returning in case of waiting for items.
   * \param      policy    Item drop policy.
   * \param      yield     Yield function to implement passive waiting.
   *                       Executed in every spinning cycle when no next item
   *                       has been produced yet. Its argument is the current
   *                       counter of the idle spinning cycles and that counter
   *                       is reset when the function returns true.
   * \param[out] drops     Pointer to store the number of items that have been
   *                       dropped. Can be nullptr.
   *
   * \return Number of items dequeued and stored into \a items.
   */
  size_t dequeue(Item items[], size_t capacity, size_t burst,
                 Drop_policy policy, Yield const &yield,
                 Sequence *drops = nullptr)
  {
    if (drops)
      *drops = 0;

    size_t count = 0;
    size_t idle_cycles = 0;

    for (;;)
      {
        Sequence current_drops;
        auto status = peek(items[count], policy, &current_drops);

        if (status == State::Dropped)
          {
            // Some items have been dropped. We need to notify the caller
            // about it. Thus return with the items already dequeued.

            if (drops)
              *drops = current_drops;

            break;
          }

        if (status == State::Ready)
          {
            // Next item dequeued. Return if there is no space for more items
            // in the array.

            ++count;
            if (count == capacity)
              break;

            continue;
         }

       if (status == State::Idle)
         {
           // No next item has been produced yet. Return if we have already
           // dequeued enough items. Otherwise just spin, but allow the yield
           // function to implement passive waiting.

           if (count >= burst)
             break;

           ++idle_cycles;
           if (yield(idle_cycles))
             idle_cycles = 0;
         }
      }

    return count;
  }

private:
  Ring_buffer_consumer() = delete;
  State peek(Item &, Drop_policy, Sequence &, Sequence *) const = delete;

  /// Mutual exclusion lock for accessing the current sequence counter.
  std::mutex _lock;

  /// Current sequence counter.
  Sequence _current = 0;
};

} // namespace utrace
