From 4bb722d1a07036f9752aaa79be39fb0515f1335d Mon Sep 17 00:00:00 2001
From: Florian Schmaus <flow@cs.fau.de>
Date: Tue, 20 Jun 2017 10:30:14 +0200
Subject: [PATCH] Add support for sleeping workers

build your sleezy EMPER version by using for example

cmake -DCMAKE_BUILD_TYPE=release -DEMPER_WORKER_SLEEP=ON ..
---
 CMakeLists.txt                       | 11 +++++
 apps/CMakeLists.txt                  | 11 ++++-
 apps/WorkerSleepExample.cpp          | 67 ++++++++++++++++++++++++++++
 emper/Dispatcher.cpp                 |  4 ++
 emper/Dispatcher.hpp                 |  2 +
 emper/Runtime.cpp                    |  3 +-
 emper/Runtime.hpp                    | 19 ++++++++
 emper/RuntimeStrategy.hpp            |  5 +--
 emper/Scheduler.cpp                  |  4 ++
 emper/Scheduler.hpp                  |  3 +-
 emper/strategies/ws/WsDispatcher.cpp |  4 ++
 emper/strategies/ws/WsScheduler.cpp  |  3 ++
 emper/strategies/ws/WsStrategy.cpp   | 13 ++++++
 emper/strategies/ws/WsStrategy.hpp   | 17 +++----
 14 files changed, 148 insertions(+), 18 deletions(-)
 create mode 100644 apps/WorkerSleepExample.cpp

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0d8ee8b6..a688fb39 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -40,6 +40,17 @@ find_package(Threads REQUIRED)
 
 enable_testing()
 
+# Custom options, enable with "cmake -DEMPER_WORKER_SLEEP=ON"
+# Source: https://stackoverflow.com/a/10364240/194894
+macro(emper_option option_name option_description)
+    option(EMPER_${option_name} ${option_description})
+    if(EMPER_${option_name})
+        add_definitions(-DEMPER_${option_name})
+    endif(EMPER_${option_name})
+endmacro()
+
+emper_option(WORKER_SLEEP "Enable sleeping worker support")
+
 # Macro to add files to a var. Can even be used in subdirectories.
 # Source: http://stackoverflow.com/a/7049380/194894
 macro (add_files var)
diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt
index 2dcc6106..0c432b9b 100644
--- a/apps/CMakeLists.txt
+++ b/apps/CMakeLists.txt
@@ -1,2 +1,9 @@
-add_executable(Main Main.cpp)
-target_link_libraries(Main Threads::Threads emper)
+function(emper_app_single_file emper_app_name)
+  set(emper_app_filename "${emper_app_name}.cpp")
+  message("EMPER single file app: ${emper_app_name} (${emper_app_filename})")
+  add_executable(${emper_app_name} ${emper_app_filename})
+  target_link_libraries(${emper_app_name} Threads::Threads emper)
+endfunction()
+
+emper_app_single_file(Main)
+emper_app_single_file(WorkerSleepExample)
diff --git a/apps/WorkerSleepExample.cpp b/apps/WorkerSleepExample.cpp
new file mode 100644
index 00000000..9e0b848c
--- /dev/null
+++ b/apps/WorkerSleepExample.cpp
@@ -0,0 +1,67 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <iostream>
+#include <list>
+#include <string>
+
+#include "Runtime.hpp"
+#include "Common.hpp"
+#include "PrivateSemaphore.hpp"
+#include "BinaryPrivateSemaphore.hpp"
+#include "CountingPrivateSemaphore.hpp"
+#include "Debug.hpp"
+
+#include "emper.hpp"
+#include "emper-version.h"
+
+static unsigned int ITERATIONS = 10;
+
+static std::chrono::milliseconds SINGLE_FIBER_DURATION = std::chrono::milliseconds(3000);
+
+static std::chrono::milliseconds MULTI_FIBER_DURATION = std::chrono::milliseconds(2000);
+
+template<typename Rep, typename Period>
+static void letsGetBusy(std::chrono::duration<Rep, Period> duration) {
+	const std::chrono::time_point<std::chrono::high_resolution_clock> now = std::chrono::high_resolution_clock::now();
+	const std::chrono::time_point<std::chrono::high_resolution_clock> deadline = now + duration;
+
+	while (std::chrono::high_resolution_clock::now() < deadline);
+}
+
+static void alphaFiber() {
+	const Runtime* runtime = Runtime::getRuntime();
+	const workerid_t workerCount = runtime->getWorkerCount();
+
+	std::cout << "Starting WorkerSleepExample with " << workerCount << " workers using " << ITERATIONS << " iterations." << std::endl
+			  << "Single fiber duration: " << SINGLE_FIBER_DURATION.count() << ", Multi fiber duration: " << MULTI_FIBER_DURATION.count() << std::endl
+			  << "EMPER version: " << EMPER_FULL_VERSION << std::endl;
+	
+	for (unsigned int i = 0; i < ITERATIONS; ++i) {
+		letsGetBusy(SINGLE_FIBER_DURATION);
+
+		CPS cps;
+		for (workerid_t j = 0; j < workerCount; ++j) {
+			spawn([] {
+					letsGetBusy(MULTI_FIBER_DURATION);
+				}, cps);
+		}
+		cps.wait();
+	}
+
+	std::cout << "Finished WorkerSleepExample" << std::endl;
+	exit(EXIT_SUCCESS);
+}
+
+int main(UNUSED_ARG int argc, UNUSED_ARG char *argv[]) {
+	Runtime runtime;
+
+	Fiber* fibFiber = Fiber::from(&alphaFiber);
+
+	std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl;
+
+	runtime.schedule(*fibFiber);
+
+	runtime.waitUntilFinished();
+	
+	return 0;
+}
diff --git a/emper/Dispatcher.cpp b/emper/Dispatcher.cpp
index 6bd99972..dfe4e280 100644
--- a/emper/Dispatcher.cpp
+++ b/emper/Dispatcher.cpp
@@ -11,3 +11,7 @@ thread_local const Fiber* Dispatcher::currentFiber;
 func_t Dispatcher::getDispatchLoop() {
 	return std::bind(&Dispatcher::dispatchLoop, this);
 }
