11 #ifndef LFQUEUE_H_INCLUDED
12 #define LFQUEUE_H_INCLUDED
14 #include "atomicOps.hpp"
15 #include <type_traits>
21 #include "../headers/enums.hpp"
22 #include "../headers/types.hpp"
44 #define CACHE_LINE_SIZE 64
48 #pragma warning(disable: 4316)
49 #pragma warning(disable: 4324) // structure was padded due to __declspec(align())
50 #pragma warning(disable: 4820) // padding was added
51 #pragma warning(disable: 4127) // conditional expression is constant
81 explicit lfQueue(
size_t maxSize = 15)
82 : largestBlockSize(ceilToPow2(maxSize + 1))
90 auto firstBlock =
new Block(largestBlockSize);
91 firstBlock->next = firstBlock;
93 frontBlock = firstBlock;
94 tailBlock = firstBlock;
97 fence(memory_order_sync);
105 fence(memory_order_sync);
108 Block* tailBlock_ = tailBlock;
109 Block* block = frontBlock;
111 Block* nextBlock = block->next;
112 size_t blockFront = block->front;
113 size_t blockTail = block->tail;
115 for (
size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask()) {
116 auto element =
reinterpret_cast<T*
>(block->data + i *
sizeof(T));
124 }
while (block != tailBlock_);
131 AE_FORCEINLINE
bool try_push(T
const& element)
133 return inner_enqueue<CannotAlloc>(element);
139 AE_FORCEINLINE
bool try_push(T&& element)
141 return inner_enqueue<CannotAlloc>(std::forward<T>(element));
147 AE_FORCEINLINE
void push(T
const& element)
149 inner_enqueue<CanAlloc>(element);
154 AE_FORCEINLINE
void push(T&& element)
156 inner_enqueue<CanAlloc>(std::forward<T>(element));
163 bool try_pop(T& result)
166 ReentrantGuard guard(this->dequeuing);
181 Block* tailBlockAtStart = tailBlock;
182 fence(memory_order_acquire);
184 Block* frontBlock_ = frontBlock.load();
185 size_t blockTail = frontBlock_->tail.load();
186 size_t blockFront = frontBlock_->front.load();
187 fence(memory_order_acquire);
189 if (blockFront != blockTail) {
191 auto element =
reinterpret_cast<T*
>(frontBlock_->data + blockFront *
sizeof(T));
192 result = std::move(*element);
195 blockFront = (blockFront + 1) & frontBlock_->sizeMask();
197 fence(memory_order_release);
198 frontBlock_->front = blockFront;
200 else if (frontBlock_ != tailBlockAtStart) {
202 Block* nextBlock = frontBlock_->next;
207 size_t nextBlockFront = nextBlock->front.load();
208 size_t nextBlockTail = nextBlock->tail;
209 fence(memory_order_acquire);
213 assert(nextBlockFront != nextBlockTail);
214 AE_UNUSED(nextBlockTail);
217 fence(memory_order_release);
218 frontBlock = frontBlock_ = nextBlock;
220 compiler_fence(memory_order_release);
222 auto element =
reinterpret_cast<T*
>(frontBlock_->data + nextBlockFront *
sizeof(T));
224 result = std::move(*element);
227 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask();
229 fence(memory_order_release);
230 frontBlock_->front = nextBlockFront;
248 ReentrantGuard guard(this->dequeuing);
252 Block* tailBlockAtStart = tailBlock;
253 fence(memory_order_acquire);
255 Block* frontBlock_ = frontBlock.load();
256 size_t blockTail = frontBlock_->tail.load();
257 size_t blockFront = frontBlock_->front.load();
258 fence(memory_order_acquire);
260 if (blockFront != blockTail) {
261 return reinterpret_cast<T*
>(frontBlock_->data + blockFront *
sizeof(T));
263 else if (frontBlock_ != tailBlockAtStart) {
264 Block* nextBlock = frontBlock_->next;
266 size_t nextBlockFront = nextBlock->front.load();
267 fence(memory_order_acquire);
269 assert(nextBlockFront != nextBlock->tail);
270 return reinterpret_cast<T*
>(nextBlock->data + nextBlockFront *
sizeof(T));
277 enum AllocationMode { CanAlloc, CannotAlloc };
279 template<AllocationMode canAlloc,
typename U>
280 bool inner_enqueue(U&& element)
283 ReentrantGuard guard(this->enqueuing);
293 Block* tailBlock_ = tailBlock.load();
294 size_t blockFront = tailBlock_->front.load();
295 size_t blockTail = tailBlock_->tail.load();
296 fence(memory_order_acquire);
298 size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask();
299 if (nextBlockTail != blockFront) {
301 char* location = tailBlock_->data + blockTail *
sizeof(T);
302 new (location)T(std::forward<U>(element));
304 fence(memory_order_release);
305 tailBlock_->tail = nextBlockTail;
307 else if (tailBlock_->next.load() != frontBlock) {
313 fence(memory_order_acquire);
316 Block* tailBlockNext = tailBlock_->next.load();
317 size_t nextBlockFront = tailBlockNext->front.load();
318 nextBlockTail = tailBlockNext->tail.load();
319 fence(memory_order_acquire);
323 assert(nextBlockFront == nextBlockTail);
324 AE_UNUSED(nextBlockFront);
326 char* location = tailBlockNext->data + nextBlockTail *
sizeof(T);
327 new (location)T(std::forward<U>(element));
329 tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask();
331 fence(memory_order_release);
332 tailBlock = tailBlockNext;
334 else if (canAlloc == CanAlloc) {
336 largestBlockSize *= 2;
337 Block* newBlock =
new Block(largestBlockSize);
339 new (newBlock->data) T(std::forward<U>(element));
341 assert(newBlock->front == 0);
344 newBlock->next = tailBlock_->next.load();
345 tailBlock_->next = newBlock;
353 fence(memory_order_release);
354 tailBlock = newBlock;
356 else if (canAlloc == CannotAlloc) {
361 assert(
false &&
"Should be unreachable code");
377 AE_FORCEINLINE
static size_t ceilToPow2(
size_t x)
384 for (
size_t i = 1; i <
sizeof(size_t); i <<= 1) {
392 struct ReentrantGuard
394 ReentrantGuard(
bool& _inSection)
395 : inSection(_inSection)
399 throw std::runtime_error(
"ReaderWriterQueue does not support enqueuing or dequeuing elements from other elements' ctors and dtors");
405 ~ReentrantGuard() { inSection =
false; }
408 ReentrantGuard& operator=(ReentrantGuard
const&);
418 AE_ALIGN(CACHE_LINE_SIZE)
421 AE_ALIGN(CACHE_LINE_SIZE)
424 AE_ALIGN(CACHE_LINE_SIZE)
431 AE_FORCEINLINE
size_t sizeMask()
const {
return size - 1; }
435 Block(
size_t const& _size)
436 : front(0), tail(0), next(
nullptr), size(_size)
439 size_t alignment = std::alignment_of<T>::value;
440 data = rawData =
static_cast<char*
>(std::malloc(
sizeof(T)* size + alignment - 1));
442 auto alignmentOffset = (uintptr_t)rawData % alignment;
443 if (alignmentOffset != 0) {
444 data += alignment - alignmentOffset;
455 Block& operator=(Block
const&);
462 AE_ALIGN(CACHE_LINE_SIZE)
465 AE_ALIGN(CACHE_LINE_SIZE)
468 AE_ALIGN(CACHE_LINE_SIZE)
469 size_t largestBlockSize;
483 #endif // LFQUEUE_H_INCLUDED