Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • flow/emper
  • aj46ezos/emper
  • i4/manycore/emper
3 results
Show changes
Showing
with 742 additions and 152 deletions
fib_exe = executable(
'fib',
'Main.cpp',
dependencies: emper_dep,
)
worker_sleep_example_exe = executable(
'worker_sleep_example',
'WorkerSleepExample.cpp',
......@@ -16,6 +10,12 @@ echoserver_exe = executable(
dependencies: emper_dep,
)
netbarrier_exe = executable(
'coordinator',
'Coordinator.cpp',
dependencies: emper_dep,
)
echoserver_callback_exe = executable(
'echoserver_callback',
'EchoServerCallback.cpp',
......@@ -28,4 +28,64 @@ echoclient_exe = executable(
dependencies: emper_dep,
)
emper_info_exe = executable(
'emper-info',
'EmperInfo.cpp',
dependencies: emper_dep,
)
qsort = executable(
'qsort',
'qsort.cpp',
dependencies: emper_dep,
)
boost_program_options_dep = dependency('boost', modules: ['program_options'])
boost_program_options_code = '''
#include <boost/program_options.hpp>
int main(int argc, char* argv[]) {
boost::program_options::options_description desc("Allowed options");
}
'''
cpp_can_link_with_boost_program_options = cpp_compiler.links(
boost_program_options_code,
name: 'boost_progam_options',
dependencies: boost_program_options_dep,
)
boost_ublas_code = '''
#include <boost/numeric/ublas/matrix.hpp>
#include <boost/numeric/ublas/io.hpp>
int main (int argc, char* argv[]) {
using namespace boost::numeric::ublas;
matrix m (3, 3);
for (unsigned i = 0; i < m.size1 (); ++ i)
for (unsigned j = 0; j < m.size2 (); ++ j)
m (i, j) = 3 * i + j;
std::cout << m << std::endl;
}
'''
cpp_can_link_with_boost_ublas = cpp_compiler.links(
boost_ublas_code,
name: 'boost_ublas',
dependencies: boost_dep,
)
if (not automagic) or cpp_can_link_with_boost_program_options
fib_child_stealing_exe = executable(
'fib-child-stealing',
'FibChildStealing.cpp',
dependencies: [emper_dep, boost_program_options_dep],
)
if (not automagic) or cpp_can_link_with_boost_ublas
mat_mul_exe = executable(
'mat-mul',
'MatMul.cpp',
dependencies: [emper_dep, boost_dep, boost_program_options_dep],
)
endif
endif
subdir('fsearch')
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
/**
* qsort benchmark implementation similar to those used for this blog post:
* https://zig.news/kprotty/resource-efficient-thread-pools-with-zig-3291
* Comparables benchmark sources (rust, go, zig) can be found at:
* https://github.com/kprotty/zap/tree/blog/benchmarks
*/
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <iostream>
#include "CountingPrivateSemaphore.hpp"
#include "Fiber.hpp"
#include "Runtime.hpp"
#include "emper.hpp"
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
using std::chrono::milliseconds;
static void fill(int* arr, size_t s) {
for (int i = 0; static_cast<size_t>(i) < s; ++i) {
arr[i] = i;
}
}
static void swap(int* n, int* m) {
int tmp = *n;
*n = *m;
*m = tmp;
}
static void shuffle(int* arr, size_t s) {
uint32_t xs = 0xdeadbeef;
for (size_t i = 0; i < s; ++i) {
xs ^= xs << 13;
xs ^= xs >> 17;
xs ^= xs << 5;
size_t j = xs % (i + 1);
swap(&arr[i], &arr[j]);
}
}
static auto verify(const int* arr, size_t s) -> bool {
for (size_t i = 1; i < s; ++i) {
if (arr[i - 1] > arr[i]) {
return false;
}
}
return true;
}
static void insertion_sort(int* arr, size_t s) {
for (size_t i = 1; i < s; i++) {
size_t n = i;
while (n > 0 && arr[n] < arr[n - 1]) {
swap(&arr[n], &arr[n - 1]);
n -= 1;
}
}
}
static auto partition(int* arr, size_t s) -> size_t {
size_t pivot = s - 1;
size_t i = 0;
for (size_t j = 0; j < pivot; ++j) {
if (arr[j] <= arr[pivot]) {
swap(&arr[i], &arr[j]);
i += 1;
}
}
swap(&arr[i], &arr[pivot]);
return i;
}
static void qsort(int* arr, size_t s) {
if (s <= 32) {
insertion_sort(arr, s);
return;
}
size_t mid = partition(arr, s);
CPS cps;
spawn([&] { qsort(arr, mid); }, cps);
spawn([&] { qsort(&arr[mid], s - mid); }, cps);
cps.wait();
}
static const size_t ARR_SIZE = 10L * 1000 * 1000;
auto main() -> int {
int* arr = new int[ARR_SIZE];
std::cout << "filling" << std::endl;
fill(arr, ARR_SIZE);
std::cout << "shuffling" << std::endl;
shuffle(arr, ARR_SIZE);
Runtime runtime;
auto* sorter = Fiber::from([&]() {
const auto start = std::chrono::steady_clock::now();
qsort(arr, ARR_SIZE);
const auto end = std::chrono::steady_clock::now();
auto ms = duration_cast<milliseconds>(end - start);
std::cout << "sorting took " << ms.count() << "ms" << std::endl;
runtime.initiateTermination();
});
runtime.scheduleFromAnywhere(*sorter);
runtime.waitUntilFinished();
int exit_code = EXIT_SUCCESS;
if (!verify(arr, ARR_SIZE)) {
std::cerr << "Array is not sorted" << std::endl;
exit_code = EXIT_FAILURE;
}
delete[] arr;
return exit_code;
}
/compile_commands.json
......@@ -801,7 +801,7 @@ WARN_NO_PARAMDOC = YES
# a warning is encountered.
# The default value is: NO.
WARN_AS_ERROR = YES
WARN_AS_ERROR = NO
# The WARN_FORMAT tag determines the format of the warning messages that doxygen
# can produce. The string should contain the $file, $line, and $text tags, which
......
#!/usr/bin/env bash
# The directory of this script is also EMPER's root directory.
EMPER_ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
IMAGE=$(sed --regexp-extended --quiet 's;^image: "([^"]*)"$;\1;p' "${EMPER_ROOT}/.gitlab-ci.yml")
docker run \
--volume="${EMPER_ROOT}:${EMPER_ROOT}" \
--interactive \
--tty \
--env USER_ID="${UID}" \
--env GROUP_ID="$(id -g ${USER})" \
--security-opt=seccomp:unconfined \
--entrypoint "${EMPER_ROOT}/tools/docker-prepare" \
"${IMAGE}" \
"${EMPER_ROOT}" "$@"
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2022 Florian Schmaus
#include "AbstractFiber.hpp"
auto operator<<(std::ostream& strm, const AbstractFiber& fiber) -> std::ostream& {
fiber.printTo(strm, false);
return strm;
}
auto operator<<=(std::ostream& strm, const AbstractFiber& fiber) -> std::ostream& {
fiber.printTo(strm, true);
return strm;
}
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2022 Florian Schmaus
#pragma once
#include <ostream>
#include "emper-common.h"
class Fiber;
class Fibril;
class AbstractFiber {
friend class Dispatcher;
protected:
virtual ~AbstractFiber() = default;
virtual void run() = 0;
public:
virtual auto isRunnable() const -> bool { return true; }
virtual auto getAffinityBuffer() const -> workeraffinity_t* { return nullptr; }
virtual auto asFiberIfPossible() -> Fiber* { return nullptr; };
virtual auto asFibrilIfPossible() -> Fibril* { return nullptr; };
virtual void printTo(std::ostream& strm, bool withPtr = true) const = 0;
friend auto operator<<(std::ostream& strm, const AbstractFiber& fiber) -> std::ostream&;
friend auto operator<<=(std::ostream& strm, const AbstractFiber& fiber) -> std::ostream&;
};
......@@ -9,6 +9,7 @@
#include "Fiber.hpp"
#include "UnboundedBlockingMpscQueue.hpp"
#include "emper.hpp"
#include "io/Future.hpp"
template <typename T>
class Actor {
......@@ -19,8 +20,10 @@ class Actor {
Running,
};
protected:
Runtime& runtime;
private:
std::atomic<State> state = {Stopped};
UnboundedBlockingMpscQueue<T> queue;
......@@ -73,9 +76,15 @@ class Actor {
void startFromAnywhere() { start<CallerEnvironment::ANYWHERE>(); }
void tell(T t) { queue.put(t); }
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
void tell(T t) {
queue.template put<callerEnvironment>(t);
}
void tellFromAnywhere(T t) { queue.putFromAnywhere(t); }
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER, typename InputIt>
void tell(InputIt begin, InputIt end) {
queue.template put<callerEnvironment>(begin, end);
}
auto pendingMailboxItems() -> size_t { return queue.size(); }
......@@ -86,7 +95,7 @@ class Actor {
if constexpr (emper::IO) {
emper::sleep(1);
} else {
emper::yield();
runtime.yield();
}
// TODO: The suppressed linter error below may be a false positive
// reported by clang-tidy.
......
......@@ -3,6 +3,7 @@
#include "BinaryPrivateSemaphore.hpp"
#include <cassert> // for assert
#include <ostream>
#include "Common.hpp" // for unlikely
#include "Context.hpp" // for Context
......@@ -56,7 +57,7 @@ auto BinaryPrivateSemaphore::signalInternal() -> Context* {
// for the correctnes of the synchronization primitive.
bpsState.store(newState);
}
LOGDD("unblock in fast path");
LOGDD("unblock in fast path, as we found a blocked context " << blockedContext);
return blockedContext;
}
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus
// Copyright © 2020-2021 Florian Schmaus
#pragma once
#include <atomic> // for atomic
#include "BlockablePurpose.hpp"
#include "PrivateSemaphore.hpp" // for PrivateSemaphore
class Context;
namespace emper::io {
class Future;
}
class BinaryPrivateSemaphore final : public PrivateSemaphore {
private:
......@@ -28,17 +32,29 @@ class BinaryPrivateSemaphore final : public PrivateSemaphore {
// if the vcall is actually performed in a constructor.
Context* blockedContext = nullptr;
BinaryPrivateSemaphore(emper::BlockablePurpose blockablePurpose)
: PrivateSemaphore(blockablePurpose), bpsState(initial) {}
protected:
auto signalInternal() -> Context* override;
public:
BinaryPrivateSemaphore() : bpsState(initial) {}
BinaryPrivateSemaphore() : BinaryPrivateSemaphore(emper::BlockablePurpose::GENERIC) {}
~BinaryPrivateSemaphore() override = default;
void wait() override;
void reset() { bpsState = initial; };
// Workaround for doxygen choking on the following friend declaration with
// Parsing file emper/emper/BinaryPrivateSemaphore.hpp:54: error: no uniquely matching class
// member found for
// Future
// (warning treated as error, aborting now)
//! @cond Doxygen_Suppress
friend emper::io::Future;
//! @endcond
};
using BPS = BinaryPrivateSemaphore;
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus
// Copyright © 2020-2022 Florian Schmaus
#pragma once
#include <utility>
#include "BlockablePurpose.hpp"
#include "CallerEnvironment.hpp"
#include "Common.hpp"
#include "Context.hpp"
......@@ -12,6 +13,8 @@
#include "Emper.hpp"
#include "Runtime.hpp"
#include "lib/adt/LockedSet.hpp"
#include "stats/FromAnywhere.hpp"
#include "stats/Worker.hpp"
static emper::lib::adt::LockedSet<Context*> blockedContexts;
......@@ -22,6 +25,8 @@ class Blockable : public Logger<logSubsystem> {
ContextManager& contextManager;
const emper::BlockablePurpose blockablePurpose;
workeraffinity_t* affinity = nullptr;
// It would be OK to not initialize this member since it is only set
......@@ -30,23 +35,32 @@ class Blockable : public Logger<logSubsystem> {
// about clang-analyzer-optin.cplusplus.UninitializedObject.
workeraffinity_t affinity_buffer = Fiber::NOT_AFFINE;
Blockable(Runtime& runtime) : runtime(runtime), contextManager(runtime.getContextManager()) {}
Blockable(Runtime& runtime,
emper::BlockablePurpose blockablePurpose = emper::BlockablePurpose::GENERIC)
: runtime(runtime),
contextManager(runtime.getContextManager()),
blockablePurpose(blockablePurpose) {}
void maybeSetAffinity() {
if constexpr (!emper::SET_AFFINITY_ON_BLOCK) return;
if constexpr (!emper::SET_AFFINITY_ON_BLOCK && !emper::STATS_BLOCKED_CONTEXT) return;
// TODO: At some point we may want to have something like
// Runtime::isAffinityCapable() and return here if it is
// not. Although I am not sure if it is worth the overhead.
bool setAffinityBuffer = false;
auto* currentFiber = Context::getCurrentFiber();
auto* affinity = currentFiber->getAffinityBuffer();
if (affinity) {
this->affinity = affinity;
} else {
affinity_buffer = Runtime::getWorkerId();
setAffinityBuffer = true;
this->affinity = &affinity_buffer;
}
if (emper::STATS_BLOCKED_CONTEXT || setAffinityBuffer) {
affinity_buffer = Runtime::getWorkerId();
}
}
// Older clang-tidy versions show a
......@@ -55,14 +69,26 @@ class Blockable : public Logger<logSubsystem> {
// NOLINTNEXTLINE(performance-unnecessary-value-param)
void block(func_t freshContextHook) {
// Only contexts managed by EMPER's runtime can block.
assert(Runtime::inRuntime());
emper::assertInRuntime();
LOGD("block() blockedContext is " << Context::getCurrentContext());
LOGD("block() context " << Context::getCurrentContext() << " and fiber "
<< Context::getCurrentFiber());
maybeSetAffinity();
if constexpr (emper::STATS_BLOCKED_CONTEXT_COUNT) {
size_t blockedContextCount =
runtime.blockedContextCount.fetch_add(1, std::memory_order_relaxed) + 1;
emper::stats::worker->recordBlockedContextCount(blockedContextCount);
}
if constexpr (emper::BLOCKED_CONTEXT_SET) {
blockedContexts.insert(Context::getCurrentContext());
auto* currentContext = Context::getCurrentContext();
blockedContexts.insert(currentContext);
}
if constexpr (emper::STATS_BLOCKED_CONTEXT) {
emper::stats::worker->recordBlockedContext(blockablePurpose);
}
contextManager.saveAndStartNew(std::move(freshContextHook));
......@@ -70,10 +96,24 @@ class Blockable : public Logger<logSubsystem> {
auto unblockAndGetContinuation(Context* context) -> Fiber* {
assert(context != nullptr);
if constexpr (emper::STATS_BLOCKED_CONTEXT_COUNT) {
runtime.blockedContextCount.fetch_sub(1, std::memory_order_relaxed);
}
if constexpr (emper::BLOCKED_CONTEXT_SET) {
blockedContexts.erase(context);
}
if constexpr (emper::STATS_BLOCKED_CONTEXT) {
if (likely(emper::stats::worker)) {
emper::stats::worker->recordUnblockedContext(affinity_buffer, blockablePurpose);
} else {
// The unblock was triggered from outside the runtime system,
// e.g. the Completer calling signal() on a semaphore of a IO
// future.
runtime.fromAnywhereStats->recordUnblockedContext(affinity_buffer, blockablePurpose);
}
}
return Fiber::from([this, context]() { contextManager.discardAndResume(context); }, affinity);
}
......@@ -81,6 +121,8 @@ class Blockable : public Logger<logSubsystem> {
void unblock(Context* context) {
Fiber* unblockFiber = unblockAndGetContinuation(context);
LOGD("unblock() context " << context << " via unblock fiber " << unblockFiber);
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
runtime.schedule(*unblockFiber);
} else {
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Schmaus
#pragma once
namespace emper {
enum class BlockablePurpose {
GENERIC,
IO,
};
}
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include "CallerEnvironment.hpp"
#include <iostream>
#include "Common.hpp"
auto operator<<(std::ostream& os, const CallerEnvironment& callerEnvironment) -> std::ostream& {
switch (callerEnvironment) {
case OWNER:
return os << "OWNER";
case EMPER:
return os << "EMPER";
case ANYWHERE:
return os << "ANYWHERE";
default:
DIE_MSG("Unknown CallerEnvironment");
}
}
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <iostream>
/*! Enum representing the different environments where code can be executed */
enum CallerEnvironment {
EMPER,
ANYWHERE,
OWNER, /*!< indicate code executed by the worker owning the object */
EMPER, /*!< indicate code executed by any worker */
ANYWHERE, /*!< indicate code executed outside of any worker */
};
auto operator<<(std::ostream& os, const CallerEnvironment& callerEnvironment) -> std::ostream&;
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus
// Copyright © 2020-2024 Florian Schmaus
#include "Common.hpp"
#include <cstdio>
#include <cstdlib>
#include <iostream>
#include <string>
#include "log/log.hpp"
void die(const char* message, bool usePerror) {
if (usePerror) {
// TODO: Include errno value and short description (e.g., ENOSYS).
std::perror(message);
} else {
std::cerr << message << std::endl;
emper::log::log("!!!!! PANIC", message);
}
exit(EXIT_FAILURE);
abort();
}
......@@ -2,8 +2,11 @@
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <errno.h>
#include <functional>
#include <sstream> // IWYU pragma: keep
#include <sstream> // IWYU pragma: keep
#include <type_traits> // IWYU pragma: keep
#include "emper-config.h" // IWYU pragma: keep
......@@ -20,16 +23,21 @@ using func_t = std::function<void()>;
#define DIE_MSG(x) do { std::stringstream sst; sst << __FILE__ ":" TOSTRING(__LINE__) " " << x; die(sst.str().c_str(), false); } while (false)
// NOLINTNEXTLINE(bugprone-macro-parentheses)
#define DIE_MSG_ERRNO(x) do { std::stringstream sst; sst << __FILE__ ":" TOSTRING(__LINE__) " " << x; die(sst.str().c_str(), true); } while (false)
#define DIE_MSG_ERRNO_IS(e, x) do { errno = -e; DIE_MSG_ERRNO(x); } while (false)
// clang-format on
// We compile with -fno-exceptions for the moment.
//#define THROW(x) do { throw std::runtime_error(x); } while (false)
// #define THROW(x) do { throw std::runtime_error(x); } while (false)
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
#define ALIGN_TO_CACHE_LINE alignas(64)
#define CACHE_LINE_SIZE EMPER_ASSUME_CACHE_LINE_SIZE
#define ALIGN_TO_CACHE_LINE alignas(CACHE_LINE_SIZE)
#define CACHE_LINE_EXCLUSIVE(T, symbol) \
std::aligned_storage<CACHE_LINE_SIZE, CACHE_LINE_SIZE>::type __##symbol##_mem; \
T& symbol = *new (&__##symbol##_mem) T()
[[noreturn]] void die(const char* message, bool usePerror);
......@@ -37,6 +45,7 @@ using WORD = unsigned int;
#ifdef EMPER_STATS
#include <chrono> // IWYU pragma: keep
#define TIME_NS(code, record_func) \
auto _time_ns_start = std::chrono::high_resolution_clock::now(); \
code; \
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus
// Copyright © 2020-2022 Florian Schmaus
#include "Context.hpp"
#include <sys/mman.h>
#include <cerrno>
#include <cstdint>
#include <ostream>
#include "stats/Worker.hpp"
thread_local Context* Context::currentContext;
thread_local void* Context::originalStack;
auto Context::calcTos(char* context) -> void* {
auto contextBase = (uintptr_t)context;
uintptr_t contextTop = contextBase + CONTEXT_SIZE;
// Align the Top-of-Stack (tos) to 16 byte.
uintptr_t tos = contextTop & ~0xf;
uintptr_t tosOffset = tos - contextBase;
return context + tosOffset;
}
auto Context::calcTos() -> void* { return calcTos(context); }
auto Context::calcGuardPage(char* context) -> void* {
if constexpr (!emper::STACK_GUARD_PAGE) return nullptr;
auto contextBase = (uintptr_t)context;
uintptr_t guardPageBase = em ::roundUp(contextBase, PAGE_SIZE);
uintptr_t guardPageOffset = guardPageBase - contextBase;
return context + guardPageOffset;
}
auto Context::calcGuardPage() -> void* { return calcGuardPage(context); }
auto Context::inGuardPage(void* mem) -> bool {
if constexpr (!emper::STACK_GUARD_PAGE) return false;
const auto _guardPage = (uintptr_t)this->guardPage;
const auto _mem = (uintptr_t)mem;
return _mem >= _guardPage && _mem < _guardPage + PAGE_SIZE;
}
auto Context::inCurrentGuardPage(void* mem) -> bool {
if (!currentContext) return false;
return currentContext->inGuardPage(mem);
}
auto Context::calcBos(char* context) -> void* {
auto contextBase = (uintptr_t)context;
uintptr_t bosOffset = 0;
if constexpr (emper::STACK_GUARD_PAGE) {
uintptr_t guardPageBase = em ::roundUp(contextBase, PAGE_SIZE);
uintptr_t guardPageTop = guardPageBase + PAGE_SIZE;
bosOffset = guardPageTop - contextBase;
}
return context + bosOffset;
}
auto Context::calcBos() -> void* { return calcBos(context); }
// cppcheck-suppress noExplicitConstructor selfInitialization
Context::Context(func_t mainFunction)
: tos(calcTos()),
guardPage(calcGuardPage()),
bos(calcBos()),
usableStackSize((char*)tos - (char*)bos),
mainFunction(std::move(std::move(mainFunction))) {
// valgrindStackId = VALGRIND_STACK_REGISTER(context, context + CONTEXT_SI);
if constexpr (emper::STACK_GUARD_PAGE) {
int res = mprotect(guardPage, PAGE_SIZE, PROT_NONE);
if (res) DIE_MSG_ERRNO("Installaing guard page at " << guardPage << " failed");
}
maybeMarkStack();
setEmptyHook();
// TOS is 16-byte aligned, however we need the initial
// savedStackpointer to be 8-byte aligned because will later jmp to
// it. We also save a pointer to very first function this context is
// going to execute a this initial stackpointer value.
alphaFunctionIpLocation = savedStackpointer = (uintptr_t*)tos - 1;
void** alphaSavedIp = reinterpret_cast<void**>(savedStackpointer);
void (*f)() = &kickoff;
assert(f != nullptr);
*(alphaSavedIp) = reinterpret_cast<void*>(f);
}
Context::~Context() {
// VALGRIND_STACK_DEREGISTER(valgrindStackId);
if constexpr (emper::STACK_GUARD_PAGE) {
int res = mprotect(guardPage, PAGE_SIZE, PROT_READ | PROT_WRITE | PROT_EXEC);
if (res) DIE_MSG_ERRNO("Uninstallaing guard page at " << guardPage << " failed");
}
}
void Context::printTo(std::ostream& strm, bool withPtr) const {
// clang-format off
strm << "Context ";
if (withPtr) {
strm << "(" << this << ") ";
}
thread_local Context* Context::lastContextBeforeReturningToOriginalStack;
strm << "[fiber=" << currentFiber
<< ", tos=" << tos;
strm << ", bos=" << &context
<< ", saved-sp=" << savedStackpointer
<< ", *saved-sp=" << *static_cast<void**>(savedStackpointer);
if (savedStackpointer != alphaFunctionIpLocation) {
strm << ", alpha-fun-loc=" << alphaFunctionIpLocation
<< ", *alpha-func-loc=" << *static_cast<const void* const*>(alphaFunctionIpLocation);
}
strm << "]";
// clang-format off
}
auto operator<<(std::ostream& strm, const Context& context) -> std::ostream& {
strm << "Context " << &context << " [tos: " << context.tos << " bos: "
<< &context.context
/*
<< " alphaFunLoc=" << context.alphaFunctionIpLocation
<< " alphaFunValue=" << *((uintptr_t*) context.alphaFunctionIpLocation)
<< " main=" << context.mainFunction.target<void(void)>()
<< " hook=" << context.startAndResumeHook.target<void(void)>()
*/
<< "]";
context.printTo(strm, false);
return strm;
}
auto operator<<=(std::ostream& strm, const Context& context) -> std::ostream& {
context.printTo(strm, true);
return strm;
}
void Context::unmap(void* sp) const {
constexpr size_t PAGE_SIZE_MASK = PAGE_SIZE - 1;
const auto spBase = (uintptr_t)sp;
const uintptr_t spBasePage = spBase & ~PAGE_SIZE_MASK;
const auto bosBase = (uintptr_t)bos;
const uintptr_t bosBasePage = (bosBase + PAGE_SIZE_MASK) & ~PAGE_SIZE_MASK;
const size_t length = spBasePage - bosBasePage;
const uintptr_t bosBasePageOffset = bosBasePage - bosBase;
auto* const addr = (char*)bos + bosBasePageOffset;
const int advice = []{
if constexpr (emper::CONTINUATION_STEALING_MADVISE_STACK == emper::ContinuationStealingMadviseStack::dontneed) {
return MADV_DONTNEED;
} else {
return MADV_FREE;
}
}();
LOGD("madvise() addr=" << addr << ", length=" << length << ", advice=" << advice);
errno = madvise(addr, length, advice);
if (errno) {
DIE_MSG_ERRNO("Unmapping unused stack space failed");
}
}
void Context::maybeMarkStack() {
if constexpr (!emper::DEBUG && !emper::STATS_STACK_USAGE_ENABLED) return;
// Write the stack full of 0xcc bytes, which just happen to be
// the 'int3' instruction, which will trigger a breakpoint
// when executed.
void* res = memset(bos, 0xcc, usableStackSize);
if (!res) DIE;
// Mark the last valid 16 bytes just below the top of stack.
res = memset(((uintptr_t*)tos) - 1, 0xab, sizeof(uintptr_t));
if (!res) DIE;
// Mark the eventually existing unused top stack bytes
res = memset(tos, 0xee, &context[CONTEXT_SIZE] - (char*)tos);
if (!res) DIE;
}
void Context::recordStackUsage() {
constexpr uintptr_t stackMarker = 0xcccccccccccccccc;
size_t bosOffset;
for (bosOffset = 0; bosOffset < usableStackSize; bosOffset += sizeof(stackMarker)) {
auto* stackWord = (uintptr_t*)((char*)bos + bosOffset);
if (*stackWord != stackMarker) break;
}
size_t stackUsage = usableStackSize - bosOffset;
emper::stats::worker->recordStackUsage(stackUsage);
}
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus
// Copyright © 2020-2022 Florian Schmaus
#pragma once
#include <cassert> // for assert
#include <cstdint> // for uintptr_t
#include <cstring> // for memset
#include <functional> // for function
#include <iosfwd> // for ostream
#include <cassert> // for assert
#include <cstring> // for memset
#include <functional> // for function
#include <iosfwd> // for ostream
#include <ostream>
#include <type_traits> // for remove_reference<>::type // IWYU pragma: keep
#include <utility>
#include "Common.hpp" // for func_t, DIE, ALIGN_TO_CACHE_LINE
#include "Debug.hpp" // for LOGD, LogSubsystem, LogSubsystem::C, Logger
#include "Emper.hpp" // for Emper::DEBUG
#include "emper-config.h"
#include "lib/math.hpp"
class AbstractFiber;
class ContextManager;
class Dispatcher;
class Fiber;
class Fibril;
namespace em = emper::lib::math;
extern "C" [[noreturn]] void switch_and_load_context(void** toTos);
// *Not* marked as 'noreturn' because save_and_switch_context does
......@@ -25,19 +30,24 @@ extern "C" [[noreturn]] void switch_and_load_context(void** toTos);
extern "C" void save_and_switch_context(void** toTos, void** fromTos);
extern "C" [[noreturn]] void switch_context(void** toTos);
class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> {
private:
static const unsigned int CONTEXT_SIZE = 0xffff; // 1024 * 1024 * 4;
class EMPER_CONTEXT_ALIGNAS Context : Logger<LogSubsystem::C> {
public:
static constexpr size_t PAGE_SIZE = emper::ASSUME_PAGE_SIZE;
static constexpr size_t CONTEXT_SIZE = em::roundUp(emper::MIN_CONTEXT_STACK_SIZE, PAGE_SIZE);
private:
static thread_local Context* currentContext;
static thread_local void* originalStack;
AbstractFiber* currentFiber = nullptr;
void* const tos;
static thread_local Context* lastContextBeforeReturningToOriginalStack;
void* const guardPage;
Fiber* currentFiber = nullptr;
void* const bos;
void* const tos;
const size_t usableStackSize;
// unsigned valgrindStackId;
void* savedStackpointer;
......@@ -48,16 +58,17 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> {
func_t startAndResumeHook;
// Allign the stack to a cache line, so that it isn't shared with
// the other members of this class.
// NOLINTNEXTLINE(modernize-avoid-c-arrays)
ALIGN_TO_CACHE_LINE char context[CONTEXT_SIZE];
char context[CONTEXT_SIZE];
void printTo(std::ostream& strm, bool withPtr = true) const;
friend auto operator<<(std::ostream& strm, const Context& context) -> std::ostream&;
friend auto operator<<=(std::ostream& strm, const Context& context) -> std::ostream&;
friend ContextManager;
auto getFiber() -> Fiber* { return currentFiber; }
auto getFiber() -> AbstractFiber* { return currentFiber; }
/**
* The first function that a newly started context will
......@@ -72,57 +83,48 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> {
friend Dispatcher;
static void setCurrentFiber(Fiber* fiber) {
assert(currentContext);
// Allow Fibril to access Context::currentContext.
friend Fibril;
static void setCurrentFiber(AbstractFiber* fiber) {
Context* context = getCurrentContext();
assert(context);
currentContext->currentFiber = fiber;
context->currentFiber = fiber;
DBG("Set context " << currentContext << " fiber to " << fiber);
}
public:
// cppcheck-suppress noExplicitConstructor selfInitialization
Context(func_t mainFunction)
// Align the Top-of-Stack (tos) to 16 byte.
: tos((void*)((uintptr_t)(context + CONTEXT_SIZE) & (~0xf))),
mainFunction(std::move(std::move(mainFunction))) {
// valgrindStackId = VALGRIND_STACK_REGISTER(context, context + CONTEXT_SI);
if constexpr (emper::DEBUG) {
// Write the stack full of 0xcc bytes, which just happen to be
// the 'int3' instruction, which will trigger a breakpoint
// when executed.
void* res = memset(context, 0xcc, (char*)tos - context);
if (!res) DIE;
// Mark the last valid 16 bytes just below the top of stack.
res = memset(((uintptr_t*)tos) - 1, 0xab, sizeof(uintptr_t));
if (!res) DIE;
// Mark the eventually existing unused top stack bytes
res = memset(tos, 0xee, &context[CONTEXT_SIZE] - (char*)tos);
if (!res) DIE;
}
void unmap(void* sp) const;
setEmptyHook();
void maybeMarkStack();
// We write the real kickoff function's address at
// TOS - 2 so it's called after this context has been actived.
// This also ensures that the stack is correctly aligned for
// SSE operations, which require the stack to be 8-byte, but
// not 16-byte aligned.
alphaFunctionIpLocation = savedStackpointer = (uintptr_t*)tos - 2;
void** alphaSavedIp = reinterpret_cast<void**>(savedStackpointer);
void (*f)() = &kickoff;
assert(f != nullptr);
*(alphaSavedIp) = reinterpret_cast<void*>(f);
}
void recordStackUsage();
// NOLINTNEXTLINE(modernize-use-equals-default)
~Context() {
// VALGRIND_STACK_DEREGISTER(valgrindStackId);
static auto calcTos(char* context) -> void*;
auto calcTos() -> void*;
static auto calcGuardPage(char* context) -> void*;
auto calcGuardPage() -> void*;
static auto calcBos(char* context) -> void*;
auto calcBos() -> void*;
/*
* Used by ContextManager to re-initialize a cached unused context.
*/
inline void reinit() {
currentFiber = nullptr;
savedStackpointer = const_cast<void*>(alphaFunctionIpLocation);
setEmptyHook();
}
static auto getCurrentFiber() -> Fiber* {
assert(currentContext);
public:
Context(func_t mainFunction);
~Context();
static auto getCurrentFiber() -> AbstractFiber* {
assert(currentContext);
return currentContext->currentFiber;
}
......@@ -135,22 +137,13 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> {
[[nodiscard]] inline auto getTos() const -> const void* { return tos; }
/**
* Start this context. This method is called from an unmanaged stack which is saved.
* Start this context. This method is called from an unmanaged stack.
*/
inline auto start() -> Context* {
LOGD("starting");
inline void start() {
LOGD("starting " << *this);
currentContext = this;
save_and_switch_context(&savedStackpointer, &originalStack);
return lastContextBeforeReturningToOriginalStack;
}
[[noreturn]] static void switchToOriginalStack() {
// Can't use new context hook to communicate the context to free.
lastContextBeforeReturningToOriginalStack = currentContext;
currentContext = nullptr;
switch_and_load_context(&originalStack);
switch_context(&savedStackpointer);
}
/**
......@@ -160,7 +153,7 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> {
* 'newContext' is a free context.
*/
inline void saveAndStart(Context* newContext) {
LOGD("saving and switching to " << *newContext);
LOGD("saving and switching to " <<= *newContext);
currentContext = newContext;
// Save the current context and start a new one.
......@@ -191,5 +184,19 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> {
switch_and_load_context(&(context->savedStackpointer));
}
inline static auto getCurrentContext() -> Context* { return currentContext; }
// Disallow this function from being inlined and add a compiler
// barrier in form of an empty "asm volatile" statement to prevent
// the compiler from optimizing the TLS access. We ran into this
// with clang-release (clang 18) builds and SimpleFibTest, where the
// compiler would cache the access of the thread local variable on
// the stack accross the dispatch loop, especially accross
// fiber->run() which potentially switches the stack.
// See also https://stackoverflow.com/q/75592038/194894
[[gnu::noinline]] static auto getCurrentContext() -> Context* {
asm volatile("");
return currentContext;
}
static auto inCurrentGuardPage(void* mem) -> bool;
auto inGuardPage(void* mem) -> bool;
};
......@@ -58,6 +58,5 @@ switch_context:
; which must point to a word which represent the memory address
; where we want to continue.
mov rsp, [rdi]
; Pop the memory address where to continue from the
; stack. Which means: Continue at [rsp].
ret
; Jump to the address rsp currently points at.
jmp [rsp]
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus
// Copyright © 2020-2022 Florian Schmaus
#include "ContextManager.hpp"
#include <cassert> // for assert
#include <new>
#include <ostream>
#include <utility>
#include "Context.hpp" // for Context
#include "Debug.hpp" // for LOGD
#include "Dispatcher.hpp" // for Dispatcher
#include "Runtime.hpp" // for Runtime
#include "emper-common.h"
#include "Context.hpp"
#include "Continuation.hpp"
#include "Debug.hpp"
#include "Dispatcher.hpp"
#include "Emper.hpp"
#include "Fibril.hpp"
#include "Runtime.hpp"
#include "emper-config.h" // // IWYU pragma: keep
class Fiber;
class AbstractFiber;
ContextManager::ContextManager(Runtime& runtime) : MemoryManager(runtime), runtime(runtime) {
auto newWorkerHook = [this](ATTR_UNUSED workerid_t workerId) {
for (unsigned int i = 0; i < CONTEXT_MANAGER_FIRST_LAYER_QUEUE_SIZE * 2; ++i) {
auto* context = new Context(this->runtime.dispatcher.getDispatchLoop());
putFreeContext(context);
}
};
// Note that it is important that this hook is executed *after*
// the one of the MemoryManager superclass.
runtime.addNewWorkerHook(newWorkerHook);
}
ContextManager::ContextManager(Runtime& runtime) : MemoryManager(runtime), runtime(runtime) {}
auto ContextManager::getFreeContext() -> Context* {
#ifdef CM_WITH_MEMORY_MANAGER
if constexpr (!emper::CONTEXT_MANAGER_WITH_MEMORY_MANAGER) {
return new Context(this->runtime.dispatcher.getDispatchLoop());
}
bool malloced;
void* memory = getMemory(&malloced);
if (malloced) {
return new (memory) Context(this->runtime.dispatcher.getDispatchLoop());
}
return static_cast<Context*>(memory);
#else
return new Context(this->runtime.dispatcher.getDispatchLoop());
#endif
auto* context = static_cast<Context*>(memory);
context->reinit();
if constexpr (emper::STATS_STACK_USAGE == emper::StatsStackUsage::accurate) {
// If we want accurate stack-usage stats, then we need to re-mark
// the stack if we got the memory from the memory manager but if
// it wasn't malloced.
context->maybeMarkStack();
}
return context;
}
void ContextManager::putFreeContext(Context* context) {
#ifdef CM_WITH_MEMORY_MANAGER
putMemory(context);
#else
delete context;
#endif
if constexpr (emper::STATS_STACK_USAGE_ENABLED) {
context->recordStackUsage();
}
if constexpr (emper::CONTEXT_MANAGER_WITH_MEMORY_MANAGER) {
putMemory(context);
} else {
delete context;
}
}
thread_local static Continuation* continuation;
void ContextManager::start() {
Context* freeContext = getFreeContext();
freeContext = freeContext->start();
putFreeContext(freeContext);
Continuation originalStackContinuation;
continuation = &originalStackContinuation;
uintptr_t fibrilResumeValue = originalStackContinuation.setJmp();
// We perform a longjmp with fibrilResumeValue == 0 to switch back
// to the original stack and terminate the runtime.
if (unlikely(!fibrilResumeValue && runtime.isTerminating())) {
Context* currentContext = Context::currentContext;
if (currentContext) [[likely]] {
Context::currentContext = nullptr;
putFreeContext(currentContext);
}
return;
}
Context* freeContext = nullptr;
if constexpr (emper::CONTINUATION_STEALING) {
// TODO: This LOGD causes segfaults with gcc-14, intestigate why.
// LOGD("Fibril::tryResumeFibril(" << fibrilResumeValue << ")");
Fibril::tryResumeFibril(fibrilResumeValue);
// Resuming the fibril was not (yet) possible. There may be a
// current context, even though we are currently here on the
// initial (pthread) context. Lookup the current context and
// reinitilize it so it can be re-used to start() it and continue
// in the dispatch loop.
freeContext = Context::getCurrentContext();
}
if (freeContext) {
freeContext->reinit();
} else {
freeContext = getFreeContext();
}
freeContext->start();
}
[[noreturn]] void ContextManager::switchToOriginalStack() {
DBG("Switching to original stack from " << Context::currentContext);
continuation->longJmp(0);
}
void ContextManager::tryResumeFibril(uintptr_t fibrilResumeValue) {
continuation->longJmp(fibrilResumeValue);
}
/**
......@@ -87,7 +141,9 @@ void ContextManager::discardAndResume(Context* context) {
// Since we are going to discard this context, it will never reach
// the end of its dispatch loop, and hence we need to ensure that
// its fiber is recycled.
Fiber* currentFiber = contextToFree->getFiber();
AbstractFiber* currentFiber = contextToFree->getFiber();
assert(currentFiber);
LOGD("Recycling fiber " << currentFiber << " of discard context " << contextToFree);
runtime.dispatcher.recycle(currentFiber);
contextToFree->discardAndResume(context);
......