diff --git a/Makefile b/Makefile index 9b9ecb705d98d87f78fdbacea20ab4bf3ef7f5a5..2cc276fd22bc7add06360829c34c002ba9063562 100644 --- a/Makefile +++ b/Makefile @@ -67,6 +67,14 @@ fibril-unmap: EMPER_IO=false \ BUILDDIR="build-$@" +.PHONY: stats +stats: + rm -f build + $(MAKE) build \ + EMPER_LOG_LEVEL="Info" \ + EMPER_STATS=true \ + BUILDDIR="build-$@" + .PHONY: fast-static-analysis fast-static-analysis: all check-format check-license doc diff --git a/apps/FibChildStealing.cpp b/apps/FibChildStealing.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d4aad54beaf06f655c4a5c04efd6806c07365e4c --- /dev/null +++ b/apps/FibChildStealing.cpp @@ -0,0 +1,122 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2022 Florian Schmaus +#include <boost/program_options.hpp> +#include <cstdint> +#include <iostream> // for basic_ostream::operator<< +#include <memory> +#include <thread> + +#include "BinaryPrivateSemaphore.hpp" // for BPS +#include "CountingPrivateSemaphore.hpp" // for CPS +#include "Debug.hpp" // for DBG +#include "Fiber.hpp" // for Fiber +#include "PrivateSemaphore.hpp" // for PS +#include "Runtime.hpp" // for Runtime +#include "lib/sync/Semaphore.hpp" + +namespace po = boost::program_options; + +using fibParams = struct { + uint64_t n; + uint64_t* result; + PS* sem; +}; + +static void fib(void* voidParams) { + auto* params = static_cast<fibParams*>(voidParams); + uint64_t n = params->n; + auto* result = params->result; + PS* sem = params->sem; + + if (n < 2) { + *result = n; + } else { + CPS newSem(2); + + uint64_t a, b; + + fibParams newParams1; + newParams1.n = n - 1; + newParams1.result = &a; + newParams1.sem = &newSem; + + // Note that this is the inefficient spawn/sync variant, we + // usually would compute one previous fib number without spawning. + fibParams newParams2; + newParams2.n = n - 2; + newParams2.result = &b; + newParams2.sem = &newSem; + + Fiber* f1 = Fiber::from(&fib, &newParams1); + Fiber* f2 = Fiber::from(&fib, &newParams2); + + Runtime* runtime = Runtime::getRuntime(); + runtime->schedule(*f1); + runtime->schedule(*f2); + + DBG("fib: Calling wait for n=" << n); + newSem.wait(); + + *result = a + b; + } + + DBG("fib: Calling signalAndExit for n=" << n); + sem->signalAndExit(); +} + +// NOLINTNEXTLINE(bugprone-exception-escape) +auto main(int argc, char* argv[]) -> int { + uint64_t fibNum = -1; + po::options_description desc("Allowed options"); + // clang-format off + desc.add_options() + ("help", "Show help") + ("nthreads", po::value<unsigned int>()->default_value(std::thread::hardware_concurrency()), "Number of worker threads used by EMPER's runtime system") + ("fibnum", po::value<uint64_t>(&fibNum)->default_value(12), "The Fibonacci number to compute") + ; + // clang-format on + + // Make 'fibnum' a positional option. + po::positional_options_description pos_desc; + pos_desc.add("fibnum", -1); + + // clang-format off + auto parse_result = po::command_line_parser(argc, argv) + .options(desc) + .positional(pos_desc) + .run() + ; + // clang-format on + + po::variables_map vm; + po::store(parse_result, vm); + po::notify(vm); + + const unsigned nthreads = vm["nthreads"].as<unsigned int>(); + + std::cout << "Number of threads: " << nthreads << std::endl; + + Runtime runtime(nthreads); + + emper::lib::sync::Semaphore semaphore; + + Fiber* fibFiber = Fiber::from([&] { + uint64_t result; + BPS sem; + fibParams params = {fibNum, &result, &sem}; + + fib(¶ms); + + sem.wait(); + + std::cout << "fib(" << fibNum << ") = " << result << std::endl; + + semaphore.notify(); + }); + + runtime.scheduleFromAnywhere(*fibFiber); + + semaphore.wait(); + + return 0; +} diff --git a/apps/Main.cpp b/apps/Main.cpp deleted file mode 100644 index b2df11bdca19aa97430dbbe07699fd0428d010b1..0000000000000000000000000000000000000000 --- a/apps/Main.cpp +++ /dev/null @@ -1,90 +0,0 @@ -// SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus -#include <cstdlib> // for exit, EXIT_SUCCESS -#include <iostream> // for basic_ostream::operator<< - -#include "BinaryPrivateSemaphore.hpp" // for BPS -#include "CountingPrivateSemaphore.hpp" // for CPS -#include "Debug.hpp" // for DBG -#include "Fiber.hpp" // for Fiber -#include "PrivateSemaphore.hpp" // for PS -#include "Runtime.hpp" // for Runtime -#include "emper-common.h" // for UNUSED_ARG - -using fibParams = struct { - int n; - int* result; - PS* sem; -}; - -static void fib(void* voidParams) { - auto* params = static_cast<fibParams*>(voidParams); - int n = params->n; - int* result = params->result; - PS* sem = params->sem; - - if (n < 2) { - *result = n; - } else { - CPS newSem(2); - - int a, b; - - fibParams newParams1; - newParams1.n = n - 1; - newParams1.result = &a; - newParams1.sem = &newSem; - fibParams newParams2; - newParams2.n = n - 2; - newParams2.result = &b; - newParams2.sem = &newSem; - - Fiber* f1 = Fiber::from(&fib, &newParams1); - Fiber* f2 = Fiber::from(&fib, &newParams2); - - Runtime* runtime = Runtime::getRuntime(); - runtime->schedule(*f1); - runtime->schedule(*f2); - - DBG("fib: Calling wait for n=" << n); - newSem.wait(); - - *result = a + b; - } - - DBG("fib: Calling signalAndExit for n=" << n); - sem->signalAndExit(); -} - -static void fibKickoff() { - const int fibNum = 4; - int result; - BPS sem; - fibParams params = {fibNum, &result, &sem}; - - fib(¶ms); - - sem.wait(); - - std::cout << "fib(" << fibNum << ") = " << result << std::endl; - exit(EXIT_SUCCESS); -} - -auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { - // const unsigned nthreads = std::thread::hardware_concurrency(); - const unsigned nthreads = 2; - - std::cout << "Number of threads: " << nthreads << std::endl; - - Runtime runtime(nthreads); - - Fiber* fibFiber = Fiber::from(&fibKickoff); - - std::cout << "Just alloacted alpha fiber at " << fibFiber << std::endl; - - runtime.scheduleFromAnywhere(*fibFiber); - - runtime.waitUntilFinished(); - - return 0; -} diff --git a/apps/fsearch/meson.build b/apps/fsearch/meson.build index 0a55f0351838a573bc674519055ea6a65032719f..80b1147db3321f1568add6a9c8ca3468d5b27f74 100644 --- a/apps/fsearch/meson.build +++ b/apps/fsearch/meson.build @@ -1,17 +1,3 @@ -boost_program_options_dep = dependency('boost', modules: ['program_options']) - -boost_program_options_code = ''' -#include <boost/program_options.hpp> -int main(int argc, char* argv[]) { - boost::program_options::options_description desc("Allowed options"); -} -''' -cpp_can_link_with_boost_program_options = cpp_compiler.links( - boost_program_options_code, - name: 'boost_progam_options', - dependencies: boost_program_options_dep, -) - if cpp_has_fs_recursive_directory_iterator and cpp_can_link_with_boost_program_options fsearch_exe = executable( 'fsearch', diff --git a/apps/meson.build b/apps/meson.build index 6ee5a2d128dc6170326fd80af9597e80edcc8156..edc239dcb805a28b557477453e973fe1c8603520 100644 --- a/apps/meson.build +++ b/apps/meson.build @@ -1,9 +1,3 @@ -fib_exe = executable( - 'fib', - 'Main.cpp', - dependencies: emper_dep, -) - worker_sleep_example_exe = executable( 'worker_sleep_example', 'WorkerSleepExample.cpp', @@ -40,4 +34,26 @@ qsort = executable( dependencies: emper_dep, ) +boost_program_options_dep = dependency('boost', modules: ['program_options']) + +boost_program_options_code = ''' +#include <boost/program_options.hpp> +int main(int argc, char* argv[]) { + boost::program_options::options_description desc("Allowed options"); +} +''' +cpp_can_link_with_boost_program_options = cpp_compiler.links( + boost_program_options_code, + name: 'boost_progam_options', + dependencies: boost_program_options_dep, +) + +if cpp_can_link_with_boost_program_options + fib_child_stealing_exe = executable( + 'fib-child-stealing', + 'FibChildStealing.cpp', + dependencies: [emper_dep, boost_program_options_dep], + ) +endif + subdir('fsearch') diff --git a/emper/Fibril.hpp b/emper/Fibril.hpp index 606ebd5484a056a8e955a5d5aef8c9b6494323c2..2be6c341da98978db544e6bfb31600aa30fa96b7 100644 --- a/emper/Fibril.hpp +++ b/emper/Fibril.hpp @@ -300,8 +300,10 @@ class Fibril : public AbstractFiber, public Logger<LogSubsystem::FIBRIL> { auto fork_func = [](ARGs... args, Fibril * fr, RET * ret, RET(*fun)(PARs...)) __attribute__((noinline, hot, optimize(3))) { fr->cont.ip = __builtin_return_address(0); - fr->runtime.scheduler.pushBottom(*fr); + bool pushed = fr->runtime.scheduler.pushBottom(*fr); *ret = fun(args...); + if (!pushed) [[unlikely]] + return; fr->checkContinuationState(); }; membar(fork_func(args..., this, ret, fun)); @@ -312,8 +314,10 @@ class Fibril : public AbstractFiber, public Logger<LogSubsystem::FIBRIL> { auto fork_func = [](ARGs... args, Fibril * fr, void (*fun)(PARs...)) __attribute__((noinline, hot, optimize(3))) { fr->cont.ip = __builtin_return_address(0); - fr->runtime.scheduler.pushBottom(*fr); + bool pushed = fr->runtime.scheduler.pushBottom(*fr); fun(args...); + if (!pushed) [[unlikely]] + return; fr->checkContinuationState(); }; membar(fork_func(args..., this, fun)); @@ -324,8 +328,10 @@ class Fibril : public AbstractFiber, public Logger<LogSubsystem::FIBRIL> { auto fork_func = [](Fibril * fr, std::function<T> fun, ARGs... args) __attribute__((noinline, hot, optimize(3))) { fr->cont.ip = __builtin_return_address(0); - fr->runtime.scheduler.pushBottom(*fr); + bool pushed = fr->runtime.scheduler.pushBottom(*fr); fun(args...); + if (!pushed) [[unlikely]] + return; fr->checkContinuationState(); }; membar(fork_func(this, fun, args...)); diff --git a/emper/lib/adt/LockedQueue.hpp b/emper/lib/adt/LockedQueue.hpp index 1fcd6bd1c4ab1fae72d6cc0fec42b1db5f405dc5..544db34a3039694a5196b818016da677fa6664bb 100644 --- a/emper/lib/adt/LockedQueue.hpp +++ b/emper/lib/adt/LockedQueue.hpp @@ -8,6 +8,7 @@ #include "Common.hpp" #include "StealingResult.hpp" +#include "lib/adt/PushBottomResult.hpp" // Forward declaration, declared and defined in Fibril.hpp. namespace emper { @@ -41,14 +42,19 @@ class LockedQueue { return deque.size(); } - auto pushBottom(const I item) -> bool { + auto pushBottom(const I item) -> PushBottomResult { + bool pushed = true; std::lock_guard<std::mutex> lock(queue_mutex); - if (deque.size() == SIZE) return false; + if (deque.size() == SIZE) { + pushed = false; + goto out; + } deque.push_back(item); - return true; + out: + return PushBottomResult{pushed, deque.size()}; } template <const int maxRetries> diff --git a/emper/lib/adt/PushBottomResult.hpp b/emper/lib/adt/PushBottomResult.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b65236ce624f3b41d807c774787d81a0bcdc27bb --- /dev/null +++ b/emper/lib/adt/PushBottomResult.hpp @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Schmaus +#pragma once + +namespace adt { + +struct PushBottomResult { + const bool pushed; + const uint64_t size; + + operator bool() const { return pushed; } +}; + +} // namespace adt diff --git a/emper/lib/adt/WsClQueue.hpp b/emper/lib/adt/WsClQueue.hpp index 34112f0e10d7f6f02158c313692539a6804e8f50..c79dd42e6cf859813513c48daf6ee9f4375cdfdd 100644 --- a/emper/lib/adt/WsClQueue.hpp +++ b/emper/lib/adt/WsClQueue.hpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2022 Florian Schmaus #pragma once #include <atomic> @@ -7,6 +7,7 @@ #include "Common.hpp" #include "StealingResult.hpp" +#include "lib/adt/PushBottomResult.hpp" namespace adt { @@ -47,7 +48,7 @@ class WsClQueue { WsClQueue() : bottom(1), top(1) {} // TODO: Decide what to do regarding the following suppressed lint. // NOLINTNEXTLINE(readability-avoid-const-params-in-decls) - auto pushBottom(const PAYLOAD item) -> bool; + auto pushBottom(const PAYLOAD item) -> PushBottomResult; template <const int maxRetries> auto popTop(PAYLOAD *item) -> emper::StealingResult; auto popBottom(PAYLOAD *item) -> bool; @@ -60,8 +61,14 @@ class WsClQueue { }; template <typename PAYLOAD, const uintptr_t CAPACITY> -auto WsClQueue<PAYLOAD, CAPACITY>::pushBottom(const PAYLOAD item) -> bool { - if (isFull()) return false; +auto WsClQueue<PAYLOAD, CAPACITY>::pushBottom(const PAYLOAD item) -> PushBottomResult { + bool pushed = true; + uint64_t newSize = + bottom.load(std::memory_order_relaxed) - top.load(std::memory_order_relaxed) + 1; + if (newSize > CAPACITY) { + pushed = false; + goto out; + } queue[bottom % CAPACITY] = item; @@ -69,7 +76,9 @@ auto WsClQueue<PAYLOAD, CAPACITY>::pushBottom(const PAYLOAD item) -> bool { atomic_thread_fence(std::memory_order_release); ++bottom; - return true; + +out: + return PushBottomResult{pushed, newSize}; } template <typename PAYLOAD, const uintptr_t CAPACITY> diff --git a/emper/lib/adt/WsClV2Queue.hpp b/emper/lib/adt/WsClV2Queue.hpp index 943f41c8bbc6c434c01ce25cb13e60fb76224053..6d9c43ac0d261ae58b4923bd27b829e34340c79e 100644 --- a/emper/lib/adt/WsClV2Queue.hpp +++ b/emper/lib/adt/WsClV2Queue.hpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2022 Florian Schmaus #pragma once #include <atomic> @@ -7,6 +7,7 @@ #include "Common.hpp" #include "StealingResult.hpp" +#include "lib/adt/PushBottomResult.hpp" namespace adt { @@ -51,7 +52,7 @@ class WsClV2Queue { // an underflow if bottom is '0'. The paper's queue uses Java // 'long' for bottom and top and is thus safe since it's signed. WsClV2Queue() : top(1) {} - auto pushBottom(ITEM_TYPE item) -> bool; + auto pushBottom(ITEM_TYPE item) -> PushBottomResult; auto popBottom(ITEM_TYPE *item) -> bool; template <const int maxRetries> auto popTop(ITEM_TYPE *item) -> emper::StealingResult; @@ -64,9 +65,13 @@ class WsClV2Queue { }; template <typename ITEM_TYPE, const uintptr_t CAPACITY> -auto WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) -> bool { +auto WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) -> PushBottomResult { + bool pushed = true; // Check if queue is full. - if (isFull()) return false; + if (isFull()) { + pushed = false; + goto out; + } queue[bottom % CAPACITY] = item; @@ -76,7 +81,8 @@ auto WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) -> bool // Write fence / memory barrier atomic_thread_fence(std::memory_order_release); - return true; +out: + return PushBottomResult{pushed, bottom - top}; } template <typename ITEM_TYPE, const uintptr_t CAPACITY> diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index 47a89a7a78b6973250895fc550986dfdde044e68..310131debd45f7d5acccb555e9f284681f6a39ad 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -23,6 +23,7 @@ #include "Worker.hpp" #include "emper-common.h" // for workerid_t #include "io/IoContext.hpp" +#include "lib/adt/PushBottomResult.hpp" #include "strategies/AbstractWorkStealingStrategy.hpp" #include "strategies/AbstractWorkStealingWorkerStats.hpp" @@ -51,7 +52,7 @@ AbstractWorkStealingScheduler::AbstractWorkStealingScheduler(Runtime& runtime, } void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { - bool pushed = queue.pushBottom(&fiber); + auto pushed = queue.pushBottom(&fiber); if (unlikely(!pushed)) { if constexpr (emper::OVERFLOW_QUEUE) { enqueueInAnywhereQueue(fiber); @@ -61,7 +62,7 @@ void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { ABORT("Could not push fiber " << &fiber << " into queue"); } } else if constexpr (emper::STATS) { - uint64_t queueLength = queue.usedSlots(); + uint64_t queueLength = pushed.size; awss::stats.recordScheduledToLocalAndQueueLength(queueLength); } @@ -284,3 +285,18 @@ auto AbstractWorkStealingScheduler::nextFiberResultFromMpscQueueOrWorkStealing() return nextFiberResultViaWorkStealing(); } + +auto AbstractWorkStealingScheduler::pushBottom(AbstractFiber& fiber) -> bool { + auto pushed = queue.pushBottom(&fiber); + if (pushed) [[likely]] { + onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local}); + if constexpr (emper::STATS) { + uint64_t queueLength = pushed.size; + awss::stats.recordScheduledToLocalAndQueueLength(queueLength); + } + } else if (!queueFullWarningEmitted) { + queueFullWarningEmitted = true; + LOGW("Pushing to queue failed: Queue full"); + } + return pushed; +} diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp index f6006cb241c7959c30b9b8ce653f15e54fca6ab7..9afead16bce5c3356b5b20bcad64c5363b6b6a99 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.hpp +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -5,10 +5,7 @@ #include <cstddef> // for size_t #include <optional> -#include "CallerEnvironment.hpp" #include "Fiber.hpp" -#include "FiberHint.hpp" -#include "FiberSource.hpp" #include "Scheduler.hpp" #include "emper-common.h" #include "lib/adt/MpscQueue.hpp" @@ -31,6 +28,8 @@ class AbstractWorkStealingScheduler : public Scheduler { auto nextFiberViaAnywhereQueue() -> std::optional<NextFiberResult>; auto tryStealFiberFrom(workerid_t victim) -> std::optional<NextFiberResult>; + bool queueFullWarningEmitted = false; + protected: WsQueue<QUEUE_SIZE>** queues; static thread_local WsQueue<QUEUE_SIZE> queue; @@ -48,11 +47,7 @@ class AbstractWorkStealingScheduler : public Scheduler { auto nextFiberResultViaWorkStealing() -> std::optional<NextFiberResult>; auto nextFiberResultFromMpscQueueOrWorkStealing() -> std::optional<NextFiberResult>; - auto pushBottom(AbstractFiber& fiber) -> bool override { - bool res = queue.pushBottom(&fiber); - onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local}); - return res; - } + auto pushBottom(AbstractFiber& fiber) -> bool override; auto popBottom() -> AbstractFiber* override { AbstractFiber* fiber; diff --git a/iwyu-mappings.imp b/iwyu-mappings.imp index 25cb6087e5f05936a3ee8855b18b1e883b879338..bbe2308cdadfde1136ebfff1d41c3c8c6dd07c53 100644 --- a/iwyu-mappings.imp +++ b/iwyu-mappings.imp @@ -20,6 +20,7 @@ { include: ["<boost/program_options/parsers.hpp>", "private", "<boost/program_options.hpp>", "public"], }, { include: ["<boost/program_options/positional_options.hpp>", "private", "<boost/program_options.hpp>", "public"], }, { include: ["<boost/type_index/type_index_facade.hpp>", "private", "<boost/program_options.hpp>", "public"], }, + { include: ["<boost/cstdint.hpp>", "private", "<cstdint>", "public"], }, { symbol: ["__kernel_timespec", "private", "<liburing.h>", "public" ] }, { symbol: ["std::filesystem", "private", "<filesystem>", "public" ] },