xenium
Loading...
Searching...
No Matches
michael_scott_queue.hpp
1//
2// Copyright (c) 2018-2020 Manuel Pöter.
3// Licensed under the MIT License. See LICENSE file in the project root for full license information.
4//
5
6#ifndef XENIUM_MICHAEL_SCOTT_QUEUE_HPP
7#define XENIUM_MICHAEL_SCOTT_QUEUE_HPP
8
9#include <xenium/acquire_guard.hpp>
10#include <xenium/backoff.hpp>
11#include <xenium/parameter.hpp>
12#include <xenium/policy.hpp>
13
14#ifdef _MSC_VER
15#pragma warning(push)
16#pragma warning(disable: 4324) // structure was padded due to alignment specifier
17#endif
18
19namespace xenium {
36template <class T, class... Policies>
38public:
39 using value_type = T;
40 using reclaimer = parameter::type_param_t<policy::reclaimer, parameter::nil, Policies...>;
41 using backoff = parameter::type_param_t<policy::backoff, no_backoff, Policies...>;
42
43 template <class... NewPolicies>
45
46 static_assert(parameter::is_set<reclaimer>::value, "reclaimer policy must be specified");
47
50
57 void push(T value);
58
66 [[nodiscard]] bool try_pop(T &result);
67
68private:
69 struct node;
70
71 using concurrent_ptr = typename reclaimer::template concurrent_ptr<node, 0>;
72 using marked_ptr = typename concurrent_ptr::marked_ptr;
73 using guard_ptr = typename concurrent_ptr::guard_ptr;
74
75 struct node : reclaimer::template enable_concurrent_ptr<node>
76 {
77 node() : value() {};
78 node(T&& v) : value(std::move(v)) {}
79
80 T value;
81 concurrent_ptr next;
82 };
83
84 alignas(64) concurrent_ptr head;
85 alignas(64) concurrent_ptr tail;
86};
87
88template <class T, class... Policies>
90{
91 auto n = new node();
92 head.store(n, std::memory_order_relaxed);
93 tail.store(n, std::memory_order_relaxed);
94}
95
96template <class T, class... Policies>
97michael_scott_queue<T, Policies...>::~michael_scott_queue()
98{
99 // (1) - this acquire-load synchronizes-with the release-CAS (11)
100 auto n = head.load(std::memory_order_acquire);
101 while (n)
102 {
103 // (2) - this acquire-load synchronizes-with the release-CAS (6)
104 auto next = n->next.load(std::memory_order_acquire);
105 delete n.get();
106 n = next;
107 }
108}
109
110template <class T, class... Policies>
112{
113 node* n = new node(std::move(value));
114
115 backoff backoff;
116
117 guard_ptr t;
118 for (;;)
119 {
120 // Get the old tail pointer.
121 // (3) - this acquire-load synchronizes-with the release-CAS (5, 7, 10)
122 t.acquire(tail, std::memory_order_acquire);
123
124 // Help update the tail pointer if needed.
125 // (4) - this acquire-load synchronizes-with the release-CAS (6)
126 auto next = t->next.load(std::memory_order_acquire);
127 if (next.get() != nullptr)
128 {
129 marked_ptr expected(t.get());
130 // (5) - this release-CAS synchronizes-with the acquire-load (3)
131 tail.compare_exchange_weak(expected, next, std::memory_order_release, std::memory_order_relaxed);
132 continue;
133 }
134
135 // Attempt to link in the new element.
136 marked_ptr null{};
137 // (6) - this release-CAS synchronizes-with the acquire-load (2, 4, 9).
138 if (t->next.compare_exchange_weak(null, n, std::memory_order_release, std::memory_order_relaxed))
139 break;
140
141 backoff();
142 }
143
144 // Swing the tail to the new element.
145 marked_ptr expected = t.get();
146 // (7) - this release-CAS synchronizes-with the acquire-load (3)
147 tail.compare_exchange_strong(expected, n, std::memory_order_release, std::memory_order_relaxed);
148}
149
150template <class T, class... Policies>
152{
153 backoff backoff;
154
155 guard_ptr h;
156 for (;;)
157 {
158 // Get the old head and tail elements.
159 // (8) - this acquire-load synchronizes-with the release-CAS (11)
160 h.acquire(head, std::memory_order_acquire);
161
162 // Get the head element's successor.
163 // (9) - this acquire-load synchronizes-with the release-CAS (6).
164 auto next = acquire_guard(h->next, std::memory_order_acquire);
165 if (head.load(std::memory_order_relaxed).get() != h.get())
166 continue;
167
168 // If the head (dummy) element is the only one, return false to signal that
169 // the operation has failed (no element has been returned).
170 if (next.get() == nullptr)
171 return false;
172
173 marked_ptr t = tail.load(std::memory_order_relaxed);
174
175 // There are multiple elements. Help update tail if needed.
176 if (h.get() == t.get())
177 {
178 // (10) - this release-CAS synchronizes-with the acquire-load (3)
179 tail.compare_exchange_weak(t, next, std::memory_order_release, std::memory_order_relaxed);
180 continue;
181 }
182
183 // Attempt to update the head pointer so that it points to the new dummy node.
184 marked_ptr expected(h.get());
185 // (11) - this release-CAS synchronizes-with the acquire-load (1, 8)
186 if (head.compare_exchange_weak(expected, next, std::memory_order_release, std::memory_order_relaxed))
187 {
188 // return the data of head's successor; it is the new dummy node.
189 result = std::move(next->value);
190 break;
191 }
192
193 backoff();
194 }
195
196 // The old dummy node has been unlinked, so reclaim it.
197 h.reclaim();
198
199 return true;
200}
201}
202
203#ifdef _MSC_VER
204#pragma warning(pop)
205#endif
206
207#endif
An unbounded generic lock-free multi-producer/multi-consumer FIFO queue.
Definition michael_scott_queue.hpp:37
void push(T value)
Definition michael_scott_queue.hpp:111
bool try_pop(T &result)
Definition michael_scott_queue.hpp:151
Slim wrapper around std::hash with specialization for pointer types.
Definition hash.hpp:25
Dummy backoff strategy that does nothing.
Definition backoff.hpp:17
Policy to configure the backoff strategy.
Definition policy.hpp:39
Policy to configure the reclamation scheme to be used.
Definition policy.hpp:25