diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index de4ff849689cdedb5d1a81146365f6c50fefbe89..bec6efd465b7dd67da1587f8ffa2cca7132c1849 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -4,21 +4,19 @@ #include <algorithm> // for copy, copy_backward #include <functional> // for function -#include <mutex> -#include <queue> #include "CallerEnvironment.hpp" #include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger #include "Emper.hpp" #include "Fiber.hpp" // for Fiber #include "emper-common.h" // for workeraffinity_t +#include "lib/adt/LockedUnboundedQueue.hpp" class Runtime; class Scheduler : public Logger<LogSubsystem::SCHED> { private: - std::queue<Fiber*> scheduleAnywhereQueue; - std::mutex scheduleAnywhereQueueMutex; + lib::adt::LockedUnboundedQueue<Fiber> scheduleAnywhereQueue; void wakeupSleepingWorkersIfSleepingObserved(); @@ -47,14 +45,7 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { } } - auto dequeFiberFromAnywhereQueue() -> Fiber* { - std::lock_guard<std::mutex> lock(scheduleAnywhereQueueMutex); - if (scheduleAnywhereQueue.empty()) return nullptr; - - Fiber* res = scheduleAnywhereQueue.front(); - scheduleAnywhereQueue.pop(); - return res; - } + auto dequeFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); } public: virtual void schedule(Fiber& fiber) = 0; @@ -62,10 +53,7 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { virtual auto nextFiber() -> Fiber* = 0; void scheduleFromAnywhere(Fiber& fiber) { - { - std::lock_guard<std::mutex> lock(scheduleAnywhereQueueMutex); - scheduleAnywhereQueue.push(&fiber); - } + scheduleAnywhereQueue.enqueue(&fiber); onNewWork<CallerEnvironment::ANYWHERE>(); } diff --git a/emper/lib/adt/LockedUnboundedQueue.hpp b/emper/lib/adt/LockedUnboundedQueue.hpp index 3a33dc62aae654df274924ef040e5361f33b8c7d..1d76cd742c1631edaceea5e9c1f3edc4b2f17fdb 100644 --- a/emper/lib/adt/LockedUnboundedQueue.hpp +++ b/emper/lib/adt/LockedUnboundedQueue.hpp @@ -2,32 +2,32 @@ // Copyright © 2020 Florian Schmaus #pragma once -#include <deque> #include <mutex> +#include <queue> -namespace adt { +namespace lib::adt { template <typename I> class LockedUnboundedQueue { private: std::mutex queue_mutex; - std::deque<I*> deque; + std::queue<I*> queue; public: void enqueue(I* item) { std::lock_guard<std::mutex> lock(queue_mutex); - deque.push_front(item); + queue.push(item); } auto dequeue() -> I* { std::lock_guard<std::mutex> lock(queue_mutex); - if (deque.empty()) { + if (queue.empty()) { return nullptr; } - I* res = deque.back(); - deque.pop_back(); + I* res = queue.front(); + queue.pop(); return res; } }; -} // namespace adt +} // namespace lib::adt