6#ifndef XENIUM_KIRSCH_KFIFO_QUEUE_HPP
7#define XENIUM_KIRSCH_KFIFO_QUEUE_HPP
9#include <xenium/marked_ptr.hpp>
10#include <xenium/parameter.hpp>
11#include <xenium/policy.hpp>
12#include <xenium/utils.hpp>
14#include <xenium/detail/pointer_queue_traits.hpp>
43 template <
class T,
class... Policies>
46 using traits = detail::pointer_queue_traits_t<
T,
Policies...>;
47 using raw_value_type =
typename traits::raw_type;
53 static_assert(parameter::is_set<reclaimer>::value,
"reclaimer policy must be specified");
73 void push(value_type value);
87 std::atomic<marked_value> value;
89 char padding[std::max(padding_bytes, 1u)];
92 struct unpadded_entry {
93 std::atomic<marked_value> value;
95 using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
106 struct segment_deleter {
107 void operator()(segment*
seg)
const { release_segment(
seg); }
109 struct segment : reclaimer::template enable_concurrent_ptr<segment, 16, segment_deleter> {
110 using concurrent_ptr =
typename reclaimer::template concurrent_ptr<segment, 16>;
112 explicit segment(uint64_t k) : k(k) {}
114 for (
unsigned i = 0; i < k; ++i) {
115 assert(items()[i].value.load(std::memory_order_relaxed).get() ==
nullptr);
119 void delete_remaining_items() {
120 for (
unsigned i = 0; i < k; ++i) {
121 traits::delete_value(items()[i].value.load(std::memory_order_relaxed).get());
122 items()[i].value.store(
nullptr, std::memory_order_relaxed);
126 entry* items() noexcept {
return reinterpret_cast<entry*
>(
this + 1); }
128 std::atomic<bool> deleted{
false};
130 concurrent_ptr next{};
133 using concurrent_ptr =
typename segment::concurrent_ptr;
134 using marked_ptr =
typename concurrent_ptr::marked_ptr;
135 using guard_ptr =
typename concurrent_ptr::guard_ptr;
137 segment* alloc_segment()
const;
138 static void release_segment(segment* seg);
140 template <
bool Empty>
141 bool find_index(marked_ptr segment, uint64_t& value_index, marked_value& old)
const noexcept;
142 void advance_head(guard_ptr& head_current, marked_ptr tail_current)
noexcept;
143 void advance_tail(marked_ptr tail_old)
noexcept;
144 bool committed(marked_ptr segment, marked_value value, uint64_t index)
noexcept;
146 const std::size_t k_;
147 concurrent_ptr head_;
148 concurrent_ptr tail_;
151 template <
class T,
class... Policies>
152 kirsch_kfifo_queue<T, Policies...>::kirsch_kfifo_queue(uint64_t k) :
155 const auto seg = alloc_segment();
156 head_.store(seg, std::memory_order_relaxed);
157 tail_.store(seg, std::memory_order_relaxed);
160 template <
class T,
class... Policies>
161 kirsch_kfifo_queue<T, Policies...>::~kirsch_kfifo_queue() {
162 auto seg = head_.load(std::memory_order_relaxed).get();
164 auto next = seg->next.load(std::memory_order_relaxed).get();
165 seg->delete_remaining_items();
166 release_segment(seg);
171 template <
class T,
class... Policies>
172 auto kirsch_kfifo_queue<T, Policies...>::alloc_segment() const -> segment* {
173 void* data = ::operator
new(
sizeof(segment) + k_ *
sizeof(entry));
174 auto result =
new(data) segment(k_);
175 for (std::size_t i = 0; i < k_; ++i)
176 new(&result->items()[i]) entry();
180 template <
class T,
class... Policies>
181 void kirsch_kfifo_queue<T, Policies...>::release_segment(segment* seg) {
183 ::operator
delete(seg);
186 template <
class T,
class... Policies>
188 if (value ==
nullptr)
189 throw std::invalid_argument(
"value cannot be nullptr");
191 raw_value_type
raw_value = traits::get_raw(value);
195 tail_old.acquire(tail_, std::memory_order_acquire);
202 if (
tail_old != tail_.load(std::memory_order_relaxed))
209 std::memory_order_release, std::memory_order_relaxed) &&
211 traits::release(value);
226 head_old.acquire(head_, std::memory_order_acquire);
232 if (
head_old != head_.load(std::memory_order_relaxed))
236 marked_ptr
tail_old = tail_.load(std::memory_order_acquire);
245 std::memory_order_acquire, std::memory_order_relaxed)) {
258 template <
bool Empty>
264 for (
size_t i = 0;
i < k;
i++) {
266 old = segment->items()[index].value.load(std::memory_order_relaxed);
275 template <
class T,
class... Policies>
276 bool kirsch_kfifo_queue<T, Policies...>::committed(
277 marked_ptr segment, marked_value value, uint64_t index)
noexcept
279 if (value != segment->items()[index].value.load(std::memory_order_relaxed))
282 const marked_value empty_value(
nullptr, value.mark() + 1);
284 if (segment->deleted.load(std::memory_order_relaxed) ==
true) {
286 return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
290 marked_ptr head_current = head_.load(std::memory_order_acquire);
291 if (segment.get() == head_current.get()) {
293 marked_ptr new_head(head_current.get(), head_current.mark() + 1);
295 if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed))
301 return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
304 if (segment->deleted.load(std::memory_order_relaxed) ==
false) {
310 return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
314 template <
class T,
class... Policies>
315 void kirsch_kfifo_queue<T, Policies...>::advance_head(guard_ptr& head_current, marked_ptr tail_current)
noexcept {
317 const marked_ptr head_next_segment = head_current->next.load(std::memory_order_acquire);
318 if (head_current != head_.load(std::memory_order_relaxed))
321 if (head_current.get() == tail_current.get()) {
323 const marked_ptr tail_next_segment = tail_current->next.load(std::memory_order_acquire);
324 if (tail_next_segment.get() ==
nullptr)
327 if (tail_current == tail_.load(std::memory_order_relaxed)) {
328 marked_ptr new_tail(tail_next_segment.get(), tail_current.mark() + 1);
330 tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
334 head_current->deleted.store(
true, std::memory_order_relaxed);
336 marked_ptr expected = head_current;
337 marked_ptr new_head(head_next_segment.get(), head_current.mark() + 1);
339 if (head_.compare_exchange_strong(expected, new_head, std::memory_order_release, std::memory_order_relaxed)) {
340 head_current.reclaim();
344 template <
class T,
class... Policies>
345 void kirsch_kfifo_queue<T, Policies...>::advance_tail(marked_ptr tail_current)
noexcept {
347 marked_ptr next_segment = tail_current->next.load(std::memory_order_acquire);
348 if (tail_current != tail_.load(std::memory_order_relaxed))
351 if (next_segment.get() !=
nullptr) {
352 marked_ptr new_tail(next_segment.get(), next_segment.mark() + 1);
354 tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
356 auto seg = alloc_segment();
357 const marked_ptr new_segment(seg, next_segment.mark() + 1);
360 if (tail_current->next.compare_exchange_strong(next_segment, new_segment,
361 std::memory_order_release, std::memory_order_relaxed)) {
362 marked_ptr new_tail(seg, tail_current.mark() + 1);
364 tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
366 release_segment(seg);
An unbounded lock-free multi-producer/multi-consumer k-FIFO queue.
Definition kirsch_kfifo_queue.hpp:44
bool try_pop(value_type &result)
Definition kirsch_kfifo_queue.hpp:222
static constexpr std::size_t entry_size
Provides the effective size of a single queue entry (including padding).
Definition kirsch_kfifo_queue.hpp:101
void push(value_type value)
Definition kirsch_kfifo_queue.hpp:187
A pointer with an embedded mark/tag value.
Definition marked_ptr.hpp:41
Slim wrapper around std::hash with specialization for pointer types.
Definition hash.hpp:25
Policy to configure the number of padding bytes to add to each entry in kirsch_kfifo_queue and kirsch...
Definition policy.hpp:117
Policy to configure the reclamation scheme to be used.
Definition policy.hpp:25