diff --git a/CMakeLists.txt b/CMakeLists.txt index 0d8ee8b67716bceae87dd2b3284796c30edc8bc6..a688fb399a84c3be55a8e7fccaef5791b25af5ac 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,6 +40,17 @@ find_package(Threads REQUIRED) enable_testing() +# Custom options, enable with "cmake -DEMPER_WORKER_SLEEP=ON" +# Source: https://stackoverflow.com/a/10364240/194894 +macro(emper_option option_name option_description) + option(EMPER_${option_name} ${option_description}) + if(EMPER_${option_name}) + add_definitions(-DEMPER_${option_name}) + endif(EMPER_${option_name}) +endmacro() + +emper_option(WORKER_SLEEP "Enable sleeping worker support") + # Macro to add files to a var. Can even be used in subdirectories. # Source: http://stackoverflow.com/a/7049380/194894 macro (add_files var) diff --git a/apps/CMakeLists.txt b/apps/CMakeLists.txt index 2dcc61064325725b7a6d5b892cdafc688a9e2eeb..0c432b9b42e37de3a2970b165cffdea88ec7f33b 100644 --- a/apps/CMakeLists.txt +++ b/apps/CMakeLists.txt @@ -1,2 +1,9 @@ -add_executable(Main Main.cpp) -target_link_libraries(Main Threads::Threads emper) +function(emper_app_single_file emper_app_name) + set(emper_app_filename "${emper_app_name}.cpp") + message("EMPER single file app: ${emper_app_name} (${emper_app_filename})") + add_executable(${emper_app_name} ${emper_app_filename}) + target_link_libraries(${emper_app_name} Threads::Threads emper) +endfunction() + +emper_app_single_file(Main) +emper_app_single_file(WorkerSleepExample) diff --git a/apps/WorkerSleepExample.cpp b/apps/WorkerSleepExample.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9e0b848c8821ebb1e3ef68f150a58aeee972c043 --- /dev/null +++ b/apps/WorkerSleepExample.cpp @@ -0,0 +1,67 @@ +#include <stdio.h> +#include <stdlib.h> +#include <iostream> +#include <list> +#include <string> + +#include "Runtime.hpp" +#include "Common.hpp" +#include "PrivateSemaphore.hpp" +#include "BinaryPrivateSemaphore.hpp" +#include "CountingPrivateSemaphore.hpp" +#include "Debug.hpp" + +#include "emper.hpp" +#include "emper-version.h" + +static unsigned int ITERATIONS = 10; + +static std::chrono::milliseconds SINGLE_FIBER_DURATION = std::chrono::milliseconds(3000); + +static std::chrono::milliseconds MULTI_FIBER_DURATION = std::chrono::milliseconds(2000); + +template<typename Rep, typename Period> +static void letsGetBusy(std::chrono::duration<Rep, Period> duration) { + const std::chrono::time_point<std::chrono::high_resolution_clock> now = std::chrono::high_resolution_clock::now(); + const std::chrono::time_point<std::chrono::high_resolution_clock> deadline = now + duration; + + while (std::chrono::high_resolution_clock::now() < deadline); +} + +static void alphaFiber() { + const Runtime* runtime = Runtime::getRuntime(); + const workerid_t workerCount = runtime->getWorkerCount(); + + std::cout << "Starting WorkerSleepExample with " << workerCount << " workers using " << ITERATIONS << " iterations." << std::endl + << "Single fiber duration: " << SINGLE_FIBER_DURATION.count() << ", Multi fiber duration: " << MULTI_FIBER_DURATION.count() << std::endl + << "EMPER version: " << EMPER_FULL_VERSION << std::endl; + + for (unsigned int i = 0; i < ITERATIONS; ++i) { + letsGetBusy(SINGLE_FIBER_DURATION); + + CPS cps; + for (workerid_t j = 0; j < workerCount; ++j) { + spawn([] { + letsGetBusy(MULTI_FIBER_DURATION); + }, cps); + } + cps.wait(); + } + + std::cout << "Finished WorkerSleepExample" << std::endl; + exit(EXIT_SUCCESS); +} + +int main(UNUSED_ARG int argc, UNUSED_ARG char *argv[]) { + Runtime runtime; + + Fiber* fibFiber = Fiber::from(&alphaFiber); + + std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; + + runtime.schedule(*fibFiber); + + runtime.waitUntilFinished(); + + return 0; +} diff --git a/emper/Dispatcher.cpp b/emper/Dispatcher.cpp index 6bd999727fce94b1fd03f4bdef87e3b874d130b4..dfe4e2801f6256895da83c369ef2951b700d480a 100644 --- a/emper/Dispatcher.cpp +++ b/emper/Dispatcher.cpp @@ -11,3 +11,7 @@ thread_local const Fiber* Dispatcher::currentFiber; func_t Dispatcher::getDispatchLoop() { return std::bind(&Dispatcher::dispatchLoop, this); } + +void Dispatcher::putRuntimeWorkerToSleep() { + runtime.dispatcherLoopSleep(); +} diff --git a/emper/Dispatcher.hpp b/emper/Dispatcher.hpp index 28e208c58bfd7391a4695390bf4a36fe17002d6a..ab33b353033d1775249c90b9a2cc81f25b78460c 100644 --- a/emper/Dispatcher.hpp +++ b/emper/Dispatcher.hpp @@ -39,6 +39,8 @@ protected: delete fiber; } + void putRuntimeWorkerToSleep(); + public: Dispatcher(Runtime& runtime) : runtime(runtime) { } diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 13ddab8525b3d9884238bf1d75e21c0a3e9e99a8..7692c9282d9060d19100cba957d0fa40be33a52a 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -24,7 +24,8 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy) : workerCoun , workerLatch(workerCount) , scheduler(strategy.getScheduler(*this)) , dispatcher(strategy.getDispatcher(*this)) - , contextManager(*(new ContextManager(*this))) { + , contextManager(*(new ContextManager(*this))) + , atLeastOneWorkerIsSleeping(false) { threads = new pthread_t[workerCount]; workerIds = new workerid_t[workerCount]; diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 9a8caeba4cff57c2a153ac51e70a834886fbeba9..636f5d3f1cce0a8d1591b8f4fa683f1710ec5bf8 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -35,6 +35,10 @@ private: void* workerLoop(void* workerId); + std::mutex workerSleepMutex; + std::condition_variable workerSleepConditionVariable; + ALIGN_TO_CACHE_LINE std::atomic<bool> atLeastOneWorkerIsSleeping; + static RuntimeStrategy& DEFAULT_STRATEGY; protected: @@ -42,6 +46,20 @@ protected: newWorkerHooks.push_back(hook); }; + inline void notifyAboutNewWork() { + if (!atLeastOneWorkerIsSleeping.load(std::memory_order_relaxed)) return; + + std::lock_guard<std::mutex> lk(workerSleepMutex); + workerSleepConditionVariable.notify_all(); + } + + void dispatcherLoopSleep() { + std::unique_lock<std::mutex> lk(workerSleepMutex); + atLeastOneWorkerIsSleeping.store(true, std::memory_order_relaxed); + workerSleepConditionVariable.wait(lk); + atLeastOneWorkerIsSleeping.store(false, std::memory_order_relaxed); + } + public: Runtime() : Runtime(std::thread::hardware_concurrency()) { @@ -88,6 +106,7 @@ public: friend ContextManager; friend Scheduler; + friend Dispatcher; template<typename T, intptr_t WS_QUEUE_SIZE, size_t WORKER_EXCLUSIVE_QUEUE_SIZE> friend class MemoryManager; }; diff --git a/emper/RuntimeStrategy.hpp b/emper/RuntimeStrategy.hpp index 32fc65b6e4a8027cc2a3ff1f698715908ca570c3..3703e9faf014b3ced3fac3908d8041c62d5a14d8 100644 --- a/emper/RuntimeStrategy.hpp +++ b/emper/RuntimeStrategy.hpp @@ -1,9 +1,8 @@ #pragma once -#include "Scheduler.hpp" -#include "Dispatcher.hpp" - class Runtime; +class Scheduler; +class Dispatcher; class RuntimeStrategy { diff --git a/emper/Scheduler.cpp b/emper/Scheduler.cpp index f51d199d2c69f4b606ba5f83a2571ba785e3248b..222a1a3703a01dea93ac2a58f0c4509d36ec1a29 100644 --- a/emper/Scheduler.cpp +++ b/emper/Scheduler.cpp @@ -8,3 +8,7 @@ Scheduler::Scheduler(Runtime& runtime) : runtime(runtime) { void Scheduler::addNewWorkerHook(std::function<void(void)> hook) { runtime.addNewWorkerHook(hook); } + +void Scheduler::notifyRuntimeAboutNewWork() { + runtime.notifyAboutNewWork(); +} diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index d9f900d62c61156a1cf5b52fde6bacbe3df695c5..043165b1248dea34316b2d453f8c485989e03221 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -7,7 +7,6 @@ #include "Fiber.hpp" #include "Common.hpp" - class Runtime; class Scheduler : public Logger<LogSubsystem::SCHED>{ @@ -25,6 +24,8 @@ protected: fiber.doAtomicIncrRefCount(); } + void notifyRuntimeAboutNewWork(); + public: virtual void schedule(Fiber& fiber) = 0; diff --git a/emper/strategies/ws/WsDispatcher.cpp b/emper/strategies/ws/WsDispatcher.cpp index bf9d8983dc3689b30cb8331d81d733254c24234c..203382014f57b748b54c11378a7948f7ecb57386 100644 --- a/emper/strategies/ws/WsDispatcher.cpp +++ b/emper/strategies/ws/WsDispatcher.cpp @@ -7,7 +7,11 @@ void WsDispatcher::dispatchLoop() { while (true) { const Fiber* fiber = runtime.nextFiber(); if (!fiber) { +#ifdef EMPER_WORKER_SLEEP + putRuntimeWorkerToSleep(); +#else pthread_yield(); +#endif continue; } diff --git a/emper/strategies/ws/WsScheduler.cpp b/emper/strategies/ws/WsScheduler.cpp index 1edd5a18215f43c71db242851d36afe1c86b60e7..0b4bdfcb5ad2eeeed2cc0ec1d5510a9519cdbf19 100644 --- a/emper/strategies/ws/WsScheduler.cpp +++ b/emper/strategies/ws/WsScheduler.cpp @@ -21,6 +21,9 @@ void WsScheduler::schedule(Fiber& fiber) { if (unlikely(!pushed)) { ABORT("Could not push fiber " << &fiber << " into queue"); } +#ifdef EMPER_WORKER_SLEEP + notifyRuntimeAboutNewWork(); +#endif } Fiber* WsScheduler::nextFiber() { diff --git a/emper/strategies/ws/WsStrategy.cpp b/emper/strategies/ws/WsStrategy.cpp index 482e0f317fb23463dd191e5c222fcda1aac32e2b..d1848c08b0ad2d9dcd3928ee985e237a8c671b40 100644 --- a/emper/strategies/ws/WsStrategy.cpp +++ b/emper/strategies/ws/WsStrategy.cpp @@ -1,3 +1,16 @@ #include "WsStrategy.hpp" +#include "WsScheduler.hpp" +#include "WsDispatcher.hpp" + WsStrategy WsStrategy::INSTANCE; + +Scheduler& WsStrategy::getScheduler(Runtime& runtime) { + Scheduler* scheduler = new WsScheduler(runtime); + return *scheduler; +} + +Dispatcher& WsStrategy::getDispatcher(Runtime& runtime) { + Dispatcher* dispatcher = new WsDispatcher(runtime); + return *dispatcher; +} diff --git a/emper/strategies/ws/WsStrategy.hpp b/emper/strategies/ws/WsStrategy.hpp index fc8209ee5ca63c925e9fd79b78e0495be6fd9c5c..75e6f3c048e078f0fd67c349974698205d963108 100644 --- a/emper/strategies/ws/WsStrategy.hpp +++ b/emper/strategies/ws/WsStrategy.hpp @@ -1,8 +1,9 @@ #pragma once #include "RuntimeStrategy.hpp" -#include "WsScheduler.hpp" -#include "WsDispatcher.hpp" + +class Scheduler; +class Dispatcher; class WsStrategy : public RuntimeStrategy { @@ -11,16 +12,10 @@ private: WsStrategy() { } - Scheduler& getScheduler(Runtime& runtime) { - Scheduler* scheduler = new WsScheduler(runtime); - return *scheduler; - } + Scheduler& getScheduler(Runtime& runtime); + + Dispatcher& getDispatcher(Runtime& runtime); - Dispatcher& getDispatcher(Runtime& runtime) { - Dispatcher* dispatcher = new WsDispatcher(runtime); - return *dispatcher; - } - public: static WsStrategy INSTANCE;