diff --git a/emper/FiberSource.cpp b/emper/FiberSource.cpp index 410813360996f82fda444e51571fe79c99acc707..06844f74470731f5a4ad897e9db43a983a3868cd 100644 --- a/emper/FiberSource.cpp +++ b/emper/FiberSource.cpp @@ -10,8 +10,8 @@ auto operator<<(std::ostream& os, const emper::FiberSource& fiberSource) -> std: switch (fiberSource) { case emper::FiberSource::local: return os << "local"; - case emper::FiberSource::inbox: - return os << "inbox"; + case emper::FiberSource::mpscQueue: + return os << "mpscQueue"; case emper::FiberSource::stolen: return os << "stolen"; case emper::FiberSource::io: diff --git a/emper/FiberSource.hpp b/emper/FiberSource.hpp index e721fe1b7542f65a52213a6598e4dfef9826fbbf..75412f41d81fbca90683e33b60840d67cf4de1fe 100644 --- a/emper/FiberSource.hpp +++ b/emper/FiberSource.hpp @@ -14,7 +14,7 @@ namespace emper { */ enum class FiberSource : uintptr_t { local, /*!< A worker's own work-stealing queue */ - inbox, /*!< A worker's own inbox or priority queue */ + mpscQueue, /*!< A worker's own mpsc queue (inbox / priority) */ stolen, /*!< A other worker's work-stealing queue */ io, /*!< A worker's own io_uring completion queue */ ioStolen, /*!< A other worker's io_uring completion queue */ diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index eea60e0d63e3a6a9a55acac85d0e04745b758916..8261c8d59f210291706550206d7b4fd4e25ef4eb 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -142,9 +142,9 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { inline void wakeupSleepingWorkers(emper::FiberHint hint) { LOGD("Wake sleepers from " << callerEnvironment << " cause new work: " << hint); switch (hint.getSource()) { - // We scheduled to an inbox where the fiber lives exclusively -> wake up the + // We scheduled to an mpsc queue where the fiber lives exclusively -> wake up the // specific worker if necessary - case emper::FiberSource::inbox: { + case emper::FiberSource::mpscQueue: { workerSleepStrategy.notifySpecific<callerEnvironment>(hint.getWorker()); } break; diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index 00f174e15626d0b6901d72e4eb66c9de1ea9a4ce..028dd892ab742c8869ee5158c5d5cbfc4061cc95 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -33,18 +33,18 @@ using emper::io::IoContext; thread_local AbstractWorkStealingScheduler::WsQueue<AbstractWorkStealingScheduler::QUEUE_SIZE> AbstractWorkStealingScheduler::queue; -thread_local AbstractWorkStealingScheduler::InboxQueue AbstractWorkStealingScheduler::inbox; +thread_local AbstractWorkStealingScheduler::MpscQueue AbstractWorkStealingScheduler::mpscQueue; AbstractWorkStealingScheduler::AbstractWorkStealingScheduler(Runtime& runtime, RuntimeStrategy& strategy) : Scheduler(runtime, strategy) { const workerid_t workerCount = runtime.getWorkerCount(); queues = new AbstractWorkStealingScheduler::WsQueue<QUEUE_SIZE>*[workerCount]; - inboxes = new AbstractWorkStealingScheduler::InboxQueue*[workerCount]; + mpscQueues = new AbstractWorkStealingScheduler::MpscQueue*[workerCount]; auto newWorkerHook = [this](workerid_t workerId) { queues[workerId] = &queue; - inboxes[workerId] = &inbox; + mpscQueues[workerId] = &mpscQueue; }; addNewWorkerHook(newWorkerHook); } @@ -69,13 +69,13 @@ void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local}); } -void AbstractWorkStealingScheduler::scheduleToInbox(Fiber& fiber, workerid_t workerId) { - inboxes[workerId]->enqueue(&fiber); - emper::statsIncr(awss::stats.scheduledFibersToInbox); +void AbstractWorkStealingScheduler::scheduleToMpscQueue(Fiber& fiber, workerid_t workerId) { + mpscQueues[workerId]->enqueue(&fiber); + emper::statsIncr(awss::stats.scheduledFibersToMpscQueue); - // Notify the runtime that we scheduled work to a specific workers' inbox + // Notify the runtime that we scheduled work to a specific workers' mpscQueue // the runtime is responsible to notify it if necessary. - onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{workerId, emper::FiberSource::inbox}); + onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{workerId, emper::FiberSource::mpscQueue}); } auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool { @@ -263,16 +263,17 @@ popBottom: return std::nullopt; } -auto AbstractWorkStealingScheduler::nextFiberResultFromInbox() -> std::optional<NextFiberResult> { - Fiber* fiber = inbox.dequeue(); - if (fiber != nullptr) return NextFiberResult{fiber, emper::FiberSource::inbox}; +auto AbstractWorkStealingScheduler::nextFiberResultFromMpscQueue() + -> std::optional<NextFiberResult> { + Fiber* fiber = mpscQueue.dequeue(); + if (fiber != nullptr) return NextFiberResult{fiber, emper::FiberSource::mpscQueue}; return std::nullopt; } -auto AbstractWorkStealingScheduler::nextFiberResultFromInboxOrWorkStealing() +auto AbstractWorkStealingScheduler::nextFiberResultFromMpscQueueOrWorkStealing() -> std::optional<NextFiberResult> { - auto result = nextFiberResultFromInbox(); + auto result = nextFiberResultFromMpscQueue(); if (result) return result; return nextFiberResultViaWorkStealing(); diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp index 74f67a17afafdc0045365b4e387961b135f31fac..7dd3198bf0b5b70b958fbe67ad3be26e39f41c0a 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.hpp +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -27,7 +27,7 @@ class AbstractWorkStealingScheduler : public Scheduler { #else using WsQueue = adt::WsClQueue<Fiber*, SIZE>; #endif - using InboxQueue = adt::MpscQueue<Fiber>; + using MpscQueue = adt::MpscQueue<Fiber>; public: static const int QUEUE_SIZE = 1024; @@ -40,18 +40,18 @@ class AbstractWorkStealingScheduler : public Scheduler { WsQueue<QUEUE_SIZE>** queues; static thread_local WsQueue<QUEUE_SIZE> queue; - InboxQueue** inboxes; - static thread_local InboxQueue inbox; + MpscQueue** mpscQueues; + static thread_local MpscQueue mpscQueue; void scheduleViaWorkStealing(Fiber& fiber); - void scheduleToInbox(Fiber& fiber, workerid_t workerId); + void scheduleToMpscQueue(Fiber& fiber, workerid_t workerId); auto maybeRecycle(Fiber* fiber) -> bool; - // This method is static because it only uses the thread_local inbox - static auto nextFiberResultFromInbox() -> std::optional<NextFiberResult>; + // This method is static because it only uses the thread_local mpscQueue + static auto nextFiberResultFromMpscQueue() -> std::optional<NextFiberResult>; auto nextFiberResultViaWorkStealing() -> std::optional<NextFiberResult>; - auto nextFiberResultFromInboxOrWorkStealing() -> std::optional<NextFiberResult>; + auto nextFiberResultFromMpscQueueOrWorkStealing() -> std::optional<NextFiberResult>; public: AbstractWorkStealingScheduler(Runtime& runtime, RuntimeStrategy& strategy); diff --git a/emper/strategies/AbstractWorkStealingStats.cpp b/emper/strategies/AbstractWorkStealingStats.cpp index 3a715ec2d7e1cff4fd4f40415c1cc9bf236644bb..af7b74ee83e34238c09833be21638f993d4c2428 100644 --- a/emper/strategies/AbstractWorkStealingStats.cpp +++ b/emper/strategies/AbstractWorkStealingStats.cpp @@ -21,8 +21,8 @@ void AbstractWorkStealingStats::print(std::ostream& out) { << std::to_string(comulatedWorkerStats.scheduledFibersToLocal) << std::endl << "total-scheduled-fibers-to-overflow-queue: " << std::to_string(comulatedWorkerStats.scheduledFibersToOverflowQueue) << std::endl - << "total-scheduled-fibers-to-inbox: " - << std::to_string(comulatedWorkerStats.scheduledFibersToInbox) << std::endl + << "total-scheduled-fibers-to-mpsc-queue: " + << std::to_string(comulatedWorkerStats.scheduledFibersToMpscQueue) << std::endl << "global-max-queue-length: " << std::to_string(comulatedWorkerStats.maxQueueLength) << std::endl << "total-next-fiber-from-local: " << std::to_string(comulatedWorkerStats.nextFiberFromLocal) diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.cpp b/emper/strategies/AbstractWorkStealingWorkerStats.cpp index 68375ca77607d23e80d8649cd288c40649d51d98..ca3ef250b10adbb8a38dd09e9bf851b990b8c9ae 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.cpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.cpp @@ -8,7 +8,7 @@ auto AbstractWorkStealingWorkerStats::operator+=(const AbstractWorkStealingWorke -> AbstractWorkStealingWorkerStats& { scheduledFibersToLocal += other.scheduledFibersToLocal; scheduledFibersToOverflowQueue += other.scheduledFibersToOverflowQueue; - scheduledFibersToInbox += other.scheduledFibersToInbox; + scheduledFibersToMpscQueue += other.scheduledFibersToMpscQueue; maxQueueLength = std::max(maxQueueLength, other.maxQueueLength); nextFiberFromLocal += other.nextFiberFromLocal; nextFiberFromHintLocal += other.nextFiberFromHintLocal; diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.hpp b/emper/strategies/AbstractWorkStealingWorkerStats.hpp index a6ae44e7e493c4bb40b96828f8b3dbaff8d2f08b..e8e25bef9e692d550ec21a722cbf23adbb67d835 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.hpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.hpp @@ -15,7 +15,7 @@ class AbstractWorkStealingWorkerStats { public: uint64_t scheduledFibersToLocal = 0; uint64_t scheduledFibersToOverflowQueue = 0; - uint64_t scheduledFibersToInbox = 0; + uint64_t scheduledFibersToMpscQueue = 0; uint64_t maxQueueLength = 0; uint64_t nextFiberFromLocal = 0; uint64_t nextFiberFromHintLocal = 0; diff --git a/emper/strategies/laws/LawsDispatcher.cpp b/emper/strategies/laws/LawsDispatcher.cpp index 0c8efb3f8e07175756d419e2d82526c80edc60b4..7980f7c732cec4f466648c0645dfcff62e7f423a 100644 --- a/emper/strategies/laws/LawsDispatcher.cpp +++ b/emper/strategies/laws/LawsDispatcher.cpp @@ -41,7 +41,7 @@ void LawsDispatcher::dispatchLoop() { if (isRunnable(fiber)) { if constexpr (emper::STATS) { switch (next->source) { - case emper::FiberSource::inbox: + case emper::FiberSource::mpscQueue: LawsStrategy::stats.dispatchedFibersFromPriority++; break; case emper::FiberSource::local: diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 82cb3c02216724d85a89807397b1d1b7f865016b..9edcf3cd2930711e6a345c15c61114787356cc55 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -63,5 +63,5 @@ void LawsScheduler::scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) } auto LawsScheduler::nextFiber() -> std::optional<NextFiberResult> { - return nextFiberResultFromInboxOrWorkStealing(); + return nextFiberResultFromMpscQueueOrWorkStealing(); } diff --git a/emper/strategies/laws/LawsScheduler.hpp b/emper/strategies/laws/LawsScheduler.hpp index 4a5f9bc9e2199a436260dc453f15532ba9780e62..cc69ab7ac57b41fc0ab39cd789972b6bdcefceef 100644 --- a/emper/strategies/laws/LawsScheduler.hpp +++ b/emper/strategies/laws/LawsScheduler.hpp @@ -18,7 +18,7 @@ class LawsScheduler : public AbstractWorkStealingScheduler { using LawsMpscQueue = adt::MpscQueue<Fiber>; private: - LawsMpscQueue**& priorityQueues = AbstractWorkStealingScheduler::inboxes; + LawsMpscQueue**& priorityQueues = AbstractWorkStealingScheduler::mpscQueues; template <CallerEnvironment callerEnvironment> void tryScheduleToPriorityQueue(Fiber& fiber); @@ -26,7 +26,7 @@ class LawsScheduler : public AbstractWorkStealingScheduler { protected: void scheduleInternal(Fiber& fiber) override; void scheduleOnInternal(Fiber& fiber, workerid_t workerId) override { - scheduleToInbox(fiber, workerId); + scheduleToMpscQueue(fiber, workerId); } void scheduleFromAnywhereInternal(Fiber& fiber) override; void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) override; diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index 5aa22c1b0e06e25c655c0993ecd69241b68e260c..af94eaa074eecad47a17abd5ab36e749f1468933 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -19,7 +19,7 @@ class WsScheduler : public AbstractWorkStealingScheduler { protected: void scheduleInternal(Fiber& fiber) override { scheduleViaWorkStealing(fiber); } void scheduleOnInternal(Fiber& fiber, workerid_t workerId) override { - scheduleToInbox(fiber, workerId); + scheduleToMpscQueue(fiber, workerId); } void scheduleFromAnywhereInternal(Fiber& fiber) override { enqueueInAnywhereQueue(fiber); @@ -35,6 +35,6 @@ class WsScheduler : public AbstractWorkStealingScheduler { WsScheduler(Runtime& runtime, RuntimeStrategy& strategy); auto nextFiber() -> std::optional<NextFiberResult> override { - return nextFiberResultFromInboxOrWorkStealing(); + return nextFiberResultFromMpscQueueOrWorkStealing(); }; };