diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 82736087477b0d137cf05b4d83c6d2228209f330..c9ec2e99cb39f9aefea6f6e7ef42c5a07bfe5cc2 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -31,6 +31,7 @@ #include "io/IoContext.hpp" // for IoContext #include "io/Stats.hpp" // for emper::io::Stats #include "lib/DebugUtil.hpp" +#include "strategies/AbstractWorkStealingScheduler.hpp" #ifdef EMPER_DEFAULT_SCHEDULING_STRATEGY_WORK_STEALING #include "strategies/ws/WsStrategyFactory.hpp" @@ -253,7 +254,16 @@ void Runtime::yield() { auto Runtime::nextFiber() -> NextFiberResult { if constexpr (emper::IO) { // Schedule all fibers waiting on completed IO - IoContext::getWorkerIo()->reapAndScheduleCompletions(); + std::vector<Fiber*> completions = IoContext::getWorkerIo()->reapCompletions(); + if (!completions.empty()) { + // Keep the first and schedule the rest + Fiber* next = completions[0]; + schedule(completions.begin() + 1, completions.end()); + + // TODO: hint that this fiber comes from the IO subsystem + return NextFiberResult{ + next, static_cast<uintptr_t>(AbstractWorkStealingScheduler::FiberSource::local)}; + } } return scheduler.nextFiber(); diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 2edd7aa9c3c0c23314699f4f0c6e4b273b32050b..787c7c113c7be34767ea64289cf0a6dc2bc88cc2 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -150,6 +150,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { scheduler.schedule(fiber); } + template <class InputIt> + inline void schedule(InputIt begin, InputIt end) { + // Calling schedule() only works from within the EMPER runtime. + assert(inRuntime()); + + scheduler.schedule(begin, end); + } + inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } template <class InputIt> diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index 85fb8d80dc8b67d1b586c68764991221da6d67e6..9263f1e7416c6e7c5eacbf3079c8b60bf2885b84 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -60,6 +60,15 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { scheduleInternal(fiber); } + template <class InputIt> + void schedule(InputIt begin, InputIt end) { + for (; begin != end; ++begin) { + Fiber& fiber = **begin; + LOGD("Scheduling batched fiber " << &fiber); + scheduleInternal(fiber); + } + } + virtual auto nextFiber() -> NextFiberResult = 0; void scheduleFromAnywhere(Fiber& fiber) { diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 0a2e92f8fd9ad3b9934dea8c1c5f941a910472d8..100a11a30f04c3b1332b1723365cf38576bb4e83 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -134,12 +134,11 @@ class IoContext : public Logger<LogSubsystem::IO> { template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER> void reapAndScheduleCompletions() { auto completions = reapCompletions(); - for (auto it = completions.begin(); it != completions.end(); ++it) { - Fiber *fiber = *it; + if (!completions.empty()) { if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - runtime.schedule(*fiber); + runtime.schedule(completions.begin(), completions.end()); } else { - runtime.scheduleFromAnywhere(*fiber); + runtime.scheduleFromAnywhere(completions.begin(), completions.end()); } } }