Commit 9527d2f4 authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'io-stealing' into 'master'

[RFC] IO-stealing analogue to work-stealing

See merge request i4/manycore/emper!260
parents a8cc5c53 67b0c77a
Pipeline #70165 passed with stages
in 38 minutes and 40 seconds
......@@ -6,6 +6,8 @@
/.cache/
/.clangd/
tools/gdb/__pycache__/
subprojects/packagecache/
subprojects/googletest*
subprojects/liburing*
......@@ -113,6 +113,14 @@ clang-tidy:
variables:
EMPER_IO_SINGLE_URING: 'true'
.emper-io-stealing:
variables:
EMPER_IO_STEALING: 'true'
.emper-lockless-cq:
variables:
EMPER_IO_LOCKLESS_CQ: 'true'
.default-library-static:
variables:
EMPER_DEFAULT_LIBRARY: 'static'
......@@ -309,3 +317,34 @@ test-pipe-sleep-strategy-no-completer:
- .test
- .emper-pipe-sleep-strategy
- .emper-no-completer
test-lockless-cq:
extends:
- .test
- .emper-lockless-cq
test-io-stealing:
extends:
- .test
- .emper-io-stealing
test-lockless-io-stealing:
extends:
- .test
- .emper-io-stealing
- .emper-lockless-cq
test-io-stealing-pipe-no-completer:
extends:
- .test
- .emper-pipe-sleep-strategy
- .emper-no-completer
- .emper-io-stealing
test-io-stealing-pipe-no-completer-lockless:
extends:
- .test
- .emper-pipe-sleep-strategy
- .emper-no-completer
- .emper-io-stealing
- .emper-lockless-cq
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include "CallerEnvironment.hpp"
#include <iostream>
#include "Common.hpp"
auto operator<<(std::ostream& os, const CallerEnvironment& callerEnvironment) -> std::ostream& {
switch (callerEnvironment) {
case OWNER:
return os << "OWNER";
case EMPER:
return os << "EMPER";
case ANYWHERE:
return os << "ANYWHERE";
default:
DIE_MSG("Unknown CallerEnvironment");
}
}
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <iostream>
/*! Enum representing the different environments where code can be executed */
enum CallerEnvironment {
EMPER,
ANYWHERE,
OWNER, /*!< indicate code executed by the worker owning the object */
EMPER, /*!< indicate code executed by any worker */
ANYWHERE, /*!< indicate code executed outside of any worker */
};
auto operator<<(std::ostream& os, const CallerEnvironment& callerEnvironment) -> std::ostream&;
......@@ -105,6 +105,30 @@ static const bool IO =
#endif
;
static const bool IO_STEALING =
#ifdef EMPER_IO_STEALING
true
#else
false
#endif
;
static const bool IO_LOCKLESS_CQ =
#ifdef EMPER_IO_LOCKLESS_CQ
true
#else
false
#endif
;
enum class IoLocklessMemoryOrder {
weak,
strong,
};
static const enum IoLocklessMemoryOrder IO_LOCKLESS_MEMORY_ORDER =
IoLocklessMemoryOrder::EMPER_IO_LOCKLESS_MEMORY_ORDER;
static const bool IO_SINGLE_URING =
#ifdef EMPER_IO_SINGLE_URING
true
......
......@@ -340,7 +340,8 @@ auto Runtime::nextFiber() -> NextFiberResult {
if constexpr (emper::IO_WORKER_URING) {
// Schedule all fibers waiting on completed IO
IoContext::ContinuationBuffer completions;
unsigned ncompletions = IoContext::getWorkerIo()->reapCompletions(completions);
unsigned ncompletions =
IoContext::getWorkerIo()->reapCompletions<CallerEnvironment::OWNER>(completions);
if (ncompletions > 0) {
// Keep the first and schedule the rest
Fiber* next = completions[0];
......
......@@ -213,6 +213,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
void executeAndWait(std::function<void()> f);
friend class AbstractWorkStealingScheduler;
template <LogSubsystem>
friend class Blockable;
friend ContextManager;
......
......@@ -7,6 +7,7 @@
#include <sys/eventfd.h> // for eventfd
#include <unistd.h> // for close
#include <algorithm>
#include <array>
#include <atomic> // for atomic, __atomic_base
#include <cassert> // for assert
......@@ -22,7 +23,6 @@
#include "Common.hpp" // for unlikely, DIE_MSG_ERRNO, DIE_MSG
#include "Debug.hpp" // for LOGD
#include "Emper.hpp" // for DEBUG, IO_URING_SQPOLL
#include "Fiber.hpp"
#include "Runtime.hpp"
#include "emper-common.h"
#include "io/Future.hpp" // for Future, operator<<, Future::State
......@@ -122,7 +122,7 @@ auto IoContext::submitPreparedSqesAndWait(unsigned wait_nr) -> unsigned {
TIME_NS(
{
do {
reapAndScheduleCompletions();
reapAndScheduleCompletions<callerEnvironment>();
} while ((submitted = io_uring_submit(&ring)) == -EBUSY);
},
stats.record_io_submit_full_cq);
......@@ -218,36 +218,155 @@ template void IoContext::submitAndWait<CallerEnvironment::ANYWHERE>(Future &futu
unsigned wait_nr);
template <CallerEnvironment callerEnvironment>
auto IoContext::reapCompletions(ContinuationBuffer &continuationFibers) -> unsigned {
auto IoContext::reapCompletionsLockless(Fiber **continuations, unsigned toReap) -> unsigned {
// Configurable memory order for the atomic operations
constexpr auto LL_READ_MEM_ORDER =
emper::IO_LOCKLESS_MEMORY_ORDER == emper::IoLocklessMemoryOrder::weak
? std::memory_order_acquire
: std::memory_order_seq_cst;
constexpr auto LL_WRITE_MEM_ORDER =
emper::IO_LOCKLESS_MEMORY_ORDER == emper::IoLocklessMemoryOrder::weak
? std::memory_order_release
: std::memory_order_seq_cst;
std::array<Completion, CQE_BATCH_COUNT> reapedCompletions;
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
stats.recordStealAttempt();
}
struct io_uring_cq *cq = &ring.cq;
const unsigned mask = *cq->kring_mask;
auto *atail = reinterpret_cast<std::atomic<unsigned> *>(cq->ktail);
auto *ahead = reinterpret_cast<std::atomic<unsigned> *>(cq->khead);
// NOTE: just using head for the CAS introduces a possible ABA problem
// if the unsigned head counter overflows during the read and the CAS.
// Load possibly concurrently used userspace written head pointer
unsigned head = ahead->load(LL_READ_MEM_ORDER);
unsigned count;
do {
// Load concurrently used kernel written tail pointer
unsigned tail = atail->load(LL_READ_MEM_ORDER);
// NOTE: This number may already be wrong during its calculation
unsigned ready = tail - head;
if (!ready) return 0;
count = std::min(toReap, ready);
for (unsigned i = 0; i < count; ++i) {
const struct io_uring_cqe *cqe = &cq->cqes[(head + i) & mask];
void *cqe_data = io_uring_cqe_get_data(cqe);
// Only the owner is allowed to reap new work notifications
if constexpr (callerEnvironment != CallerEnvironment::OWNER) {
const TaggedPtr tptr(cqe_data);
const auto tag = static_cast<PointerTags>(tptr.getTag());
if (tag == PointerTags::NewWorkWsq || tag == PointerTags::NewWorkAq) {
// don't consume the new work notification
if (i == 0) return 0;
// Since i starts at 0 using i as count is correct.
// If i = 1 this means we are at the second cqe and
// count = i = 1 will consume only the first cqe.
count = i;
break;
}
}
auto &reapedCompletion = reapedCompletions[i];
reapedCompletion.first = cqe->res;
reapedCompletion.second = cqe_data;
}
// TODO: think about the correct memory ordering constraints
} while (
!ahead->compare_exchange_weak(head, head + count, LL_WRITE_MEM_ORDER, LL_READ_MEM_ORDER));
LOGD("got " << count << " cqes from worker " << worker->getWorkerId() << "'s io_uring");
if constexpr (emper::DEBUG) {
assert(count <= reqs_in_uring);
reqs_in_uring -= count;
}
// Reaps done by other worker threads are counted in the AbstractWorkStealingStats
// as stolenIoFibers.
stats.record_reaps<callerEnvironment>(count);
return getContinuationsFromCompletions<callerEnvironment>(reapedCompletions.data(), count,
continuations);
}
// Show the compiler our template incarnations
template auto IoContext::reapCompletionsLockless<CallerEnvironment::OWNER>(Fiber **continuations,
unsigned toReap)
-> unsigned;
template auto IoContext::reapCompletionsLockless<CallerEnvironment::EMPER>(Fiber **continuations,
unsigned toReap)
-> unsigned;
template auto IoContext::reapCompletionsLockless<CallerEnvironment::ANYWHERE>(Fiber **contiunations,
unsigned toReap)
-> unsigned;
template <CallerEnvironment callerEnvironment>
auto IoContext::reapCompletionsLocked(Fiber **continuations, unsigned toReap) -> unsigned {
// TODO: should only the owner possibly rereap?
constexpr bool checkForRereap = callerEnvironment == CallerEnvironment::OWNER;
unsigned reReapCount = 0;
using Completion = std::pair<int32_t, void *>;
// vector to store seen cqes to make the critical section
// array to store cqe* needed by liburing
std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes;
// array to store seen cqes to make the critical section
// where cq_lock is held as small as possible
std::array<Completion, CQE_BATCH_COUNT> reapedCompletions;
// never reap completions on the global IoContext
assert(this != runtime.globalIo);
LOGD("Reaping completions for worker " << std::to_string(worker->getWorkerId()));
// this label is not used for callerEnvironment::ANYWHERE and thus has to be
// annotated with ATTR_UNUSED
reap_cqes:
ATTR_UNUSED;
std::array<struct io_uring_cqe *, CQE_BATCH_COUNT> cqes;
if constexpr (needsCqLock) {
// Someone else is currently reaping completions
if (unlikely(!cq_lock.try_lock())) {
LOGD("unsuccessful try_lock");
return 0;
}
// Number of actual continuation fibers resulting from the reaped CQEs
unsigned continuationsCount = 0;
if constexpr (callerEnvironment == CallerEnvironment::ANYWHERE) {
// TODO: Is using a try lock and the waitInflight flag here even sound?
// Coudn't it be possible to have a lost wakeup with unconsumed new work notification
// cqe in our CQ
//
// State only a single worker does work involving IO and another (completer, io-stealing
// worker accesses its CQ.
// Other Owner
// | submit IO
// | lock
// | prepare to sleep
// | set flag
// | unlock
// | sleep
// lock |
// | try lock unsucessfull
// | sleep again
// check flag |
// unlock |
if constexpr (needsCqLock) {
// The Owner always takes the lock to reap all completions and especially
// new work notifications and prevent the above discribed problem.
if constexpr (callerEnvironment == CallerEnvironment::OWNER) {
cq_lock.lock();
} else {
// Someone else is currently reaping completions
if (unlikely(!cq_lock.try_lock())) {
LOGD("unsuccessful try_lock from " << callerEnvironment);
return 0;
}
// We have to check the waitInflight flag with the cq_lock held to
// ensure we observe an update by the worker holding the lock.
// Otherwise this could happen:
// C W
// Other Owner
// | |
// | lock
// | prepare to sleep
// check flag |
......@@ -255,10 +374,10 @@ reap_cqes:
// | unlock
// lock |
// Which results in the Completer possible consuming new work notifications.
// Which results in the Other possible consuming new work notifications.
// We must not reap completions of this IoContext to not race
// with the sleeping worker.
// with the sleeping owner.
if (waitInflight.load(std::memory_order_acquire)) {
LOGD("Not reaping worker " << std::to_string(worker->getWorkerId())
<< " since this worker is already waiting for its CQEs");
......@@ -268,7 +387,22 @@ reap_cqes:
}
}
unsigned count = io_uring_peek_batch_cqe(&ring, cqes.data(), CQE_BATCH_COUNT);
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
stats.recordStealAttempt();
}
unsigned count = io_uring_peek_batch_cqe(&ring, cqes.data(), toReap);
if (!count) {
if constexpr (needsCqLock) {
cq_lock.unlock();
}
if constexpr (checkForRereap) {
goto check_for_rereap;
}
return 0;
}
for (unsigned i = 0; i < count; ++i) {
struct io_uring_cqe *cqe = cqes[i];
......@@ -292,78 +426,31 @@ reap_cqes:
reqs_in_uring -= count;
}
// Reaps done by other worker threads are counted in the AbstractWorkStealingStats
// as stolenIoFibers.
stats.record_reaps<callerEnvironment>(count);
continuationsCount = getContinuationsFromCompletions<callerEnvironment>(reapedCompletions.data(),
count, continuations);
// A naive try lock protecting a worker's IoContext's cq is racy.
// While a worker is holding the lock additional completions could arrive
// which the worker does not observe because it could be already finished iterating.
// In the case that the worker still holds the lock preventing the globalCompleter
// In the case that the worker still holds the lock preventing others
// from reaping the additional completions we have a lost wakeup possibly leading
// to a completely sleeping runtime with runnable completions in a worker's IoContext.
// To prevent this race we check the CQ again for new cqes after we already
// dropped the CQ lock and possibly reap again.
stats.record_reaps<callerEnvironment>(count);
unsigned posInBuf = 0;
for (unsigned i = 0; i < count; ++i) {
auto &completion = reapedCompletions[i];
auto res = completion.first;
auto *cqe_data = completion.second;
TaggedPtr tptr(cqe_data);
// Got a CQE for a forgotten Future.
if (!tptr) {
continue;
}
auto tag = static_cast<PointerTags>(tptr.getTag());
switch (tag) {
case PointerTags::NewWorkWsq:
case PointerTags::NewWorkAq: {
auto &sleepStrategy =
reinterpret_cast<PipeSleepStrategy &>(runtime.getWorkerSleepStrategy());
sleepStrategy.onNewWorkNotification<callerEnvironment>(*this);
break;
}
case PointerTags::Callback: {
auto *callback = tptr.getPtr<Future::CallbackInternal>();
LOGD("Create new callback fiber for " << callback);
auto *callbackFiber = Fiber::from(
[&c = *callback, res] {
c(res);
delete &c;
},
&callback->affinity);
continuationFibers[posInBuf++] = callbackFiber;
} break;
case PointerTags::Future: {
auto *future = tptr.getPtr<Future>();
// assert that the future was previously in the uringFutureSet
assert(uringFutureSet.erase(future) > 0);
future->recordCompletion(stats, res);
Fiber *continuation = future->completeAndGetContinuation(res);
if (continuation) {
continuationFibers[posInBuf++] = continuation;
}
} break;
default:
DIE_MSG("Unknown pointer tag encountered: " << (int)tag);
break;
}
}
// check if we missed new cqes
// TODO: should only the worker recheck?
if constexpr (callerEnvironment == CallerEnvironment::EMPER) {
bool reReap = io_uring_cq_ready(&ring) != 0;
// Check if we missed new cqes.
check_for_rereap:
ATTR_UNUSED;
if constexpr (checkForRereap) {
bool reReap = cqeCount() != 0;
if (reReap) {
// schedule all already collected continuation fibers
runtime.schedule(continuationFibers.data(), posInBuf);
runtime.schedule(continuations, continuationsCount);
reReapCount++;
......@@ -374,15 +461,19 @@ reap_cqes:
stats.record_reReapCount(reReapCount);
}
return posInBuf;
return continuationsCount;
}
// Show the compiler our template incarnations this is needed again because
// reapCompletions<CallerEnvironment::ANYWHERE> is now called from GlobalIoContext.cpp
template auto IoContext::reapCompletions<CallerEnvironment::ANYWHERE>(
ContinuationBuffer &contiunationFibers) -> unsigned;
template auto IoContext::reapCompletions<CallerEnvironment::EMPER>(
ContinuationBuffer &continuationFibers) -> unsigned;
// Show the compiler our template incarnations
template auto IoContext::reapCompletionsLocked<CallerEnvironment::OWNER>(Fiber **continuations,
unsigned toReap)
-> unsigned;
template auto IoContext::reapCompletionsLocked<CallerEnvironment::EMPER>(Fiber **continuations,
unsigned toReap)
-> unsigned;
template auto IoContext::reapCompletionsLocked<CallerEnvironment::ANYWHERE>(Fiber **contiunations,
unsigned toReap)
-> unsigned;
IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) {
struct io_uring_params params;
......@@ -443,4 +534,31 @@ IoContext::~IoContext() {
delete submitter;
}
auto IoContext::getSqHead() const -> unsigned { return *ring.sq.khead; }
auto IoContext::getSqTail() const -> unsigned { return *ring.sq.ktail; }
auto IoContext::getSqEntries() const -> unsigned { return *ring.sq.kring_entries; }
auto IoContext::getSqFlags() const -> unsigned { return *ring.sq.kflags; }
auto IoContext::getCqHead() const -> unsigned { return *ring.cq.khead; }
auto IoContext::getCqHeadSafe() const -> unsigned {
return reinterpret_cast<std::atomic<unsigned> *>(ring.cq.khead)->load();
}
auto IoContext::getCqTail() const -> unsigned { return *ring.cq.ktail; }
auto IoContext::getCqTailSafe() const -> unsigned {
return reinterpret_cast<std::atomic<unsigned> *>(ring.cq.ktail)->load();
}
auto IoContext::getCqEntries() const -> unsigned { return *ring.cq.kring_entries; }
auto IoContext::getCqe(unsigned i) const -> struct io_uring_cqe {
const unsigned mask = *ring.cq.kring_mask;
const unsigned head = getCqHead();
return ring.cq.cqes[(head + i) & mask];
}
auto IoContext::getCqFlags() const -> unsigned {
return *ring.sq.kflags;
}
} // namespace emper::io
......@@ -2,7 +2,8 @@
// Copyright © 2020-2021 Florian Fischer
#pragma once
#include <liburing.h> // for io_uring
#include <liburing.h>
#include <liburing/io_uring.h>
#include <array>
#include <atomic> // for atomic
......@@ -12,26 +13,27 @@
#include <functional> // for less
#include <iterator>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include "CallerEnvironment.hpp" // for CallerEnvironment, EMPER
#include "Common.hpp"
#include "Debug.hpp" // for LogSubsystem, LogSubsystem::IO, Logger
#include "Emper.hpp"
#include "Fiber.hpp"
#include "Runtime.hpp" // for Runtime
#include "Worker.hpp"
#include "emper-config.h"
#include "io/Future.hpp"
#include "io/Stats.hpp"
#include "io/SubmitActor.hpp" // IWYU pragma: keep
#include "lib/adt/LockedSet.hpp" // for LockedSet
#include "io/SubmitActor.hpp" // IWYU pragma: keep
#include "lib/TaggedPtr.hpp"
#include "lib/adt/LockedSet.hpp" // for LockedSet
#include "sleep_strategy/PipeSleepStrategy.hpp" // IWYU pragma: keep
class AbstractWorkStealingScheduler;
class Fiber;
namespace emper::sleep_strategy {
class PipeSleepStrategy;
}
namespace emper::io {
class IoContext : public Logger<LogSubsystem::IO> {
......@@ -47,13 +49,36 @@ class IoContext : public Logger<LogSubsystem::IO> {
friend class emper::sleep_strategy::PipeSleepStrategy;
// Debug functions to access the mmaped memory of ring.
// gdb is not allowed to access the io mmaped memory of the io_uring fd.
// https://stackoverflow.com/questions/67451177/why-cant-gdb-read-io-uring-cqe-contents
auto getSqHead() const -> unsigned;
auto getSqTail() const -> unsigned;
auto getSqEntries() const -> unsigned;
auto getSqFlags() const -> unsigned;
auto getCqHead() const -> unsigned;
auto getCqHeadSafe() const -> unsigned;
auto getCqTail() const -> unsigned;
auto getCqTailSafe() const -> unsigned;
auto getCqEntries() const -> unsigned;
auto getCqe(unsigned i) const -> struct io_uring_cqe;
auto getCqFlags() const -> unsigned;
protected:
// Remember the Runtime which created the IoContext
Runtime &runtime;
static thread_local IoContext *workerIo;
// We must synchronize the CQ if it is accessed by multiple threads.
// This is the case if we use a completer or if other workers try to steal IO.
static constexpr bool needsCqLock =
emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none;
(emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) || (emper::IO_STEALING);
// Are we synchronizing the CQs lockfree
static constexpr bool locklessCq = needsCqLock && emper::IO_LOCKLESS_CQ;
// TryLock protecting the completion queue of ring.
CACHE_LINE_EXCLUSIVE(std::mutex, cq_lock);
struct io_uring ring;
......@@ -164,6 +189,82 @@ class IoContext : public Logger<LogSubsystem::IO> {
}
}
/** The for now relevant data contained in a cqe */
using Completion = std::pair<int32_t, void *>;