+
+void Dispatcher::putRuntimeWorkerToSleep() {
+	runtime.dispatcherLoopSleep();
+}
diff --git a/emper/Dispatcher.hpp b/emper/Dispatcher.hpp
index 28e208c5..ab33b353 100644
--- a/emper/Dispatcher.hpp
+++ b/emper/Dispatcher.hpp
@@ -39,6 +39,8 @@ protected:
 		delete fiber;
 	}
 
+	void putRuntimeWorkerToSleep();
+
 public:
 	Dispatcher(Runtime& runtime) : runtime(runtime) {
 	}
diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp
index 13ddab85..7692c928 100644
--- a/emper/Runtime.cpp
+++ b/emper/Runtime.cpp
@@ -24,7 +24,8 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy) : workerCoun
 										 , workerLatch(workerCount)
 										 , scheduler(strategy.getScheduler(*this))
 										 , dispatcher(strategy.getDispatcher(*this))
-										 , contextManager(*(new ContextManager(*this))) {
+										 , contextManager(*(new ContextManager(*this)))
+										 , atLeastOneWorkerIsSleeping(false) {
 	threads = new pthread_t[workerCount];
 	workerIds = new workerid_t[workerCount];
 
diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp
index 9a8caeba..636f5d3f 100644
--- a/emper/Runtime.hpp
+++ b/emper/Runtime.hpp
@@ -35,6 +35,10 @@ private:
 
 	void* workerLoop(void* workerId);
 
+	std::mutex workerSleepMutex;
+	std::condition_variable workerSleepConditionVariable;
+	ALIGN_TO_CACHE_LINE std::atomic<bool> atLeastOneWorkerIsSleeping;
+
 	static RuntimeStrategy& DEFAULT_STRATEGY;
 
 protected:
@@ -42,6 +46,20 @@ protected:
 		newWorkerHooks.push_back(hook);
 	};
 
+	inline void notifyAboutNewWork() {
+		if (!atLeastOneWorkerIsSleeping.load(std::memory_order_relaxed)) return;
+
+		std::lock_guard<std::mutex> lk(workerSleepMutex);
+		workerSleepConditionVariable.notify_all();
+	}
+
+	void dispatcherLoopSleep() {
+		std::unique_lock<std::mutex> lk(workerSleepMutex);
+		atLeastOneWorkerIsSleeping.store(true, std::memory_order_relaxed);
+		workerSleepConditionVariable.wait(lk);
+		atLeastOneWorkerIsSleeping.store(false, std::memory_order_relaxed);
+	}
+
 public:
 
 	Runtime() : Runtime(std::thread::hardware_concurrency()) {
@@ -88,6 +106,7 @@ public:
 
 	friend ContextManager;
 	friend Scheduler;
+	friend Dispatcher;
 	template<typename T, intptr_t WS_QUEUE_SIZE, size_t WORKER_EXCLUSIVE_QUEUE_SIZE>
 	friend class MemoryManager;
 };
diff --git a/emper/RuntimeStrategy.hpp b/emper/RuntimeStrategy.hpp
index 32fc65b6..3703e9fa 100644
--- a/emper/RuntimeStrategy.hpp
+++ b/emper/RuntimeStrategy.hpp
@@ -1,9 +1,8 @@
 #pragma once
 
-#include "Scheduler.hpp"
-#include "Dispatcher.hpp"
-
 class Runtime;
+class Scheduler;
+class Dispatcher;
 
 class RuntimeStrategy {
 
diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp
index f51d199d..222a1a37 100644
--- a/emper/Scheduler.cpp
+++ b/emper/Scheduler.cpp
@@ -8,3 +8,7 @@ Scheduler::Scheduler(Runtime& runtime) : runtime(runtime) {
 void Scheduler::addNewWorkerHook(std::function<void(void)> hook) {
 	runtime.addNewWorkerHook(hook);
 }
+
+void Scheduler::notifyRuntimeAboutNewWork() {
+	runtime.notifyAboutNewWork();
+}
diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp
index d9f900d6..043165b1 100644
--- a/emper/Scheduler.hpp
+++ b/emper/Scheduler.hpp
@@ -7,7 +7,6 @@
 #include "Fiber.hpp"
 #include "Common.hpp"
 
-
 class Runtime;
 
 class Scheduler : public Logger<LogSubsystem::SCHED>{
@@ -25,6 +24,8 @@ protected:
 		fiber.doAtomicIncrRefCount();
 	}
 
+	void notifyRuntimeAboutNewWork();
+
 public:
 	virtual void schedule(Fiber& fiber) = 0;
 
diff --git a/emper/strategies/ws/WsDispatcher.cpp b/emper/strategies/ws/WsDispatcher.cpp
index bf9d8983..20338201 100644
--- a/emper/strategies/ws/WsDispatcher.cpp
+++ b/emper/strategies/ws/WsDispatcher.cpp
@@ -7,7 +7,11 @@ void WsDispatcher::dispatchLoop() {
 	while (true) {
 		const Fiber* fiber = runtime.nextFiber();
 		if (!fiber) {
+#ifdef EMPER_WORKER_SLEEP
+			putRuntimeWorkerToSleep();
+#else
 			pthread_yield();
+#endif
 			continue;
 		}
 
diff --git a/emper/strategies/ws/WsScheduler.cpp b/emper/strategies/ws/WsScheduler.cpp
index 1edd5a18..0b4bdfcb 100644
--- a/emper/strategies/ws/WsScheduler.cpp
+++ b/emper/strategies/ws/WsScheduler.cpp
@@ -21,6 +21,9 @@ void WsScheduler::schedule(Fiber& fiber) {
 	if (unlikely(!pushed)) {
 		ABORT("Could not push fiber " << &fiber << " into queue");
 	}
+#ifdef EMPER_WORKER_SLEEP
+	notifyRuntimeAboutNewWork();
+#endif
 }
 
 Fiber* WsScheduler::nextFiber() {
diff --git a/emper/strategies/ws/WsStrategy.cpp b/emper/strategies/ws/WsStrategy.cpp
index 482e0f31..d1848c08 100644
--- a/emper/strategies/ws/WsStrategy.cpp
+++ b/emper/strategies/ws/WsStrategy.cpp
@@ -1,3 +1,16 @@
 #include "WsStrategy.hpp"
 
+#include "WsScheduler.hpp"
+#include "WsDispatcher.hpp"
+
 WsStrategy WsStrategy::INSTANCE;
+
+Scheduler& WsStrategy::getScheduler(Runtime& runtime) {
+	Scheduler* scheduler =  new WsScheduler(runtime);
+	return *scheduler;
+}
+
+Dispatcher& WsStrategy::getDispatcher(Runtime& runtime) {
+	Dispatcher* dispatcher = new WsDispatcher(runtime);
+	return *dispatcher;
+}
diff --git a/emper/strategies/ws/WsStrategy.hpp b/emper/strategies/ws/WsStrategy.hpp
index fc8209ee..75e6f3c0 100644
--- a/emper/strategies/ws/WsStrategy.hpp
+++ b/emper/strategies/ws/WsStrategy.hpp
@@ -1,8 +1,9 @@
 #pragma once
 
 #include "RuntimeStrategy.hpp"
-#include "WsScheduler.hpp"
-#include "WsDispatcher.hpp"
+
+class Scheduler;
+class Dispatcher;
 
 class WsStrategy : public RuntimeStrategy {
 
@@ -11,16 +12,10 @@ private:
 	WsStrategy() {
 	}
 
-	Scheduler& getScheduler(Runtime& runtime) {
-		Scheduler* scheduler =  new WsScheduler(runtime);
-		return *scheduler;
-	}
+	Scheduler& getScheduler(Runtime& runtime);
+
+	Dispatcher& getDispatcher(Runtime& runtime);
 
-	Dispatcher& getDispatcher(Runtime& runtime) {
-		Dispatcher* dispatcher = new WsDispatcher(runtime);
-		return *dispatcher;
-	}
-	
 public:
 
 	static WsStrategy INSTANCE;
-- 
GitLab