diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index bb1d34bfed6dbf92d228c75265ec0c25ff986788..38b9e084665cb787acdc50cfdab59d0a5b4a7491 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -2,6 +2,7 @@ // Copyright © 2020-2021 Florian Schmaus #pragma once +#include <cstddef> #include <functional> // for function #include <ostream> @@ -53,7 +54,10 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { scheduleAnywhereQueue.insert(fibers, count); } - auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); } + template <class OutputIt> + auto dequeueFibersFromAnywhereQueue(OutputIt begin, OutputIt end) -> size_t { + return scheduleAnywhereQueue.dequeue(begin, end); + } virtual void scheduleInternal(Fiber& fiber) = 0; virtual void scheduleFromAnywhereInternal(Fiber& fiber) = 0; diff --git a/emper/io/Stats.hpp b/emper/io/Stats.hpp index 5114800ee26424b71ef80f2312087a45c55bd126..f1854391b5d81eac0d9b94f1c97217f5d9159f06 100644 --- a/emper/io/Stats.hpp +++ b/emper/io/Stats.hpp @@ -12,7 +12,6 @@ #include <cstdlib> // for abort #include <iostream> // for operator<<, basic_ostream, basi... #include <map> // for map, map<>::value_compare -#include <string> #include <vector> #include "CallerEnvironment.hpp" diff --git a/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp b/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp index 939a092c8c893ad7a438310d8c57722c93e94bbe..fb1ab1b508146eaf404d1a758f0635d6f1777b71 100644 --- a/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp +++ b/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp @@ -46,5 +46,25 @@ class BoostSharedMutexUnboundedQueue { queue.pop(); return res; } + + template <class OutputIt> + auto dequeue(OutputIt begin, OutputIt end) -> size_t { + OutputIt cur = begin; + { + boost::upgrade_lock<boost::shared_mutex> rlock(queue_mutex); + if (queue.empty()) { + return 0; + } + + { + boost::upgrade_to_unique_lock<boost::shared_mutex> wlock(rlock); + for (; !queue.empty() && cur != end; ++cur) { + *cur = queue.front(); + queue.pop(); + } + } + } + return cur - begin; + } }; } // namespace lib::adt diff --git a/emper/lib/adt/LockedQueue.hpp b/emper/lib/adt/LockedQueue.hpp index 53264aff6d90525f419c63450b6ac410e21b00ec..a1a86180f0d320d8298c2deaa12d1c45ede42b41 100644 --- a/emper/lib/adt/LockedQueue.hpp +++ b/emper/lib/adt/LockedQueue.hpp @@ -24,6 +24,11 @@ class LockedQueue { return deque.size() == SIZE; } + auto freeSlots() -> size_t { + std::lock_guard<std::mutex> lock(queue_mutex); + return SIZE - deque.size(); + } + auto pushBottom(const I item) -> bool { std::lock_guard<std::mutex> lock(queue_mutex); diff --git a/emper/lib/adt/MutexUnboundedQueue.hpp b/emper/lib/adt/MutexUnboundedQueue.hpp index f146c007feed1ca827a4a1955036915f87fb9d4b..fe0557e126825a3d69dc7ac807fc1ed10fc3f807 100644 --- a/emper/lib/adt/MutexUnboundedQueue.hpp +++ b/emper/lib/adt/MutexUnboundedQueue.hpp @@ -44,5 +44,18 @@ class MutexUnboundedQueue { queue.pop(); return res; } + + template <class OutputIt> + auto dequeue(OutputIt begin, OutputIt end) -> size_t { + OutputIt cur = begin; + { + std::lock_guard<std::mutex> lock(queue_mutex); + for (; !queue.empty() && cur != end; ++cur) { + *cur = queue.front(); + queue.pop(); + } + } + return cur - begin; + } }; } // namespace lib::adt diff --git a/emper/lib/adt/RwLockUnboundedQueue.hpp b/emper/lib/adt/RwLockUnboundedQueue.hpp index 7e887ded45e5333e547c1ae2cbac2f98113d52b7..b52b938e0270a57332f670f9576ef57e43482e92 100644 --- a/emper/lib/adt/RwLockUnboundedQueue.hpp +++ b/emper/lib/adt/RwLockUnboundedQueue.hpp @@ -98,5 +98,42 @@ class RwLockUnboundedQueue { pthread_rwlock_unlock(&lock); return res; } + + template <class OutputIt> + auto dequeue(OutputIt begin, OutputIt end) -> size_t { + int err = pthread_rwlock_rdlock(&lock); + if (unlikely(err)) { + DIE_MSG("pthread_rwlock_rdlock failed: " << strerror(err)); + } + + OutputIt cur = begin; + if (queue.empty()) { + goto unlock; + } + + // try to upgrade to wrlock + err = pthread_rwlock_trywrlock(&lock); + if (err) { + if (unlikely(err != EBUSY)) { + DIE_MSG("pthread_rwlock_trylock failed: " << strerror(err)); + } + + // drop the read lock and aquire a write lock + aquire_wrlock(lock); + + if (queue.empty()) { + goto unlock; + } + } + + for (; !queue.empty() && cur != end; ++cur) { + *cur = queue.front(); + queue.pop(); + } + + unlock: + pthread_rwlock_unlock(&lock); + return cur - begin; + } }; } // namespace lib::adt diff --git a/emper/lib/adt/SharedMutexUnboundedQueue.hpp b/emper/lib/adt/SharedMutexUnboundedQueue.hpp index c3a7864453dc23f580a52dbf507ad20c9c4d42de..5f98ff0c60caf780f80621b1060847186f1d440e 100644 --- a/emper/lib/adt/SharedMutexUnboundedQueue.hpp +++ b/emper/lib/adt/SharedMutexUnboundedQueue.hpp @@ -51,5 +51,23 @@ class SharedMutexUnboundedQueue { queue.pop(); return res; } + + template <class OutputIt> + auto dequeue(OutputIt begin, OutputIt end) -> size_t { + { + std::shared_lock lock(queue_mutex); + if (queue.empty()) return 0; + } + + OutputIt cur = begin; + { + std::unique_lock lock(queue_mutex); + for (; !queue.empty() && cur != end; ++cur) { + *cur = queue.front(); + queue.pop(); + } + } + return cur - begin; + } }; } // namespace lib::adt diff --git a/emper/lib/math.hpp b/emper/lib/math.hpp index 81c3d90ee4f6c6d120c9c63bec1966f47a503de3..953d74450bb34974820a12f7bbd93d133cc5c857 100644 --- a/emper/lib/math.hpp +++ b/emper/lib/math.hpp @@ -5,6 +5,7 @@ #include <cassert> #include <cstdint> #include <iosfwd> +#include <string> #include <type_traits> namespace emper::lib::math { @@ -54,9 +55,9 @@ class RunningAverage { calcRunningAvg(average, counter, value); } - auto getCounter() -> counter_type { return counter; } + [[nodiscard]] auto getCounter() const -> counter_type { return counter; } - auto getAverage() -> average_type { return average; } + [[nodiscard]] auto getAverage() const -> average_type { return average; } friend auto operator<<(std::ostream& strm, const RunningAverage& avg) -> std::ostream& { strm << std::to_string(avg.average) << " [" << std::to_string(avg.counter) << "]"; diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index 62ecb93cc08a6dd3512a68181f2de127adc57c5f..8e2bd3fd773c33a8ce1cbab62cba735e7925aaf4 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -2,6 +2,9 @@ // Copyright © 2021 Florian Schmaus #include "AbstractWorkStealingScheduler.hpp" +#include <algorithm> +#include <array> +#include <cassert> #include <ostream> // for operator<<, basic_ostream<>::__ostream_type #include "Common.hpp" // for unlikely, likely @@ -55,6 +58,64 @@ auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool { return true; } +auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue() + -> std::optional<std::pair<Fiber*, FiberSource>> { + const size_t KEEP_FREE_SLOTS = 64; + const size_t DEQUEUE_FROM_ANYWHERE_MAX = 128; + + const uintptr_t remainingCapacityLocalWsQueue = queue.freeSlots(); + size_t maxFiberCount; + if (remainingCapacityLocalWsQueue > KEEP_FREE_SLOTS) { + maxFiberCount = remainingCapacityLocalWsQueue - KEEP_FREE_SLOTS; + maxFiberCount = std::min(maxFiberCount, DEQUEUE_FROM_ANYWHERE_MAX); + } else { + maxFiberCount = 1; + } + + std::array<Fiber*, DEQUEUE_FROM_ANYWHERE_MAX> fibers; + + size_t fiberCount = + dequeueFibersFromAnywhereQueue(fibers.begin(), fibers.begin() + maxFiberCount); + + Fiber* res = nullptr; + size_t fibersLiftedFromAnywhere = 0; + for (size_t i = 0; i < fiberCount; i++) { + Fiber* fiber = fibers[i]; + + if (maybeRecycle(fiber)) continue; + + // We found a runnable fiber. First check if we already have one + // fiber saved to be returned. + if (!res) { + res = fiber; + continue; + } + + // We already have a fiber saved which will be returned. Hence we + // push the remaining runnable fibers to our work-stealing queue, + // so that they can be stolen by others. + [[maybe_unused]] bool pushed = queue.pushBottom(fiber); + + // We have ensured that there is capacity for this fiber, hence we + // must be able to push it here. + assert(pushed); + + fibersLiftedFromAnywhere++; + } + + if (fibersLiftedFromAnywhere) { + onNewWork(); + + if constexpr (emper::STATS) { + awss::stats.recordFibersLiftedFromAnywhereQueue(fibersLiftedFromAnywhere); + } + } + + if (res) return std::make_pair(res, FiberSource::anywhereQueue); + + return std::nullopt; +} + auto AbstractWorkStealingScheduler::nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource> { FiberSource fiberSource = FiberSource::local; Fiber* fiber; @@ -93,17 +154,15 @@ popBottom: } } -dequeueFromAnywhere: // Try the "scheduled from anywhere" queue to get work as last resort. - fiber = dequeueFiberFromAnywhereQueue(); - if (fiber) { - emper::statsIncr(awss::stats.nextFiberFromAnywhereQueue); - - if (maybeRecycle(fiber)) goto dequeueFromAnywhere; - - fiberSource = FiberSource::anywhereQueue; + { + auto anywhereQueueFiber = nextFiberViaAnywhereQueue(); + if (anywhereQueueFiber) return *anywhereQueueFiber; } + // We where not able to dequeue any fiber if we reach this point. + fiber = nullptr; + out: return std::make_pair(fiber, fiberSource); } diff --git a/emper/strategies/AbstractWorkStealingScheduler.hpp b/emper/strategies/AbstractWorkStealingScheduler.hpp index e482243b50e4cbced073810220b028427248afbb..f162da5c0d85e2c73450921a7ab038bd6246f4b4 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.hpp +++ b/emper/strategies/AbstractWorkStealingScheduler.hpp @@ -4,6 +4,7 @@ #include <cstddef> // for size_t #include <cstdint> +#include <optional> #include <utility> #include "NextFiberResult.hpp" @@ -36,6 +37,9 @@ class AbstractWorkStealingScheduler : public Scheduler { anywhereQueue, }; + private: + auto nextFiberViaAnywhereQueue() -> std::optional<std::pair<Fiber*, FiberSource>>; + protected: WsQueue<QUEUE_SIZE>** queues; static thread_local WsQueue<QUEUE_SIZE> queue; diff --git a/emper/strategies/AbstractWorkStealingStats.cpp b/emper/strategies/AbstractWorkStealingStats.cpp index a8e833eebb641485b69a61799a8e7adcfe09ff5d..1052d9dc174f9a7863a97f90a3260edb58053b00 100644 --- a/emper/strategies/AbstractWorkStealingStats.cpp +++ b/emper/strategies/AbstractWorkStealingStats.cpp @@ -6,6 +6,7 @@ #include <string> #include "WorkerLocalData.hpp" +#include "lib/math.hpp" #include "strategies/AbstractWorkStealingStrategy.hpp" AbstractWorkStealingStats::AbstractWorkStealingStats(AbstractWorkStealingStrategy& strategy) @@ -25,5 +26,12 @@ void AbstractWorkStealingStats::print() { << "total-next-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextFiberStolen) << std::endl << "total-next-fiber-from-anywhere-queue: " - << std::to_string(comulatedWorkerStats.nextFiberFromAnywhereQueue) << std::endl; + << std::to_string(comulatedWorkerStats.nextFiberFromAnywhereQueue) << std::endl + << "total-fibers-lifted-from-anywhere-queue: " + << std::to_string(comulatedWorkerStats.totalFibersLiftedFromAnywhereQueue) << std::endl + << "max-fibers-lifted-from-anywhere-queue: " + << std::to_string(comulatedWorkerStats.maxFibersLiftedFromAnywhereQueue) << std::endl + << "avg-fibers-lifted-from-anywhere-queue: " + << std::to_string(comulatedWorkerStats.avgFibersLiftedFromAnywhereQueue.getAverage()) + << std::endl; } diff --git a/emper/strategies/AbstractWorkStealingStrategy.hpp b/emper/strategies/AbstractWorkStealingStrategy.hpp index 597ab09b6061eaae9488d0612b03a37e33a1982d..6a8857bbb9e654dcc2fdb04209c84943613db310 100644 --- a/emper/strategies/AbstractWorkStealingStrategy.hpp +++ b/emper/strategies/AbstractWorkStealingStrategy.hpp @@ -9,8 +9,8 @@ class AbstractWorkStealingScheduler; class AbstractWorkStealingStats; +class AbstractWorkStealingWorkerStats; class Runtime; -struct AbstractWorkStealingWorkerStats; class AbstractWorkStealingStrategy : public RuntimeStrategy { public: diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.cpp b/emper/strategies/AbstractWorkStealingWorkerStats.cpp new file mode 100644 index 0000000000000000000000000000000000000000..260e5d4fbeced409950dea2f5343b11cc7fb906e --- /dev/null +++ b/emper/strategies/AbstractWorkStealingWorkerStats.cpp @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Schmaus +#include "AbstractWorkStealingWorkerStats.hpp" + +#include <algorithm> + +auto AbstractWorkStealingWorkerStats::operator+=(const AbstractWorkStealingWorkerStats& other) + -> AbstractWorkStealingWorkerStats& { + scheduledFibersToLocal += other.scheduledFibersToLocal; + scheduledFibersToOverflowQueue += other.scheduledFibersToOverflowQueue; + nextFiberFromLocal += other.nextFiberFromLocal; + nextFiberStolen += other.nextFiberStolen; + nextFiberFromAnywhereQueue += other.nextFiberFromAnywhereQueue; + + totalFibersLiftedFromAnywhereQueue += other.totalFibersLiftedFromAnywhereQueue; + maxFibersLiftedFromAnywhereQueue = + std::max(maxFibersLiftedFromAnywhereQueue, other.maxFibersLiftedFromAnywhereQueue); + avgFibersLiftedFromAnywhereQueue.update(other.avgFibersLiftedFromAnywhereQueue.getAverage()); + + return *this; +} + +void AbstractWorkStealingWorkerStats::recordFibersLiftedFromAnywhereQueue( + size_t fibersLiftedFromAnywhereQueue) { + totalFibersLiftedFromAnywhereQueue += fibersLiftedFromAnywhereQueue; + maxFibersLiftedFromAnywhereQueue = + std::max(maxFibersLiftedFromAnywhereQueue, fibersLiftedFromAnywhereQueue); + avgFibersLiftedFromAnywhereQueue.update(fibersLiftedFromAnywhereQueue); +} diff --git a/emper/strategies/AbstractWorkStealingWorkerStats.hpp b/emper/strategies/AbstractWorkStealingWorkerStats.hpp index 19394a5addc712609dcafae4ad4eef1bc63019c9..a07af68ccbdcfbdd55b315d945597217b982a4d7 100644 --- a/emper/strategies/AbstractWorkStealingWorkerStats.hpp +++ b/emper/strategies/AbstractWorkStealingWorkerStats.hpp @@ -2,22 +2,29 @@ // Copyright © 2020-2021 Florian Schmaus #pragma once +#include <cstddef> #include <cstdint> -struct AbstractWorkStealingWorkerStats { +#include "lib/math.hpp" + +namespace math = emper::lib::math; + +class AbstractWorkStealingWorkerStats { + friend class AbstractWorkStealingScheduler; + + public: uint64_t scheduledFibersToLocal = 0; uint64_t scheduledFibersToOverflowQueue = 0; uint64_t nextFiberFromLocal = 0; uint64_t nextFiberStolen = 0; uint64_t nextFiberFromAnywhereQueue = 0; - auto operator+=(const AbstractWorkStealingWorkerStats& other) - -> AbstractWorkStealingWorkerStats& { - scheduledFibersToLocal += other.scheduledFibersToLocal; - scheduledFibersToOverflowQueue += other.scheduledFibersToOverflowQueue; - nextFiberFromLocal += other.nextFiberFromLocal; - nextFiberStolen += other.nextFiberStolen; - nextFiberFromAnywhereQueue = other.nextFiberFromAnywhereQueue; - return *this; - } + uint64_t totalFibersLiftedFromAnywhereQueue = 0; + uint64_t maxFibersLiftedFromAnywhereQueue = 0; + math::RunningAverage<double, uint32_t> avgFibersLiftedFromAnywhereQueue; + + auto operator+=(const AbstractWorkStealingWorkerStats& other) -> AbstractWorkStealingWorkerStats&; + + private: + void recordFibersLiftedFromAnywhereQueue(size_t fibersLiftedFromAnywhere); }; diff --git a/emper/strategies/meson.build b/emper/strategies/meson.build index 6858f902a1a8e6ad9295a9d8d25f012ef9bf44a9..92ceafa22714f9b4a51c235c75a4088c8ff56786 100644 --- a/emper/strategies/meson.build +++ b/emper/strategies/meson.build @@ -2,6 +2,7 @@ emper_cpp_sources += files( 'AbstractWorkStealingScheduler.cpp', 'AbstractWorkStealingStats.cpp', 'AbstractWorkStealingStrategy.cpp', + 'AbstractWorkStealingWorkerStats.cpp', ) subdir('ws')