L4Re Operating System Framework
Interface and Usage Documentation
Loading...
Searching...
No Matches
ring_buffer
Go to the documentation of this file.
1// vim:set ft=cpp: -*- Mode: C++ -*-
2/*
3 * Copyright (C) 2026 Kernkonzept GmbH.
4 * Author(s): Martin Decky <martin.decky@kernkonzept.com>
5 *
6 * License: see LICENSE.spdx (in this directory or the directories above)
7 */
8
28
29#pragma once
30
31#include <cstddef>
32#include <stdexcept>
33#include <atomic>
34#include <bit>
35#include <mutex>
36#include <functional>
37
38namespace utrace {
39
41#if defined(ARCH_arm64)
42 static constexpr size_t stable_cache_alignment = 256U;
43#elif defined(ARCH_ppc32)
44 static constexpr size_t stable_cache_alignment = 128U;
45#else
46 static constexpr size_t stable_cache_alignment = 64U;
47#endif
48
49#ifdef __GCC_DESTRUCTIVE_SIZE
50static_assert(stable_cache_alignment >= __GCC_DESTRUCTIVE_SIZE,
51 "Stable cache alignment not sufficient");
52#endif // __GCC_DESTRUCTIVE_SIZE
53
55static constexpr size_t string_buffer_size = 1024U;
56
65template<typename SEQUENCE_TYPE>
67{
68public:
69 using Sequence = SEQUENCE_TYPE;
70 using Generation = std::atomic<Sequence>;
71
73 static constexpr Sequence const nil = 0U;
74
86 static void check_items(size_t const items)
87 {
88 if (items == 0)
89 throw std::invalid_argument("Zero items not supported.");
90
91 if (items != std::bit_ceil(items))
92 throw std::invalid_argument("Number of items must be power of two.");
93 }
94
106 static void check_mask(size_t const mask)
107 {
108 auto items = mask + 1;
109 if (items != std::bit_ceil(items))
110 throw std::length_error("Invalid mask.");
111 }
112};
113
127template<typename SEQUENCE_TYPE>
129{
130public:
131 using Sequence = SEQUENCE_TYPE;
132 using Generation = typename Ring_buffer<Sequence>::Generation;
133
134 template<typename S, typename T, auto G> friend class Ring_buffer_producer;
135 template<typename S, typename T, auto G>
136 friend class Ring_buffer_consumer_raw;
137
145 unsigned version() const
146 { return _version; }
147
155 size_t alignment() const
156 { return _alignment; }
157
159 size_t mask() const
160 { return _mask; }
161
163 size_t items() const
164 { return _mask + 1; }
165
166private:
168 unsigned _version;
169
171 size_t _alignment;
172
174 alignas(stable_cache_alignment) size_t _mask;
175
177 alignas(stable_cache_alignment) Generation _tail;
178};
179
189template<typename ITEM_TYPE>
190static constexpr size_t ring_slot_alignment = std::bit_ceil(sizeof(ITEM_TYPE));
191
206template<typename ITEM_TYPE, auto GENERATION_PTR>
207class alignas(ring_slot_alignment<ITEM_TYPE>) Ring_slot
208{
209public:
210 using This = Ring_slot;
211 using Item = ITEM_TYPE;
212
213 template<typename S, typename T, auto G> friend class Ring_buffer_producer;
214 template<typename S, typename T, auto G>
215 friend class Ring_buffer_consumer_raw;
216
224 static size_t size(size_t const items)
225 { return items * sizeof(This); }
226
227private:
230 auto generation() const noexcept
231 { return std::atomic_ref(_data.*GENERATION_PTR); }
232
234 Item _data;
235};
236
247template<typename SEQUENCE_TYPE, typename ITEM_TYPE, auto GENERATION_PTR>
248class Ring_buffer_producer : public Ring_buffer<SEQUENCE_TYPE>
249{
250public:
251 using This = Ring_buffer_producer;
252 using Super = Ring_buffer<SEQUENCE_TYPE>;
253
254 using Item = ITEM_TYPE;
255
256 using Status = Ring_status<SEQUENCE_TYPE>;
258
274 Ring_buffer_producer(Status &status, Slot &slots, unsigned const version,
275 size_t const items) : _status(status), _slots(slots)
276 {
278
279 _status._version = version;
280 _status._alignment = stable_cache_alignment;
281 _status._mask = items - 1;
282 _status._tail = Super::nil;
283
284 for (size_t i = 0; i < items; ++i)
285 _slots[i].generation() = Super::nil;
286 }
287
289 size_t items() const
290 { return _status.items(); }
291
300 void enqueue(Item const &item) const
301 {
302 auto next = ++_status._tail;
303 auto index = next & _status._mask;
304
305 auto &generation = _slots[index].generation();
306 auto &slot = _slots[index]._data;
307
308 generation.store(Super::nil, std::memory_order_release);
309 slot = item;
310 generation.store(next, std::memory_order_release);
311 }
312
313private:
314 Ring_buffer_producer() = delete;
315
316 Status &_status;
317 Slot &_slots;
318};
319
333template<typename SEQUENCE_TYPE, typename ITEM_TYPE, auto GENERATION_PTR>
334class Ring_buffer_consumer_raw : public Ring_buffer<SEQUENCE_TYPE>
335{
336public:
337 using This = Ring_buffer_consumer_raw;
338 using Super = Ring_buffer<SEQUENCE_TYPE>;
339
340 using Sequence = SEQUENCE_TYPE;
341 using Item = ITEM_TYPE;
342
343 using Status = Ring_status<Sequence>;
345
353 enum class Drop_policy
354 {
365
375 };
376
378 enum class State
379 {
380 Idle = 0, //< No item dequeued.
381 Ready, //< An item has been dequeued.
382 Dropped //< Some items have been dropped.
383 };
384
394 Ring_buffer_consumer_raw(Status const &status, Slot const *slots)
395 : _status(status), _slots(slots)
396 { This::check_mask(_status._mask); }
397
399 size_t items() const
400 { return _status.items(); }
401
435 State peek(Item &item, Drop_policy policy, Sequence &current,
436 Sequence *drops = nullptr) const
437 {
438 if (drops)
439 *drops = 0;
440
441 // First access to the ring buffer.
442 if (current == Super::nil)
443 current = 1;
444
445 auto index = current & _status._mask;
446
447 auto const &generation = _slots[index].generation();
448 auto const &slot = _slots[index]._data;
449
450 auto seen = generation.load();
451 if (seen < current)
452 {
453 // Expected item not produced yet.
454 return State::Idle;
455 }
456
457 if (seen > current)
458 goto drop;
459
460 item = slot;
461 seen = generation.load();
462
463 // Make sure the item has not been overwritten while we have been copying
464 // the data.
465 if (seen == current)
466 {
467 // Consume item.
468 ++current;
469 return State::Ready;
470 }
471
472 drop:
473 // Seen > current -> drop detected.
474 Sequence head = _status._tail;
475
476 // Conservative drop policy continues at the tail.
477 // Minimal drop policy continues at the head.
478 if (policy == Drop_policy::Minimal)
479 head -= _status._mask;
480
481 // How many items have been dropped.
482 if (drops)
483 *drops = head - current;
484
485 current = head;
486 return State::Dropped;
487 }
488
489private:
490 Ring_buffer_consumer_raw() = delete;
491
492 Status const &_status;
493 Slot const *_slots;
494};
495
509template<typename SEQUENCE_TYPE, typename ITEM_TYPE, auto GENERATION_PTR>
511 : public Ring_buffer_consumer_raw<SEQUENCE_TYPE, ITEM_TYPE, GENERATION_PTR>
512{
513public:
514 using Sequence = SEQUENCE_TYPE;
515 using Item = ITEM_TYPE;
516
518
519 using Drop_policy = Super::Drop_policy;
520 using State = Super::State;
521 using Status = Ring_status<Sequence>;
523
524 using Yield = std::function<bool (Sequence)>;
525
535 Ring_buffer_consumer(Status const &status, Slot const *slots)
536 : Super(status, slots)
537 {}
538
560 State peek(Item &item, Drop_policy policy, Sequence *drops = nullptr)
561 {
562 const std::lock_guard guard(_lock);
563 return Super::peek(item, policy, _current, drops);
564 }
565
595 size_t dequeue(Item items[], size_t capacity, size_t burst,
596 Drop_policy policy, Yield const &yield,
597 Sequence *drops = nullptr)
598 {
599 if (drops)
600 *drops = 0;
601
602 size_t count = 0;
603 size_t idle_cycles = 0;
604
605 for (;;)
606 {
607 Sequence current_drops;
608 auto status = peek(items[count], policy, &current_drops);
609
610 if (status == State::Dropped)
611 {
612 // Some items have been dropped. We need to notify the caller
613 // about it. Thus return with the items already dequeued.
614
615 if (drops)
616 *drops = current_drops;
617
618 break;
619 }
620
621 if (status == State::Ready)
622 {
623 // Next item dequeued. Return if there is no space for more items
624 // in the array.
625
626 ++count;
627 if (count == capacity)
628 break;
629
630 continue;
631 }
632
633 if (status == State::Idle)
634 {
635 // No next item has been produced yet. Return if we have already
636 // dequeued enough items. Otherwise just spin, but allow the yield
637 // function to implement passive waiting.
638
639 if (count >= burst)
640 break;
641
642 ++idle_cycles;
643 if (yield(idle_cycles))
644 idle_cycles = 0;
645 }
646 }
647
648 return count;
649 }
650
651private:
652 Ring_buffer_consumer() = delete;
653 State peek(Item &, Drop_policy, Sequence &, Sequence *) const = delete;
654
656 std::mutex _lock;
657
659 Sequence _current = 0;
660};
661
662} // namespace utrace
State
Status of the dequeue operation.
Definition ring_buffer:379
Ring_buffer_consumer_raw(Status const &status, Slot const *slots)
Construct ring buffer consumer.
Definition ring_buffer:394
size_t items() const
Get the number of items.
Definition ring_buffer:399
State peek(Item &item, Drop_policy policy, Sequence &current, Sequence *drops=nullptr) const
Poll and possibly dequeue an item from the ring buffer.
Definition ring_buffer:435
@ Conservative
Conservative item drop policy.
Definition ring_buffer:364
size_t dequeue(Item items[], size_t capacity, size_t burst, Drop_policy policy, Yield const &yield, Sequence *drops=nullptr)
Dequeue items from the ring buffer.
Definition ring_buffer:595
State peek(Item &item, Drop_policy policy, Sequence *drops=nullptr)
Poll and possibly dequeue an item from the ring buffer.
Definition ring_buffer:560
Ring_buffer_consumer(Status const &status, Slot const *slots)
Construct ring buffer consumer.
Definition ring_buffer:535
Ring_buffer_producer(Status &status, Slot &slots, unsigned const version, size_t const items)
Construct ring buffer producer.
Definition ring_buffer:274
void enqueue(Item const &item) const
Enqueue an item in the ring buffer.
Definition ring_buffer:300
size_t items() const
Get the number of items.
Definition ring_buffer:289
Ring buffer.
Definition ring_buffer:67
static void check_mask(size_t const mask)
Check that the item mask is a power of two minus one.
Definition ring_buffer:106
static constexpr Sequence const nil
Invalid (non-committed) items set their sequence counter to 0.
Definition ring_buffer:73
static void check_items(size_t const items)
Check that the number of items is a power of two.
Definition ring_buffer:86
Ring buffer slot.
Definition ring_buffer:208
static size_t size(size_t const items)
Get the size (in bytes) of the given count of items.
Definition ring_buffer:224
Ring buffer status area.
Definition ring_buffer:129
size_t mask() const
Get the item mask.
Definition ring_buffer:159
size_t items() const
Get the number of items.
Definition ring_buffer:163
size_t alignment() const
Get the stable alignment of the ring buffer status members.
Definition ring_buffer:155
unsigned version() const
Get the version of the ring buffer items.
Definition ring_buffer:145