From 11a9c99fbd65169e42baea926843b046f608227c Mon Sep 17 00:00:00 2001
From: Florian Schmaus <flow@cs.fau.de>
Date: Sat, 17 Jun 2017 16:42:08 +0200
Subject: [PATCH] Add WsCl2Queue and use it in WsScheduler

---
 emper/lib/adt/WsClV2Queue.hpp       | 146 ++++++++++++++++++++++++++++
 emper/strategies/ws/WsScheduler.hpp |   5 +
 2 files changed, 151 insertions(+)
 create mode 100644 emper/lib/adt/WsClV2Queue.hpp

diff --git a/emper/lib/adt/WsClV2Queue.hpp b/emper/lib/adt/WsClV2Queue.hpp
new file mode 100644
index 00000000..e8e7f956
--- /dev/null
+++ b/emper/lib/adt/WsClV2Queue.hpp
@@ -0,0 +1,146 @@
+#pragma once
+
+#include <cstdint>
+#include <atomic>
+
+#include "Common.hpp"
+
+namespace adt {
+
+/**
+ * @file
+ *
+ * A CL queue used for work stealing.
+ *
+ * Implements the partially concurrent queue for the work stealing
+ * scheduling algorithm as defined in "Dynamic Circular Work-Stealing Deque" by Chace and Lev. The
+ * worker uses pushBottom to push new items on the stack, and
+ * retrieves items via popBottom. Eventually, when its own queue is
+ * empty, it will use popTop to retrieve items from a different
+ * Worker's queue.
+ *
+ * Only pushBottom/popBottom and popTop may be used concurrently,
+ * i.e. concurrent use of pushBottom with popBottom is not allowed.
+ *
+ * Unlike the ABP queue, the CL queue uses a circular array and
+ * only-incrementing uint64_t bottom and top fields. According to the
+ * authors "A 64-bit integer is large enough to accommodate 64 years
+ * of pushes, pops and steals executing at a rate of 4 billtion
+ * operations per second", so overflows should be no problem.
+ */
+template<typename ITEM_TYPE, const uintptr_t CAPACITY>
+class WsClV2Queue {
+
+	// Bottom of the queue is only modified in pushBottom() and
+	// popBottom() methods, which are only used by the same
+	// thread. Thus we don't need to make it atomic.
+	ALIGN_TO_CACHE_LINE uint64_t bottom;
+
+	ALIGN_TO_CACHE_LINE std::atomic<uint64_t> top;
+
+	ITEM_TYPE queue[CAPACITY];
+
+public:
+	// 'bottom' and 'top' are initialized to '1', instead of '0' as
+	// it's done in the "Dynamic Circular Work-Stealing Deque" paper
+	// because popBottom will decrement bottom, which will result in
+	// an underflow if bottom is '0'. The paper's queue uses Java
+	// 'long' for bottom and top and is thus safe since it's signed.
+	WsClV2Queue() : bottom(1), top(1) { }
+	bool pushBottom(const ITEM_TYPE item);
+	bool popBottom(ITEM_TYPE *item);
+	bool popTop(ITEM_TYPE *item);
+	bool isFull() const;
+	bool isEmpty() const;
+	void print() const;
+	inline uintptr_t capacity() const {
+		return CAPACITY;
+	}
+	inline uint64_t usedSlots() const {
+		return bottom - top;
+	}
+	inline uintptr_t freeSlots() const {
+		return capacity() - usedSlots();
+	}
+};
+
+template<typename ITEM_TYPE, const uintptr_t CAPACITY>
+bool WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) {
+	// Check if queue is full.
+	if (isFull())
+		return false;
+	
+	queue[bottom % CAPACITY] = item;
+
+	// TODO: Possible to move after fence?
+	bottom += 1;
+
+	// Write fence / memory barrier
+	atomic_thread_fence(std::memory_order_release);
+	
+	return true;
+}
+
+template<typename ITEM_TYPE, const uintptr_t CAPACITY>
+bool WsClV2Queue<ITEM_TYPE, CAPACITY>::popBottom(ITEM_TYPE *item) {
+	const uint64_t localBottom = --bottom;
+	uint64_t localTop = top.load(std::memory_order_acquire);
+
+	if (localTop > localBottom) {
+		// The queue is empty.
+		bottom = localTop;
+		return false;
+	}
+
+	*item = queue[localBottom % CAPACITY];
+
+	// There was no possiblity of a race. No atomic CAS required.
+	if (localBottom > localTop) return true;
+
+	bool res = top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed);
+	//	TODO: Why do we reset bottom here?
+	bottom = localTop + 1;
+	return res;
+}
+
+template<typename ITEM_TYPE, const uintptr_t CAPACITY>
+bool WsClV2Queue<ITEM_TYPE, CAPACITY>::popTop(ITEM_TYPE *item) {
+ start:
+	uint64_t localTop = top.load(std::memory_order_acquire);
+
+	// Initially this fence was after the check if the queue is
+	// empty. But since the shared variable 'bottom' is read by the
+	// check, I believe it belongs before the check.
+	// Read fence / memory barrier
+	atomic_thread_fence(std::memory_order_acquire);
+
+	// Check if the queue is empty.
+	if (bottom <= localTop) return false;
+
+	*item = queue[localTop % CAPACITY];
+
+	// If cas fails, then this popTop() lost the race against another
+	// popTop() or a rare-case popBottom().
+	if (!top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed)) {
+		// TODO: Return 'ABORT' enum value here.
+#ifdef WAIT_FREE_WS_QUEUE
+		return false;
+#else
+		goto start;
+#endif
+	}
+
+	return true;
+}
+
+template<typename ITEM_TYPE, const uintptr_t CAPACITY>
+bool WsClV2Queue<ITEM_TYPE, CAPACITY>::isFull() const {
+	return usedSlots() >= CAPACITY;
+}
+
+template<typename ITEM_TYPE, const uintptr_t CAPACITY>
+bool WsClV2Queue<ITEM_TYPE, CAPACITY>::isEmpty() const {
+	return top >= bottom;
+}
+
+}	// namespace adt
diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp
index 9f9c2ab9..b0ea85cb 100644
--- a/emper/strategies/ws/WsScheduler.hpp
+++ b/emper/strategies/ws/WsScheduler.hpp
@@ -2,11 +2,16 @@
 
 #include "Scheduler.hpp"
 #include "WsClQueue.hpp"
+#include "WsClV2Queue.hpp"
 #include "LockedQueue.hpp"
 
 class WsScheduler: public Scheduler {
 	template <size_t SIZE>
+#ifdef LOCKED_WS_QUEUE
 	using WsQueue = adt::LockedQueue<Fiber*, SIZE>;
+#else
+	using WsQueue = adt::WsClV2Queue<Fiber*, SIZE>;
+#endif
 
 public:
 	static const int QUEUE_SIZE = 128;
-- 
GitLab