Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • flow/emper
  • aj46ezos/emper
  • i4/manycore/emper
3 results
Show changes
Commits on Source (12)
Showing
with 859 additions and 443 deletions
......@@ -153,6 +153,22 @@ const bool IO =
#endif
;
const bool IO_NOTIFICATION =
#ifdef EMPER_IO_NOTIFICATION
true
#else
false
#endif
;
const bool IO_PUNT_WAITFD_READ =
#ifdef EMPER_IO_PUNT_WAITFD_READ
true
#else
false
#endif
;
const bool IO_STEALING =
#ifdef EMPER_IO_STEALING
true
......
......@@ -20,6 +20,7 @@ enum class FiberSource : uintptr_t {
ioStolen, /*!< A other worker's io_uring completion queue */
anywhereQueue, /*!< The anywhere queue */
hintWsq, /*!< A known other worker's work-stealing queue */
hintIo, /*!< A known worker's io_uring completion queue */
hintAq, /*!< Straight from the anywhere queue */
};
......
......@@ -53,8 +53,10 @@ namespace log {
class LogBuffer;
}
namespace sleep_strategy {
class AbstractIoSleepStrategy;
class PipeSleepStrategy;
}
class WaitfdSleepStrategy;
} // namespace sleep_strategy
namespace stats {
class Worker;
class FromAnywhere;
......@@ -262,8 +264,10 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
template <typename>
friend class WorkerLocalData;
friend class Fibril;
friend class emper::sleep_strategy::AbstractIoSleepStrategy;
friend class emper::sleep_strategy::PipeSleepStrategy;
friend class emper::lib::ShardedFileBuffer;
friend class emper::sleep_strategy::WaitfdSleepStrategy;
friend void emper::log::log(const std::string& prefix, const std::string& message);
};
......@@ -11,9 +11,8 @@
class Runtime;
namespace emper::sleep_strategy {
class PipeSleepStrategy;
class AbstractIoSleepStrategy;
}
using emper::sleep_strategy::PipeSleepStrategy;
class Worker {
private:
......@@ -39,7 +38,7 @@ class Worker {
friend Runtime;
friend class AbstractWorkStealingScheduler;
friend class PipeSleepStrategy;
friend class emper::sleep_strategy::AbstractIoSleepStrategy;
public:
[[nodiscard]] auto getWorkerId() const -> workerid_t { return workerId; }
......
......@@ -33,11 +33,15 @@
#include "io/Stats.hpp"
#include "io/SubmitActor.hpp" // IWYU pragma: keep
#include "lib/TaggedPtr.hpp"
#include "lib/adt/LockedSet.hpp" // for LockedSet
#include "sleep_strategy/PipeSleepStrategy.hpp" // IWYU pragma: keep
#include "lib/adt/LockedSet.hpp"
#include "sleep_strategy/AbstractIoSleepStrategy.hpp" // IWYU pragma: keep
class AbstractWorkStealingScheduler;
namespace emper::sleep_strategy {
class WaitfdSleepStrategy;
}
namespace emper::io {
class IoContext : public Logger<LogSubsystem::IO> {
friend class ::Runtime;
......@@ -51,7 +55,8 @@ class IoContext : public Logger<LogSubsystem::IO> {
// IoContext pointer in GlobalIoContext::globalCompleterFunc
friend class GlobalIoContext;
friend class emper::sleep_strategy::PipeSleepStrategy;
friend class emper::sleep_strategy::AbstractIoSleepStrategy;
friend class emper::sleep_strategy::WaitfdSleepStrategy;
// Debug functions to access the mmaped memory of ring.
// gdb is not allowed to access the io mmaped memory of the io_uring fd.
......@@ -223,8 +228,8 @@ class IoContext : public Logger<LogSubsystem::IO> {
auto tag = static_cast<PointerTags>(tptr.getTag());
switch (tag) {
case PointerTags::NewWorkNotification: {
auto &sleepStrategy =
reinterpret_cast<PipeSleepStrategy &>(runtime.getWorkerSleepStrategy());
auto &sleepStrategy = reinterpret_cast<emper::sleep_strategy::AbstractIoSleepStrategy &>(
runtime.getWorkerSleepStrategy());
sleepStrategy.onNewWorkNotification<callerEnvironment>(*this, tptr);
break;
}
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2022 Florian Fischer
#include "sleep_strategy/AbstractIoSleepStrategy.hpp"
#include <liburing.h>
#include <sys/eventfd.h>
#include <atomic>
#include <cassert>
#include <cstdint>
#include <mutex>
#include "CallerEnvironment.hpp"
#include "Common.hpp"
#include "Runtime.hpp"
#include "Worker.hpp"
#include "io/IoContext.hpp"
#include "lib/TaggedPtr.hpp"
using emper::io::IoContext;
using emper::lib::TaggedPtr;
namespace emper::sleep_strategy {
thread_local AbstractIoSleepStrategy::SleepState AbstractIoSleepStrategy::sleepState;
thread_local int AbstractIoSleepStrategy::specificEvfd;
AbstractIoSleepStrategy::AbstractIoSleepStrategy(Runtime& runtime, workerid_t workerCount)
: workerCount(workerCount), stats(runtime) {
sleepers.store(0, std::memory_order_relaxed);
sleepStates = new SleepState*[workerCount];
evfds = new int[workerCount];
runtime.addNewWorkerHook([this](workerid_t workerId) {
specificEvfd = eventfd(0, 0);
if (specificEvfd == -1) {
DIE_MSG_ERRNO("Creating worker specific notification eventfd failed");
}
evfds[workerId] = specificEvfd;
sleepStates[workerId] = &sleepState;
});
}
AbstractIoSleepStrategy::~AbstractIoSleepStrategy() { delete[] sleepStates; }
auto AbstractIoSleepStrategy::prepareSleep(IoContext& io,
const std::function<bool(void)>& customLogic) -> bool {
{
std::lock_guard<std::mutex> lock(sleepState.lock);
// Check if we were notified specifically and should skip the sleep attempt
if (sleepState.isNotified()) {
sleepState.markRunning();
LOGD("Reset notified state to running -> skip sleeping");
return false;
}
if (!sleepState.readingGlobal) {
const bool shouldSleep = customLogic();
if (!shouldSleep) return false;
sleepState.readingGlobal = true;
}
if (!sleepState.isSleeping()) {
sleepState.markSleeping();
struct io_uring_sqe* sqe = io.getSqe();
io_uring_prep_read(sqe, specificEvfd, &specificReadBuf, sizeof(specificReadBuf), 0);
const auto tag = static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification);
// Tag the sqe with a marked new work notification to reset the specific state
// when reaping the resulting cqe.
io_uring_sqe_set_data(sqe, TaggedPtr((void*)nullptr, tag, true));
io.trackReqsInUring(1);
LOGD("prepared specificEvfd read");
}
}
return true;
}
void AbstractIoSleepStrategy::submitAndWaitForCompletionsIn(IoContext& io) {
LOGD("sleep until IO completions occur");
io.submitPreparedSqesAndWait<CallerEnvironment::EMPER>(1);
LOGD("wakeup due to available IO completions");
stats.incWakeup();
}
void AbstractIoSleepStrategy::prepareSleepRead(IoContext& io, int fd, uint8_t sqe_flags) {
struct io_uring_sqe* sqe = io.getSqe();
// We read directly into the workers dispatchHint
// TODO: Think about the race between the two reads
io_uring_prep_read(sqe, fd, &io.worker->dispatchHint, sizeof(io.worker->dispatchHint), 0);
sqe->flags = sqe_flags;
io_uring_sqe_set_data(
sqe, TaggedPtr((void*)nullptr,
static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification)));
io.trackReqsInUring(1);
}
template <CallerEnvironment callerEnvironment>
void AbstractIoSleepStrategy::onNewWorkNotification(IoContext& io,
const emper::lib::TaggedPtr& data) {
if constexpr (callerEnvironment != CallerEnvironment::OWNER) {
DIE_MSG("Others reaping new work notification from " << io.worker->getWorkerId() << " CQ");
}
assert(data.getTag() == static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification));
if (data.isMarked()) {
std::lock_guard<std::mutex> lock(sleepState.lock);
LOGD("Got specific notification");
stats.incWakeupDueToNotify();
// Reset specific and notified flag to indicate that a new specific notification
// was consumed, a new specific read must be prepared and other notifySpecific
// calls must notify again.
sleepState.markRunning();
} else {
LOGD("Got new work notification");
stats.incWakeupDueToNotify();
// Reset global flag to indicate that a new sleep cqe must be prepared
sleepState.readingGlobal = false;
}
}
template void AbstractIoSleepStrategy::onNewWorkNotification<CallerEnvironment::OWNER>(
IoContext& io, const TaggedPtr& data);
template void AbstractIoSleepStrategy::onNewWorkNotification<CallerEnvironment::EMPER>(
IoContext& io, const TaggedPtr& data);
template void AbstractIoSleepStrategy::onNewWorkNotification<CallerEnvironment::ANYWHERE>(
IoContext& io, const TaggedPtr& data);
} // namespace emper::sleep_strategy
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2022 Florian Fischer
#pragma once
#include <unistd.h>
#include <algorithm>
#include <atomic>
#include <cassert>
#include <cstdint>
#include <functional>
#include <iostream>
#include <mutex>
#include "CallerEnvironment.hpp"
#include "Common.hpp"
#include "Debug.hpp"
#include "FiberHint.hpp"
#include "emper-common.h"
#include "lib/TaggedPtr.hpp"
#include "sleep_strategy/SleeperState.hpp"
#include "sleep_strategy/Stats.hpp"
class Runtime;
namespace emper::io {
class IoContext;
}
namespace emper::sleep_strategy {
/**
* @brief An abstract IO based sleep strategy combining the IO subsystem with the sleep strategy
*
* Design goals
* ============
*
* * Wakeup either on external new work notifications or on local IO completions
* -> Sleep strategy is sound without the IO completer
* * Do as less as possible in a system saturated with work
* * Pass a hint where to find new work to suspended workers
*
* Algorithm
* =========
*
*```
* Data:
* Global:
* global IO object
* sleepers count
* Per worker:
* dispatch hint buffer
* specific IO object
* lockable sleep state
*
* PrepareSleep(customLogic):
* Lock state
* Defer unlock state
* If state is notified
* Reset to running
* Return
* If stats not reading global
* Return false if customLogic returns false
* Set state reading global
* If state is not sleeping
* Mark state sleeping
* Prepare read from specificEvfd
* Return true
*
* WriteNotifications : implementation specific
*
* NotifyEmper(n):
* if observed sleepers <= 0
* return
*
* // Determine how many we are responsible to wake
* do
* toWakeup = min(observed sleepers, n)
* while (!CAS(sleepers, toWakeup))
*
* write toWakeup hints
*
* NotifyAnywhere(n):
* // Ensure all n notifications take effect
* while (!CAS(sleepers, observed sleepers - n))
* if observed sleeping <= -n
* return
*
* toWakeup = min(observed sleeping, n)
* write toWakeup hints
*
* NotifySpecific(w):
* Get w's state
* Lock state
* Defer unlock state
* Return if already notified
* Mark state notified
* If state was sleeping
* Write specific notification
*
* onNewWorkCompletion(data):
* If data is marked
* lock state
* Mark state running
* Else
* Reset readingGlobal
*```
*
* Notes
* =====
*
* * We must decrement the sleepers count on the notifier side to
* prevent multiple notifiers to observe all the same amount of sleepers,
* trying to wake up the same sleepers by writing to the e.g a pipe will jam
* it up with unconsumed hints and thus block in writeNotifications resulting
* in a deadlock.
* * The CAS loops on the notifier side are needed because decrementing
* and incrementing is racy: Two notifier can observe the
* sum of both their excess decrement and increment to much resulting in a
* broken counter.
* * Add the dispatch hint code in AbstractWorkStealingScheduler::nextFiber.
* This allows workers to check the dispatch hint after there
* where no local work to execute.
* This is a trade-off where we trade slower wakeup - a just awoken worker
* will check for local work - against a faster dispatch hot path when
* we have work to do in our local WSQ.
* * Other thread must not reap new work notifications because this
* would introduces a race for cqes and a possible
* lost wakeup if the other consumes the completions before the worker
* is actually waiting for them.
* * When notifying sleeping workers from anywhere we must ensure that all
* notifications take effect. This is needed for example when terminating
* the runtime to prevent sleep attempt from worker thread which are
* about to sleep but have not incremented the sleeper count yet.
* We achieve this by always decrementing the sleeper count by the notification
* count.
*/
class AbstractIoSleepStrategy : public Logger<LogSubsystem::SLEEP_S> {
friend class emper::io::IoContext;
protected:
/**
* @brief decrement the sleep count and return how many workers should be notified
*
* As mentioned in the AbstractIoSleepStrategy documentation We need to decrement
* the sleeper count on the notification side to prevent overcommiting notifications.
* And the CAS loop is needed because we don't want to decrement the sleepers
* count to much.
*
* Decrement the sleeper count for each sleeper we are going to wakeup
*/
virtual auto getSleeperCountToWakeupEmper(int64_t& sleeping, int64_t count) -> uint64_t {
int64_t toWakeup;
do {
// No one is sleeping -> we don't need to notify
if (sleeping <= 0) {
return 0;
}
// Count of sleepers we notify
toWakeup = std::min(sleeping, count);
} while (!sleepers.compare_exchange_weak(sleeping, sleeping - toWakeup,
std::memory_order_release, std::memory_order_acquire));
LOGD("notifyFromEmper writes "
<< toWakeup << " notifications and set sleepers count to: " << sleeping - toWakeup);
assert(toWakeup >= 0);
return static_cast<uint64_t>(toWakeup);
}
/**
* @brief decrement the sleep count and return how many workers should be notified
*
* If we observe nobody sleeping we need to prevent sleep locks when
* everybody is about to sleep but none has incremented the sleepers count yet.
* We prevent the next sleep from blocking by setting the sleeper count to
* the negative amount we want to prevent from sleeping.
*
* More general if we notify from anywhere we need all notifications to take
* effect. Only notifying the sleepers we observe may not be enough.
* For example when terminating the runtime we notify all workers if we
* only wakeup the ones we observe sleeping we may never terminate because
* after we notified all we observed new one went to sleep which will never
* be notified.
* To prevent this we make sure that all count notifications take effect
* by making the sleepers count negative and thus preventing at least count
* sleep attempts.
*/
virtual auto getSleeperCountToWakeupAnywhere(int64_t& sleeping, int64_t count) -> uint64_t {
int64_t newSleeping;
do {
// We already prevent enough sleep attempts
if (sleeping <= -count) {
LOGD("notifyFromAnywhere sleeper count already preventing enough sleep attempts");
return 0;
}
// Don't decrease the sleeper count in the CAS loop further than -count,
// which is the threshold we need to ensure that the notifications will be
// observed.
// Decreasing it further than this threshold is not faulty it just
// results in unnecessary skipped sleeps.
newSleeping = std::max(-count, sleeping - count);
} while (!sleepers.compare_exchange_weak(sleeping, newSleeping, std::memory_order_release,
std::memory_order_acquire));
// toWakeup should not be negative but could be if sleeping was already negative
const int64_t toWakeup = std::max(std::min(sleeping, count), 0L);
LOGD("notifyFromAnywhere writes "
<< toWakeup << " notifications and set sleepers count to: " << newSleeping);
assert(toWakeup >= 0);
return static_cast<uint64_t>(toWakeup);
}
/**
* @brief State of a worker
*/
class SleepState {
friend class AbstractIoSleepStrategy;
bool readingGlobal = false;
std::mutex lock;
emper::sleep_strategy::SleeperState s = emper::sleep_strategy::SleeperState::Running;
auto markNotified() -> bool {
auto oldS = s;
s = emper::sleep_strategy::SleeperState::Notified;
return oldS == emper::sleep_strategy::SleeperState::Sleeping;
}
void markSleeping() { s = emper::sleep_strategy::SleeperState::Sleeping; }
void markRunning() { s = emper::sleep_strategy::SleeperState::Running; }
auto isNotified() const -> bool { return s == emper::sleep_strategy::SleeperState::Notified; }
auto isSleeping() const -> bool { return s == emper::sleep_strategy::SleeperState::Sleeping; }
};
SleepState** sleepStates;
static thread_local SleepState sleepState;
int* evfds;
static thread_local int specificEvfd;
// Buff used by all specirfic reads since their results are meaningless
uint64_t specificReadBuf;
// Make sure the shared counter lives in an exclusive cache line
CACHE_LINE_EXCLUSIVE(std::atomic<int64_t>, sleepers);
workerid_t workerCount;
Stats stats;
AbstractIoSleepStrategy(Runtime& runtime, workerid_t workerCount);
virtual ~AbstractIoSleepStrategy();
virtual void writeNotificationsFromEmper(emper::FiberHint hint, unsigned count) = 0;
virtual void writeNotificationsFromAnywhere(emper::FiberHint hint, unsigned count) = 0;
void writeSpecificNotification(workerid_t workerId) {
stats.addNotifications(1);
const uint64_t b = 1;
ssize_t res = write(evfds[workerId], &b, sizeof(b));
if (unlikely(res) < 0) {
DIE_MSG_ERRNO("writing to the worker " << workerId << " evfd failed");
}
}
using CustomLogic = std::function<bool(void)>;
/**
* @brief Prepare everything to sleep until new work is available
*
* @param io The IoContext used to sleep
* @param customLogic Code specific to the actual IO sleep strategy. Reads are only prepared
* if customLogic returns true.
*
* @return True if any read was prepared and IO completions should be awaited
*/
auto prepareSleep(emper::io::IoContext& io, const CustomLogic& customLogic) -> bool;
/**
* @brief prepare a new work notification read request
*
* @param io The IoContext where the read request will be prepared
* @param fd The file descriptor to read from
* @param sqe_flags The flags set on the issued SQE
*/
static void prepareSleepRead(emper::io::IoContext& io, int fd, uint8_t sqe_flags = 0);
void submitAndWaitForCompletionsIn(emper::io::IoContext& io);
template <CallerEnvironment callerEnvironment>
void onNewWorkNotification(emper::io::IoContext& io, const emper::lib::TaggedPtr& data);
template <CallerEnvironment callerEnvironment>
auto getSleeperCountToWakeup(int64_t& sleeping, unsigned count) -> unsigned {
auto signedCount = static_cast<int64_t>(count);
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
return getSleeperCountToWakeupEmper(sleeping, signedCount);
} else {
return getSleeperCountToWakeupAnywhere(sleeping, signedCount);
}
}
template <CallerEnvironment callerEnvironment>
void notify(emper::FiberHint hint, unsigned count) {
stats.incOnNewWork();
int64_t sleeping = getSleeping();
const unsigned toWakeup = getSleeperCountToWakeup<callerEnvironment>(sleeping, count);
if (toWakeup) {
stats.incNotify();
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
writeNotificationsFromEmper(hint, toWakeup);
} else {
writeNotificationsFromAnywhere(hint, toWakeup);
}
}
}
template <CallerEnvironment callerEnvironment>
void notifySpecificInternal(workerid_t workerId) {
auto& specificState = *sleepStates[workerId];
std::lock_guard<std::mutex> lock(specificState.lock);
LOGD("Specifically notify worker " << workerId << " from " << callerEnvironment);
if (specificState.isNotified()) {
LOGD(workerId << " already marked notified");
return;
}
const bool isSleeping = specificState.markNotified();
if (isSleeping) {
LOGD(workerId << " has specific read -> write notification");
writeSpecificNotification(workerId);
}
}
public:
void printStats(std::ostream& out = std::cout) { stats.print(out); }
[[nodiscard]] inline auto getSleeping() const -> long { return sleepers.load(); }
virtual void sleep() = 0;
};
} // namespace emper::sleep_strategy
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
// Copyright © 2021-2022 Florian Fischer
#include "sleep_strategy/PipeSleepStrategy.hpp"
#include <liburing.h>
#include <atomic>
#include <cassert>
#include <cstdint>
#include "CallerEnvironment.hpp"
#include "Runtime.hpp"
#include "Worker.hpp"
#include "io/IoContext.hpp"
#include "lib/TaggedPtr.hpp"
using emper::io::IoContext;
using emper::lib::TaggedPtr;
namespace emper::sleep_strategy {
thread_local PipeSleepStrategy::Pipe PipeSleepStrategy::pipe;
thread_local PipeSleepStrategy::SleepState PipeSleepStrategy::sleepState;
PipeSleepStrategy::PipeSleepStrategy(Runtime& runtime, workerid_t workerCount)
: workerCount(workerCount), stats(runtime) {
LOGD("init pipe sleep startegy");
sleepers.store(0, std::memory_order_relaxed);
sleepStates = new SleepState*[workerCount];
pipes = new Pipe*[workerCount];
runtime.addNewWorkerHook([this](workerid_t workerId) {
sleepStates[workerId] = &sleepState;
pipes[workerId] = &pipe;
});
}
PipeSleepStrategy::~PipeSleepStrategy() {
delete[] sleepStates;
delete[] pipes;
}
void PipeSleepStrategy::sleep() {
LOGD("going to sleep");
IoContext& io = *IoContext::getWorkerIo();
{
std::lock_guard<std::mutex> lock(sleepState.lock);
// Check if we were notified specifically and should skip the sleep attempt
if (sleepState.isNotified()) {
sleepState.markRunning();
LOGD("Reset notified state to running -> skip sleeping");
return;
auto customSleepLogic = [this, &io] {
// increment the sleeper count if it was negative we should skip sleeping
int64_t sleeping = this->sleepers.fetch_add(1, std::memory_order_acquire);
if (sleeping < 0) {
LOGD("observed sleeper count as: " << sleeping << " -> skip sleeping");
return false;
}
if (!sleepState.globalReadInflight) {
// increment the sleeper count if it was negative we should skip sleeping
int64_t sleeping = sleepers.fetch_add(1, std::memory_order_acquire);
if (sleeping < 0) {
LOGD("observed sleeper count as: " << sleeping << " -> skip sleeping");
return;
}
assert(sleeping <= workerCount);
sleepState.globalReadInflight = true;
struct io_uring_sqe* sqe = io.getSqe();
// We read directly into the workers dispatchHint
io_uring_prep_read(sqe, global.sleepFd, &io.worker->dispatchHint,
sizeof(io.worker->dispatchHint), 0);
// Mark the sqe as a new work notification to reset the Global flag when reaping the
// resulting cqe
io_uring_sqe_set_data(
sqe, TaggedPtr(0, static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification)));
io.trackReqsInUring(1);
LOGD("prepared global.sleepFd read and set sleepers count to: " << sleeping + 1);
}
if (!sleepState.isSleeping()) {
sleepState.markSleeping();
struct io_uring_sqe* sqe = io.getSqe();
// We read directly into the workers dispatchHint
// TODO: Think about the race between the two reads
io_uring_prep_read(sqe, pipe.sleepFd, &io.worker->dispatchHint,
sizeof(io.worker->dispatchHint), 0);
// Tag the sqe with a marked new work notification to reset the specific state
// when reaping the resulting cqe.
io_uring_sqe_set_data(
sqe, TaggedPtr((void*)nullptr,
static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification), true));
io.trackReqsInUring(1);
LOGD("prepared pipe.sleepFd read");
}
}
// Wait for IO completions
LOGD("sleep until IO completions occur");
io.submitPreparedSqesAndWait<CallerEnvironment::EMPER>(1);
LOGD("wakeup due to available IO completions");
stats.incWakeup();
prepareSleepRead(io, this->global.sleepFd);
LOGD("prepared sleepFd read and set sleepers count to: " << sleeping + 1);
return true;
};
const bool shouldSleep = prepareSleep(io, customSleepLogic);
if (!shouldSleep) return;
submitAndWaitForCompletionsIn(io);
}
template <CallerEnvironment callerEnvironment>
void PipeSleepStrategy::notifySpecific(workerid_t workerId) {
auto& specificState = *sleepStates[workerId];
std::lock_guard<std::mutex> lock(specificState.lock);
LOGD("Specifically notify worker " << workerId << " from " << callerEnvironment);
if (specificState.isNotified()) {
LOGD(workerId << " already marked notified");
return;
}
const bool isSleeping = specificState.markNotified();
if (isSleeping) {
LOGD(workerId << " has specific read -> write notification");
writeSpecificNotification(workerId);
}
}
template void PipeSleepStrategy::notifySpecific<CallerEnvironment::EMPER>(workerid_t workerId);
template void PipeSleepStrategy::notifySpecific<CallerEnvironment::ANYWHERE>(workerid_t workerId);
template <CallerEnvironment callerEnvironment>
void PipeSleepStrategy::onNewWorkNotification(IoContext& io, TaggedPtr data) {
if constexpr (callerEnvironment != CallerEnvironment::OWNER) {
DIE_MSG("Others reaping new work notification from " << io.worker->getWorkerId() << " CQ");
}
assert(data.getTag() == static_cast<uint16_t>(IoContext::PointerTags::NewWorkNotification));
if (data.isMarked()) {
std::lock_guard<std::mutex> lock(sleepState.lock);
LOGD("Got specific notification");
stats.incWakeupDueToNotify();
// Reset specific and notified flag to indicate that a new specific notification
// was consumed, a new specific read must be prepared and other notifySpecific
// calls must notify again.
sleepState.markRunning();
} else {
LOGD("Got new work notification");
stats.incWakeupDueToNotify();
// Reset global flag to indicate that a new sleep cqe must be prepared
// and allow the completer to reap completions again
sleepState.globalReadInflight = false;
}
}
template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::OWNER>(IoContext& io,
TaggedPtr data);
template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::EMPER>(IoContext& io,
TaggedPtr data);
template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::ANYWHERE>(IoContext& io,
TaggedPtr data);
} // namespace emper::sleep_strategy
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
// Copyright © 2021-2022 Florian Fischer
#pragma once
#include <unistd.h>
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <iostream>
#include <mutex>
#include <vector>
#include "CallerEnvironment.hpp"
......@@ -16,158 +12,20 @@
#include "Debug.hpp"
#include "FiberHint.hpp"
#include "emper-common.h"
#include "lib/TaggedPtr.hpp"
#include "sleep_strategy/AbstractIoSleepStrategy.hpp"
#include "sleep_strategy/AbstractWorkerSleepStrategy.hpp"
#include "sleep_strategy/SleeperState.hpp"
#include "sleep_strategy/Stats.hpp"
class Runtime;
namespace emper::io {
class IoContext;
}
namespace emper::sleep_strategy {
/**
* @brief A pipe(2) based sleep strategy combining the IO subsystem with the sleep strategy
*
* implement a pipe based sleep strategy using the IO subsystem
*
* Design goals
* ============
*
* * Wakeup either on external newWork notifications or on local IO completions
* -> Sleep strategy is sound without the IO completer
* * Do as less as possible in a system saturated with work
* * Pass a hint where to find new work to suspended workers
*
* Algorithm
* =========
*
*```
* Data:
* Global:
* global pipe
* sleepers count
* Per worker:
* dispatch hint buffer
* specific pipe
* state lock
* sleep state
*
* Sleep:
* Lock state
* If state == notified
* Set state = running
* return
*
* If we have no global sleep request in flight
* Atomic increment sleep count
* Skip sleeping if sleep count was < 0
* Mark sleep requests in flight
* Prepare read cqe from the global pipe to dispatch hint buffer
*
* If state == running
* Set state = sleeping
* Prepare marked read cqe from the specific pipe to dispatch hint buffer
*
* Unlock state
* Wait until IO completions occurred
*
* NotifyEmper(n):
* if observed sleepers <= 0
* return
*
* // Determine how many we are responsible to wake
* do
* toWakeup = min(observed sleepers, n)
* while (!CAS(sleepers, toWakeup))
*
* write toWakeup hints to the hint pipe
*
* NotifyAnywhere(n):
* // Ensure all n notifications take effect
* while (!CAS(sleepers, observed sleepers - n))
* if observed sleeping <= -n
* return
*
* toWakeup = min(observed sleeping, n)
* write toWakeup hints to the hint pipe
*
* NotifySpecific(w):
* Get w's state
* Lock state
* Return if already notified
* Mark state notified
* Write hint if was stat was sleeping
*
* onNewWorkCompletion:
* If data is marked
* lock state
* set state = running
* return
*
* Reset global read inflight
*```
*
* Notes
* =====
*
* * We must decrement the sleepers count on the notifier side to
* prevent multiple notifiers to observe all the same amount of sleepers,
* trying to wake up the same sleepers by writing to the pipe and jamming it up
* with unconsumed hints and thus blocking in the notify write resulting
* in a deadlock.
* * The CAS loops on the notifier side are needed because decrementing
* and incrementing the excess is racy: Two notifier can observe the
* sum of both their excess decrement and increment to much resulting in a
* broken counter.
* * Add the dispatch hint code in AbstractWorkStealingScheduler::nextFiber.
* This allows workers to check the dispatch hint after there
* where no local work to execute.
* This is a trade-off where we trade slower wakeup - a just awoken worker
* will check for local work - against a faster dispatch hot path when
* we have work to do in our local WSQ.
* * Other thread must not reap new work notifications because this
* would introduces a race for cqes and a possible
* lost wakeup if the other consumes the completions before the worker
* is actually waiting for them.
* * When notifying sleeping workers from anywhere we must ensure that all
* notifications take effect. This is needed for example when terminating
* the runtime to prevent sleep attempt from worker thread which are
* about to sleep but have not incremented the sleeper count yet.
* We achieve this by always decrementing the sleeper count by the notification
* count.
*/
class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>,
public Logger<LogSubsystem::SLEEP_S> {
friend class emper::io::IoContext;
/**
* @brief State of a worker
*/
class SleepState {
friend class PipeSleepStrategy;
bool globalReadInflight = false;
std::mutex lock;
emper::sleep_strategy::SleeperState s = emper::sleep_strategy::SleeperState::Running;
auto markNotified() -> bool {
auto oldS = s;
s = emper::sleep_strategy::SleeperState::Notified;
return oldS == emper::sleep_strategy::SleeperState::Sleeping;
}
void markSleeping() { s = emper::sleep_strategy::SleeperState::Sleeping; }
void markRunning() { s = emper::sleep_strategy::SleeperState::Running; }
auto isNotified() const -> bool { return s == emper::sleep_strategy::SleeperState::Notified; }
auto isSleeping() const -> bool { return s == emper::sleep_strategy::SleeperState::Sleeping; }
};
SleepState** sleepStates;
static thread_local SleepState sleepState;
class PipeSleepStrategy : public AbstractWorkerSleepStrategy<PipeSleepStrategy>,
public AbstractIoSleepStrategy {
class Pipe {
friend class PipeSleepStrategy;
int sleepFd;
......@@ -186,21 +44,9 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>,
}
};
const workerid_t workerCount;
Pipe global;
/**
* @brief Per worker pipe to notify specific workers
*/
Pipe** pipes;
static thread_local Pipe pipe;
Stats stats;
// Make sure the shared counter lives in an exlusive cache line
CACHE_LINE_EXCLUSIVE(std::atomic<int64_t>, sleepers);
void writeNotifications(emper::FiberHint hint, int64_t count) {
void writeNotifications(emper::FiberHint hint, unsigned count) {
stats.addNotifications(count);
std::vector<void*> hints(count, hint);
......@@ -211,113 +57,18 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>,
}
}
void writeSpecificNotification(workerid_t workerId) {
stats.addNotifications(1);
const void* b = nullptr;
ssize_t res = write(pipes[workerId]->notifyFd, &b, sizeof(b));
if (unlikely(res) < 0) {
DIE_MSG_ERRNO("writing to the notifyFd of worker " << workerId << " failed");
}
void writeNotificationsFromEmper(emper::FiberHint hint, unsigned count) override {
writeNotifications(hint, count);
}
void notifyFromEmper(int64_t& sleeping, emper::FiberHint hint, int64_t count) {
int64_t toWakeup;
// We need to decrement the sleeper count on the notification side
// to prevent overcommiting notifications, filling up the pipe which results
// in notify calls to block on write.
// And the CAS loop is needed because we don't want to decrement the sleepers
// count to much.
// Decrement the sleeper count for each sleeper we are going to wakeup
do {
// No one is sleeping -> we don't need to notify
if (sleeping <= 0) {
return;
}
// Count of sleepers we notify
toWakeup = std::min(sleeping, count);
} while (!sleepers.compare_exchange_weak(sleeping, sleeping - toWakeup,
std::memory_order_release, std::memory_order_acquire));
// increment the notify stat if we actually have to notify someone
stats.incNotify();
writeNotifications(hint, toWakeup);
LOGD("notifyFromEmper written "
<< toWakeup << " notifications and set sleepers count to: " << sleeping - toWakeup);
}
void notifyFromAnywhere(int64_t& sleeping, emper::FiberHint hint, int64_t count) {
// If we observe nobody sleeping we need to prevent sleep locks when
// everybody is about to sleep but none has incremented the sleepers count yet.
// We prevent the next sleep from blocking by setting the sleeper count to
// the negative amount we want to prevent from sleeping.
// More general if we notify from anywhere we need all notifications to take
// effect. Only notifying the sleepers we observe may not be enough.
// For example when terminating the runtime we notify all workers if we
// only wakeup the ones we observe sleeping we may never terminate because
// after we notified all we observed new one went to sleep which will never
// be notified.
// To prevent this we make sure that all count notifications take effect
// by making the sleepers count negative and thus preventing at least count
// sleep attempts.
int64_t newSleeping;
do {
// We already prevent enough sleep attempts
if (sleeping <= -count) {
LOGD("notifyFromAnywhere sleeper count already preventing enough sleep attempts");
return;
}
// Don't decrease the sleeper count in the CAS loop further than -count,
// which is the threshold we need to ensure that the notifications will be
// observed.
// Decreasing it further than this threshold is not faulty it just
// results in unnecessary skipped sleeps.
newSleeping = std::max(-count, sleeping - count);
} while (!sleepers.compare_exchange_weak(sleeping, newSleeping, std::memory_order_release,
std::memory_order_acquire));
stats.incNotify();
// To wakeup should not be negative but could be if sleeping was already negative
int64_t toWakeup = std::max(std::min(sleeping, count), 0L);
if (toWakeup > 0) {
writeNotifications(hint, toWakeup);
}
LOGD("notifyFromAnywhere written "
<< toWakeup << " notifications and set sleepers count to: " << newSleeping);
}
template <CallerEnvironment callerEnvironment>
void notify(emper::FiberHint hint, uint32_t count) {
// The hint must be != nullptr so sleep() knows when to prepare and submit
// a sleepFd read sqe.
stats.incOnNewWork();
int64_t sleeping = getSleeping();
auto signedCount = static_cast<int64_t>(count);
if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) {
notifyFromAnywhere(sleeping, hint, signedCount);
} else {
notifyFromEmper(sleeping, hint, signedCount);
}
void writeNotificationsFromAnywhere(emper::FiberHint hint, unsigned count) override {
writeNotifications(hint, count);
}
public:
PipeSleepStrategy(Runtime& runtime, workerid_t workerCount);
~PipeSleepStrategy();
PipeSleepStrategy(Runtime& runtime, workerid_t workerCount)
: AbstractIoSleepStrategy(runtime, workerCount) {}
void printStats(std::ostream& out = std::cout) { stats.print(out); }
[[nodiscard]] inline auto getSleeping() const -> long { return sleepers.load(); }
void printStats(std::ostream& out = std::cout) { AbstractIoSleepStrategy::printStats(out); }
template <CallerEnvironment callerEnvironment>
inline void notifyOne() {
......@@ -336,10 +87,10 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>,
}
template <CallerEnvironment callerEnvironment>
void notifySpecific(workerid_t workerId);
inline void notifySpecific(workerid_t workerId) {
notifySpecificInternal<callerEnvironment>(workerId);
}
template <CallerEnvironment callerEnvironment>
void onNewWorkNotification(emper::io::IoContext& io, emper::lib::TaggedPtr data);
void sleep();
void sleep() override;
};
} // namespace emper::sleep_strategy
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include "sleep_strategy/WaitfdSleepStrategy.hpp"
#include <liburing.h>
#include <syscall.h>
#include "Emper.hpp"
#include "FiberSource.hpp"
#include "Runtime.hpp"
#include "io/IoContext.hpp"
using emper::io::IoContext;
static const unsigned IOURING_REGISTER_WAITFD = 20;
struct io_uring_waitfd_registration {
void* hint;
int fd;
};
namespace emper::sleep_strategy {
uint32_t WaitfdSleepStrategy::WAITFD_NOTIFY_AND_PREVENT_SLEEP = 1;
WaitfdSleepStrategy::WaitfdSleepStrategy(Runtime& runtime, workerid_t workerCount,
unsigned waitfdFlags)
: AbstractIoSleepStrategy(runtime, workerCount), waitfdFlags(waitfdFlags) {
long res = syscall(__NR_waitfd, &sleepers, waitfdFlags);
if (res < 0) {
DIE_MSG_ERRNO("creating waitfd failed");
}
waitfd = static_cast<int>(res);
if constexpr (emper::IO_NOTIFICATION) {
auto registerWaitFd = [this](workerid_t workerId) {
IoContext& io = *IoContext::getWorkerIo();
emper::FiberHint hint(workerId, emper::FiberSource::hintIo);
struct io_uring_waitfd_registration reg {
hint, this->waitfd
};
long ret = syscall(__NR_io_uring_register, io.ring.ring_fd, IOURING_REGISTER_WAITFD, &reg,
sizeof(reg));
if (ret) {
DIE_MSG_ERRNO("failed to register waitfd for worker " << workerId);
}
};
runtime.addNewWorkerHook(registerWaitFd);
}
}
WaitfdSleepStrategy::~WaitfdSleepStrategy() {
for (unsigned i = 0; i < workerCount; ++i) close(evfds[i]);
delete[] evfds;
close(waitfd);
}
void WaitfdSleepStrategy::sleep() {
IoContext& io = *IoContext::getWorkerIo();
auto customLogic = [this, &io] {
// waitfd non-blocking reads will always return -EAGAIN and waitfd is not pollable
// therefore we can skip that and immediately issue the read in an async manner.
uint8_t sqe_flags = 0;
if constexpr (emper::IO_PUNT_WAITFD_READ) sqe_flags |= IOSQE_ASYNC;
prepareSleepRead(io, waitfd, sqe_flags);
LOGD("prepared waitfd read");
return true;
};
const bool shouldSleep = prepareSleep(io, customLogic);
if (!shouldSleep) return;
submitAndWaitForCompletionsIn(io);
}
} // namespace emper::sleep_strategy
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#pragma once
#include <unistd.h>
#include <cstdint>
#include <iostream>
#include <stdexcept>
#include "CallerEnvironment.hpp"
#include "Common.hpp"
#include "Debug.hpp"
#include "FiberHint.hpp"
#include "emper-common.h"
#include "emper-config.h" // IWYU pragma: keep
#include "lib/LinuxVersion.hpp"
#include "sleep_strategy/AbstractIoSleepStrategy.hpp"
#include "sleep_strategy/AbstractWorkerSleepStrategy.hpp"
class Runtime;
#ifndef __NR_waitfd
static const unsigned __NR_waitfd = []() -> unsigned {
if (EMPER_LINUX_GE("5.17"))
return 451;
else if (EMPER_LINUX_GE("5.16"))
return 450;
else
return 449;
}();
#endif
namespace emper::sleep_strategy {
class WaitfdSleepStrategy : public AbstractWorkerSleepStrategy<WaitfdSleepStrategy>,
public AbstractIoSleepStrategy {
int waitfd;
enum class WaitfdFlags {
none = 0,
accounting = 1,
waitFree = 2,
bestEffort = 4,
};
const unsigned waitfdFlags;
struct WaitfdNotification {
void* d;
uint32_t flags;
uint32_t count;
};
static auto getDefaultWaitfdFlags() -> unsigned {
unsigned flags = 0;
flags = static_cast<unsigned>(WaitfdFlags::EMPER_WAITFD_FLAGS);
const auto* flagsEnvVal = std::getenv("EMPER_WAITFD_FLAGS");
if (flagsEnvVal) {
const std::string envValStr(flagsEnvVal);
if (envValStr == "accounting")
flags = static_cast<unsigned>(WaitfdFlags::accounting);
else if (envValStr == "wait-free")
flags = static_cast<unsigned>(WaitfdFlags::waitFree);
else if (envValStr == "best-effort")
flags = static_cast<unsigned>(WaitfdFlags::bestEffort);
else
DIE_MSG("Unknown waitfd flags specified in environment: " << envValStr);
}
return flags;
}
void writeNotifications(emper::FiberHint hint, uint32_t flags, int32_t count) {
struct WaitfdNotification notification = {hint, flags, static_cast<uint32_t>(count)};
LOGD("write: {" << notification.d << ", " << notification.flags << ", " << notification.count
<< "}");
ssize_t res = write(waitfd, reinterpret_cast<void*>(&notification), sizeof(notification));
if (unlikely(res) < 0) {
DIE_MSG_ERRNO("writing to waitfd failed");
}
}
static uint32_t WAITFD_NOTIFY_AND_PREVENT_SLEEP;
void writeNotificationsFromEmper(emper::FiberHint hint, unsigned count) override {
writeNotifications(hint, 0, count);
}
void writeNotificationsFromAnywhere(emper::FiberHint hint, unsigned count) override {
writeNotifications(hint, WAITFD_NOTIFY_AND_PREVENT_SLEEP, count);
}
// If the waitfd implementation does the sleeper accounting in kernel space
// we must not decrement the sleeper count in userspace.
auto getSleeperCountToWakeupEmper(int64_t& sleeping, int64_t count) -> uint64_t override {
if (waitfdFlags != static_cast<unsigned>(WaitfdFlags::accounting))
return AbstractIoSleepStrategy::getSleeperCountToWakeupEmper(sleeping, count);
const int64_t toWakeup = std::min(sleeping, count);
LOGD("notifyFromEmper writes " << toWakeup << " notifications");
return toWakeup > 0 ? static_cast<uint64_t>(toWakeup) : 0;
}
auto getSleeperCountToWakeupAnywhere(int64_t& sleeping, int64_t count) -> uint64_t override {
if (waitfdFlags != static_cast<unsigned>(WaitfdFlags::accounting))
return AbstractIoSleepStrategy::getSleeperCountToWakeupAnywhere(sleeping, count);
if (sleeping <= -count) {
LOGD("notifyFromAnywhere sleeper count already preventing enough sleep attempts");
return 0;
}
LOGD("notifyFromAnywhere writes " << count << " notifications");
return static_cast<uint64_t>(count);
}
public:
WaitfdSleepStrategy(Runtime& runtime, workerid_t workerCount,
unsigned waitfdFlags = getDefaultWaitfdFlags());
~WaitfdSleepStrategy();
template <CallerEnvironment callerEnvironment>
inline void notifyOne() {
notifyMany<callerEnvironment>(1);
}
template <CallerEnvironment callerEnvironment>
inline void notifyMany(unsigned count) {
if (count > 1) LOGD("notify(hint, " << count << ")");
notify<callerEnvironment>(emper::FiberHint::createNewWorkHint<callerEnvironment>(), count);
}
template <CallerEnvironment callerEnvironment>
inline void notifyAll() {
notifyMany<callerEnvironment>(workerCount);
}
template <CallerEnvironment callerEnvironment>
inline void notifySpecific(ATTR_UNUSED workerid_t workerId) {
notifySpecificInternal<callerEnvironment>(workerId);
}
void printStats(std::ostream& out = std::cout) { AbstractIoSleepStrategy::printStats(out); }
void sleep() override;
};
} // namespace emper::sleep_strategy
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
// Copyright © 2021-2022 Florian Fischer
#pragma once
#ifdef EMPER_SEMAPHORE_SLEEP_STRATEGY
......@@ -8,6 +8,9 @@
#elif defined EMPER_PIPE_SLEEP_STRATEGY
#include "sleep_strategy/PipeSleepStrategy.hpp"
#elif defined EMPER_WAITFD_SLEEP_STRATEGY
#include "sleep_strategy/WaitfdSleepStrategy.hpp"
#else
#error Unknown WorkerSleepStrategy implementation
#endif
......@@ -19,5 +22,8 @@ using WorkerSleepStrategy = SemaphoreWorkerSleepStrategy;
#elif defined EMPER_PIPE_SLEEP_STRATEGY
using WorkerSleepStrategy = PipeSleepStrategy;
#elif defined EMPER_WAITFD_SLEEP_STRATEGY
using WorkerSleepStrategy = WaitfdSleepStrategy;
#endif
} // namespace emper::sleep_strategy
emper_cpp_sources += files(
'AbstractIoSleepStrategy.cpp',
'PipeSleepStrategy.cpp',
'SemaphoreWorkerSleepStrategy.cpp',
'Stats.cpp',
'WaitfdSleepStrategy.cpp',
'WorkerStats.cpp',
)
......@@ -149,11 +149,24 @@ auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() -> std::optional
return std::nullopt;
}
auto AbstractWorkStealingScheduler::tryStealFiberFrom(workerid_t victim)
auto AbstractWorkStealingScheduler::tryStealIoCompletionFrom(workerid_t victim)
-> std::optional<NextFiberResult> {
auto* victimIo = runtime.ioContexts[victim];
Fiber* fiber = victimIo->reapSingleCompletion<CallerEnvironment::EMPER>();
if (fiber) {
emper::statsIncr(awss::stats.nextIoFiberStolen);
return NextFiberResult{fiber, emper::FiberSource::ioStolen};
}
return std::nullopt;
}
auto AbstractWorkStealingScheduler::tryStealWorkFrom(workerid_t victim)
-> std::optional<NextFiberResult> {
constexpr int maxRetries = emper::WAITFREE_WORK_STEALING ? 0 : -1;
AbstractFiber* fiber;
popTop:
StealingResult res = queues[victim]->popTop<maxRetries>(&fiber);
if (res == StealingResult::Stolen) {
emper::statsIncr(awss::stats.nextFiberStolen);
......@@ -163,13 +176,16 @@ popTop:
return NextFiberResult{fiber, emper::FiberSource::stolen};
}
return std::nullopt;
}
auto AbstractWorkStealingScheduler::tryStealFiberFrom(workerid_t victim)
-> std::optional<NextFiberResult> {
auto result = tryStealWorkFrom(victim);
if (result) return result;
if constexpr (emper::IO_STEALING) {
auto* victimIo = runtime.ioContexts[victim];
fiber = victimIo->reapSingleCompletion<CallerEnvironment::EMPER>();
if (fiber) {
emper::statsIncr(awss::stats.nextIoFiberStolen);
return NextFiberResult{fiber, emper::FiberSource::ioStolen};
}
return tryStealIoCompletionFrom(victim);
}
return std::nullopt;
......@@ -204,7 +220,7 @@ popBottom:
switch (source) {
case emper::FiberSource::hintWsq: {
const auto victim = dispatchHint.getWorker();
const auto stolen = tryStealFiberFrom(victim);
const auto stolen = tryStealWorkFrom(victim);
if (stolen) {
emper::statsIncr(awss::stats.nextFiberFromHintLocal);
onWorkStolen();
......@@ -220,6 +236,15 @@ popBottom:
}
break;
}
case emper::FiberSource::hintIo: {
const auto victim = dispatchHint.getWorker();
const auto stolenIo = tryStealIoCompletionFrom(victim);
if (stolenIo) {
emper::statsIncr(awss::stats.nextFiberFromHintIo);
return NextFiberResult{(*stolenIo).fiber, emper::FiberSource::hintIo};
}
break;
}
default:
DIE_MSG("invalid dispatch hint");
}
......
......@@ -26,6 +26,8 @@ class AbstractWorkStealingScheduler : public Scheduler {
private:
auto nextFiberViaAnywhereQueue() -> std::optional<NextFiberResult>;
auto tryStealWorkFrom(workerid_t victim) -> std::optional<NextFiberResult>;
auto tryStealIoCompletionFrom(workerid_t victim) -> std::optional<NextFiberResult>;
auto tryStealFiberFrom(workerid_t victim) -> std::optional<NextFiberResult>;
bool queueFullWarningEmitted = false;
......
......@@ -31,6 +31,8 @@ void AbstractWorkStealingStats::print(std::ostream& out) {
<< std::to_string(comulatedWorkerStats.nextFiberFromHintLocal) << std::endl
<< "total-next-fiber-hint-anywherequeue: "
<< std::to_string(comulatedWorkerStats.nextFiberFromHintAnywhere) << std::endl
<< "total-next-fiber-hint-io: " << std::to_string(comulatedWorkerStats.nextFiberFromHintIo)
<< std::endl
<< "total-next-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextFiberStolen)
<< std::endl
<< "total-next-io-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextIoFiberStolen)
......
......@@ -13,6 +13,7 @@ auto AbstractWorkStealingWorkerStats::operator+=(const AbstractWorkStealingWorke
nextFiberFromLocal += other.nextFiberFromLocal;
nextFiberFromHintLocal += other.nextFiberFromHintLocal;
nextFiberFromHintAnywhere += other.nextFiberFromHintAnywhere;
nextFiberFromHintIo += other.nextFiberFromHintIo;
nextFiberStolen += other.nextFiberStolen;
nextIoFiberStolen += other.nextIoFiberStolen;
nextFiberFromAnywhereQueue += other.nextFiberFromAnywhereQueue;
......
......@@ -20,6 +20,7 @@ class AbstractWorkStealingWorkerStats {
uint64_t nextFiberFromLocal = 0;
uint64_t nextFiberFromHintLocal = 0;
uint64_t nextFiberFromHintAnywhere = 0;
uint64_t nextFiberFromHintIo = 0;
uint64_t nextFiberStolen = 0;
uint64_t nextIoFiberStolen = 0;
uint64_t nextFiberFromAnywhereQueue = 0;
......
......@@ -113,6 +113,7 @@ conf_data.set('EMPER_STATS_STACK_USAGE', stats_stack_usage)
conf_data.set('EMPER_STATS_WORKER_SLEEP', stats_worker_sleep)
conf_data.set('EMPER_STATS_BLOCKED_CONTEXT', stats_blocked_context)
conf_data.set('EMPER_STATS_BLOCKED_CONTEXT_COUNT', stats_blocked_context_count)
conf_data.set('EMPER_WAITFD_FLAGS', get_option('waitfd_flags'))
context_alignment = get_option('context_alignment')
if context_alignment == 'none'
......@@ -142,14 +143,9 @@ endif
semaphore_impl = get_option('wakeup_semaphore_implementation')
conf_data.set('EMPER_' + semaphore_impl.to_upper() + '_WAKEUP_SEMAPHORE', true)
sleep_stratey = get_option('worker_sleep_strategy')
if sleep_stratey == 'semaphore'
conf_data.set('EMPER_SEMAPHORE_SLEEP_STRATEGY', true)
elif sleep_stratey == 'pipe'
conf_data.set('EMPER_PIPE_SLEEP_STRATEGY', true)
else
error('Unsupported sleep strategy')
endif
sleep_strategy = get_option('worker_sleep_strategy')
sleep_strategy_var_name = sleep_strategy.to_upper().replace('-', '_')
conf_data.set('EMPER_' + sleep_strategy_var_name + '_SLEEP_STRATEGY', true)
locked_unbounded_queue_impl = get_option('locked_unbounded_queue_implementation')
if locked_unbounded_queue_impl == 'boost_shared_mutex'
......@@ -195,7 +191,11 @@ io_bool_options = [
{'option': 'try_syscall'},
{'option': 'waitfree_stealing',
'dependencies': {'io_stealing': true, 'io_lockless_cq': true}},
{'option': 'notification',
'dependencies': {'worker_sleep_strategy': 'waitfd'}},
{'option': 'synchronous'},
{'option': 'punt_waitfd_read',
'dependencies': {'worker_sleep_strategy': 'waitfd'}},
]
io_raw_options = [
......
......@@ -63,9 +63,22 @@ option(
'worker_sleep_strategy',
type: 'combo',
description: 'The used algorithm to suspend and wakeup workers',
choices: ['semaphore', 'pipe'],
choices: ['semaphore', 'pipe', 'waitfd'],
value: 'semaphore',
)
option(
'waitfd_flags',
type: 'combo',
description: 'The flags of the used waitfd',
choices: ['none', 'accounting', 'waitFree', 'bestEffort'],
value: 'none',
)
option(
'io_punt_waitfd_read',
type: 'boolean',
description: 'Punt the sleep watfd read to the io_wq',
value: false,
)
option(
'wakeup_semaphore_implementation',
type: 'combo',
......@@ -314,6 +327,12 @@ option(
description: 'Use synchronous blocking systemcalls instead of the emper::io subsystem',
value: false,
)
option(
'io_notification',
type: 'boolean',
description: 'Let the kernel notify suspended EMPER workers about new IO completions',
value: false,
)
option(
'continuation_stealing_mode',
type: 'combo',
......