Commits (99)
......@@ -10,3 +10,11 @@ insert_final_newline = true
[*.{c,h,cpp,hpp}]
indent_style = tab
indent_size = 2
[*.yml]
indent_style = space
indent_size = 2
[meson_options.txt]
indent_style = space
indent_size = 2
image: "flowdalic/debian-testing-dev:1.15"
image: "flowdalic/debian-testing-dev:1.19"
before_script:
- ulimit -a
- nproc
- |
readarray TOOLS <<EOF
c++
......@@ -22,6 +23,10 @@ before_script:
$tool --version
done
cache:
paths:
- subprojects/packagecache
variables:
BUILDDTYPE: debugoptimized
CC: gcc
......@@ -52,6 +57,12 @@ fast-static-analysis:
variables:
EMPER_IO: "true"
.fast-variant-check:
stage: test
script: make fast-static-analysis smoke-test-suite
variables:
EMPER_IO: "true"
iwyu:
stage: smoke-test
script: IWYU_TOOL="${CI_PROJECT_DIR}/tools/iwyu_tool.py" make iwyu
......@@ -64,6 +75,11 @@ clang-tidy:
variables:
EMPER_IO: "true"
.build:
stage: test
script:
- make
.test:
extends:
- .meson-test
......@@ -81,6 +97,14 @@ clang-tidy:
CC: clang
CXX: clang++
.libc++:
extends:
- .clang
variables:
EMPER_USE_BUNDLED_DEPS: "always"
EMPER_CPP_ARGS: "-stdlib=libc++"
EMPER_CPP_LINK_ARGS: "-stdlib=libc++"
.emper-ws-scheduling:
variables:
EMPER_DEFAULT_SCHEDULING_STRATEGY: "work_stealing"
......@@ -101,6 +125,10 @@ clang-tidy:
variables:
EMPER_USERSPACE_RCU: 'true'
.emper-no-io:
variables:
EMPER_IO: 'false'
.emper-pipe-sleep-strategy:
variables:
EMPER_WORKER_SLEEP_STRATEGY: 'pipe'
......@@ -109,6 +137,18 @@ clang-tidy:
variables:
EMPER_IO_COMPLETER_BEHAVIOR: 'none'
.emper-single-poller:
variables:
EMPER_IO_URING_SQ_POLLER: 'one'
.emper-numa-poller:
variables:
EMPER_IO_URING_SQ_POLLER: 'numa'
.emper-each-poller:
variables:
EMPER_IO_URING_SQ_POLLER: 'each'
.emper-single-uring:
variables:
EMPER_IO_SINGLE_URING: 'true'
......@@ -179,10 +219,18 @@ clang-tidy:
variables:
EMPER_LOCKED_WS_QUEUE: "true"
.waitfree-ws:
variables:
EMPER_WAITFREE_WORK_STEALING: "true"
.futex-wakeup-semaphore:
variables:
EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "futex"
.futex2-wakeup-semaphore:
variables:
EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "futex2"
.locked-wakeup-semaphore:
variables:
EMPER_WAKEUP_SEMAPHORE_IMPLEMENTATION: "locked"
......@@ -221,6 +269,17 @@ test-clang-debug:
- test-clang
- .debug-build
smoke-test-libc++:
stage: smoke-test
extends:
- .fast-variant-check
- .libc++
test-libc++:
extends:
- .test
- .libc++
test-worker-no-sleep:
extends:
- .test
......@@ -267,10 +326,11 @@ test-worker-wakeup-strategy-all:
- .test
- .worker-wakeup-strategy-all
test-worker-wakeup-strategy-throttle:
extends:
- .test
- .emper-worker-wakeup-strategy-throttle
# Disable throttle test till throttle works with notifySpecific
#test-worker-wakeup-strategy-throttle:
# extends:
# - .test
# - .emper-worker-wakeup-strategy-throttle
test-do-not-log-timestamp:
extends:
......@@ -282,11 +342,22 @@ test-locked-ws-queues:
- .test
- .locked-ws-queues
test-waitfree-ws:
extends:
- .test
- .waitfree-ws
test-futex-wakeup-semaphore:
extends:
- .test
- .futex-wakeup-semaphore
# TODO: enable this if the CI has linux >= 5.16
build-futex-wakeup-semaphore:
extends:
- .build
- .futex2-wakeup-semaphore
test-locked-wakeup-semaphore:
extends:
- .test
......@@ -302,6 +373,11 @@ test-mmapped-log:
- .meson-test
script: make && EMPER_LOG_FILE=emper.log make test
test-no-io:
extends:
- .test
- .emper-no-io
test-single-uring:
extends:
- .test
......@@ -348,3 +424,44 @@ test-io-stealing-pipe-no-completer-lockless:
- .emper-no-completer
- .emper-io-stealing
- .emper-lockless-cq
smoke-test-locked-queue-rwlock:
extends:
- .fast-variant-check
variables:
EMPER_LOCKED_UNBOUNDED_QUEUE_IMPLEMENTATION: "rwlock"
smoke-test-locked-queue-shared-mutex:
extends:
- .fast-variant-check
variables:
EMPER_LOCKED_UNBOUNDED_QUEUE_IMPLEMENTATION: "shared_mutex"
smoke-test-locked-queue-boost-shared-mutex:
extends:
- .fast-variant-check
variables:
EMPER_LOCKED_UNBOUNDED_QUEUE_IMPLEMENTATION: "boost_shared_mutex"
smoke-test-locked-queue-boost-userspace-rcu:
extends:
- .fast-variant-check
variables:
EMPER_USERSPACE_RCU: "true"
# Only build the poller variants because sqpoll needs linux >= 5.15
# TODO: also test those variants if the CI uses linux >= 5.15
build-single-poller:
extends:
- .build
- .emper-single-poller
build-numa-poller:
extends:
- .build
- .emper-single-poller
build-each-poller:
extends:
- .build
- .emper-each-poller
......@@ -34,6 +34,15 @@ debug:
rm -f build
$(MAKE) build BUILDTYPE=$@
libc++:
rm -f build
$(MAKE) build \
CC=clang CXX=clang++ \
EMPER_CPP_ARGS="-stdlib=libc++" \
EMPER_CPP_LINK_ARGS="-stdlib=libc++" \
EMPER_USE_BUNDLED_DEPS="always" \
BUILDDIR="build-libc++"
.PHONY: fast-static-analysis
fast-static-analysis: all check-format check-license doc
......
......@@ -41,6 +41,7 @@ void notify() {
for (int conn : conns) emper::io::closeAndForget(conn);
}
// NOLINTNEXTLINE(bugprone-exception-escape)
auto main(int argc, char* argv[]) -> int {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <count>" << std::endl;
......@@ -55,7 +56,7 @@ auto main(int argc, char* argv[]) -> int {
<< " connections" << std::endl;
Runtime runtime;
auto* listener = emper::io::tcp_listener(HOST, PORT, [&](int socket) {
auto coordinator_func = [&](int socket) {
{
std::lock_guard<std::mutex> l(lock);
conns.push_back(socket);
......@@ -70,7 +71,10 @@ auto main(int argc, char* argv[]) -> int {
notify();
runtime.initiateTermination();
}
});
};
auto* listener = emper::io::tcp_listener(HOST, PORT, coordinator_func, scount,
{emper::io::SockOpt::ReusePort});
if (!listener) {
exit(EXIT_FAILURE);
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Fischer
#include <sys/socket.h>
#include <sys/types.h>
#include <atomic>
#include <cerrno>
......
......@@ -91,7 +91,7 @@ static void qsort(int* arr, size_t s) {
cps.wait();
}
static const size_t ARR_SIZE = 10 * 1000 * 1000;
static const size_t ARR_SIZE = 10L * 1000 * 1000;
auto main() -> int {
int* arr = new int[ARR_SIZE];
......
......@@ -69,9 +69,9 @@ class Blockable : public Logger<logSubsystem> {
// NOLINTNEXTLINE(performance-unnecessary-value-param)
void block(func_t freshContextHook) {
// Only contexts managed by EMPER's runtime can block.
assert(Runtime::inRuntime());
emper::assertInRuntime();
LOGD("block() blockedContext is " << Context::getCurrentContext());
LOGD("block() blockedContext=" << Context::getCurrentContext());
maybeSetAffinity();
......
......@@ -30,9 +30,10 @@ using func_t = std::function<void()>;
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
#define ALIGN_TO_CACHE_LINE alignas(64)
#define CACHE_LINE_EXCLUSIVE(T, symbol) \
std::aligned_storage<64, 64>::type __##symbol##_mem; \
#define CACHE_LINE_SIZE 64
#define ALIGN_TO_CACHE_LINE alignas(CACHE_LINE_SIZE)
#define CACHE_LINE_EXCLUSIVE(T, symbol) \
std::aligned_storage<CACHE_LINE_SIZE, CACHE_LINE_SIZE>::type __##symbol##_mem; \
T& symbol = *new (&__##symbol##_mem) T()
[[noreturn]] void die(const char* message, bool usePerror);
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus
// Copyright © 2020-2022 Florian Schmaus
#include "Context.hpp"
#include <ostream>
......@@ -22,3 +22,11 @@ auto operator<<(std::ostream& strm, const Context& context) -> std::ostream& {
<< "]";
return strm;
}
[[noreturn]] void Context::switchToOriginalStack() {
// Can't use new context hook to communicate the context to free.
lastContextBeforeReturningToOriginalStack = currentContext;
currentContext = nullptr;
DBG("Switchting to original stack " << &originalStack << " from " << currentContext);
switch_and_load_context(&originalStack);
}
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus
// Copyright © 2020-2022 Florian Schmaus
#pragma once
#include <cassert> // for assert
......@@ -147,12 +147,7 @@ class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> {
return lastContextBeforeReturningToOriginalStack;
}
[[noreturn]] static void switchToOriginalStack() {
// Can't use new context hook to communicate the context to free.
lastContextBeforeReturningToOriginalStack = currentContext;
currentContext = nullptr;
switch_and_load_context(&originalStack);
}
[[noreturn]] static void switchToOriginalStack();
/**
* Save the current context and switch to a new one. This method is
......
......@@ -134,9 +134,9 @@ class Logger {
case LogSubsystem::RUNTI:
return "RUNTI";
case LogSubsystem::SLEEP_S:
return "SLEEP_S";
return "SLEEP";
case LogSubsystem::WAKE_S:
return "WAKE_S";
return "WAKE ";
case LogSubsystem::U_B_MPSC_Q:
return "UBSCQ";
case LogSubsystem::IO:
......
......@@ -2,7 +2,7 @@
// Copyright © 2020 Florian Schmaus
#include "Dispatcher.hpp"
#include <pthread.h> // for pthread_yield
#include <sched.h>
#include "Emper.hpp"
#include "Runtime.hpp" // for Runtime
......@@ -17,6 +17,6 @@ void Dispatcher::dispatchLoopDoSleep() {
if constexpr (emper::WORKER_SLEEP) {
runtime.dispatchLoopSleep();
} else {
pthread_yield();
sched_yield();
}
}
......@@ -2,13 +2,23 @@
// Copyright © 2020-2021 Florian Schmaus
#pragma once
#include <cassert>
#include <cstddef>
#include <string>
#include "Worker.hpp"
#include "emper-config.h"
namespace emper {
struct Emper {
static auto inRuntime() -> bool { return Worker::isWorkerThread(); }
static void assertInRuntime() { assert(inRuntime()); }
};
constexpr auto assertInRuntime = Emper::assertInRuntime;
constexpr auto inRuntime = Emper::inRuntime;
static const size_t WS_VICTIM_COUNT = EMPER_WS_VICTIM_COUNT;
static const size_t WS_VICTIM_DENOMINATOR = EMPER_WS_VICTIM_DENOMINATOR;
......@@ -51,6 +61,14 @@ enum class WorkerWakeupStrategy {
static const enum WorkerWakeupStrategy WORKER_WAKEUP_STRATEGY =
WorkerWakeupStrategy::EMPER_WORKER_WAKEUP_STRATEGY;
static const bool WORKER_IGNORE_WAKEUP_HINT =
#ifdef EMPER_WORKER_IGNORE_WAKEUP_HINT
true
#else
false
#endif
;
static const bool LIBURCU =
#ifdef EMPER_LIBURCU
true
......@@ -59,6 +77,14 @@ static const bool LIBURCU =
#endif
;
static const bool WAITFREE_WORK_STEALING =
#ifdef EMPER_WAITFREE_WORK_STEALING
true
#else
false
#endif
;
static const bool DEBUG =
#ifndef NDEBUG
true
......@@ -151,14 +177,13 @@ static const bool IO_TRY_SYSCALL =
static const bool IO_WORKER_URING = IO && !IO_SINGLE_URING;
static const bool IO_URING_SQPOLL =
#ifdef EMPER_IO_URING_SQPOLL
static const bool WAITFREE_IO_STEALING =
#ifdef EMPER_IO_WAITFREE_STEALING
true
#else
false
#endif
;
// Initialize this bool in Emper.cpp because it needs code evaluation
// (LinuxVersion::compare) during runtime.
// Using a static variable here means EACH object file including this header has to
......@@ -169,13 +194,14 @@ static const bool IO_URING_SQPOLL =
// warnings during the comparison use not yet initialized components is reduced.
extern const bool IO_MUST_INVALIDATE_BROKEN_CHAIN;
static const bool IO_URING_SHARED_WQ =
#ifdef EMPER_IO_URING_SHARED_WQ
true
#else
false
#endif
;
enum class IoSqPoller {
off,
one,
each,
numa,
};
static const enum IoSqPoller IO_SQ_POLLER = IoSqPoller::EMPER_IO_SQ_POLLER;
enum class IoCompleterBehavior {
schedule,
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include "FiberHint.hpp"
#include "CallerEnvironment.hpp"
#include "Worker.hpp"
namespace emper {
template <CallerEnvironment callerEnvironment>
auto FiberHint::createNewWorkHint() -> FiberHint {
if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) {
return {emper::FiberSource::hintAq};
}
return {Worker::getCurrentWorkerId(), emper::FiberSource::hintWsq};
}
template auto FiberHint::createNewWorkHint<CallerEnvironment::EMPER>() -> FiberHint;
template auto FiberHint::createNewWorkHint<CallerEnvironment::ANYWHERE>() -> FiberHint;
} // namespace emper
auto operator<<(std::ostream& os, const emper::FiberHint& hint) -> std::ostream& {
return os << "FiberHint{" << hint.getWorker() << ", " << hint.getSource() << "}";
}
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#pragma once
#include <cstdint>
#include <iostream>
#include "CallerEnvironment.hpp"
#include "FiberSource.hpp"
#include "emper-common.h"
#include "lib/TaggedPtr.hpp"
namespace emper {
/**
* @brief A descriptor where to find a fiber
*
* A FiberHint consists of a FiberSource combined with a workerId.
* Valid FiberSources for now are: Fiber::Source::hint{Wsq, Aq}.
*
* This is used during work-stealing to find work faster than stealing
* round-robin from a random starting point.
* The sleep strategies also use FiberHints to decide who and how to notify
* on new work.
*/
class FiberHint {
emper::lib::TaggedPtr _tptr;
public:
FiberHint() : _tptr((nullptr)) {}
FiberHint(emper::FiberSource source) : _tptr(0, static_cast<uint16_t>(source)) {}
FiberHint(workerid_t workerId, emper::FiberSource source)
: _tptr(static_cast<uintptr_t>(workerId), static_cast<uint16_t>(source)) {}
FiberHint(const FiberHint& other) : _tptr(other._tptr){};
[[nodiscard]] auto getSource() const -> emper::FiberSource {
return static_cast<emper::FiberSource>(_tptr.getTag());
}
[[nodiscard]] auto getWorker() const -> workerid_t {
return static_cast<workerid_t>(_tptr.getRawPtrValue());
}
void clear() { _tptr = nullptr; }
template <CallerEnvironment callerEnvironment>
[[nodiscard]] static auto createNewWorkHint() -> FiberHint;
inline operator bool() const { return _tptr; }
inline operator void*() const { return _tptr; }
};
} // namespace emper
auto operator<<(std::ostream& os, const emper::FiberHint& hint) -> std::ostream&;
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include "FiberSource.hpp"
#include <iostream>
#include "Common.hpp"
auto operator<<(std::ostream& os, const emper::FiberSource& fiberSource) -> std::ostream& {
switch (fiberSource) {
case emper::FiberSource::local:
return os << "local";
case emper::FiberSource::mpscQueue:
return os << "mpscQueue";
case emper::FiberSource::stolen:
return os << "stolen";
case emper::FiberSource::io:
return os << "io";
case emper::FiberSource::ioStolen:
return os << "ioStolen";
case emper::FiberSource::anywhereQueue:
return os << "anywhereQueue";
case emper::FiberSource::hintWsq:
return os << "hintWsq";
case emper::FiberSource::hintAq:
return os << "hintAq";
default:
DIE_MSG("Unknown FiberSource");
}
}
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Schmaus, Florian Fischer
#pragma once
#include <cstdint>
#include <iostream>
namespace emper {
/**
* @brief Descriptor for all the possible locations a Fiber can be obtained from
*
* This enum is used to collect stats, create hints where fibers are and
* make informed notification or scheduling decisions.
*/
enum class FiberSource : uintptr_t {
local, /*!< A worker's own work-stealing queue */
mpscQueue, /*!< A worker's own mpsc queue (inbox / priority) */
stolen, /*!< A other worker's work-stealing queue */
io, /*!< A worker's own io_uring completion queue */
ioStolen, /*!< A other worker's io_uring completion queue */
anywhereQueue, /*!< The anywhere queue */
hintWsq, /*!< A known other worker's work-stealing queue */
hintAq, /*!< Straight from the anywhere queue */
};
} // namespace emper
auto operator<<(std::ostream& os, const emper::FiberSource& fiberSource) -> std::ostream&;
......@@ -4,9 +4,11 @@
#include <cstdint>
#include "FiberSource.hpp"
class Fiber;
struct NextFiberResult {
Fiber* const fiber;
const uintptr_t metadata;
const emper::FiberSource source;
};
......@@ -17,7 +17,7 @@ class PrivateSemaphore : protected Blockable<LogSubsystem::PS> {
// cppcheck-suppress uninitMemberVar
PrivateSemaphore(emper::BlockablePurpose blockablePurpose = emper::BlockablePurpose::GENERIC)
: Blockable(*Runtime::getRuntime(), blockablePurpose) {
LOGD("constructed by fiber " << Dispatcher::getCurrentFiber());
LOGD("constructed by " << Dispatcher::getCurrentFiber());
}
virtual ~PrivateSemaphore() = default;
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
// Copyright © 2020-2022 Florian Schmaus, Florian Fischer
#include "Runtime.hpp"
#include <numa.h>
#include <pthread.h> // for pthread_t, pthread_attr_init
#include <cerrno> // for errno
......@@ -24,6 +25,7 @@
#include "Debug.hpp" // for DBG, ABORT, LOGD, LOGE
#include "Emper.hpp"
#include "Fiber.hpp" // for Fiber
#include "FiberSource.hpp"
#include "NextFiberResult.hpp"
#include "RuntimeStrategy.hpp" // for RuntimeStrategy
#include "RuntimeStrategyFactory.hpp"
......@@ -34,10 +36,10 @@
#include "io/IoContext.hpp"
#include "io/Stats.hpp"
#include "lib/DebugUtil.hpp"
#include "lib/env.hpp"
#include "log/LogBuffer.hpp"
#include "stats/FromAnywhere.hpp"
#include "stats/Worker.hpp"
#include "strategies/AbstractWorkStealingScheduler.hpp"
#ifdef EMPER_DEFAULT_SCHEDULING_STRATEGY_WORK_STEALING
#include "strategies/ws/WsStrategyFactory.hpp"
......@@ -140,6 +142,40 @@ Runtime::Runtime(workerid_t workerCount, const std::vector<NewWorkerHook>& newWo
DIE_MSG("pinningOffset and not pinning workers are mutually exclusive");
}
// When sharing SQ poll threads the IoContext which the other attach to
// must be initialized first
if constexpr (emper::IO_SQ_POLLER == emper::IoSqPoller::numa) {
if (numa_available() < 0) {
DIE_MSG("numa support not available for numa based sq poller sharing");
}
struct bitmask* nodeCpus = numa_allocate_cpumask();
if (!nodeCpus) {
DIE_MSG_ERRNO("numa_allocate_cpumask failed");
}
for (unsigned node = 0; node < numa_all_nodes_ptr->size; ++node) {
if (!numa_bitmask_isbitset(numa_all_nodes_ptr, node)) {
continue;
}
int err = numa_node_to_cpus(static_cast<int>(node), nodeCpus);
if (err) {
DIE_MSG_ERRNO("numa_node_to_cpu failed");
}
for (unsigned cpu = 0; cpu < nodeCpus->size; ++cpu) {
if (!numa_bitmask_isbitset(nodeCpus, cpu)) {
continue;
}
workerid_t workerId = cpuToWorkerId(cpu);
ioContexts[workerId] = new IoContext(*this);
break;
}
}
}
for (workerid_t i = 0; i < workerCount; ++i) {
pthread_attr_t attr;
errno = pthread_attr_init(&attr);
......@@ -242,17 +278,13 @@ auto Runtime::workerLoop(Worker* worker) -> void* {
worker->setWorker();
if constexpr (emper::IO_WORKER_URING) {
auto* workerIo = new IoContext(*this);
if constexpr (emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) {
// submit the workers' CQ eventfds to the global IoContext
globalIo->registerWorkerIo(*workerIo);
if (!ioContexts[worker->workerId]) {
ioContexts[worker->workerId] = new IoContext(*this);
}
// notify the globalCompleter that we have registered our eventfd
ioReadySem.notify();
ioContexts[worker->workerId] = workerIo;
workerIo->setWorkerIo(worker);
// notify the globalCompleter that we have initialized our IoContext (registered our eventfd)
ioContexts[worker->workerId]->setWorkerIo(worker);
ioReadySem.notify();
}
LOGD("Worker loop started by thread " << gettid());
......@@ -292,9 +324,12 @@ auto Runtime::workerLoop(Worker* worker) -> void* {
}
auto Runtime::getDefaultWorkerCount() -> workerid_t {
auto workerCountEnv = emper::lib::env::getUnsignedFromEnv<workerid_t>("EMPER_WORKER_COUNT");
static const std::string workerCountEnvVarName = "EMPER_WORKER_COUNT";
auto workerCountEnv = emper::lib::env::getUnsignedFromEnv<workerid_t>(workerCountEnvVarName);
if (workerCountEnv) {
return workerCountEnv.value();
auto value = workerCountEnv.value();
DBG("Processed " << workerCountEnvVarName << "=" << value);
return value;
}
// The CPU count reported by sysconf(_SC_NPROCESSORS_ONLN), sysconf(_SC_NPROCESSORS_CONF)
......@@ -326,7 +361,7 @@ void Runtime::yield() {
});
}
auto Runtime::nextFiber() -> NextFiberResult {
auto Runtime::nextFiber() -> std::optional<NextFiberResult> {
if constexpr (emper::IO_WORKER_URING) {
// Schedule all fibers waiting on completed IO
IoContext::ContinuationBuffer completions;
......@@ -337,9 +372,7 @@ auto Runtime::nextFiber() -> NextFiberResult {
Fiber* next = completions[0];
schedule(&completions[1], ncompletions - 1);
// TODO: hint that this fiber comes from the IO subsystem
return NextFiberResult{
next, static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::local)};
return NextFiberResult{next, emper::FiberSource::io};
}
}
......@@ -436,7 +469,7 @@ void Runtime::printLastRuntimeStats(std::ostream& out) {
}
void Runtime::executeAndWait(std::function<void()> f) {
if (inRuntime()) {
if (emper::inRuntime()) {
ABORT("Ca not use executeAndWait() from within the Runtime");
}
......@@ -453,3 +486,27 @@ void Runtime::executeAndWait(std::function<void()> f) {
fiberFinished.lock();
}
auto Runtime::shouldPinWorkers() -> bool {
static const std::string pinWorkersEnvVarName = "EMPER_PIN_WORKERS";
auto pinWorkersEnv = emper::lib::env::getBoolFromEnv(pinWorkersEnvVarName);
if (pinWorkersEnv) {
auto value = pinWorkersEnv.value();
DBG("Processed " << pinWorkersEnvVarName << "=" << value);
return value;
}
return true;
}
auto Runtime::getDefaultPinningOffset() -> workerid_t {
static const std::string pinningOffsetEnvVarName = "EMPER_PINNING_OFFSET";
auto pinningOffsetEnv = emper::lib::env::getUnsignedFromEnv<workerid_t>(pinningOffsetEnvVarName);
if (pinningOffsetEnv) {
auto value = pinningOffsetEnv.value();
DBG("Processed " << pinningOffsetEnvVarName << "=" << value);
return value;
}
return 0;
}