YSE sound engine  1.0
cross platform sound engine
 All Classes Namespaces Functions Pages
lfQueue.hpp
1 /*
2  ==============================================================================
3 
4  lfQueue.h
5  Created: 8 Mar 2014 12:44:41pm
6  Author: yvan
7 
8  ==============================================================================
9 */
10 
11 #ifndef LFQUEUE_H_INCLUDED
12 #define LFQUEUE_H_INCLUDED
13 
14 #include "atomicOps.hpp"
15 #include <type_traits>
16 #include <utility>
17 #include <cassert>
18 #include <stdexcept>
19 #include <cstdint>
20 #include <cstdlib>
21 #include "../headers/enums.hpp"
22 #include "../headers/types.hpp"
23 
24 namespace YSE {
25 
26  // )2013 Cameron Desrochers.
27  // Distributed under the simplified BSD license (see the license file that
28  // should have come with this header).
29 
30 
31  // A lock-free queue for a single-consumer, single-producer architecture.
32  // The queue is also wait-free in the common path (except if more memory
33  // needs to be allocated, in which case malloc is called).
34  // Allocates memory sparingly (O(lg(n) times, amortized), and only once if
35  // the original maximum size estimate is never exceeded.
36  // Tested on x86/x64 processors, but semantics should be correct for all
37  // architectures (given the right implementations in atomicops.h), provided
38  // that aligned integer and pointer accesses are naturally atomic.
39  // Note that there should only be one consumer thread and producer thread;
40  // Switching roles of the threads, or using multiple consecutive threads for
41  // one role, is not safe unless properly synchronized.
42  // Using the queue exclusively from one thread is fine, though a bit silly.
43 
44 #define CACHE_LINE_SIZE 64
45 
46 #ifdef AE_VCPP
47 #pragma warning(push)
48 #pragma warning(disable: 4316)
49 #pragma warning(disable: 4324) // structure was padded due to __declspec(align())
50 #pragma warning(disable: 4820) // padding was added
51 #pragma warning(disable: 4127) // conditional expression is constant
52 #endif
53 
54  template<typename T>
55  class lfQueue
56  {
57  // Design: Based on a queue-of-queues. The low-level queues are just
58  // circular buffers with front and tail indices indicating where the
59  // next element to dequeue is and where the next element can be enqueued,
60  // respectively. Each low-level queue is called a "block". Each block
61  // wastes exactly one element's worth of space to keep the design simple
62  // (if front == tail then the queue is empty, and can't be full).
63  // The high-level queue is a circular linked list of blocks; again there
64  // is a front and tail, but this time they are pointers to the blocks.
65  // The front block is where the next element to be dequeued is, provided
66  // the block is not empty. The back block is where elements are to be
67  // enqueued, provided the block is not full.
68  // The producer thread owns all the tail indices/pointers. The consumer
69  // thread owns all the front indices/pointers. Both threads read each
70  // other's variables, but only the owning thread updates them. E.g. After
71  // the consumer reads the producer's tail, the tail may change before the
72  // consumer is done dequeuing an object, but the consumer knows the tail
73  // will never go backwards, only forwards.
74  // If there is no room to enqueue an object, an additional block (of
75  // greater size than the last block) is added. Blocks are never removed.
76 
77  public:
78  // Constructs a queue that can hold maxSize elements without further
79  // allocations. Allocates maxSize + 1, rounded up to the nearest power
80  // of 2, elements.
81  explicit lfQueue(size_t maxSize = 15)
82  : largestBlockSize(ceilToPow2(maxSize + 1)) // We need a spare slot to fit maxSize elements in the block
83 #ifndef NDEBUG
84  , enqueuing(false)
85  , dequeuing(false)
86 #endif
87  {
88  assert(maxSize > 0);
89 
90  auto firstBlock = new Block(largestBlockSize);
91  firstBlock->next = firstBlock;
92 
93  frontBlock = firstBlock;
94  tailBlock = firstBlock;
95 
96  // Make sure the reader/writer threads will have the initialized memory setup above:
97  fence(memory_order_sync);
98  }
99 
100  // Note: The queue should not be accessed concurrently while it's
101  // being deleted. It's up to the user to synchronize this.
102  ~lfQueue()
103  {
104  // Make sure we get the latest version of all variables from other CPUs:
105  fence(memory_order_sync);
106 
107  // Destroy any remaining objects in queue and free memory
108  Block* tailBlock_ = tailBlock;
109  Block* block = frontBlock;
110  do {
111  Block* nextBlock = block->next;
112  size_t blockFront = block->front;
113  size_t blockTail = block->tail;
114 
115  for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask()) {
116  auto element = reinterpret_cast<T*>(block->data + i * sizeof(T));
117  element->~T();
118  (void)element;
119  }
120 
121  delete block;
122  block = nextBlock;
123 
124  } while (block != tailBlock_);
125  }
126 
127 
128  // Enqueues a copy of element if there is room in the queue.
129  // Returns true if the element was enqueued, false otherwise.
130  // Does not allocate memory.
131  AE_FORCEINLINE bool try_push(T const& element)
132  {
133  return inner_enqueue<CannotAlloc>(element);
134  }
135 
136  // Enqueues a moved copy of element if there is room in the queue.
137  // Returns true if the element was enqueued, false otherwise.
138  // Does not allocate memory.
139  AE_FORCEINLINE bool try_push(T&& element)
140  {
141  return inner_enqueue<CannotAlloc>(std::forward<T>(element));
142  }
143 
144 
145  // Enqueues a copy of element on the queue.
146  // Allocates an additional block of memory if needed.
147  AE_FORCEINLINE void push(T const& element)
148  {
149  inner_enqueue<CanAlloc>(element);
150  }
151 
152  // Enqueues a moved copy of element on the queue.
153  // Allocates an additional block of memory if needed.
154  AE_FORCEINLINE void push(T&& element)
155  {
156  inner_enqueue<CanAlloc>(std::forward<T>(element));
157  }
158 
159 
160  // Attempts to dequeue an element; if the queue is empty,
161  // returns false instead. If the queue has at least one element,
162  // moves front to result using operator=, then returns true.
163  bool try_pop(T& result)
164  {
165 #ifndef NDEBUG
166  ReentrantGuard guard(this->dequeuing);
167 #endif
168 
169  // High-level pseudocode:
170  // Remember where the tail block is
171  // If the front block has an element in it, dequeue it
172  // Else
173  // If front block was the tail block when we entered the function, return false
174  // Else advance to next block and dequeue the item there
175 
176  // Note that we have to use the value of the tail block from before we check if the front
177  // block is full or not, in case the front block is empty and then, before we check if the
178  // tail block is at the front block or not, the producer fills up the front block *and
179  // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
180  // reproducible in practice.
181  Block* tailBlockAtStart = tailBlock;
182  fence(memory_order_acquire);
183 
184  Block* frontBlock_ = frontBlock.load();
185  size_t blockTail = frontBlock_->tail.load();
186  size_t blockFront = frontBlock_->front.load();
187  fence(memory_order_acquire);
188 
189  if (blockFront != blockTail) {
190  // Front block not empty, dequeue from here
191  auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
192  result = std::move(*element);
193  element->~T();
194 
195  blockFront = (blockFront + 1) & frontBlock_->sizeMask();
196 
197  fence(memory_order_release);
198  frontBlock_->front = blockFront;
199  }
200  else if (frontBlock_ != tailBlockAtStart) {
201  // Front block is empty but there's another block ahead, advance to it
202  Block* nextBlock = frontBlock_->next;
203  // Don't need an acquire fence here since next can only ever be set on the tailBlock,
204  // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which
205  // ensures next is up-to-date on this CPU in case we recently were at tailBlock.
206 
207  size_t nextBlockFront = nextBlock->front.load();
208  size_t nextBlockTail = nextBlock->tail;
209  fence(memory_order_acquire);
210 
211  // Since the tailBlock is only ever advanced after being written to,
212  // we know there's for sure an element to dequeue on it
213  assert(nextBlockFront != nextBlockTail);
214  AE_UNUSED(nextBlockTail);
215 
216  // We're done with this block, let the producer use it if it needs
217  fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue
218  frontBlock = frontBlock_ = nextBlock;
219 
220  compiler_fence(memory_order_release); // Not strictly needed
221 
222  auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
223 
224  result = std::move(*element);
225  element->~T();
226 
227  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask();
228 
229  fence(memory_order_release);
230  frontBlock_->front = nextBlockFront;
231  }
232  else {
233  // No elements in current block and no other block to advance to
234  return false;
235  }
236 
237  return true;
238  }
239 
240 
241  // Returns a pointer to the first element in the queue (the one that
242  // would be removed next by a call to `try_dequeue`). If the queue appears
243  // empty at the time the method is called, nullptr is returned instead.
244  // Must be called only from the consumer thread.
245  T* peek()
246  {
247 #ifndef NDEBUG
248  ReentrantGuard guard(this->dequeuing);
249 #endif
250  // See try_dequeue() for reasoning
251 
252  Block* tailBlockAtStart = tailBlock;
253  fence(memory_order_acquire);
254 
255  Block* frontBlock_ = frontBlock.load();
256  size_t blockTail = frontBlock_->tail.load();
257  size_t blockFront = frontBlock_->front.load();
258  fence(memory_order_acquire);
259 
260  if (blockFront != blockTail) {
261  return reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
262  }
263  else if (frontBlock_ != tailBlockAtStart) {
264  Block* nextBlock = frontBlock_->next;
265 
266  size_t nextBlockFront = nextBlock->front.load();
267  fence(memory_order_acquire);
268 
269  assert(nextBlockFront != nextBlock->tail);
270  return reinterpret_cast<T*>(nextBlock->data + nextBlockFront * sizeof(T));
271  }
272  return nullptr;
273  }
274 
275 
276  private:
277  enum AllocationMode { CanAlloc, CannotAlloc };
278 
279  template<AllocationMode canAlloc, typename U>
280  bool inner_enqueue(U&& element)
281  {
282 #ifndef NDEBUG
283  ReentrantGuard guard(this->enqueuing);
284 #endif
285 
286  // High-level pseudocode (assuming we're allowed to alloc a new block):
287  // If room in tail block, add to tail
288  // Else check next block
289  // If next block is not the head block, enqueue on next block
290  // Else create a new block and enqueue there
291  // Advance tail to the block we just enqueued to
292 
293  Block* tailBlock_ = tailBlock.load();
294  size_t blockFront = tailBlock_->front.load();
295  size_t blockTail = tailBlock_->tail.load();
296  fence(memory_order_acquire);
297 
298  size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask();
299  if (nextBlockTail != blockFront) {
300  // This block has room for at least one more element
301  char* location = tailBlock_->data + blockTail * sizeof(T);
302  new (location)T(std::forward<U>(element));
303 
304  fence(memory_order_release);
305  tailBlock_->tail = nextBlockTail;
306  }
307  else if (tailBlock_->next.load() != frontBlock) {
308  // Note that the reason we can't advance to the frontBlock and start adding new entries there
309  // is because if we did, then dequeue would stay in that block, eventually reading the new values,
310  // instead of advancing to the next full block (whose values were enqueued first and so should be
311  // consumed first).
312 
313  fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock
314 
315  // tailBlock is full, but there's a free block ahead, use it
316  Block* tailBlockNext = tailBlock_->next.load();
317  size_t nextBlockFront = tailBlockNext->front.load();
318  nextBlockTail = tailBlockNext->tail.load();
319  fence(memory_order_acquire);
320 
321  // This block must be empty since it's not the head block and we
322  // go through the blocks in a circle
323  assert(nextBlockFront == nextBlockTail);
324  AE_UNUSED(nextBlockFront);
325 
326  char* location = tailBlockNext->data + nextBlockTail * sizeof(T);
327  new (location)T(std::forward<U>(element));
328 
329  tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask();
330 
331  fence(memory_order_release);
332  tailBlock = tailBlockNext;
333  }
334  else if (canAlloc == CanAlloc) {
335  // tailBlock is full and there's no free block ahead; create a new block
336  largestBlockSize *= 2;
337  Block* newBlock = new Block(largestBlockSize);
338 
339  new (newBlock->data) T(std::forward<U>(element));
340 
341  assert(newBlock->front == 0);
342  newBlock->tail = 1;
343 
344  newBlock->next = tailBlock_->next.load();
345  tailBlock_->next = newBlock;
346 
347  // Might be possible for the dequeue thread to see the new tailBlock->next
348  // *without* seeing the new tailBlock value, but this is OK since it can't
349  // advance to the next block until tailBlock is set anyway (because the only
350  // case where it could try to read the next is if it's already at the tailBlock,
351  // and it won't advance past tailBlock in any circumstance).
352 
353  fence(memory_order_release);
354  tailBlock = newBlock;
355  }
356  else if (canAlloc == CannotAlloc) {
357  // Would have had to allocate a new block to enqueue, but not allowed
358  return false;
359  }
360  else {
361  assert(false && "Should be unreachable code");
362  return false;
363  }
364 
365  return true;
366  }
367 
368 
369  // Disable copying
370  lfQueue(lfQueue const&) = delete;
371 
372  // Disable assignment
373  lfQueue& operator=(lfQueue const&) = delete;
374 
375 
376 
377  AE_FORCEINLINE static size_t ceilToPow2(size_t x)
378  {
379  // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
380  --x;
381  x |= x >> 1;
382  x |= x >> 2;
383  x |= x >> 4;
384  for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
385  x |= x >> (i << 3);
386  }
387  ++x;
388  return x;
389  }
390  private:
391 #ifndef NDEBUG
392  struct ReentrantGuard
393  {
394  ReentrantGuard(bool& _inSection)
395  : inSection(_inSection)
396  {
397  assert(!inSection);
398  if (inSection) {
399  throw std::runtime_error("ReaderWriterQueue does not support enqueuing or dequeuing elements from other elements' ctors and dtors");
400  }
401 
402  inSection = true;
403  }
404 
405  ~ReentrantGuard() { inSection = false; }
406 
407  private:
408  ReentrantGuard& operator=(ReentrantGuard const&);
409 
410  private:
411  bool& inSection;
412  };
413 #endif
414 
415  struct Block
416  {
417  // Avoid false-sharing by putting highly contended variables on their own cache lines
418  AE_ALIGN(CACHE_LINE_SIZE)
419  weak_atomic<size_t> front; // (Atomic) Elements are read from here
420 
421  AE_ALIGN(CACHE_LINE_SIZE)
422  weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
423 
424  AE_ALIGN(CACHE_LINE_SIZE) // next isn't very contended, but we don't want it on the same cache line as tail (which is)
425  weak_atomic<Block*> next; // (Atomic)
426 
427  char* data; // Contents (on heap) are aligned to T's alignment
428 
429  const size_t size;
430 
431  AE_FORCEINLINE size_t sizeMask() const { return size - 1; }
432 
433 
434  // size must be a power of two (and greater than 0)
435  Block(size_t const& _size)
436  : front(0), tail(0), next(nullptr), size(_size)
437  {
438  // Allocate enough memory for an array of Ts, aligned
439  size_t alignment = std::alignment_of<T>::value;
440  data = rawData = static_cast<char*>(std::malloc(sizeof(T)* size + alignment - 1));
441  assert(rawData);
442  auto alignmentOffset = (uintptr_t)rawData % alignment;
443  if (alignmentOffset != 0) {
444  data += alignment - alignmentOffset;
445  }
446  }
447 
448  ~Block()
449  {
450  std::free(rawData);
451  }
452 
453  private:
454  // C4512 - Assignment operator could not be generated
455  Block& operator=(Block const&);
456 
457  private:
458  char* rawData;
459  };
460 
461  private:
462  AE_ALIGN(CACHE_LINE_SIZE)
463  weak_atomic<Block*> frontBlock; // (Atomic) Elements are enqueued to this block
464 
465  AE_ALIGN(CACHE_LINE_SIZE)
466  weak_atomic<Block*> tailBlock; // (Atomic) Elements are dequeued from this block
467 
468  AE_ALIGN(CACHE_LINE_SIZE) // Ensure tailBlock gets its own cache line
469  size_t largestBlockSize;
470 
471 #ifndef NDEBUG
472  bool enqueuing;
473  bool dequeuing;
474 #endif
475  };
476 
477 #ifdef AE_VCPP
478 #pragma warning(pop)
479 #endif
480 
481 }
482 
483 #endif // LFQUEUE_H_INCLUDED