diff --git a/CMakeLists.txt b/CMakeLists.txt index a688fb399a84c3be55a8e7fccaef5791b25af5ac..30a09d611cfd4e14be5a3b107fd31c5413e7df1a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,6 +50,8 @@ macro(emper_option option_name option_description) endmacro() emper_option(WORKER_SLEEP "Enable sleeping worker support") +emper_option(LOCKED_WS_QUEUE "Use a fully locked queue for work-stealing") +emper_option(OVERFLOW_QUEUE "Use a overflow queue in case the primary queue is full") # Macro to add files to a var. Can even be used in subdirectories. # Source: http://stackoverflow.com/a/7049380/194894 diff --git a/emper/strategies/laws/CMakeLists.txt b/emper/strategies/laws/CMakeLists.txt index 078c930df152842007a5acd2bc4fd8217efb7d48..4a3ad5811d74ae378f03d5f8b44f09f9d45073e3 100644 --- a/emper/strategies/laws/CMakeLists.txt +++ b/emper/strategies/laws/CMakeLists.txt @@ -1,2 +1,4 @@ +add_files(EMPER_SOURCE LawsStrategy.cpp) add_files(EMPER_SOURCE LawsScheduler.cpp) add_files(EMPER_SOURCE LawsDispatcher.cpp) + diff --git a/emper/strategies/laws/LawsDispatcher.hpp b/emper/strategies/laws/LawsDispatcher.hpp index 2fc1f5332cfa3d901f9a8796a41e58e5c7c02590..2a52966f6405c8af9f7a128880dbe9ffb5ae934b 100644 --- a/emper/strategies/laws/LawsDispatcher.hpp +++ b/emper/strategies/laws/LawsDispatcher.hpp @@ -2,6 +2,11 @@ #include "Dispatcher.hpp" -class LawsDispatcher : Dispatcher { +class LawsDispatcher : public Dispatcher { + +public: + LawsDispatcher(Runtime& runtime) : Dispatcher(runtime) { + } + void dispatchLoop() override; }; diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 52f914f9c292961da8657ad7066dc271ce99048a..ac3150fbbefaa8927375b1ed39eef15c3e5460ef 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -5,7 +5,7 @@ #define EMPER_OVERFLOW_QUEUE -thread_local adt::MpscQueue<Fiber> priorityQueue; +thread_local adt::MpscQueue<Fiber> LawsScheduler::priorityQueue; thread_local LawsScheduler::WsQueue<LawsScheduler::QUEUE_SIZE> LawsScheduler::queue; diff --git a/emper/strategies/laws/LawsStrategy.cpp b/emper/strategies/laws/LawsStrategy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b2e2ffce3f80e79baafcaf8b97ce5f9b0f61cd17 --- /dev/null +++ b/emper/strategies/laws/LawsStrategy.cpp @@ -0,0 +1,13 @@ +#include "LawsStrategy.hpp" + +LawsStrategy LawsStrategy::INSTANCE; + +Scheduler& LawsStrategy::getScheduler(Runtime& runtime) { + Scheduler* scheduler = new LawsScheduler(runtime); + return *scheduler; +} + +Dispatcher& LawsStrategy::getDispatcher(Runtime& runtime) { + Dispatcher* dispatcher = new LawsDispatcher(runtime); + return *dispatcher; +} diff --git a/emper/strategies/laws/LawsStrategy.hpp b/emper/strategies/laws/LawsStrategy.hpp index 80ed93dc608df8206e274be629f3394bf4caae48..b520c9cf12f21c1e3da4aec3ed9772882d984197 100644 --- a/emper/strategies/laws/LawsStrategy.hpp +++ b/emper/strategies/laws/LawsStrategy.hpp @@ -4,12 +4,20 @@ #include "LawsScheduler.hpp" #include "LawsDispatcher.hpp" -class LawsStrategy : RuntimeStrategy { - Scheduler* getScheduler(Runtime& runtime) { - return new LawsScheduler(runtime); - } - Dispatcher* getDispatcher(Runtime& runtime) { - return new LawsDispatcher(runtime); +class LawsStrategy : public RuntimeStrategy { + +private: + + LawsStrategy() { } + + Scheduler& getScheduler(Runtime& runtime); + + Dispatcher& getDispatcher(Runtime& runtime); + +public: + + static LawsStrategy INSTANCE; + }; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 2688e2d106f1f62f6c20178167d69f2570cb72e9..7cdd4e8258224f8e61629a30481461ed6bbb7f48 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -17,3 +17,7 @@ add_test(CppApiTest cpp_api_test) add_executable(simple_actor_test SimpleActorTest.cpp) target_link_libraries(simple_actor_test emper) add_test(SimpleActorTest simple_actor_test) + +add_executable(simple_laws_test SimpleLawsTest.cpp) +target_link_libraries(simple_laws_test emper) +add_test(SimpleLawsTest simple_laws_test) diff --git a/tests/SimpleLawsTest.cpp b/tests/SimpleLawsTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f31b9929f27b15008b6ea0b354187c2f6735ad32 --- /dev/null +++ b/tests/SimpleLawsTest.cpp @@ -0,0 +1,105 @@ +#include "emper.hpp" + +#include "LawsStrategy.hpp" +#include "Fiber.hpp" + +#include <random> + +static const unsigned int ROUND_COUNT = 10; +static const unsigned int FIBER_LOOPS = 10; + +static const unsigned int PAYLOAD_COUNT = 4096; +typedef struct ALIGN_TO_CACHE_LINE { + // 4096 * 8 byte (64 bit) = 32 KiB = L1 cache size of most systems + uint64_t payload[PAYLOAD_COUNT]; + CPS* cps; + unsigned int fiberNum; +} FiberData; + +typedef struct ALIGN_TO_CACHE_LINE { + workeraffinity_t affinity; +} AlignedWorkerAffinity; + + +static void fiberFun(void* voidFiberData) { + FiberData* fiberData = static_cast<FiberData*>(voidFiberData); + + std::random_device randomDevice; + std::mt19937_64 randomGenerator(randomDevice()); + std::uniform_int_distribution<unsigned long long> randomDistribution(0, UINT64_MAX); + + for (unsigned int i = 0; i < FIBER_LOOPS; ++i) { + for (unsigned int j = 0; j < PAYLOAD_COUNT; ++j) { + unsigned long long r = randomDistribution(randomGenerator); + fiberData->payload[j] += r; + } + } + + fiberData->cps->signalAndExit(); +} + +static void alphaFun() { + Runtime* runtime = Runtime::getRuntime(); + const unsigned int FIBER_COUNT = runtime->getWorkerCount() + 3; + + AlignedWorkerAffinity *affinities = new AlignedWorkerAffinity[FIBER_COUNT]; + FiberData* fiberData = new FiberData[FIBER_COUNT]; + + for (unsigned int i = 0; i < FIBER_COUNT; ++i) { + FiberData& currentFiberData = fiberData[i]; + memset(currentFiberData.payload, 0, sizeof(uint64_t) * PAYLOAD_COUNT); + currentFiberData.fiberNum = i; + currentFiberData.cps = nullptr; + } + + for (unsigned int round = 0; round < ROUND_COUNT; ++round) { + CPS cps(FIBER_COUNT); + for (unsigned int i = 0; i < FIBER_COUNT; ++i) { + FiberData* myFiberData = &fiberData[i]; + myFiberData->cps = &cps; + Fiber* fiber = Fiber::from(&fiberFun, + myFiberData, + &affinities[i].affinity); + runtime->schedule(*fiber); + } + cps.wait(); + } + + std::atomic<uint64_t> finalResult(0); + CPS cps(FIBER_COUNT); + for (unsigned int i = 0; i < FIBER_COUNT; ++i) { + FiberData* myFiberData = &fiberData[i]; + myFiberData->cps = &cps; + Fiber* fiber = Fiber::from([myFiberData, &finalResult]() { + uint64_t mySum = 0; + for (unsigned int i = 0; i < PAYLOAD_COUNT; ++i) { + mySum += myFiberData->payload[i]; + } + finalResult += mySum; + + myFiberData->cps->signalAndExit(); + }, + &affinities[i].affinity); + runtime->schedule(*fiber); + } + + free(fiberData); + free(affinities); + + std::cerr << "Result: " << finalResult << std::endl; + + exit(EXIT_SUCCESS); +} + +int main(UNUSED_ARG int args, UNUSED_ARG char *argv[]) { + RuntimeStrategy& lawsStrategy = LawsStrategy::INSTANCE; + Runtime runtime(lawsStrategy); + + Fiber* alphaFiber = Fiber::from(&alphaFun); + + runtime.schedule(*alphaFiber); + + runtime.waitUntilFinished(); + + return 0; +}