Skip to content
Snippets Groups Projects
Commit 11a9c99f authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Add WsCl2Queue and use it in WsScheduler

parent 25c24170
No related branches found
No related tags found
No related merge requests found
#pragma once
#include <cstdint>
#include <atomic>
#include "Common.hpp"
namespace adt {
/**
* @file
*
* A CL queue used for work stealing.
*
* Implements the partially concurrent queue for the work stealing
* scheduling algorithm as defined in "Dynamic Circular Work-Stealing Deque" by Chace and Lev. The
* worker uses pushBottom to push new items on the stack, and
* retrieves items via popBottom. Eventually, when its own queue is
* empty, it will use popTop to retrieve items from a different
* Worker's queue.
*
* Only pushBottom/popBottom and popTop may be used concurrently,
* i.e. concurrent use of pushBottom with popBottom is not allowed.
*
* Unlike the ABP queue, the CL queue uses a circular array and
* only-incrementing uint64_t bottom and top fields. According to the
* authors "A 64-bit integer is large enough to accommodate 64 years
* of pushes, pops and steals executing at a rate of 4 billtion
* operations per second", so overflows should be no problem.
*/
template<typename ITEM_TYPE, const uintptr_t CAPACITY>
class WsClV2Queue {
// Bottom of the queue is only modified in pushBottom() and
// popBottom() methods, which are only used by the same
// thread. Thus we don't need to make it atomic.
ALIGN_TO_CACHE_LINE uint64_t bottom;
ALIGN_TO_CACHE_LINE std::atomic<uint64_t> top;
ITEM_TYPE queue[CAPACITY];
public:
// 'bottom' and 'top' are initialized to '1', instead of '0' as
// it's done in the "Dynamic Circular Work-Stealing Deque" paper
// because popBottom will decrement bottom, which will result in
// an underflow if bottom is '0'. The paper's queue uses Java
// 'long' for bottom and top and is thus safe since it's signed.
WsClV2Queue() : bottom(1), top(1) { }
bool pushBottom(const ITEM_TYPE item);
bool popBottom(ITEM_TYPE *item);
bool popTop(ITEM_TYPE *item);
bool isFull() const;
bool isEmpty() const;
void print() const;
inline uintptr_t capacity() const {
return CAPACITY;
}
inline uint64_t usedSlots() const {
return bottom - top;
}
inline uintptr_t freeSlots() const {
return capacity() - usedSlots();
}
};
template<typename ITEM_TYPE, const uintptr_t CAPACITY>
bool WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) {
// Check if queue is full.
if (isFull())
return false;
queue[bottom % CAPACITY] = item;
// TODO: Possible to move after fence?
bottom += 1;
// Write fence / memory barrier
atomic_thread_fence(std::memory_order_release);
return true;
}
template<typename ITEM_TYPE, const uintptr_t CAPACITY>
bool WsClV2Queue<ITEM_TYPE, CAPACITY>::popBottom(ITEM_TYPE *item) {
const uint64_t localBottom = --bottom;
uint64_t localTop = top.load(std::memory_order_acquire);
if (localTop > localBottom) {
// The queue is empty.
bottom = localTop;
return false;
}
*item = queue[localBottom % CAPACITY];
// There was no possiblity of a race. No atomic CAS required.
if (localBottom > localTop) return true;
bool res = top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed);
// TODO: Why do we reset bottom here?
bottom = localTop + 1;
return res;
}
template<typename ITEM_TYPE, const uintptr_t CAPACITY>
bool WsClV2Queue<ITEM_TYPE, CAPACITY>::popTop(ITEM_TYPE *item) {
start:
uint64_t localTop = top.load(std::memory_order_acquire);
// Initially this fence was after the check if the queue is
// empty. But since the shared variable 'bottom' is read by the
// check, I believe it belongs before the check.
// Read fence / memory barrier
atomic_thread_fence(std::memory_order_acquire);
// Check if the queue is empty.
if (bottom <= localTop) return false;
*item = queue[localTop % CAPACITY];
// If cas fails, then this popTop() lost the race against another
// popTop() or a rare-case popBottom().
if (!top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed)) {
// TODO: Return 'ABORT' enum value here.
#ifdef WAIT_FREE_WS_QUEUE
return false;
#else
goto start;
#endif
}
return true;
}
template<typename ITEM_TYPE, const uintptr_t CAPACITY>
bool WsClV2Queue<ITEM_TYPE, CAPACITY>::isFull() const {
return usedSlots() >= CAPACITY;
}
template<typename ITEM_TYPE, const uintptr_t CAPACITY>
bool WsClV2Queue<ITEM_TYPE, CAPACITY>::isEmpty() const {
return top >= bottom;
}
} // namespace adt
...@@ -2,11 +2,16 @@ ...@@ -2,11 +2,16 @@
#include "Scheduler.hpp" #include "Scheduler.hpp"
#include "WsClQueue.hpp" #include "WsClQueue.hpp"
#include "WsClV2Queue.hpp"
#include "LockedQueue.hpp" #include "LockedQueue.hpp"
class WsScheduler: public Scheduler { class WsScheduler: public Scheduler {
template <size_t SIZE> template <size_t SIZE>
#ifdef LOCKED_WS_QUEUE
using WsQueue = adt::LockedQueue<Fiber*, SIZE>; using WsQueue = adt::LockedQueue<Fiber*, SIZE>;
#else
using WsQueue = adt::WsClV2Queue<Fiber*, SIZE>;
#endif
public: public:
static const int QUEUE_SIZE = 128; static const int QUEUE_SIZE = 128;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment