From 5ac424cc898495a329617d8076e43f8a7b316bf9 Mon Sep 17 00:00:00 2001
From: Florian Schmaus <flow@cs.fau.de>
Date: Tue, 4 Apr 2017 16:10:58 +0200
Subject: [PATCH] Add Actor API and improve debug log output

---
 emper/Actor.hpp                               | 79 ++++++++++++++++
 emper/Blockable.hpp                           | 35 +++++++
 emper/Debug.cpp                               |  8 +-
 emper/Debug.hpp                               | 43 +++++++--
 emper/Dispatcher.hpp                          |  9 +-
 emper/Runtime.cpp                             |  2 +-
 emper/Runtime.hpp                             |  3 +-
 emper/Scheduler.hpp                           |  5 +-
 emper/UnboundedBlockingMpscQueue.hpp          | 91 +++++++++++++++++++
 .../MultiList_UnboundedWaitFreeMpScQueue.hpp  | 26 ++++++
 emper/strategies/laws/LawsScheduler.cpp       |  2 +-
 emper/strategies/ws/WsScheduler.cpp           |  2 +-
 tests/CMakeLists.txt                          |  4 +
 tests/SimpleActorTest.cpp                     | 77 ++++++++++++++++
 14 files changed, 369 insertions(+), 17 deletions(-)
 create mode 100644 emper/Actor.hpp
 create mode 100644 emper/Blockable.hpp
 create mode 100644 emper/UnboundedBlockingMpscQueue.hpp
 create mode 100644 emper/lib/adt/MultiList_UnboundedWaitFreeMpScQueue.hpp
 create mode 100644 tests/SimpleActorTest.cpp

diff --git a/emper/Actor.hpp b/emper/Actor.hpp
new file mode 100644
index 00000000..6931d764
--- /dev/null
+++ b/emper/Actor.hpp
@@ -0,0 +1,79 @@
+#pragma once
+
+#include "UnboundedBlockingMpscQueue.hpp"
+#include "Fiber.hpp"
+
+#include <atomic>
+#include <chrono>
+
+template<typename T>
+class Actor {
+
+private:
+
+	enum State {
+		Stopped,
+		Retrieving,
+		Running,
+	};
+
+	Runtime& runtime;
+
+	// TODO Make atomic.
+	State state = Stopped;
+
+	UnboundedBlockingMpscQueue<T> queue;
+
+	void actorLoop() {
+		state = Running;
+
+		while (state == Running) {
+			state = Retrieving;
+			T t = queue.get();
+			state = Running;
+
+			receive(t);
+		}
+
+		state = Stopped;
+	}
+
+protected:
+
+	Actor(Runtime& runtime) : runtime(runtime), queue(runtime) {
+	}
+
+	virtual void receive(T t) = 0;
+
+	void stop() {
+		state = Stopped;
+	}
+
+public:
+	void start() {
+		if (state != Stopped) return;
+
+		Fiber* actorFiber = Fiber::from(std::bind(&Actor::actorLoop, this));
+		runtime.schedule(*actorFiber);
+	}
+
+	void tell(T t) {
+		queue.put(t);
+	}
+
+	size_t pendingMailboxItems() {
+		return queue.size();
+	}
+
+	bool waitUntilIdle(long timeout) {
+		const auto start = std::chrono::steady_clock::now();
+		const auto deadline = start + std::chrono::milliseconds(timeout);
+		while (!(queue.size() == 0 && state == Retrieving)) {
+			if (std::chrono::steady_clock::now() > deadline) {
+				return false;
+			}
+		}
+
+		return true;
+	}
+};
diff --git a/emper/Blockable.hpp b/emper/Blockable.hpp
new file mode 100644
index 00000000..43aaa13d
--- /dev/null
+++ b/emper/Blockable.hpp
@@ -0,0 +1,35 @@
+#pragma once
+
+#include "Common.hpp"
+#include "Debug.hpp"
+#include "Runtime.hpp"
+#include "ContextManager.hpp"
+#include "Context.hpp"
+
+class Blockable {
+protected:
+
+	Runtime& runtime;
+
+	ContextManager& contextManager;
+
+	Blockable(Runtime& runtime) : runtime(runtime),
+								  contextManager(runtime.getContextManager()) {
+	}
+
+	void block(func_t freshContextHook) {
+		LOGD("block() blockedContext is " << Context::getCurrentContext());
+		contextManager.saveAndStartNew(freshContextHook);
+	}
+
+	void unblock(Context* context) {
+		assert(context != nullptr);
+		// cppcheck-suppress unsafeClassCanLeak
+		Fiber* unblockFiber = Fiber::from([this, context]() {
+				contextManager.discardAndResume(context);
+			});
+		runtime.schedule(*unblockFiber);
+	}
+
+	virtual void logD(const std::string& string) const = 0;
+};
diff --git a/emper/Debug.cpp b/emper/Debug.cpp
index 4a61d912..9fd357d1 100644
--- a/emper/Debug.cpp
+++ b/emper/Debug.cpp
@@ -12,5 +12,11 @@ void worker_log(const std::string& prefix, const std::string& message) {
 	const workerid_t workerId = Runtime::getWorkerId();
 
 	std::unique_lock<std::mutex> lock(worker_log_mutex);
-	std::cerr << (unsigned int) workerId << ": " << prefix << " " << message << std::endl;
+	std::cerr << (unsigned int) workerId;
+	if (!prefix.empty()) {
+		std::cerr << " " << prefix << " ";
+	} else {
+		std::cerr << "       ";
+	}
+	std::cerr << message << std::endl;
 }
