diff --git a/emper/lib/adt/WsClV2Queue.hpp b/emper/lib/adt/WsClV2Queue.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e8e7f956472681f221569837f4ec993d0de70dc1 --- /dev/null +++ b/emper/lib/adt/WsClV2Queue.hpp @@ -0,0 +1,146 @@ +#pragma once + +#include <cstdint> +#include <atomic> + +#include "Common.hpp" + +namespace adt { + +/** + * @file + * + * A CL queue used for work stealing. + * + * Implements the partially concurrent queue for the work stealing + * scheduling algorithm as defined in "Dynamic Circular Work-Stealing Deque" by Chace and Lev. The + * worker uses pushBottom to push new items on the stack, and + * retrieves items via popBottom. Eventually, when its own queue is + * empty, it will use popTop to retrieve items from a different + * Worker's queue. + * + * Only pushBottom/popBottom and popTop may be used concurrently, + * i.e. concurrent use of pushBottom with popBottom is not allowed. + * + * Unlike the ABP queue, the CL queue uses a circular array and + * only-incrementing uint64_t bottom and top fields. According to the + * authors "A 64-bit integer is large enough to accommodate 64 years + * of pushes, pops and steals executing at a rate of 4 billtion + * operations per second", so overflows should be no problem. + */ +template<typename ITEM_TYPE, const uintptr_t CAPACITY> +class WsClV2Queue { + + // Bottom of the queue is only modified in pushBottom() and + // popBottom() methods, which are only used by the same + // thread. Thus we don't need to make it atomic. + ALIGN_TO_CACHE_LINE uint64_t bottom; + + ALIGN_TO_CACHE_LINE std::atomic<uint64_t> top; + + ITEM_TYPE queue[CAPACITY]; + +public: + // 'bottom' and 'top' are initialized to '1', instead of '0' as + // it's done in the "Dynamic Circular Work-Stealing Deque" paper + // because popBottom will decrement bottom, which will result in + // an underflow if bottom is '0'. The paper's queue uses Java + // 'long' for bottom and top and is thus safe since it's signed. + WsClV2Queue() : bottom(1), top(1) { } + bool pushBottom(const ITEM_TYPE item); + bool popBottom(ITEM_TYPE *item); + bool popTop(ITEM_TYPE *item); + bool isFull() const; + bool isEmpty() const; + void print() const; + inline uintptr_t capacity() const { + return CAPACITY; + } + inline uint64_t usedSlots() const { + return bottom - top; + } + inline uintptr_t freeSlots() const { + return capacity() - usedSlots(); + } +}; + +template<typename ITEM_TYPE, const uintptr_t CAPACITY> +bool WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) { + // Check if queue is full. + if (isFull()) + return false; + + queue[bottom % CAPACITY] = item; + + // TODO: Possible to move after fence? + bottom += 1; + + // Write fence / memory barrier + atomic_thread_fence(std::memory_order_release); + + return true; +} + +template<typename ITEM_TYPE, const uintptr_t CAPACITY> +bool WsClV2Queue<ITEM_TYPE, CAPACITY>::popBottom(ITEM_TYPE *item) { + const uint64_t localBottom = --bottom; + uint64_t localTop = top.load(std::memory_order_acquire); + + if (localTop > localBottom) { + // The queue is empty. + bottom = localTop; + return false; + } + + *item = queue[localBottom % CAPACITY]; + + // There was no possiblity of a race. No atomic CAS required. + if (localBottom > localTop) return true; + + bool res = top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed); + // TODO: Why do we reset bottom here? + bottom = localTop + 1; + return res; +} + +template<typename ITEM_TYPE, const uintptr_t CAPACITY> +bool WsClV2Queue<ITEM_TYPE, CAPACITY>::popTop(ITEM_TYPE *item) { + start: + uint64_t localTop = top.load(std::memory_order_acquire); + + // Initially this fence was after the check if the queue is + // empty. But since the shared variable 'bottom' is read by the + // check, I believe it belongs before the check. + // Read fence / memory barrier + atomic_thread_fence(std::memory_order_acquire); + + // Check if the queue is empty. + if (bottom <= localTop) return false; + + *item = queue[localTop % CAPACITY]; + + // If cas fails, then this popTop() lost the race against another + // popTop() or a rare-case popBottom(). + if (!top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed)) { + // TODO: Return 'ABORT' enum value here. +#ifdef WAIT_FREE_WS_QUEUE + return false; +#else + goto start; +#endif + } + + return true; +} + +template<typename ITEM_TYPE, const uintptr_t CAPACITY> +bool WsClV2Queue<ITEM_TYPE, CAPACITY>::isFull() const { + return usedSlots() >= CAPACITY; +} + +template<typename ITEM_TYPE, const uintptr_t CAPACITY> +bool WsClV2Queue<ITEM_TYPE, CAPACITY>::isEmpty() const { + return top >= bottom; +} + +} // namespace adt diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index 9f9c2ab957b00bb0fcb509ec0f0d672cfc0e1b59..b0ea85cb67af999123923391d682f593bd2249b0 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -2,11 +2,16 @@ #include "Scheduler.hpp" #include "WsClQueue.hpp" +#include "WsClV2Queue.hpp" #include "LockedQueue.hpp" class WsScheduler: public Scheduler { template <size_t SIZE> +#ifdef LOCKED_WS_QUEUE using WsQueue = adt::LockedQueue<Fiber*, SIZE>; +#else + using WsQueue = adt::WsClV2Queue<Fiber*, SIZE>; +#endif public: static const int QUEUE_SIZE = 128;