6#ifndef XENIUM_RAMALHETE_QUEUE_HPP
7#define XENIUM_RAMALHETE_QUEUE_HPP
9#include <xenium/acquire_guard.hpp>
10#include <xenium/backoff.hpp>
11#include <xenium/marked_ptr.hpp>
12#include <xenium/parameter.hpp>
13#include <xenium/policy.hpp>
14#include <xenium/detail/pointer_queue_traits.hpp>
22#pragma warning(disable: 4324)
33 template <
unsigned Value>
67 using traits = detail::pointer_queue_traits_t<
T,
Policies...>;
68 using raw_value_type =
typename traits::raw_type;
76 static_assert(entries_per_node > 0,
"entries_per_node must be greater than zero");
77 static_assert(parameter::is_set<reclaimer>::value,
"reclaimer policy must be specified");
92 void push(value_type value);
107 using marked_ptr =
typename concurrent_ptr::marked_ptr;
108 using guard_ptr =
typename concurrent_ptr::guard_ptr;
114 std::atomic<marked_value> value;
118 static constexpr unsigned step_size = 11;
119 static constexpr unsigned max_idx = step_size * entries_per_node;
121 struct node : reclaimer::template enable_concurrent_ptr<node> {
124 std::atomic<unsigned> pop_idx;
125 entry entries[entries_per_node];
126 std::atomic<unsigned> push_idx;
130 node(raw_value_type
item) :
135 entries[0].value.store(
item, std::memory_order_relaxed);
136 for (
unsigned i = 1;
i < entries_per_node;
i++)
137 entries[
i].value.store(
nullptr, std::memory_order_relaxed);
141 for (
unsigned i = pop_idx;
i < push_idx;
i += step_size) {
142 traits::delete_value(entries[
i % entries_per_node].value.load(std::memory_order_relaxed).get());
147 alignas(64) concurrent_ptr head;
148 alignas(64) concurrent_ptr tail;
154 auto n =
new node(
nullptr);
155 n->push_idx.store(0, std::memory_order_relaxed);
156 head.store(
n, std::memory_order_relaxed);
157 tail.store(
n, std::memory_order_relaxed);
160template <
class T,
class... Policies>
161ramalhete_queue<T, Policies...>::~ramalhete_queue()
164 auto n = head.load(std::memory_order_acquire);
168 auto next = n->next.load(std::memory_order_acquire);
174template <
class T,
class... Policies>
177 raw_value_type
raw_val = traits::get_raw(value);
179 throw std::invalid_argument(
"value can not be nullptr");
185 t.acquire(tail, std::memory_order_acquire);
187 unsigned idx =
t->push_idx.fetch_add(step_size, std::memory_order_relaxed);
188 if (
idx >= max_idx) {
190 if (
t != tail.load(std::memory_order_relaxed))
193 auto next =
t->next.load(std::memory_order_relaxed);
197 traits::release(value);
202 std::memory_order_release,
203 std::memory_order_relaxed))
207 tail.compare_exchange_strong(
expected,
new_node, std::memory_order_release, std::memory_order_relaxed);
211 new_node->push_idx.store(0, std::memory_order_relaxed);
216 next =
t->next.load(std::memory_order_acquire);
219 tail.compare_exchange_strong(
expected, next, std::memory_order_release, std::memory_order_relaxed);
223 idx %= entries_per_node;
227 if (
t->entries[
idx].value.compare_exchange_strong(
expected,
raw_val, std::memory_order_release, std::memory_order_relaxed)) {
228 traits::release(value);
244 h.acquire(head, std::memory_order_acquire);
247 const auto pop_idx =
h->pop_idx.load(std::memory_order_acquire);
250 const auto push_idx =
h->push_idx.load(std::memory_order_relaxed);
251 if (pop_idx >= push_idx &&
252 h->next.load(std::memory_order_relaxed) ==
nullptr)
256 unsigned idx =
h->pop_idx.fetch_add(step_size, std::memory_order_release);
257 if (
idx >= max_idx) {
260 auto next =
h->next.load(std::memory_order_acquire);
266 if (head.compare_exchange_strong(
expected, next, std::memory_order_release, std::memory_order_relaxed))
271 idx %= entries_per_node;
273 auto value =
h->entries[
idx].value.load(std::memory_order_relaxed);
274 if constexpr(pop_retries > 0) {
277 while (value ==
nullptr && ++
cnt <= pop_retries) {
278 value =
h->entries[
idx].value.load(std::memory_order_relaxed);
283 if (value !=
nullptr) {
285 h->entries[
idx].value.load(std::memory_order_acquire);
286 traits::store(
result, value.get());
290 auto value =
h->entries[
idx].value.exchange(
marked_value(
nullptr, 1), std::memory_order_acquire);
291 if (value !=
nullptr) {
292 traits::store(
result, value.get());
A fast unbounded lock-free multi-producer/multi-consumer FIFO queue.
Definition ramalhete_queue.hpp:65
void push(value_type value)
Pushes the given value to the queue.
Definition ramalhete_queue.hpp:175
bool try_pop(value_type &result)
Tries to pop an object from the queue.
Definition ramalhete_queue.hpp:237
Slim wrapper around std::hash with specialization for pointer types.
Definition hash.hpp:25
Dummy backoff strategy that does nothing.
Definition backoff.hpp:17
Policy to configure the backoff strategy.
Definition policy.hpp:39
Policy to configure the number of entries per allocated node in ramalhete_queue.
Definition policy.hpp:104
Policy to configure the hash function.
Definition policy.hpp:86
Policy to configure the number of iterations to spin on a queue entry while waiting for a pending pus...
Definition ramalhete_queue.hpp:34
Policy to configure the reclamation scheme to be used.
Definition policy.hpp:25