diff --git a/emper/Debug.hpp b/emper/Debug.hpp
index 789f0ac6..4b624017 100644
--- a/emper/Debug.hpp
+++ b/emper/Debug.hpp
@@ -29,7 +29,10 @@ enum class LogSubsystem {
 	F,
 	C,
 	CM,
-	D,
+	DISP,
+	SCHED,
+	RUNTI,
+	U_B_MPSC_Q,
 };
 
 enum LogLevel {
@@ -50,7 +53,10 @@ static const std::map<LogSubsystem, LogLevel> LOG_CONFIG = {
 	{ LogSubsystem::F, ALL },
 	{ LogSubsystem::C, ALL },
 	{ LogSubsystem::CM, ALL },
-	{ LogSubsystem::D, ALL },
+	{ LogSubsystem::DISP, ALL },
+	{ LogSubsystem::SCHED, ALL },
+	{ LogSubsystem::RUNTI, ALL },
+	{ LogSubsystem::U_B_MPSC_Q, ALL },
 };
 
 template <LogSubsystem logSubsystem>
@@ -60,20 +66,36 @@ private:
 	static constexpr char const * getTagFor(LogSubsystem system) {
 		switch (system) {
 		case LogSubsystem::PS:
-			return "PS";
+			return "PS   ";
 		case LogSubsystem::F:
-			return "F";
+			return "F    ";
 		case LogSubsystem::C:
-			return "C";
+			return "C    ";
 		case LogSubsystem::CM:
-			return "CM";
-		case LogSubsystem::D:
-			return "D";
+			return "CM   ";
+		case LogSubsystem::DISP:
+			return "DISP ";
+		case LogSubsystem::SCHED:
+			return "SCHED";
+		case LogSubsystem::RUNTI:
+			return "RUNTI";
+		case LogSubsystem::U_B_MPSC_Q:
+			return "UBSCQ";
 		default:
 			return "UNKNOWN SUBSYSTEM (Add it *now*)";
 		}
 	}
 
+	static constexpr bool shouldPrefixThis(LogSubsystem system) {
+		switch (system) {
+		case LogSubsystem::RUNTI:
+		case LogSubsystem::SCHED:
+			return false;
+		default:
+			return true;
+		}
+	}
+
 protected:
 
 	inline void log(LogLevel level, const std::string& string) const {
@@ -86,7 +108,10 @@ protected:
 		std::string subSystemTag = getTagFor(logSubsystem);;
 
 		std::ostringstream sst;
-		sst << subSystemTag << ": " << this;
+		sst << subSystemTag;
+		if (shouldPrefixThis(logSubsystem)) {
+			sst << " " << this;
+		}
 
 		worker_log(sst.str(), string);
 	}
diff --git a/emper/Dispatcher.hpp b/emper/Dispatcher.hpp
index 80c61c1c..28e208c5 100644
--- a/emper/Dispatcher.hpp
+++ b/emper/Dispatcher.hpp
@@ -7,7 +7,7 @@
 class Runtime;
 class ContextManager;
 
-class Dispatcher : public Logger<LogSubsystem::D> {
+class Dispatcher : public Logger<LogSubsystem::DISP> {
 protected:
 	static thread_local const Fiber* currentFiber;
 
@@ -44,7 +44,12 @@ public:
 	}
    
 
-	static const Fiber* getCurrentFiber() {
+	static const Fiber& getCurrentFiber() {
+		const Fiber* fiber = getCurrentFiberPtr();
+		return *fiber;
+	}
+
+	static const Fiber* getCurrentFiberPtr() {
 		return currentFiber;
 	}
 
diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp
index 79cd1935..13ddab85 100644
--- a/emper/Runtime.cpp
+++ b/emper/Runtime.cpp
@@ -80,7 +80,7 @@ Runtime::~Runtime() {
 
 void* Runtime::workerLoop(void* voidWorkerId) {
 	workerId = *(workerid_t*) voidWorkerId;
-	WDBG("workerLoop started by thread " << syscall(SYS_gettid));
+	LOGD("Worker loop started by thread " << syscall(SYS_gettid));
 
 	int oldType;
 	int res = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldType);
diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp
index 3ed9796a..9a8caeba 100644
--- a/emper/Runtime.hpp
+++ b/emper/Runtime.hpp
@@ -3,6 +3,7 @@
 #include <thread>
 #include <vector>
 
+#include "Debug.hpp"
 #include "Dispatcher.hpp"
 #include "Common.hpp"
 #include "Latch.hpp"
@@ -12,7 +13,7 @@
 
 class ContextManager;
 
-class Runtime {
+class Runtime : public Logger<LogSubsystem::RUNTI> {
 private:
 	static std::mutex currentRuntimeMutex;
 	static Runtime* currentRuntime;
diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp
index dd013144..d9f900d6 100644
--- a/emper/Scheduler.hpp
+++ b/emper/Scheduler.hpp
@@ -1,13 +1,16 @@
 #pragma once
 
+#include "Debug.hpp"
+
 #include <functional>
 
 #include "Fiber.hpp"
 #include "Common.hpp"
 
+
 class Runtime;
 
-class Scheduler {
+class Scheduler : public Logger<LogSubsystem::SCHED>{
 protected:
 	Runtime& runtime;
 	Scheduler(Runtime& runtime);
diff --git a/emper/UnboundedBlockingMpscQueue.hpp b/emper/UnboundedBlockingMpscQueue.hpp
new file mode 100644
index 00000000..293c78ab
--- /dev/null
+++ b/emper/UnboundedBlockingMpscQueue.hpp
@@ -0,0 +1,91 @@
+#pragma once
+
+#include "Blockable.hpp"
+#include "Context.hpp"
+
+#include <queue>
+#include <mutex>
+
+template<typename T>
+class UnboundedBlockingMpscQueue : protected Logger<LogSubsystem::U_B_MPSC_Q>
+								 , protected Blockable {
+
+private:
+	std::atomic<Context*> blockedContext;
+
+	bool tPopped;
+	T t;
+
+	std::queue<T> mpscQueue;
+	std::mutex queueMutex;
+
+	void tryToWakeupBlockedContext() {
+		Context* context = blockedContext.exchange(nullptr);
+		if (context) {
+			unblock(context);
+		}
+	}
+
+	void tryToGetElement() {
+		std::lock_guard<std::mutex> lock(queueMutex);
+		if (!mpscQueue.empty()) {
+			t = mpscQueue.front();
+			mpscQueue.pop();
+			tPopped = true;
+		}
+	}
+
+	virtual void logD(const std::string& string) const {
+		Logger::logD(string);
+	}
+
+public:
+
+	UnboundedBlockingMpscQueue(Runtime& runtime) : Blockable(runtime) {
+	}
+
+	void put(T t) {
+		{
+			std::lock_guard<std::mutex> lock(queueMutex);
+			mpscQueue.push(t);
+		}
+
+		// Micro optimization, see if there is a blocked context
+		// before performing the atomic exchange operation.
+		if (blockedContext.load() != nullptr) {
+			tryToWakeupBlockedContext();
+		}
+	}
+
+	T get() {
+		tPopped = false;
+		tryToGetElement();
+
+		if (!tPopped) {
+			Context* context = Context::getCurrentContext();
+			block([this, context] {
+					blockedContext = context;
+
+					tryToGetElement();
+					if (tPopped) {
+						tryToWakeupBlockedContext();
+					}
+				});
+			if (!tPopped) {
+				std::lock_guard<std::mutex> lock(queueMutex);
+				// If 't' isn't already set, then mspcQueue.get() MUST
+				// return an element. Note that a non-lineralizabe
+				// queue may break this invariant.
+				assert(!mpscQueue.empty());
+				t = mpscQueue.front();
+				mpscQueue.pop();
+			}
+		}
+		return t;
+	}
+
+	size_t size() {
+		std::lock_guard<std::mutex> lock(queueMutex);
+		return mpscQueue.size();
+	}
+};
diff --git a/emper/lib/adt/MultiList_UnboundedWaitFreeMpScQueue.hpp b/emper/lib/adt/MultiList_UnboundedWaitFreeMpScQueue.hpp
new file mode 100644
index 00000000..7e4c49a2
--- /dev/null
+++ b/emper/lib/adt/MultiList_UnboundedWaitFreeMpScQueue.hpp
@@ -0,0 +1,26 @@
+#pragma once
+
+template<typename T>
+class MultiList_UnboundedWaitFreeMpscQueue {
+private:
+	std::atomic<uint64_t> currentTimestamp;
+	Node* heads[];
+	Node* tails[];
+
+	struct Node {
+		T item;
+		Node next;
+		std::atomic<uint64_t> ts;
+		Node() : ts(UINT64_MAX) {};
+	};
+
+public:
+
+	void enqueue(Node* node) {
+		const workerid_t workerId = Runtime::getWorkerId();
+
+		uint64_t ts = currentTimestamp.load();
+		Node* ltail = tails[workerId];
+		tails[workerId] = node;
+		ltial
+};
diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp
index 0de1c62e..b4f92750 100644
--- a/emper/strategies/laws/LawsScheduler.cpp
+++ b/emper/strategies/laws/LawsScheduler.cpp
@@ -22,7 +22,7 @@ LawsScheduler::LawsScheduler(Runtime& runtime) : Scheduler(runtime) {
 }
 
 void LawsScheduler::schedule(Fiber& fiber) {
-	WDBG("Scheduling fiber " << &fiber);
+	LOGD("Scheduling fiber " << &fiber);
 	workeraffinity_t* const affinity_buffer = getAffinityBuffer(fiber);
 	if (affinity_buffer) {
 		workeraffinity_t affinity = *affinity_buffer;
diff --git a/emper/strategies/ws/WsScheduler.cpp b/emper/strategies/ws/WsScheduler.cpp
index 38c0c61f..14171a68 100644
--- a/emper/strategies/ws/WsScheduler.cpp
+++ b/emper/strategies/ws/WsScheduler.cpp
@@ -16,7 +16,7 @@ WsScheduler::WsScheduler(Runtime& runtime) : Scheduler(runtime) {
 }
 
 void WsScheduler::schedule(Fiber& fiber) {
-	WDBG("Scheduling fiber " << &fiber);
+	LOGD("Scheduling fiber " << &fiber);
 	queue.pushBottom(&fiber);
 }
 
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index eb67b900..2688e2d1 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -13,3 +13,7 @@ add_test(CApiTest c_api_test)
 add_executable(cpp_api_test CppApiTest.cpp)
 target_link_libraries(cpp_api_test emper)
 add_test(CppApiTest cpp_api_test)
+
+add_executable(simple_actor_test SimpleActorTest.cpp)
+target_link_libraries(simple_actor_test emper)
+add_test(SimpleActorTest simple_actor_test)
diff --git a/tests/SimpleActorTest.cpp b/tests/SimpleActorTest.cpp
new file mode 100644
index 00000000..5215fc63
--- /dev/null
+++ b/tests/SimpleActorTest.cpp
@@ -0,0 +1,77 @@
+#include "Actor.hpp"
+#include "Runtime.hpp"
+#include "CountingPrivateSemaphore.hpp"
+#include "Dispatcher.hpp"
+#include "Debug.hpp"
+#include "emper.hpp"
+
+#include <mutex>
+
+class SumActor : public Actor<uint64_t> {
+private:
+	uint64_t sum = 0;
+		
+protected:
+
+	virtual void receive(uint64_t t) override {
+		sum+=t;
+	}
+
+public:
+
+	SumActor(Runtime& runtime) : Actor(runtime) {
+	}
+
+	uint64_t getSum() {
+		return sum;
+	}
+};
+
+static void mainFiber(void* runtime_ptr) {
+	Runtime& runtime = * (Runtime*) runtime_ptr;
+	const unsigned int FIBER_COUNT = 10;
+	const uint64_t FIBERS_COUNT_TO = 100;
+	const uint64_t PER_FIBER_SUM = (FIBERS_COUNT_TO * (FIBERS_COUNT_TO + 1)) / 2;
+	const uint64_t EXPECTED_SUM = FIBER_COUNT * PER_FIBER_SUM;
+
+	SumActor sumActor(runtime);
+	sumActor.start();
+
+	CPS cps;
+	for (unsigned int i = 0; i < FIBER_COUNT; ++i) {
+		spawn([&sumActor, i] {
+				WDBG(Dispatcher::getCurrentFiber() << " starts to count to " << FIBERS_COUNT_TO);
+				for (uint64_t i = 1; i <= FIBERS_COUNT_TO; ++i) {
+					sumActor.tell(i);
+				}
+			}, cps);
+	}
+
+	// Wait for the producer fibers to finish.
+	cps.wait();
+
+	// Wait for the actor to become idle.
+	bool actorIdle = sumActor.waitUntilIdle(60 * 1000);
+	if (!actorIdle) {
+		std::cerr << "FAILURE: Actor did not went idle";
+		exit(EXIT_FAILURE);
+	}
+
+	if (sumActor.getSum() != EXPECTED_SUM) {
+		std::cerr << "FAILURE: Actor sum " << sumActor.getSum() << " is not equal to excpted sum " << EXPECTED_SUM << std::endl;
+		exit(EXIT_FAILURE);
+	}
+
+	exit(EXIT_SUCCESS);
+}
+
+int main(UNUSED_ARG int arg, UNUSED_ARG char *argv[]) {
+	Runtime runtime;
+
+	Fiber* fiber = Fiber::from(mainFiber, (void*) &runtime);
+	runtime.schedule(*fiber);
+
+	runtime.waitUntilFinished();
+
+	return EXIT_FAILURE;
+}
-- 
GitLab