Skip to content
Snippets Groups Projects
Commit 058ba7f3 authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'check-runnable' into 'master'

Already check if fiber is runnable in scheduliing subsystem

See merge request !197
parents 2a7615a9 8c5ed0a1
No related branches found
No related tags found
1 merge request!197Already check if fiber is runnable in scheduliing subsystem
Pipeline #62264 passed
......@@ -57,4 +57,5 @@ class Dispatcher : public Logger<LogSubsystem::DISP> {
static auto isDispatchedControlFlow() -> bool { return getCurrentFiberPtr() != nullptr; }
friend ContextManager;
friend class Scheduler;
};
......@@ -112,6 +112,8 @@ class ALIGN_TO_CACHE_LINE Fiber : public Logger<LogSubsystem::F> {
void print() const;
[[nodiscard]] auto isRunnable() const -> bool { return runnable; }
friend auto operator<<(std::ostream& strm, const Fiber& fiber) -> std::ostream&;
static inline auto from(fiber_fun_t function, void* arg) -> Fiber* {
......
......@@ -11,6 +11,7 @@ class RuntimeStrategyStats;
class RuntimeStrategy {
friend class Runtime;
friend class Scheduler;
private:
virtual auto getScheduler() -> Scheduler& = 0;
......
......@@ -4,8 +4,10 @@
#include "CallerEnvironment.hpp"
#include "Runtime.hpp"
#include "RuntimeStrategy.hpp"
Scheduler::Scheduler(Runtime& runtime) : runtime(runtime) {}
Scheduler::Scheduler(Runtime& runtime, RuntimeStrategy& strategy)
: runtime(runtime), dispatcher(strategy.getDispatcher()) {}
void Scheduler::addNewWorkerHook(const std::function<void(workerid_t)>& hook) {
runtime.addNewWorkerHook(hook);
......
......@@ -7,12 +7,14 @@
#include "CallerEnvironment.hpp"
#include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger
#include "Dispatcher.hpp"
#include "Emper.hpp"
#include "Fiber.hpp" // for Fiber
#include "emper-common.h" // for workeraffinity_t
#include "lib/adt/LockedUnboundedQueue.hpp"
class Runtime;
class RuntimeStrategy;
struct NextFiberResult;
class Scheduler : public Logger<LogSubsystem::SCHED> {
......@@ -24,7 +26,9 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
protected:
Runtime& runtime;
Scheduler(Runtime& runtime);
Dispatcher& dispatcher;
Scheduler(Runtime& runtime, RuntimeStrategy& strategy);
virtual ~Scheduler() = default;
......@@ -55,6 +59,8 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
virtual void scheduleFromAnywhereInternal(Fiber& fiber) = 0;
virtual void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) = 0;
void recycle(Fiber* fiber) { dispatcher.recycle(fiber); };
public:
void schedule(Fiber& fiber) {
LOGD("Scheduling fiber " << &fiber);
......
......@@ -7,21 +7,21 @@
#include "Common.hpp" // for unlikely, likely
#include "Debug.hpp" // for ABORT
#include "Emper.hpp" // for OVERFLOW_QUEUE
#include "Fiber.hpp"
#include "NextFiberResult.hpp"
#include "Runtime.hpp" // for Runtime
#include "emper-common.h" // for workerid_t
#include "strategies/AbstractWorkStealingStrategy.hpp"
#include "strategies/AbstractWorkStealingWorkerStats.hpp"
class Fiber;
using awss = AbstractWorkStealingStrategy;
thread_local AbstractWorkStealingScheduler::WsQueue<AbstractWorkStealingScheduler::QUEUE_SIZE>
AbstractWorkStealingScheduler::queue;
AbstractWorkStealingScheduler::AbstractWorkStealingScheduler(Runtime& runtime)
: Scheduler(runtime) {
AbstractWorkStealingScheduler::AbstractWorkStealingScheduler(Runtime& runtime,
RuntimeStrategy& strategy)
: Scheduler(runtime, strategy) {
const workerid_t workerCount = runtime.getWorkerCount();
queues = new AbstractWorkStealingScheduler::WsQueue<QUEUE_SIZE>*[workerCount];
......@@ -48,14 +48,24 @@ void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) {
onNewWork();
}
auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool {
if (fiber->isRunnable()) return false;
recycle(fiber);
return true;
}
auto AbstractWorkStealingScheduler::nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource> {
FiberSource fiberSource = FiberSource::local;
Fiber* fiber;
popBottom:
bool poped = queue.popBottom(&fiber);
if (likely(poped)) {
emper::statsIncr(awss::stats.nextFiberFromLocal);
if (maybeRecycle(fiber)) goto popBottom;
goto out;
}
......@@ -70,21 +80,27 @@ auto AbstractWorkStealingScheduler::nextFiberViaWorkStealing() -> std::pair<Fibe
// Don't steal from ourselves.
if (unlikely(victim == myWorkerId)) continue;
popTop:
poped = queues[victim]->popTop(&fiber);
if (poped) {
emper::statsIncr(awss::stats.nextFiberStolen);
if (maybeRecycle(fiber)) goto popTop;
fiberSource = FiberSource::stolen;
goto out;
}
}
}
dequeueFromAnywhere:
// Try the "scheduled from anywhere" queue to get work as last resort.
fiber = dequeueFiberFromAnywhereQueue();
if (fiber) {
emper::statsIncr(awss::stats.nextFiberFromAnywhereQueue);
if (maybeRecycle(fiber)) goto dequeueFromAnywhere;
fiberSource = FiberSource::anywhereQueue;
}
......
......@@ -17,6 +17,7 @@
class Fiber;
class Runtime;
class RuntimeStrategy;
class AbstractWorkStealingScheduler : public Scheduler {
template <size_t SIZE>
......@@ -41,10 +42,12 @@ class AbstractWorkStealingScheduler : public Scheduler {
void scheduleViaWorkStealing(Fiber& fiber);
auto maybeRecycle(Fiber* fiber) -> bool;
auto nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource>;
auto nextFiberResultViaWorkStealing() -> NextFiberResult;
public:
AbstractWorkStealingScheduler(Runtime& runtime);
AbstractWorkStealingScheduler(Runtime& runtime, RuntimeStrategy& strategy);
};
......@@ -14,7 +14,8 @@
thread_local LawsScheduler::LawsMpscQueue LawsScheduler::priorityQueue;
LawsScheduler::LawsScheduler(Runtime& runtime) : AbstractWorkStealingScheduler(runtime) {
LawsScheduler::LawsScheduler(Runtime& runtime, RuntimeStrategy& strategy)
: AbstractWorkStealingScheduler(runtime, strategy) {
const workerid_t workerCount = runtime.getWorkerCount();
priorityQueues = new LawsScheduler::LawsMpscQueue*[workerCount];
......
......@@ -8,6 +8,7 @@
#include "strategies/AbstractWorkStealingScheduler.hpp"
class Runtime;
class RuntimeStrategy;
struct NextFiberResult;
class LawsScheduler : public AbstractWorkStealingScheduler {
......@@ -27,7 +28,7 @@ class LawsScheduler : public AbstractWorkStealingScheduler {
void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) override;
public:
LawsScheduler(Runtime& runtime);
LawsScheduler(Runtime& runtime, RuntimeStrategy& strategy);
auto nextFiber() -> NextFiberResult override;
};
......@@ -13,7 +13,7 @@ thread_local LawsWorkerStats LawsStrategy::stats;
LawsStrategy::LawsStrategy(Runtime& runtime)
: AbstractWorkStealingStrategy(runtime),
scheduler(runtime),
scheduler(runtime, *this),
dispatcher(runtime),
allWorkerStats([] { return &stats; }, runtime) {}
......
......@@ -2,4 +2,5 @@
// Copyright © 2020-2021 Florian Schmaus
#include "WsScheduler.hpp"
WsScheduler::WsScheduler(Runtime& runtime) : AbstractWorkStealingScheduler(runtime) {}
WsScheduler::WsScheduler(Runtime& runtime, RuntimeStrategy& strategy)
: AbstractWorkStealingScheduler(runtime, strategy) {}
......@@ -8,6 +8,7 @@
class Fiber;
class Runtime;
class RuntimeStrategy;
class WsScheduler : public AbstractWorkStealingScheduler {
protected:
......@@ -23,7 +24,7 @@ class WsScheduler : public AbstractWorkStealingScheduler {
}
public:
WsScheduler(Runtime& runtime);
WsScheduler(Runtime& runtime, RuntimeStrategy& strategy);
auto nextFiber() -> NextFiberResult override { return nextFiberResultViaWorkStealing(); };
};
......@@ -10,7 +10,7 @@ class Runtime;
class RuntimeStrategyStats;
WsStrategy::WsStrategy(Runtime& runtime)
: AbstractWorkStealingStrategy(runtime), scheduler(runtime), dispatcher(runtime) {}
: AbstractWorkStealingStrategy(runtime), scheduler(runtime, *this), dispatcher(runtime) {}
auto WsStrategy::getScheduler() -> WsScheduler& { return scheduler; }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment