diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d872c32c2e9ac7c50d6f15d4c5e387c2ef129b5e..8b34b7d94053ac15e4da1e40e34d654aef4994b8 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -463,3 +463,16 @@ build-each-poller: extends: - .build - .emper-each-poller + +continuation-stealing-locked: + extends: + - .test + variables: + EMPER_CONTINUATION_STEALING_MODE: 'locked' + EMPER_LOCKED_WS_QUEUE: 'true' + +continuation-stealing-madv-free: + extends: + - .test + variables: + EMPER_CONTINUATION_STEALING_MADVISE_STACK: 'free' diff --git a/Makefile b/Makefile index fa4c7e7fe03ebf2b9964a64fdd8d995b2574f383..59a077056850523f9ab8563171208ba7044c4c1a 100644 --- a/Makefile +++ b/Makefile @@ -43,6 +43,30 @@ libc++: EMPER_USE_BUNDLED_DEPS="always" \ BUILDDIR="build-libc++" +.PHONY: clang +clang: + rm -f build + $(MAKE) build \ + CC=clang CXX=clang++ \ + BUILDDIR="build-$@" + +.PHONY: fibril-locked +fibril-locked: + rm -f build + $(MAKE) build \ + EMPER_CONTINUATION_STEALING_MODE=locked \ + EMPER_LOCKED_WS_QUEUE=true \ + EMPER_IO=false \ + BUILDDIR="build-$@" + +.PHONY: fibril-unmap +fibril-unmap: + rm -f build + $(MAKE) build \ + EMPER_CONTINUATION_STEALING_MADVISE_STACK=free \ + EMPER_IO=false \ + BUILDDIR="build-$@" + .PHONY: fast-static-analysis fast-static-analysis: all check-format check-license doc diff --git a/README.md b/README.md new file mode 100644 index 0000000000000000000000000000000000000000..b003b741f37afc6f2bc9d7f4a631792cdb460e9d --- /dev/null +++ b/README.md @@ -0,0 +1,82 @@ +# EMPER + +The Extensible Micro-Parallelism Execution Runtime (EMPER) is a +concurrency platform to execute parallel applications. + +The runtime system of EMPER implements the *wait-free* "Nowa" +continuation-stealing approach as described in the IPDPS 2021 paper by +Schmaus et al. This allows for efficient fork-join parallelism for two +reasons. First, continuation-stealing enables dynamic task +parallelism: The concurrency expressed at the programming-language +layer is only lifted into parallelism by the runtime system if there +are available workers. Secondly, the wait-free Nowa approach allows +for scalability on systems with many cores. + +```c++ +emper_fibril auto fib(int n) -> int { + if (n < 2) return n; + + Fibril fibril; + int a, b; + + fibril.fork(&a, fib, n - 1); + + b = fib(n - 2); + + fibril.join(); + + return a + b; +} +``` + +## Scheduling + +EMPER provides a modular architecture allowing for different +scheduling strategies. Currently two scheduling strategies are +implemented: +- Work stealing scheduling (WS) +- Locality aware and work stealing scheduling (LAWS) + + + +# License + +EMPER is licensed under the GNU Lesser General Public License, version +3 (or any later version). See the file 'LGPL-3' for the full text of +this license. + +# Acknowledgements + +Nicolas Pfeiffer wrote the first prototypical implementation of +continuation-stealing for EMPER. Much of his code was re-used for the +current implementation. + +Florian Fischer wrote the EMPER interface for "pseudo-blocking system +calls" based on Linux's io_uring. We would also like to thank Jens +Axboe and Pavel Begunkov for creating and constantly improving +io_uring. + +This work was partially funded by the Deutsche Forschungsgemeinschaft +(DFG, German Research Foundation) – project number 146371743 – TRR 89 +"Invasive Computing". + +# Literature + +[schmaus2021modern] +Schmaus, Florian, Florian Fischer, Timo Hönig, and Wolfgang Schröder-Preikschat. +Modern Concurrency Platforms Require Modern System-Call Techniques. +Tech. rep. CS-2021-02. Friedrich-Alexander Universität Erlangen-Nürnberg, +Technische Fakultät, Nov. 23, 2021. doi: 10.25593/issn.2191-5008/CS-2021-02. +url: https://www4.cs.fau.de/~flow/papers/schmaus2021modern.pdf + +[schmaus2021nowa] +Schmaus, Florian, Nicolas Pfeiffer, Timo Hönig, Jörg Nolte, and Wolfgang Schröder- +Preikschat. “Nowa: A Wait-Free Continuation-Stealing Concurrency Platform”. +In: 2021 IEEE International Parallel and Distributed Processing Symposium (IPDPS). +May 2021, pp. 360–371. doi: 10.1109/IPDPS49936.2021.00044. +url: https://www4.cs.fau.de/~flow/papers/schmaus2021nowa.pdf + +[pfeiffer2020cactus] +Pfeiffer, Nicolas. A Wait-Free Cactus Stack Implementation for a Microparalelism +Runtime. Master's thesis. MA-I4-2020-02. Mar. 2, 2020. +url: https://www4.cs.fau.de/~flow/papers/pfeiffer2020cactus.pdf diff --git a/apps/fsearch/fsearch.cpp b/apps/fsearch/fsearch.cpp index 1d3ac9c1ef849e8d7e2634cab7ec95e41bf1fd75..df2c84083be6c8b876a17af78e77eb1c5f6ca67b 100644 --- a/apps/fsearch/fsearch.cpp +++ b/apps/fsearch/fsearch.cpp @@ -1,23 +1,28 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2021 Florian Fischer +// Copyright © 2021-2022 Florian Fischer, Florian Schmaus #include <fcntl.h> #include <unistd.h> #include <array> +#include <boost/program_options.hpp> #include <climits> #include <cstdlib> #include <cstring> +#include <exception> #include <filesystem> #include <iostream> +#include <sstream> #include <string> #include "Common.hpp" #include "Fiber.hpp" #include "Runtime.hpp" #include "Semaphore.hpp" +#include "StealingMode.hpp" #include "io.hpp" namespace fs = std::filesystem; +namespace po = boost::program_options; #define EMPER_RIPGREP_BUFSIZE 4096 @@ -26,6 +31,8 @@ size_t needle_len; emper::Semaphore* max_running; +static enum emper::StealingMode stealingMode; + void search(const std::string& path) { if (max_running) { max_running->acquire(); @@ -33,7 +40,7 @@ void search(const std::string& path) { int fd = emper::io::openAndWait(path.c_str(), O_RDONLY); if (fd < 0) { - DIE_MSG_ERRNO("open failed"); + DIE_MSG_ERRNO("open of " << path << " failed"); } std::array<char, EMPER_RIPGREP_BUFSIZE> buf; @@ -52,7 +59,7 @@ void search(const std::string& path) { } if (bytes_read < 0) { - DIE_MSG_ERRNO("read failed"); + DIE_MSG_ERRNO("read of " << path << " failed"); } out: @@ -68,29 +75,86 @@ void walk_dir() { auto filter = [](const fs::directory_entry& dirent) { return dirent.is_regular_file(); }; // Search the file auto fn = [](const fs::directory_entry& dirent) { search(dirent.path()); }; - emper::io::recursive_directory_walk(".", filter, fn); + + switch (stealingMode) { + case emper::StealingMode::child: + emper::io::recursive_directory_walk(".", filter, fn); + break; + case emper::StealingMode::continuation: + emper::io::fibril_recursive_directory_walk(".", filter, fn); + break; + } Runtime::getRuntime()->initiateTermination(); } -auto main(int argc, char* argv[]) -> int { - if (argc < 2) { - std::cerr << "Usage: " << argv[0] << " <needle> [max fibers]" << std::endl; - return EXIT_SUCCESS; +static auto fssearch(const po::variables_map& vm) -> int { + if (!vm.count("needle")) { + std::cerr << "No 'needle' specified" << std::endl; + return EXIT_FAILURE; } - if (argc == 3) { - max_running = new emper::Semaphore(std::stoi(argv[2])); - } + auto needleStr = vm["needle"].as<std::string>(); + needle = needleStr.c_str(); + needle_len = needleStr.length(); - needle = argv[1]; - needle_len = strlen(needle); + if (vm.count("max-fibers")) { + unsigned int maxFibersCount = vm["max-fibers"].as<unsigned int>(); + max_running = new emper::Semaphore(maxFibersCount); + } Runtime runtime; + std::cout << "Starting fsearch with stealingMode=" << stealingMode; + if (max_running) std::cout << " and " << max_running->getCount() << " file fibers"; + std::cout << std::endl; + auto* dirWalker = Fiber::from(walk_dir); runtime.scheduleFromAnywhere(*dirWalker); runtime.waitUntilFinished(); + + delete max_running; + return EXIT_SUCCESS; } + +auto main(int argc, char* argv[]) -> int { + po::options_description desc("Allowed options"); + // clang-format off + desc.add_options() + ("help", "Show help") + ("needle", po::value<std::string>(), "The String to search for") + ("max-fibers", po::value<unsigned int>(), "Maximum number of file processing fibers") + ("stealing-mode", po::value<enum emper::StealingMode>(&stealingMode)->default_value(emper::StealingMode::child), "Stealing mode to use, either 'child' or 'continuation'") + ; + // clang-format on + + // Make 'needle' a positional option. + po::positional_options_description pos_desc; + pos_desc.add("needle", -1); + + // clang-format off + auto parse_result = po::command_line_parser(argc, argv) + .options(desc) + .positional(pos_desc) + .run() + ; + // clang-format on + + po::variables_map vm; + po::store(parse_result, vm); + po::notify(vm); + + if (vm.count("help")) { + std::cout << desc << "\n"; + return EXIT_SUCCESS; + } + + try { + return fssearch(vm); + } catch (const std::exception& e) { + std::cerr << e.what(); + return 1; + } +} diff --git a/apps/fsearch/meson.build b/apps/fsearch/meson.build index 881b91df41de98f796c8d4561c9c3b1c56acf9d1..0a55f0351838a573bc674519055ea6a65032719f 100644 --- a/apps/fsearch/meson.build +++ b/apps/fsearch/meson.build @@ -1,8 +1,22 @@ -if cpp_has_fs_recursive_directory_iterator +boost_program_options_dep = dependency('boost', modules: ['program_options']) + +boost_program_options_code = ''' +#include <boost/program_options.hpp> +int main(int argc, char* argv[]) { + boost::program_options::options_description desc("Allowed options"); +} +''' +cpp_can_link_with_boost_program_options = cpp_compiler.links( + boost_program_options_code, + name: 'boost_progam_options', + dependencies: boost_program_options_dep, +) + +if cpp_has_fs_recursive_directory_iterator and cpp_can_link_with_boost_program_options fsearch_exe = executable( 'fsearch', 'fsearch.cpp', - dependencies: emper_dep, + dependencies: [emper_dep, boost_program_options_dep], ) fsearch_callback_exe = executable( diff --git a/doc/Doxyfile.in b/doc/Doxyfile.in index 85538076ec08bfe1ba71b8d5840602b8750cf4ee..b961654647b31c851ef256575cb39eaaabcadd40 100644 --- a/doc/Doxyfile.in +++ b/doc/Doxyfile.in @@ -801,7 +801,7 @@ WARN_NO_PARAMDOC = YES # a warning is encountered. # The default value is: NO. -WARN_AS_ERROR = YES +WARN_AS_ERROR = NO # The WARN_FORMAT tag determines the format of the warning messages that doxygen # can produce. The string should contain the $file, $line, and $text tags, which diff --git a/emper/AbstractFiber.hpp b/emper/AbstractFiber.hpp index ce028f5b2d4b334bb24cd7479cdf40181dd11bdf..68346534b5dfcb91cbe49492172c3215219ed18a 100644 --- a/emper/AbstractFiber.hpp +++ b/emper/AbstractFiber.hpp @@ -6,19 +6,25 @@ #include "emper-common.h" +class Fiber; +class Fibril; + class AbstractFiber { friend class Dispatcher; protected: virtual ~AbstractFiber() = default; - virtual void run() const = 0; + virtual void run() = 0; public: virtual auto isRunnable() const -> bool { return true; } virtual auto getAffinityBuffer() const -> workeraffinity_t* { return nullptr; } + virtual auto asFiberIfPossible() -> Fiber* { return nullptr; }; + virtual auto asFibrilIfPossible() -> Fibril* { return nullptr; }; + virtual void printTo(std::ostream& strm, bool withPtr = true) const = 0; friend auto operator<<(std::ostream& strm, const AbstractFiber& fiber) -> std::ostream&; diff --git a/emper/Context.cpp b/emper/Context.cpp index 9a5e57e3842598bb362793a65bb13777f8d0d6d5..075a2a21ea488bbcca872671edcf34b97290c23c 100644 --- a/emper/Context.cpp +++ b/emper/Context.cpp @@ -2,14 +2,13 @@ // Copyright © 2020-2022 Florian Schmaus #include "Context.hpp" +#include <sys/mman.h> + +#include <cerrno> #include <ostream> thread_local Context* Context::currentContext; -thread_local void* Context::originalStack; - -thread_local Context* Context::lastContextBeforeReturningToOriginalStack; - void Context::printTo(std::ostream& strm, bool withPtr) const { // clang-format off strm << "Context ["; @@ -38,11 +37,25 @@ auto operator<<=(std::ostream& strm, const Context& context) -> std::ostream& { return strm; } -[[noreturn]] void Context::switchToOriginalStack() { - // Can't use new context hook to communicate the context to free. - lastContextBeforeReturningToOriginalStack = currentContext; - currentContext = nullptr; - DBG("Switchting to original stack " << &originalStack << " from " - << lastContextBeforeReturningToOriginalStack); - switch_and_load_context(&originalStack); +void Context::unmap(void* from) const { + const size_t PAGE_SIZE_MASK = 4 * 1024 - 1; + const uintptr_t start = ((uintptr_t) context + PAGE_SIZE_MASK) & ~PAGE_SIZE_MASK; + const uintptr_t end = (uintptr_t) from & ~PAGE_SIZE_MASK; + const size_t length = end - start; + const int advice = []{ + if constexpr (emper::CONTINUATION_STEALING_MADVISE_STACK == emper::ContinuationStealingMadviseStack::dontneed) { + return MADV_DONTNEED; + } else { + return MADV_FREE; + } + }(); + + + LOGD("madvise() start=" << start << ", length=" << length << ", advice=" << advice); + + // NOLINTNEXTLINE(performance-no-int-to-ptr) + errno = madvise((void*) start, length, advice); + if (errno) { + DIE_MSG_ERRNO("Unmapping unused stack space failed"); + } } diff --git a/emper/Context.hpp b/emper/Context.hpp index 0ee84b91d41bab29c2eedd96ae36719ebd2861e8..0964ed67d228c417ca07e58ec470e69e5cc6237a 100644 --- a/emper/Context.hpp +++ b/emper/Context.hpp @@ -17,6 +17,7 @@ class AbstractFiber; class ContextManager; class Dispatcher; +class Fibril; extern "C" [[noreturn]] void switch_and_load_context(void** toTos); // *Not* marked as 'noreturn' because save_and_switch_context does @@ -31,10 +32,6 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> { static thread_local Context* currentContext; - static thread_local void* originalStack; - - static thread_local Context* lastContextBeforeReturningToOriginalStack; - AbstractFiber* currentFiber = nullptr; void* const tos; @@ -75,12 +72,17 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> { friend Dispatcher; + // Allow Fibril to access Context::currentContext. + friend Fibril; + static void setCurrentFiber(AbstractFiber* fiber) { assert(currentContext); currentContext->currentFiber = fiber; } + void unmap(void* from) const; + public: // cppcheck-suppress noExplicitConstructor selfInitialization Context(func_t mainFunction) @@ -139,19 +141,15 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> { [[nodiscard]] inline auto getTos() const -> const void* { return tos; } /** - * Start this context. This method is called from an unmanaged stack which is saved. + * Start this context. This method is called from an unmanaged stack. */ - inline auto start() -> Context* { + inline void start() { LOGD("starting " << *this); currentContext = this; - save_and_switch_context(&savedStackpointer, &originalStack); - - return lastContextBeforeReturningToOriginalStack; + switch_context(&savedStackpointer); } - [[noreturn]] static void switchToOriginalStack(); - /** * Save the current context and switch to a new one. This method is * usally used when a synchronization primitve decided that it is diff --git a/emper/ContextManager.cpp b/emper/ContextManager.cpp index 367e99de2e31a3a7c0140367c169fca47bdf1d14..c0cf46e79ca9d4671f02b443ca181b89c4d983a1 100644 --- a/emper/ContextManager.cpp +++ b/emper/ContextManager.cpp @@ -5,10 +5,13 @@ #include <cassert> // for assert #include <utility> -#include "Context.hpp" // for Context -#include "Debug.hpp" // for LOGD -#include "Dispatcher.hpp" // for Dispatcher -#include "Runtime.hpp" // for Runtime +#include "Context.hpp" +#include "Continuation.hpp" +#include "Debug.hpp" +#include "Dispatcher.hpp" +#include "Emper.hpp" +#include "Fibril.hpp" +#include "Runtime.hpp" #include "emper-common.h" #include "emper-config.h" // // IWYU pragma: keep @@ -47,10 +50,61 @@ void ContextManager::putFreeContext(Context* context) { #endif } +thread_local static Continuation* continuation; + void ContextManager::start() { + Continuation originalStackContinuation; + + continuation = &originalStackContinuation; + + uintptr_t fibrilResumeValue = originalStackContinuation.setJmp(); + + // We perform a longjmp with fibrilResumeValue == 0 to switch back + // to the original stack and terminate the runtime. + if (unlikely(!fibrilResumeValue && runtime.isTerminating())) { + putFreeContext(Context::currentContext); + Context::currentContext = nullptr; + return; + } + + if constexpr (emper::CONTINUATION_STEALING) { + Fibril::tryResumeFibril(fibrilResumeValue); + + // The code below only needs to be executed if we did a long jmp + // before. But optimizing for it is not worth it. If we longjmp + // and can not resume the Fibril and we where not on the initial + // context of the Fibril, then we need to take care to recycle the + // context. + Context* currentContext = Context::getCurrentContext(); + if (currentContext) { + // If we would simply call currentContext->start() here, then we + // would jump to an invalid address. The reason is that as soon as + // the Context::kickoff() is executed, the alpha function address, + // which was just popped of the stack, will be potentially + // overriden by a stack push operation. And such operations are + // very likely. Instead we simply free the current context here, + // so that it is re-initialized later on. + + // TODO: Adjust the assembly to not use 'ret' to reach the + // kickoff function, but instead jmp. This would not destroy the + // kickoff's function pointer on the stack, and we could call + // currentContext->start() here. This would print a small + // performance gain. + putFreeContext(currentContext); + } + } + Context* freeContext = getFreeContext(); - freeContext = freeContext->start(); - putFreeContext(freeContext); + freeContext->start(); +} + +[[noreturn]] void ContextManager::switchToOriginalStack() { + DBG("Switching to original stack from " << Context::currentContext); + continuation->longJmp(0); +} + +void ContextManager::tryResumeFibril(uintptr_t fibrilResumeValue) { + continuation->longJmp(fibrilResumeValue); } /** diff --git a/emper/ContextManager.hpp b/emper/ContextManager.hpp index 5262f76f68431003c14034112b06febeb8efdcb0..c2844294d461dbacf8b56a74df6b8dab33bb7ed7 100644 --- a/emper/ContextManager.hpp +++ b/emper/ContextManager.hpp @@ -1,7 +1,9 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2022 Florian Schmaus #pragma once +#include <cstdint> + #include "Common.hpp" // for func_t #include "Debug.hpp" // for LogSubsystem, LogSubsystem::CM, Logger #include "MemoryManager.hpp" // for MemoryManager @@ -28,7 +30,13 @@ class ContextManager void start(); + [[noreturn]] static void switchToOriginalStack(); + void saveAndStartNew(func_t freshContextHook); [[noreturn]] void discardAndResume(Context* context); + + // TODO: Adding [[noreturn]] leads to SEGFAULTs in fibril_join, because + // compiler thinks join never returns! Try to fix and add [[noreturn]]. + static void tryResumeFibril(uintptr_t fibrilResumeValue); }; diff --git a/emper/Continuation.cpp b/emper/Continuation.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ca618a0e9ce2d49b3db28bde797716b11072132d --- /dev/null +++ b/emper/Continuation.cpp @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Schmaus +#include "Continuation.hpp" + +void Continuation::printTo(std::ostream& strm) const { + // clang-format off + strm << "Continuation [" + << "bp=" << bp + << ", sp=" << sp + << ", ip=" << ip + << "]"; + // clang-format on +} + +auto operator<<(std::ostream& strm, const Continuation& continuation) -> std::ostream& { + continuation.printTo(strm); + return strm; +} diff --git a/emper/Continuation.hpp b/emper/Continuation.hpp new file mode 100644 index 0000000000000000000000000000000000000000..23a906c2379a946dcedde29fb6c9425685b514ef --- /dev/null +++ b/emper/Continuation.hpp @@ -0,0 +1,59 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2022 Nicolas Pfeiffer, Florian Schmaus +#pragma once + +#include <cstdint> +#include <ostream> + +#define membar(call) \ + do { \ + call; \ + asm("nop" ::: "rbx", "r12", "r13", "r14", "r15", "memory"); \ + } while (0); + +class Continuation { + public: + void *bp; + void *sp; + void *ip; + + inline __attribute__((always_inline)) Continuation() : ip(nullptr) { + asm volatile( + "mov %%rbp, %0\n\t" + "mov %%rsp, %1\n\t" + : "=rm"(bp), "=rm"(sp)::); + }; + + inline __attribute__((always_inline, noreturn)) void execute(const void *_sp) { + asm("mov %0, %%rsp\n\t" + "mov %1, %%rbp\n\t" + "jmp *%2\n\t" ::"r"(_sp), + "r"(bp), "r"(ip) + : "memory"); + __builtin_unreachable(); + }; + + inline __attribute__((always_inline, returns_twice)) uintptr_t setJmp() { + auto set_rip = [](Continuation * c) __attribute__((noinline, hot, optimize(3))) { + c->ip = __builtin_return_address(0); + return 0; + }; + + uintptr_t res; + membar(res = set_rip(this)); + return res; + }; + + inline __attribute__((always_inline, noreturn)) void longJmp(uintptr_t ret) { + asm("mov %0, %%rsp\n\t" + "mov %1, %%rbp\n\t" + "jmp *%2\n\t" ::"r"(sp), + "r"(bp), "r"(ip), "a"(ret) + : "memory"); + __builtin_unreachable(); + }; + + void printTo(std::ostream &strm) const; + + friend auto operator<<(std::ostream &strm, const Continuation &continuation) -> std::ostream &; +}; diff --git a/emper/Debug.hpp b/emper/Debug.hpp index 111ea7d6f62b2892ce2677737cfe151b93daaa95..f7bb4f341b6fea9a4c15f6113913a7b9ad94673c 100644 --- a/emper/Debug.hpp +++ b/emper/Debug.hpp @@ -74,6 +74,7 @@ enum class LogSubsystem { PS, F, + FIBRIL, C, CM, DISP, @@ -123,6 +124,8 @@ class Logger { return "PS "; case LogSubsystem::F: return "F "; + case LogSubsystem::FIBRIL: + return "FIBRI"; case LogSubsystem::C: return "C "; case LogSubsystem::CM: diff --git a/emper/Dispatcher.hpp b/emper/Dispatcher.hpp index a090eba80c8e4a9640fc3b15ebe14ef1013cca6c..60e299aa6e8287add6d4d7ea12e34201619f5eba 100644 --- a/emper/Dispatcher.hpp +++ b/emper/Dispatcher.hpp @@ -32,7 +32,7 @@ class Dispatcher : public Logger<LogSubsystem::DISP> { } static inline auto isRunnable(AbstractFiber* abstractFiber) -> bool { - auto* fiber = dynamic_cast<Fiber*>(abstractFiber); + auto* fiber = abstractFiber->asFiberIfPossible(); if (!fiber) return true; if (fiber->isMultiFiber()) { @@ -50,7 +50,7 @@ class Dispatcher : public Logger<LogSubsystem::DISP> { } void recycle(AbstractFiber* abstractFiber) { - auto* fiber = dynamic_cast<Fiber*>(abstractFiber); + auto* fiber = abstractFiber->asFiberIfPossible(); // We only recycle Fibers. if (!fiber) return; diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 668660c899018d7013bbc9acb0afd6580e3e6a4d..64fda316acfb8fbca8a4b151392aa755d37175e3 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -239,4 +239,36 @@ enum class CompleterSchedParam { static const CompleterSchedParam COMPLETER_SCHED_PARAM = CompleterSchedParam::EMPER_IO_COMPLETER_SCHED_PARAM; +enum class ContinuationStealingMode { + disabled, + locked, + waitfree, +}; + +static const ContinuationStealingMode CONTINUATION_STEALING_MODE = + ContinuationStealingMode::EMPER_CONTINUATION_STEALING_MODE; + +static const bool CONTINUATION_STEALING_MODE_LOCKED = + CONTINUATION_STEALING_MODE == ContinuationStealingMode::locked; + +static const bool CONTINUATION_STEALING = + CONTINUATION_STEALING_MODE != ContinuationStealingMode::disabled; + +enum class ContinuationStealingMadviseStack { + no, + dontneed, + free, +}; + +static const enum ContinuationStealingMadviseStack CONTINUATION_STEALING_MADVISE_STACK = + ContinuationStealingMadviseStack::EMPER_CONTINUATION_STEALING_MADVISE_STACK; + +static const bool BUILD_WITH_CLANG = +#ifdef EMPER_BUILD_WITH_CLANG + true +#else + false +#endif + ; + } // namespace emper diff --git a/emper/Fiber.cpp b/emper/Fiber.cpp index b5d6a3a1fce9d19c4db013defb4d34c92f7a64c8..4fade85cc1d21111794dc23773e200925fc8ff33 100644 --- a/emper/Fiber.cpp +++ b/emper/Fiber.cpp @@ -7,7 +7,7 @@ #include "emper-config.h" // IWYU pragma: keep -void Fiber::run() const { +void Fiber::run() { LOGD("run() calling " << function.target<FIBER_FUN_TEMPLATE_ARG>() << " (" << function.target_type().name() << ") with arg " << arg); function(arg); diff --git a/emper/Fiber.hpp b/emper/Fiber.hpp index 9da6f9ab1da69d091ce6e41e088a9dc00620235d..741c1bd1657da5795a8d8eec6e2c18352baca79a 100644 --- a/emper/Fiber.hpp +++ b/emper/Fiber.hpp @@ -71,14 +71,16 @@ class ALIGN_TO_CACHE_LINE Fiber : public AbstractFiber, public Logger<LogSubsyst virtual ~Fiber() = default; - virtual void run() const; + void run() override; private: inline void setMpscNext(Fiber* next) { mpscNext = next; } inline auto getMpscNext() -> Fiber* { return mpscNext; } - [[nodiscard]] inline auto getAffinityBuffer() const -> workeraffinity_t* { return affinity; } + [[nodiscard]] inline auto getAffinityBuffer() const -> workeraffinity_t* override { + return affinity; + } inline auto setRunnableFalse() -> bool { bool res = runnable.load(std::memory_order_relaxed); @@ -126,7 +128,9 @@ class ALIGN_TO_CACHE_LINE Fiber : public AbstractFiber, public Logger<LogSubsyst return *affinity; } - void printTo(std::ostream& strm, bool withPtr) const; + auto asFiberIfPossible() -> Fiber* override { return this; } + + void printTo(std::ostream& strm, bool withPtr) const override; /** * @brief check if this Fiber is runnable. @@ -138,7 +142,7 @@ class ALIGN_TO_CACHE_LINE Fiber : public AbstractFiber, public Logger<LogSubsyst * * @return 'true' if this fiber is runnable, 'false' otherwhise. */ - [[nodiscard]] auto isRunnable() const -> bool; + [[nodiscard]] auto isRunnable() const -> bool override; [[nodiscard]] auto isMultiFiber() const -> bool { return isMulti; } diff --git a/emper/Fibril.cpp b/emper/Fibril.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9cfe87a55be9a522a714d5a451b9bf9db266ee7d --- /dev/null +++ b/emper/Fibril.cpp @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2022 Nicolas Pfeiffer, Florian Schmaus +#include "Fibril.hpp" + +thread_local Fibril* Fibril::toResume = nullptr; + +void Fibril::run() { + Context* currentContext = Context::getCurrentContext(); + LOGD("run() of " << *this << " on context " << currentContext); + + if constexpr (LOCKED_CONTINUATION_STEALING) { + if (!activeChildrenCount) + activeChildrenCount = 2; + else + activeChildrenCount++; + + fibrilMutex.unlock(); + } else { + reverseStealCount -= 1; + } + + // TODO: Fibril stats: increase steal count by one here. + + // This is the run() method that is executed on a fresh context. We + // must ensure that this context is in all cases free'd again, via + // putFreeContext(). This run() method must eventually call + // Fibril::join(), i.e. perform an explicit join operation. Since we + // are in the run() method, a cactus stack was formed and join() + // will always call ContextManager::tryResumeFibril(), which leads + // to ContextManager::start() where we free (at some point in the + // future hopefully re-use) this context in case this Fibril could + // not yet be resumed. + + // Reserve 128 byte at the bottom for the linking region. Basically + // this allows the compiler to pop args off the stack, but + // unfortunately on the new stack. If we do not reserve space for + // this, then it would lead to access outside (above) the stack + // area. See Nicolas Pfeiffer's Masters thesis [pfeiffer2020cactus] + // § 4.7 for more information. + cont.execute((void**)currentContext->getTos() - 16); +} + +void Fibril::printTo(std::ostream& strm, bool withPtr) const { + // clang-format off + strm << "Fibril ["; + if (withPtr) { + strm << "ptr=" << this << ", "; + } + strm << "initialContext=" << initialContext + << ", " << cont + << "]"; + // clang-format-off +} diff --git a/emper/Fibril.hpp b/emper/Fibril.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b12150c1d5250be6c177f21a34cc8e8316659e04 --- /dev/null +++ b/emper/Fibril.hpp @@ -0,0 +1,361 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2022 Nicolas Pfeiffer, Florian Schmaus +#pragma once + +#include <atomic> +#include <cassert> +#include <cstddef> +#include <cstdint> +#include <functional> +#include <mutex> +#include <ostream> +#include <type_traits> + +#include "AbstractFiber.hpp" +#include "Common.hpp" +#include "Context.hpp" +#include "ContextManager.hpp" +#include "Continuation.hpp" +#include "Debug.hpp" +#include "Emper.hpp" +#include "Runtime.hpp" +#include "Scheduler.hpp" +#include "emper-config.h" + +#define emper_fibril __attribute__((optimize("no-omit-frame-pointer"))) + +namespace adt { +template <typename I, const uintptr_t SIZE> +class LockedQueue; // IWYU pragma: keep +} + +class Fibril : public AbstractFiber, public Logger<LogSubsystem::FIBRIL> { + private: + using activeChildrenCount_t = std::conditional<emper::CONTINUATION_STEALING_MODE == + emper::ContinuationStealingMode::waitfree, + std::atomic<uint32_t>, int>::type; + activeChildrenCount_t activeChildrenCount = 0; + + // Members for locked continuation stealing. + std::mutex fibrilMutex; + + // Members for wait-free continuation stealing. + + uint32_t reverseStealCount = 0; + + // Only used if continuation_stealing_madvise_stack is not 'no'. + std::atomic<bool> resumable = false; + + Continuation cont; + + Context* const initialContext; + + Runtime& runtime; + + static thread_local Fibril* toResume; + + static constexpr bool UNMAP_STACKS = + emper::CONTINUATION_STEALING_MADVISE_STACK != emper::ContinuationStealingMadviseStack::no; + static constexpr bool LOCKED_CONTINUATION_STEALING = + emper::CONTINUATION_STEALING_MODE == emper::ContinuationStealingMode::locked; + + inline void tryResume(uint32_t fibrilResumeValue) { + LOGD("tryResumeFiber() invoking tryResume(" << fibrilResumeValue << ")"); + + if constexpr (LOCKED_CONTINUATION_STEALING) { + tryResumeLocked(); + } else { + tryResumeWaitFree(fibrilResumeValue); + } + } + + inline void tryResumeLocked() { + fibrilMutex.lock(); + int c = --activeChildrenCount; + LOGD("tryResume() c=" << c << " after decrementing activeChildrenCount"); + + const bool syncConditionHolds = c == 0; + Context* const currentContext = Context::getCurrentContext(); + + if (syncConditionHolds) { + fibrilMutex.unlock(); + + if (initialContext != currentContext) { + LOGD("tryResume: current context " << currentContext << " != initialContext " + << initialContext << ", freeing current context"); + runtime.getContextManager().putFreeContext(currentContext); + + // After the cont.execute() below, the current context will be + // the initial context. + Context::currentContext = initialContext; + } + + // Resume this fibril, does *not* return. + cont.execute(cont.sp); + __builtin_unreachable(); + } + + if (initialContext == currentContext) { + if constexpr (UNMAP_STACKS) { + // Unmap unused stack pages. + currentContext->unmap(cont.sp); + } + + // Set currrentContext to nullptr, as otherwise we would + // recycle/reuse it in ContextManager::start(). But this is the + // initial context of this Fibril, which we can not + // recycle/reuse at this point. + Context::currentContext = nullptr; + } + + fibrilMutex.unlock(); + + // Continue in the scheduler to find new work. + } + + inline void tryResumeWaitFree(uint32_t fibrilResumeValue) { + // We need to save a copy of the initial context value before + // performing the fetch_sub() below. As after the fetch_sub() + // performed below, this Fibril instance may become destructed. We + // know that it will not be yet destructed if we are the + // control-flow who founds that the sync condition holds, because + // then we are the one who is destructing it. But if the sync + // condition does not hold, then we can not assume that the Fibril + // instance is still alive, because a concurrenct control flow may + // have observed the sync condition and destructed it. + Context* const savedInitialContext = initialContext; + + // Note that the memory order for the fetch_sub below is acq_rel, + // to ensure that the 'initialContext' member value is loaded into + // 'savedInitialContext' before the decrement. As right after the + // decrement, this Fibril instance could become invalid (if the + // sync condition holds). + + uint32_t c; +#ifdef EMPER_CONTINUATION_STEALING_MODE_WAITFREE + c = activeChildrenCount.fetch_sub(fibrilResumeValue, std::memory_order_acq_rel) - + fibrilResumeValue; +#else + // "Dummy" implementation for the case where 'activeChildrenCount' is not an std::atomic. + c = activeChildrenCount -= fibrilResumeValue; +#endif + LOGD("tryResume() c=" << c << " after substracting " << fibrilResumeValue); + + const bool syncConditionHolds = c == 0; + Context* const currentContext = Context::getCurrentContext(); + + if (syncConditionHolds) { + if (initialContext != currentContext) { + if constexpr (UNMAP_STACKS) { + // Check the atomic resumable bool to see if the context + // owner is not unmapping in parallel here. + if (!resumable.exchange(true, std::memory_order_acq_rel)) { + // The context owner is unmapping pages. We do not wait + // for it to complete, instead wo return to the + // scheduler. The unmapping owner will see that we tried + // to resume and will resume for us. + return; + } + // Otherwise continue with the resume operation. + } + LOGD("tryResume: current context " << Context::currentContext << " != initialContext " + << initialContext << ", freeing current context"); + runtime.getContextManager().putFreeContext(currentContext); + + // After the cont.execute() below, the current context will be + // the initial context. + Context::currentContext = initialContext; + } + + // Resume this fibril, does *not* return. + cont.execute(cont.sp); + __builtin_unreachable(); + } + + // At this point, we can not make any assumptions about the + // validty of this Fibril instance. It may be already + // destrcuted. + if (savedInitialContext == currentContext) { + // Here we know this Fibril instance is alive, because we are + // on the initial Context where the Fibril instance was + // constructed on. + if constexpr (UNMAP_STACKS) { + // The initial context, which is also the current context, is + // now blocked. Employ the "practical solution" to the Cactus + // Stack problem, as per Yang et. al. in their SPAA '16 paper, + // and unmap, via madvise() MADV_FREE or MADV_DONTNEED, the + // unused pages of the context (i.e., stack). Note that the + // Context and hence the Fibril is not resumable while the + // unmap/madvise operation is in progress. + currentContext->unmap(cont.sp); + + if (true == resumable.exchange(true, std::memory_order_acq_rel)) { + // Someone was trying to resume this Fiber while we where + // unmapping/madvising the stack. We will resume it for them + // now. + cont.execute(cont.sp); + __builtin_unreachable(); + } + } + + // Set currrentContext to nullptr, as otherwise we would + // recycle/reuse it in ContextManager::start(). But this is the + // initial context of this Fibril, which we can not + // recycle/reuse at this point. + Context::currentContext = nullptr; + } + + // Continue in the scheduler to find new work. + } + + public: + inline __attribute__((always_inline)) Fibril(Runtime& runtime = *Runtime::getRuntime()) + : cont(), initialContext(Context::getCurrentContext()), runtime(runtime) { + if constexpr (emper::CONTINUATION_STEALING_MODE != emper::ContinuationStealingMode::waitfree && + emper::CONTINUATION_STEALING_MODE != emper::ContinuationStealingMode::locked) { + DIE_MSG("Fibril can only be used if continuation_stealing_mode is enabled"); + } + if constexpr (emper::BUILD_WITH_CLANG) { + DIE_MSG("Fibril can not be used when build with clang"); + } + + LOGD("constructed " << *this); + } + + ~Fibril() override { join(); } + + auto asFibrilIfPossible() -> Fibril* override { return this; } + + void printTo(std::ostream& strm, bool withPtr) const override; + + private: + void run() override; + + inline static void tryResumeFibril(uint32_t fibrilResumeValue) { + Fibril* fibrilToResume = toResume; + if (!fibrilToResume) return; + + // Set toResume to nullptr, as otherwise, once we restart the + // current context in ContextManager::start() we would try to + // resume this Fibril again. Note that it is important that we + // set it to nullptr before we invoke tryResume(), as + // tryResume() may never returns. + toResume = nullptr; + + fibrilToResume->tryResume(fibrilResumeValue); + } + // ContextManager::start() invokes tryResumeFibril(uint32_t), let's be friends. + friend void ContextManager::start(); + + __attribute__((noinline, hot, optimize(3))) void doJoin() { + cont.ip = __builtin_return_address(0); + + toResume = this; + + const uintptr_t fibrilResumeValue = [&] { + if constexpr (LOCKED_CONTINUATION_STEALING) { + return 1; + } else { + return reverseStealCount; + } + }(); + ContextManager::tryResumeFibril(fibrilResumeValue); + } + + void checkContinuationState() { + auto* poppedFiber = runtime.scheduler.popBottom(); + if (poppedFiber == this) { + // Nobody stole the continuation, simply return and execute the continatuion ourselves. + LOGD("poppedFiber is continuation"); + return; + } + LOGD("poppedFiber=" << poppedFiber << " was not this continuation fibril"); + + if (poppedFiber) { + // TODO: Is this also safe with locked continuation stealing? + + // We popped an unrelated fiber, reschedule it. + [[maybe_unused]] bool res = runtime.scheduler.pushBottom(*poppedFiber); + // Since we where the onces who just poped this Fiber, there + // should be always enough space in the work-stealing queue to + // push it back. + assert(res); + } + + // If the continuation was stolen, or if we popped an unrelated + // fiber, then try to resume this Fibril. + toResume = this; + ContextManager::tryResumeFibril(1); + __builtin_unreachable(); + } + + public: + template <class RET, class... PARs, class... ARGs> + inline __attribute__((always_inline)) void fork(RET* ret, RET (*fun)(PARs...), ARGs... args) { + auto fork_func = [](ARGs... args, Fibril * fr, RET * ret, RET(*fun)(PARs...)) + __attribute__((noinline, hot, optimize(3))) { + fr->cont.ip = __builtin_return_address(0); + fr->runtime.scheduler.pushBottom(*fr); + *ret = fun(args...); + fr->checkContinuationState(); + }; + membar(fork_func(args..., this, ret, fun)); + } + + template <class... PARs, class... ARGs> + inline __attribute__((always_inline)) void fork(void (*fun)(PARs...), ARGs... args) { + auto fork_func = [](ARGs... args, Fibril * fr, void (*fun)(PARs...)) + __attribute__((noinline, hot, optimize(3))) { + fr->cont.ip = __builtin_return_address(0); + fr->runtime.scheduler.pushBottom(*fr); + fun(args...); + fr->checkContinuationState(); + }; + membar(fork_func(args..., this, fun)); + } + + template <class T, class... ARGs> + inline __attribute__((always_inline)) void fork(std::function<T> fun, ARGs... args) { + auto fork_func = [](Fibril * fr, std::function<T> fun, ARGs... args) + __attribute__((noinline, hot, optimize(3))) { + fr->cont.ip = __builtin_return_address(0); + fr->runtime.scheduler.pushBottom(*fr); + fun(args...); + fr->checkContinuationState(); + }; + membar(fork_func(this, fun, args...)); + } + + inline __attribute__((always_inline)) void join() { + if constexpr (LOCKED_CONTINUATION_STEALING) { + joinLocked(); + } else { + joinWaitFree(); + } + } + + private: + inline __attribute__((always_inline)) void joinLocked() { + if (activeChildrenCount == 0) return; + + membar(doJoin()); + } + + inline __attribute__((always_inline)) void joinWaitFree() { + if (reverseStealCount == 0) return; + + membar(doJoin()); + + reverseStealCount = 0; + if constexpr (UNMAP_STACKS) { + resumable.store(false, std::memory_order_relaxed); + } + } + + // Disallow heap allocations of Fibril by declaring those private. + static void* operator new(std::size_t); + static void* operator new[](std::size_t); + + template <typename, const uintptr_t> + friend class adt::LockedQueue; +}; diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 97c2fede03df611f17e76e04c78999bf3db35bc3..c94ce71d5f7db05c8ffdb599510ead49af185662 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -306,7 +306,7 @@ auto Runtime::workerLoop(Worker* worker) -> void* { contextManager.start(); - // Threads return here if Context::switchToOriginalStack() is called. + // Threads return here if ContextManager::switchToOriginalStack() is called. // Ensure that all worker threads exit "at the same" time. Otherwise // it would be possible that one thread is work-stealing, @@ -514,3 +514,11 @@ auto Runtime::getDefaultPinningOffset() -> workerid_t { return 0; } + +void Runtime::maybeTerminateWorker() { + if (terminateWorkers.load(std::memory_order_relaxed)) { + LOGD("Worker terminating"); + // Switch to the original stack, which will simply terminate the thread. + ContextManager::switchToOriginalStack(); + } +} diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index d91d779dc8ab4694c86ddb445ebb92ced6e34e91..cf5d5acae5a8e6c272b4fbfb58da3c99b0c7e79e 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -18,7 +18,6 @@ #include "CallerEnvironment.hpp" #include "Common.hpp" -#include "Context.hpp" #include "Debug.hpp" #include "FiberHint.hpp" #include "FiberSource.hpp" @@ -157,13 +156,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { } } - void maybeTerminateWorker() { - if (terminateWorkers.load(std::memory_order_relaxed)) { - LOGD("Worker terminating"); - // Switch to the original stack, which will simply terminate the thread. - Context::switchToOriginalStack(); - } - } + void maybeTerminateWorker(); void dispatchLoopSleep() { bool canWake; @@ -262,6 +255,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { friend class MemoryManager; template <typename> friend class WorkerLocalData; + friend class Fibril; friend class emper::sleep_strategy::PipeSleepStrategy; friend void emper::log::log(const std::string& prefix, const std::string& message); diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index 665064d8a838ff0ec155f7340b083a6291f80051..007c0956eaca475269d5cd058f30ec7946ea857e 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -77,8 +77,11 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { void recycle(AbstractFiber* fiber) { dispatcher.recycle(fiber); }; - virtual auto nextFiber() -> std::optional<NextFiberResult> = 0; + virtual auto pushBottom(AbstractFiber& fiber) -> bool = 0; + virtual auto popBottom() -> AbstractFiber* = 0; + friend class Fibril; + virtual auto nextFiber() -> std::optional<NextFiberResult> = 0; friend class Runtime; public: diff --git a/emper/Semaphore.hpp b/emper/Semaphore.hpp index d68919f4dd5e68abaf1d3af245c46eb027af6354..f703c5d1d8e60cd132967c5d70e0e39ca8817818 100644 --- a/emper/Semaphore.hpp +++ b/emper/Semaphore.hpp @@ -55,6 +55,8 @@ class Semaphore { auto releaseFromAnywhere() -> bool { return release<CallerEnvironment::ANYWHERE>(); } + auto getCount() -> unsigned int { return count; }; + void print(); }; diff --git a/emper/StealingMode.cpp b/emper/StealingMode.cpp new file mode 100644 index 0000000000000000000000000000000000000000..baf82ca0dbf48c871c778213acdce81c02f53439 --- /dev/null +++ b/emper/StealingMode.cpp @@ -0,0 +1,36 @@ +/// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Schmaus +#include "StealingMode.hpp" + +#include <string> + +namespace emper { + +auto operator<<(std::ostream& out, const StealingMode& stealingMode) -> std::ostream& { + switch (stealingMode) { + case StealingMode::child: + out << "child"; + break; + case StealingMode::continuation: + out << "continuation"; + break; + } + return out; +} + +auto operator>>(std::istream& in, StealingMode& stealingMode) -> std::istream& { + std::string token; + + in >> token; + if (token == "child") { + stealingMode = StealingMode::child; + } else if (token == "continuation") { + stealingMode = StealingMode::continuation; + } else { + in.setstate(std::ios_base::failbit); + } + + return in; +} + +} // namespace emper diff --git a/emper/StealingMode.hpp b/emper/StealingMode.hpp new file mode 100644 index 0000000000000000000000000000000000000000..88fceda60022d00867b9f9d5f9fc2bd33f6e5b15 --- /dev/null +++ b/emper/StealingMode.hpp @@ -0,0 +1,18 @@ +/// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Schmaus +#pragma once + +#include <iostream> + +namespace emper { + +enum class StealingMode { + child, + continuation, +}; + +auto operator<<(std::ostream& out, const StealingMode& stealingMode) -> std::ostream&; + +auto operator>>(std::istream& in, StealingMode& stealingMode) -> std::istream&; + +} // namespace emper diff --git a/emper/SynchronizedFiber.hpp b/emper/SynchronizedFiber.hpp index cca30c8e1ac611758646a8bbbb3c2641d6a85bdb..3358454e397341514306072d7284bf2ffc6e43d7 100644 --- a/emper/SynchronizedFiber.hpp +++ b/emper/SynchronizedFiber.hpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2022 Florian Schmaus #pragma once #include <utility> @@ -30,7 +30,7 @@ class SynchronizedFiber : public Fiber { explicit SynchronizedFiber(const fiber_fun0_t& function, PrivateSemaphore& semaphore) : SynchronizedFiber(function, nullptr, semaphore) {} - void run() const override { + void run() override { Fiber::run(); semaphore.signalAndExit(); } diff --git a/emper/io.hpp b/emper/io.hpp index db52092befe20fda4dc727551448e99b526097cc..82867684380c12f046f1ca175487cb17550fc4aa 100644 --- a/emper/io.hpp +++ b/emper/io.hpp @@ -636,6 +636,10 @@ inline void closeAndForget(int fd) { void recursive_directory_walk(const fs::path &dirpath, std::function<bool(const fs::directory_entry &dirent)> filter, std::function<void(fs::directory_entry dirent)> fn); + +void fibril_recursive_directory_walk(const fs::path dirpath, + std::function<bool(const fs::directory_entry &dirent)> filter, + std::function<void(fs::directory_entry dirent)> fn); #endif struct SockOpt { diff --git a/emper/io/io.cpp b/emper/io/io.cpp index f7ef9aa225acefd1d6e3111cc1c53e758d14b086..a58a29fb5a1429bb9da0afc7787e1458694c2e89 100644 --- a/emper/io/io.cpp +++ b/emper/io/io.cpp @@ -16,6 +16,7 @@ #include "Debug.hpp" #include "Emper.hpp" #include "Fiber.hpp" +#include "Fibril.hpp" #include "Runtime.hpp" #include "SynchronizedFiber.hpp" @@ -58,6 +59,28 @@ void recursive_directory_walk(const fs::path& dirpath, cps.wait(); } +// NOLINTNEXTLINE(clang-diagnostic-unknown-attributes) +emper_fibril void fibril_recursive_directory_walk( + // NOLINTNEXTLINE(performance-unnecessary-value-param) + const fs::path dirpath, std::function<bool(const fs::directory_entry& dirent)> filter, + // NOLINTNEXTLINE(performance-unnecessary-value-param) + std::function<void(fs::directory_entry dirent)> fn) { + Fibril fibril; + + for (const auto& pathRef : fs::directory_iterator(dirpath)) { + // We must copy 'pathRef', to not hold a reference, which may + // become invalid at the end of the iteration. + const fs::directory_entry path = pathRef; + if (filter(path)) { + fibril.fork(fn, path); + } + + if (pathRef.is_directory()) { + fibril.fork(fibril_recursive_directory_walk, path, filter, fn); + } + } +} + auto tcp_listener(const std::string& host, const std::string& port, const std::function<void(int)>& handler, int backlog, const std::vector<SockOpt>& sockopts) -> Fiber* { diff --git a/emper/lib/adt/LockedQueue.hpp b/emper/lib/adt/LockedQueue.hpp index f2f8b2fe3e57a805a291fad6ae831917ea8ad25d..eda6e980169bac8e0754dec5458f293ae85288e0 100644 --- a/emper/lib/adt/LockedQueue.hpp +++ b/emper/lib/adt/LockedQueue.hpp @@ -1,11 +1,13 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2022 Florian Schmaus, Nicolas Pfeiffer #pragma once #include <deque> #include <mutex> +#include <type_traits> #include "Common.hpp" +#include "Fibril.hpp" #include "StealingResult.hpp" namespace adt { @@ -53,6 +55,20 @@ class LockedQueue { *itemPtr = deque.front(); + // This is one of the ugliest places in EMPER. Here we specialize + // the LockedQueue for locked continuation stealing, where it is + // important that the Fibril gets locked *before* the queue lock + // is released. For more information, see the IPDPS 2021 paper + // by Schmaus et al. [schmaus2021nowa], Figure 6. and Listing 2. + if constexpr (emper::CONTINUATION_STEALING_MODE_LOCKED && + std::is_same<AbstractFiber*, I>::value) { + AbstractFiber* abstractFiber = static_cast<AbstractFiber*>(*itemPtr); + Fibril* fibril = abstractFiber->asFibrilIfPossible(); + if (fibril) { + fibril->fibrilMutex.lock(); + } + } + deque.pop_front(); return emper::StealingResult::Stolen; diff --git a/emper/meson.build b/emper/meson.build index 876f76f4cc649b7468a1901680068ecfccadb3a2..1f957d0758f08012f335c9db7eca2df3d6cc1035 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -15,12 +15,14 @@ emper_asm_objects = nasm_gen.process(emper_asm_sources) emper_cpp_sources = [ 'AbstractFiber.cpp', 'CallerEnvironment.cpp', + 'Continuation.cpp', 'Runtime.cpp', 'Emper.cpp', 'Fiber.cpp', 'FiberHint.cpp', 'FiberSource.cpp', 'FiberManager.cpp', + 'Fibril.cpp', 'Context.cpp', 'Scheduler.cpp', 'Dispatcher.cpp', @@ -29,6 +31,7 @@ emper_cpp_sources = [ 'BinaryPrivateSemaphore.cpp', 'CountingPrivateSemaphore.cpp', 'Semaphore.cpp', + 'StealingMode.cpp', 'WakeupStrategy.cpp', 'Worker.cpp', ] diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp index b452edeff6070d515215e4d4e4b9252c90c47137..b38489ad8314551d30649be236cfc9c08b248535 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.hpp +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -5,7 +5,10 @@ #include <cstddef> // for size_t #include <optional> +#include "CallerEnvironment.hpp" #include "Fiber.hpp" +#include "FiberHint.hpp" +#include "FiberSource.hpp" #include "Scheduler.hpp" #include "emper-common.h" #include "lib/adt/MpscQueue.hpp" @@ -54,6 +57,20 @@ class AbstractWorkStealingScheduler : public Scheduler { auto nextFiberResultViaWorkStealing() -> std::optional<NextFiberResult>; auto nextFiberResultFromMpscQueueOrWorkStealing() -> std::optional<NextFiberResult>; + auto pushBottom(AbstractFiber& fiber) -> bool override { + bool res = queue.pushBottom(&fiber); + onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local}); + return res; + } + + auto popBottom() -> AbstractFiber* override { + AbstractFiber* fiber; + if (queue.popBottom(&fiber)) { + return fiber; + } + return nullptr; + } + public: AbstractWorkStealingScheduler(Runtime& runtime, RuntimeStrategy& strategy); diff --git a/iwyu-mappings.imp b/iwyu-mappings.imp index 5d43192e0c50f1728b394b9e9127d96b4c3c8be2..8f05861256e080461c391b4fa9c8cbec38350c03 100644 --- a/iwyu-mappings.imp +++ b/iwyu-mappings.imp @@ -7,6 +7,17 @@ { include: ["<ext/alloc_traits.h>", "private", "<memory>", "public" ] }, { include: ["<bits/types/struct_sched_param.h>", "private", "<sched.h>", "public" ] }, { include: ["<bits/types/struct_tm.h>", "private", "<ctime>", "public" ] }, + { include: ["<boost/detail/basic_pointerbuf.hpp>", "private", "<boost/program_options.hpp>", "public"], }, + { include: ["<boost/lexical_cast/bad_lexical_cast.hpp>", "private", "<boost/program_options.hpp>", "public"], }, + { include: ["<boost/program_options/detail/parsers.hpp>", "private", "<boost/program_options.hpp>", "public"], }, + { include: ["<boost/program_options/detail/value_semantic.hpp>", "private", "<boost/program_options.hpp>", "public"], }, + { include: ["<boost/program_options/errors.hpp>", "private", "<boost/program_options.hpp>", "public"], }, + { include: ["<boost/program_options/options_description.hpp>", "private", "<boost/program_options.hpp>", "public"], }, + { include: ["<boost/program_options/value_semantic.hpp>", "private", "<boost/program_options.hpp>", "public"], }, + { include: ["<boost/program_options/variables_map.hpp>", "private", "<boost/program_options.hpp>", "public"], }, + { include: ["<boost/program_options/parsers.hpp>", "private", "<boost/program_options.hpp>", "public"], }, + { include: ["<boost/program_options/positional_options.hpp>", "private", "<boost/program_options.hpp>", "public"], }, + { include: ["<boost/type_index/type_index_facade.hpp>", "private", "<boost/program_options.hpp>", "public"], }, { symbol: ["__kernel_timespec", "private", "<liburing.h>", "public" ] }, { symbol: ["std::filesystem", "private", "<filesystem>", "public" ] }, diff --git a/meson.build b/meson.build index 71cb6c4ae58071702591b1ecd66f1cb74ec2ed53..6be8efa14373c4f1db2c1d1e0f64623862e3f5ac 100644 --- a/meson.build +++ b/meson.build @@ -46,11 +46,19 @@ cpp_compiler = meson.get_compiler('cpp') if cpp_compiler.has_header('compare') conf_data.set('EMPER_HAS_COMPARE_H', true) endif +cpp_is_clang = cpp_compiler.get_id() == 'clang' +if cpp_is_clang + # Clang does not know "__attribute__((optimize(-no-omit-frame-pointer)))". + add_project_arguments('-Wno-unknown-attributes', language: 'cpp') +endif + +continuation_stealing_mode = get_option('continuation_stealing_mode') +locked_ws_queue = get_option('locked_ws_queue') conf_data.set('EMPER_WORKER_SLEEP', get_option('worker_sleep')) conf_data.set('EMPER_WORKER_WAKEUP_STRATEGY', get_option('worker_wakeup_strategy')) conf_data.set('EMPER_WORKER_IGNORE_WAKEUP_HINT', get_option('worker_ignore_wakeup_hint')) -conf_data.set('EMPER_LOCKED_WS_QUEUE', get_option('locked_ws_queue')) +conf_data.set('EMPER_LOCKED_WS_QUEUE', locked_ws_queue) conf_data.set('EMPER_LOCKED_MPSC_QUEUE', get_option('locked_mpsc_queue')) conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue')) conf_data.set('EMPER_STATS', get_option('stats')) @@ -58,6 +66,16 @@ conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue')) conf_data.set('EMPER_BLOCKED_CONTEXT_SET', get_option('blocked_context_set')) conf_data.set('EMPER_SET_AFFINITY_ON_BLOCK', get_option('set_affinity_on_block')) conf_data.set('EMPER_IO_COMPLETER_SCHED_PARAM', get_option('io_completer_sched_param')) +conf_data.set('EMPER_CONTINUATION_STEALING_MODE', continuation_stealing_mode) +conf_data.set('EMPER_CONTINUATION_STEALING_MADVISE_STACK', get_option('continuation_stealing_madvise_stack')) +conf_data.set('EMPER_BUILD_WITH_CLANG', cpp_is_clang) + +if continuation_stealing_mode == 'locked' and not locked_ws_queue + error('*Locked* continuation stealing only works with locked work-stealing queues (locked_ws_queue=true)') +endif +if continuation_stealing_mode == 'waitfree' + conf_data.set('EMPER_CONTINUATION_STEALING_MODE_WAITFREE', true) +endif semaphore_impl = get_option('wakeup_semaphore_implementation') conf_data.set('EMPER_' + semaphore_impl.to_upper() + '_WAKEUP_SEMAPHORE', true) diff --git a/meson_options.txt b/meson_options.txt index 1b54e3de90bf82ae6625eb53800075b6d65cab06..ecd18786746ca25c71d62e45d5cca882b5bbbaa5 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -217,3 +217,17 @@ option( description: 'Memory ordering used for the lockless CQ algorithm', value: 'weak', ) +option( + 'continuation_stealing_mode', + type: 'combo', + choices: ['disabled', 'locked', 'waitfree'], + description: 'Use the locked variant of continuation-stealing (instead of the wait-free one)', + value: 'waitfree', +) +option( + 'continuation_stealing_madvise_stack', + type: 'combo', + choices: ['no', 'dontneed', 'free'], + description: 'Madvise kind to unmap unused stack pages on continuation stealing', + value: 'no', +) diff --git a/tests/continuation-stealing/ContinuationStealingAndPrivateSemaphoreTest.cpp b/tests/continuation-stealing/ContinuationStealingAndPrivateSemaphoreTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5120d1a3fd869958b6ae2b8d6c6da565556e9b27 --- /dev/null +++ b/tests/continuation-stealing/ContinuationStealingAndPrivateSemaphoreTest.cpp @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Schmaus +#include <iostream> + +#include "BinaryPrivateSemaphore.hpp" +#include "Fibril.hpp" + +// NOLINTNEXTLINE(clang-diagnostic-unknown-attributes) +__attribute__((optimize("no-omit-frame-pointer"))) static void waitOnBps(BPS* bps) { bps->wait(); } + +// NOLINTNEXTLINE(clang-diagnostic-unknown-attributes) +__attribute__((optimize("no-omit-frame-pointer"))) static void forkFirstThenSignal() { + Fibril fibril; + BPS bps; + + fibril.fork(waitOnBps, &bps); + + bps.signal(); + + fibril.join(); +} + +// NOLINTNEXTLINE(clang-diagnostic-unknown-attributes) +__attribute__((optimize("no-omit-frame-pointer"))) static void signalFirstThenFork() { + Fibril fibril; + BPS bps; + + bps.signal(); + + fibril.fork(waitOnBps, &bps); + + fibril.join(); +} + +void emperTest() { + std::cout << "#### Fork first, then signal\n"; + forkFirstThenSignal(); + + std::cout << "#### Signal first, then fork\n"; + signalFirstThenFork(); + + std::cout << "#### Return from emperTest()\n"; +} diff --git a/tests/continuation-stealing/CppContinuationApiTest.cpp b/tests/continuation-stealing/CppContinuationApiTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9fac5ef2bdcf1d296e3b1291f7c07fa1f607857d --- /dev/null +++ b/tests/continuation-stealing/CppContinuationApiTest.cpp @@ -0,0 +1,26 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021-2022 Florian Schmaus +#include <atomic> +#include <cstdlib> + +#include "Fibril.hpp" + +static const unsigned int COUNT = 3; +static std::atomic_uint counter; + +static void increaseCounterByOne() { counter++; } + +// NOLINTNEXTLINE(clang-diagnostic-unknown-attributes) +__attribute__((optimize("no-omit-frame-pointer"))) void emperTest() { + Fibril fibril; + + for (unsigned int i = 0; i < COUNT; ++i) { + fibril.fork(increaseCounterByOne); + } + + fibril.join(); + + if (counter != COUNT) { + exit(EXIT_FAILURE); + } +} diff --git a/tests/continuation-stealing/FibFibrilTest.cpp b/tests/continuation-stealing/FibFibrilTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f394d898484addf7014a2316a017fed24eea54e1 --- /dev/null +++ b/tests/continuation-stealing/FibFibrilTest.cpp @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Schmaus +#include <cstdint> +#include <cstdlib> +#include <iostream> + +#include "Fibril.hpp" + +static auto fib_fast(uint64_t n) -> uint64_t { + if (n < 2) return n; + + uint64_t i = 2, x = 0, y = 0, z = 1; + + do { + x = y; + y = z; + z = x + y; + } while (i++ < n); + + return z; +} + +// NOLINTNEXTLINE(clang-diagnostic-unknown-attributes) +__attribute__((optimize("no-omit-frame-pointer"))) static auto fib(uint64_t n) -> uint64_t { + if (n < 2) return n; + + Fibril fibril; + uint64_t a, b; + + fibril.fork(&a, fib, n - 1); + + b = fib(n - 2); + + fibril.join(); + + return a + b; +} + +void emperTest() { + const uint64_t fibNum = 24; + + uint64_t res = fib(fibNum); + + uint64_t expected = fib_fast(fibNum); + std::cout << "fib(" << fibNum << ") = " << res << " [expected=" << expected << "]" << std::endl; + + if (res != expected) { + exit(EXIT_FAILURE); + } +} diff --git a/tests/continuation-stealing/meson.build b/tests/continuation-stealing/meson.build new file mode 100644 index 0000000000000000000000000000000000000000..01c78b09a81a02e9b78c64a83a4a4c66bbdeece0 --- /dev/null +++ b/tests/continuation-stealing/meson.build @@ -0,0 +1,25 @@ +tests += [ + { + 'source': files('CppContinuationApiTest.cpp'), + 'name': 'CppContinuationApiTest', + 'description': 'C++ continuation API (Fibril) test', + 'test_runner': 'continuation-stealing', + 'test_suite': 'continuation-stealing', + }, + + { + 'source': files('ContinuationStealingAndPrivateSemaphoreTest.cpp'), + 'name': 'ContinuationStealingAndPrivateSemaphoreTest', + 'description': 'Test Continuation Stealing in interaction with private semaphores', + 'test_runner': 'continuation-stealing', + 'test_suite': 'continuation-stealing', + }, + + { + 'source': files('FibFibrilTest.cpp'), + 'name': 'FibFibrilTest', + 'description': 'Compute the Fibonacci function using the Fibril API', + 'test_runner': 'continuation-stealing', + 'test_suite': 'continuation-stealing', + }, +] diff --git a/tests/fixtures/meson.build b/tests/fixtures/meson.build index 0ba7b33d466f2f088789a60bcb04263e958c9492..bb216b395c9dbad1f6833e42640ef3abc3295c1b 100644 --- a/tests/fixtures/meson.build +++ b/tests/fixtures/meson.build @@ -1,6 +1,8 @@ test_fixtures_include_dir = include_directories('.') -test_fixtures_sources = ['network.cpp'] +test_fixtures_sources = [ + 'network.cpp', +] test_fixtures_lib = library( 'test-fixtures', diff --git a/tests/meson.build b/tests/meson.build index eb4e9607f4c6ef43e5acb1024efd20f38d6fdc4a..ef6ecdfd93925e98647f6a9038586610acae695b 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -128,6 +128,9 @@ tests = [ }, ] + +subdir('continuation-stealing') + subdir('io') subdir('lib') diff --git a/tests/test-runner/continuation-stealing-test-runner.cpp b/tests/test-runner/continuation-stealing-test-runner.cpp new file mode 100644 index 0000000000000000000000000000000000000000..452c0815d256bcddef4fd44e9f54127aec6b1df5 --- /dev/null +++ b/tests/test-runner/continuation-stealing-test-runner.cpp @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Schmaus +#include <cstdlib> + +#include "Emper.hpp" +#include "emper-common.h" +#include "test-runner.hpp" + +auto main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) -> int { + if constexpr (!emper::CONTINUATION_STEALING || emper::BUILD_WITH_CLANG) { + exit(77); + } + + return testMain(); +} diff --git a/tests/test-runner/meson.build b/tests/test-runner/meson.build index a949890e9ae87fda8bc020fdb3b2aebe84dee393..fea84cfbaeef9f16ad17bc08b402acea953f37c3 100644 --- a/tests/test-runner/meson.build +++ b/tests/test-runner/meson.build @@ -7,6 +7,9 @@ avail_test_runners = { 'io': { 'sources': ['io-test-runner.cpp'], }, + 'continuation-stealing': { + 'sources': ['continuation-stealing-test-runner.cpp'], + }, } foreach runner, runner_dict : avail_test_runners