From 713c0f047d25be1c1f5ccf7eaf3c18a0af6918d2 Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fischer@muhq.space> Date: Tue, 24 Aug 2021 16:39:22 +0200 Subject: [PATCH] [sleep_strategy] introduce thread-local generic stats --- emper/Runtime.cpp | 2 +- emper/sleep_strategy/PipeSleepStrategy.cpp | 37 ++++++---- emper/sleep_strategy/PipeSleepStrategy.hpp | 57 ++++----------- .../SemaphoreWorkerSleepStrategy.hpp | 20 +++++- emper/sleep_strategy/Stats.cpp | 42 +++++++++++ emper/sleep_strategy/Stats.hpp | 71 +++++++++++++++++++ emper/sleep_strategy/WorkerStats.cpp | 19 +++++ emper/sleep_strategy/WorkerStats.hpp | 32 +++++++++ emper/sleep_strategy/meson.build | 4 +- 9 files changed, 222 insertions(+), 62 deletions(-) create mode 100644 emper/sleep_strategy/Stats.cpp create mode 100644 emper/sleep_strategy/Stats.hpp create mode 100644 emper/sleep_strategy/WorkerStats.cpp create mode 100644 emper/sleep_strategy/WorkerStats.hpp diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 0ca865bd..73d39a29 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -84,7 +84,7 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory ioContexts(emper::IO ? workerCount : 0), ioReadySem(0), randomEngine(seed), - workerSleepStrategy(workerCount) { + workerSleepStrategy(*this, workerCount) { const int nprocs = get_nprocs(); { diff --git a/emper/sleep_strategy/PipeSleepStrategy.cpp b/emper/sleep_strategy/PipeSleepStrategy.cpp index 31e5c128..3c02694e 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.cpp +++ b/emper/sleep_strategy/PipeSleepStrategy.cpp @@ -6,7 +6,6 @@ #include <atomic> #include <cassert> -#include <string> #include "CallerEnvironment.hpp" #include "Emper.hpp" @@ -14,11 +13,32 @@ #include "io/IoContext.hpp" #include "lib/TaggedPtr.hpp" +class Runtime; + using emper::io::IoContext; using emper::lib::TaggedPtr; namespace emper::sleep_strategy { +PipeSleepStrategy::PipeSleepStrategy(Runtime& runtime, workerid_t workerCount) + : workerCount(workerCount), stats(runtime) { + LOGD("init pipe sleep startegy"); + sleepers.store(0, std::memory_order_relaxed); + + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + int fds[2]; + if (pipe(fds)) { + DIE_MSG_ERRNO("pipe failed"); + } + sleepFd = fds[0]; + notifyFd = fds[1]; +} + +PipeSleepStrategy::~PipeSleepStrategy() { + close(sleepFd); + close(notifyFd); +} + template <CallerEnvironment callerEnvironment> [[nodiscard]] auto PipeSleepStrategy::createHint() -> TaggedPtr { if constexpr (callerEnvironment == CallerEnvironment::EMPER) { @@ -121,7 +141,7 @@ void PipeSleepStrategy::sleep() { io.submitPreparedSqesAndWait<CallerEnvironment::EMPER>(1); LOGD("wakeup due to available IO completions"); - statsIncr(stats->wakeup); + stats.incWakeup(); } template <CallerEnvironment callerEnvironment> @@ -131,7 +151,7 @@ void PipeSleepStrategy::onNewWorkNotification(IoContext& io) { } LOGD("Got new work notification"); - statsIncr(stats->wakeupDueToNotify); + stats.incWakeupDueToNotify(); // Reset flag to indicate that a new sleep cqe must be prepared // and allow the completer to reap completions again @@ -140,15 +160,4 @@ void PipeSleepStrategy::onNewWorkNotification(IoContext& io) { template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::EMPER>(IoContext& io); template void PipeSleepStrategy::onNewWorkNotification<CallerEnvironment::ANYWHERE>(IoContext& io); - -auto operator<<(std::ostream& os, const PipeSleepStrategy::Stats& s) -> std::ostream& { - os << "PipeSleepStrategy Stats:" << std::endl; - os << "total-onNewWork: " << std::to_string(s.onNewWork) << std::endl; - os << "total-notify: " << std::to_string(s.notify) << std::endl; - os << "total-notifications: " << std::to_string(s.notifications) << std::endl; - os << "total-skip: " << std::to_string(s.skip) << std::endl; - os << "total-wakeup: " << std::to_string(s.wakeup) << std::endl; - os << "total-wakeup-due-to-notify: " << std::to_string(s.wakeupDueToNotify) << std::endl; - return os; -} } // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/PipeSleepStrategy.hpp b/emper/sleep_strategy/PipeSleepStrategy.hpp index 4bacee4e..d96e6955 100644 --- a/emper/sleep_strategy/PipeSleepStrategy.hpp +++ b/emper/sleep_strategy/PipeSleepStrategy.hpp @@ -6,10 +6,8 @@ #include <algorithm> #include <atomic> -#include <cstddef> #include <cstdint> #include <iostream> -#include <memory> #include <stdexcept> #include <type_traits> #include <vector> @@ -17,20 +15,21 @@ #include "CallerEnvironment.hpp" #include "Common.hpp" #include "Debug.hpp" -#include "Emper.hpp" #include "Worker.hpp" #include "emper-common.h" #include "lib/TaggedPtr.hpp" #include "sleep_strategy/AbstractWorkerSleepStrategy.hpp" +#include "sleep_strategy/Stats.hpp" using emper::lib::TaggedPtr; +class Runtime; + namespace emper::io { class IoContext; } namespace emper::sleep_strategy { - /** * @brief A pipe(2) based sleep strategy combining the IO subsystem with the sleep strategy * @@ -122,22 +121,12 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, public Logger<LogSubsystem::SLEEP_S> { friend class emper::io::IoContext; - class Stats { - public: - std::atomic<size_t> onNewWork = 0; - std::atomic<size_t> notify = 0; - std::atomic<size_t> notifications = 0; - std::atomic<size_t> skip = 0; - std::atomic<size_t> wakeup = 0; - std::atomic<size_t> wakeupDueToNotify = 0; - }; - - std::unique_ptr<Stats> stats; - workerid_t workerCount; int sleepFd; int notifyFd; + Stats stats; + // Make sure the shared counter lives in an exlusive cache line std::aligned_storage<64, 64>::type sleepers_mem; std::atomic<int64_t>& sleepers = *reinterpret_cast<std::atomic<int64_t>*>(&sleepers_mem); @@ -146,7 +135,7 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, [[nodiscard]] auto createHint() -> TaggedPtr; void writeNotifications(TaggedPtr hint, int64_t count) { - statsAdd(stats->notifications, count); + stats.addNotifications(count); std::vector<void*> hints(count, hint); @@ -178,7 +167,7 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, std::memory_order_release, std::memory_order_acquire)); // increment the notify stat if we actually have to notify someone - statsIncr(stats->notify); + stats.incNotify(); writeNotifications(hint, toWakeup); LOGD("notifyFromEmper written " @@ -201,10 +190,6 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, // by making the sleepers count negative and thus preventing at least count // sleep attempts. - // Always increment the notify count because notifications from anywhere always - // take effect - statsIncr(stats->notify); - do { // We already prevent enough sleep attempts if (sleeping <= -count) { @@ -214,6 +199,8 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, } while (!sleepers.compare_exchange_weak(sleeping, sleeping - count, std::memory_order_release, std::memory_order_acquire)); + stats.incNotify(); + int64_t toWakeup = std::min(sleeping, count); if (sleeping > 0) { writeNotifications(hint, toWakeup); @@ -228,7 +215,7 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, // The hint must be != nullptr so sleep() knows when to prepare and submit // a sleepFd read sqe. - statsIncr(stats->onNewWork); + stats.incOnNewWork(); int64_t sleeping = getSleeping(); auto signedCount = static_cast<int64_t>(count); @@ -241,24 +228,10 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, } public: - PipeSleepStrategy(workerid_t workerCount) : workerCount(workerCount) { - LOGD("init pipe sleep startegy"); - sleepers.store(0, std::memory_order_relaxed); - - // NOLINTNEXTLINE(modernize-avoid-c-arrays) - int fds[2]; - if (pipe(fds)) { - DIE_MSG_ERRNO("pipe failed"); - } - sleepFd = fds[0]; - notifyFd = fds[1]; - - if constexpr (emper::STATS) { - stats = std::make_unique<Stats>(); - } - } + PipeSleepStrategy(Runtime& runtime, workerid_t workerCount); + ~PipeSleepStrategy(); - void printStats() { std::cout << *stats; } + void printStats(std::ostream& out = std::cout) { stats.print(out); } [[nodiscard]] inline auto getSleeping() const -> long { return sleepers.load(); } @@ -289,9 +262,5 @@ class PipeSleepStrategy : AbstractWorkerSleepStrategy<PipeSleepStrategy>, template <CallerEnvironment callerEnvironment> void onNewWorkNotification(emper::io::IoContext& io); void sleep(); - - friend auto operator<<(std::ostream& os, const PipeSleepStrategy::Stats& s) -> std::ostream&; }; - -auto operator<<(std::ostream& os, const PipeSleepStrategy::Stats& s) -> std::ostream&; } // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp index 47349fc6..0a9f6f6a 100644 --- a/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp +++ b/emper/sleep_strategy/SemaphoreWorkerSleepStrategy.hpp @@ -3,12 +3,14 @@ #pragma once #include <atomic> +#include <iostream> #include "CallerEnvironment.hpp" #include "Worker.hpp" #include "emper-common.h" #include "emper-config.h" #include "sleep_strategy/AbstractWorkerSleepStrategy.hpp" +#include "sleep_strategy/Stats.hpp" #ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE #include "lib/sync/LockedSemaphore.hpp" @@ -22,6 +24,8 @@ #include "lib/sync/SpuriousFutexSemaphore.hpp" #endif +class Runtime; + namespace emper::sleep_strategy { // Needed to stop waiting for a worker to change its flag if it may be already @@ -34,6 +38,8 @@ class AbstractSemaphoreWorkerSleepStrategy const workerid_t workerCount; Sem wakeupSem; + Stats stats; + // check if the used Semaphore provides a notifySpecific implementation static constexpr bool semHasNotifySpecific = requires(Sem s) { s.notifySpecific(0); }; @@ -49,10 +55,12 @@ class AbstractSemaphoreWorkerSleepStrategy auto& notified = notifiedFlags[workerId]; if (notified.load(std::memory_order_relaxed)) { notified.store(false, std::memory_order_relaxed); + stats.incSkip(); return; } wakeupSem.wait(); + stats.incWakeup(); if (notified.load(std::memory_order_relaxed)) { notified.store(false, std::memory_order_relaxed); @@ -131,7 +139,8 @@ class AbstractSemaphoreWorkerSleepStrategy } public: - AbstractSemaphoreWorkerSleepStrategy(workerid_t workerCount) : workerCount(workerCount) { + AbstractSemaphoreWorkerSleepStrategy(Runtime& runtime, workerid_t workerCount) + : workerCount(workerCount), stats(runtime) { if constexpr (useGenericNotifySpecificImpl) { notifiedFlags = new std::atomic<bool>[workerCount]; } @@ -156,6 +165,8 @@ class AbstractSemaphoreWorkerSleepStrategy inline void notifyMany(unsigned count) { if (mustNotify<callerEnvironment>()) { wakeupSem.notify_many(count); + stats.incNotify(); + stats.addNotifications(count); } } @@ -166,6 +177,8 @@ class AbstractSemaphoreWorkerSleepStrategy inline void notifyAll() { if (mustNotify<callerEnvironment>()) { wakeupSem.notify_many(workerCount); + stats.incNotify(); + stats.addNotifications(workerCount); } } @@ -176,6 +189,8 @@ class AbstractSemaphoreWorkerSleepStrategy } else { genericNotifySpecific<callerEnvironment>(workerId); } + stats.incNotify(); + stats.incNotifications(); } inline void sleep() { @@ -183,10 +198,11 @@ class AbstractSemaphoreWorkerSleepStrategy genericNotifySpecificSleep(); } else { wakeupSem.wait(); + stats.incWakeup(); } } - void printStats(){}; + void printStats(std::ostream& out = std::cout) { stats.print(out); }; }; #ifdef EMPER_LOCKED_WAKEUP_SEMAPHORE diff --git a/emper/sleep_strategy/Stats.cpp b/emper/sleep_strategy/Stats.cpp new file mode 100644 index 00000000..6859b709 --- /dev/null +++ b/emper/sleep_strategy/Stats.cpp @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "sleep_strategy/Stats.hpp" + +#include <memory> +#include <vector> + +#include "WorkerLocalData.hpp" +#include "sleep_strategy/WorkerStats.hpp" + +namespace emper::sleep_strategy { + +thread_local WorkerStats* Stats::stats; + +Stats::Stats(Runtime& runtime) { + if constexpr (emper::STATS) { + allStats = std::make_unique<WorkerLocalData<WorkerStats>>( + [] { + stats = new WorkerStats(); + return stats; + }, + runtime); + } +} + +Stats::~Stats() { + if constexpr (emper::STATS) { + allStats->forEach([](auto* stats) { delete stats; }); + } +} + +void Stats::print(std::ostream& out) { + WorkerStats totalStats; + for (auto& stats : allStats->getSnapshot()) { + totalStats += stats; + } + + totalStats += anywhereStats; + + out << totalStats; +} +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/Stats.hpp b/emper/sleep_strategy/Stats.hpp new file mode 100644 index 00000000..e96393fc --- /dev/null +++ b/emper/sleep_strategy/Stats.hpp @@ -0,0 +1,71 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <cstddef> +#include <iostream> +#include <memory> +#include <mutex> + +#include "Emper.hpp" +#include "sleep_strategy/WorkerStats.hpp" + +class Runtime; + +template <typename D> +class WorkerLocalData; + +// macro to safely increment the worker local or the anywhere stats +#define __EMPER_SS_STATS_ADD(__field__, count) \ + if constexpr (emper::STATS) { \ + if (stats) { \ + stats->__field__ += count; \ + } else { \ + std::lock_guard<std::mutex> lock(anywhereStatsMutex); \ + anywhereStats.__field__ += count; \ + } \ + } + +namespace emper::sleep_strategy { + +// Somehow iwyu insists on this rather silly looking forward declaration. +// I assume because of the friend declaration in WorkerStats.hpp. +// But using a forward declaration there did not please iwyu. +class Stats; + +class Stats { + friend class PipeSleepStrategy; + + size_t onNewWork = 0; + size_t notify = 0; + size_t notifications = 0; + size_t skip = 0; + size_t wakeup = 0; + size_t wakeupDueToNotify = 0; + + WorkerStats anywhereStats; + std::mutex anywhereStatsMutex; + + static thread_local WorkerStats* stats; + std::unique_ptr<WorkerLocalData<WorkerStats>> allStats; + + public: + Stats(Runtime& runtime); + ~Stats(); + + void print(std::ostream& out); + + void addOnNewWork(size_t count) { __EMPER_SS_STATS_ADD(onNewWork, count); } + void incOnNewWork() { addOnNewWork(1); } + void addNotify(size_t count) { __EMPER_SS_STATS_ADD(notify, count); } + void incNotify() { addNotify(1); } + void addNotifications(size_t count) { __EMPER_SS_STATS_ADD(notifications, count); } + void incNotifications() { addNotifications(1); } + void addSkip(size_t count) { __EMPER_SS_STATS_ADD(skip, count); } + void incSkip() { addSkip(1); } + void addWakeup(size_t count) { __EMPER_SS_STATS_ADD(wakeup, count); } + void incWakeup() { addWakeup(1); } + void addWakeupDueToNotify(size_t count) { __EMPER_SS_STATS_ADD(wakeupDueToNotify, count); } + void incWakeupDueToNotify() { addWakeupDueToNotify(1); } +}; +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/WorkerStats.cpp b/emper/sleep_strategy/WorkerStats.cpp new file mode 100644 index 00000000..da268226 --- /dev/null +++ b/emper/sleep_strategy/WorkerStats.cpp @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include "WorkerStats.hpp" + +#include <iostream> +#include <string> + +namespace emper::sleep_strategy { +auto operator<<(std::ostream& os, const WorkerStats& s) -> std::ostream& { + os << "SleepStrategy Stats:" << std::endl; + os << "total-onNewWork: " << std::to_string(s.onNewWork) << std::endl; + os << "total-notify: " << std::to_string(s.notify) << std::endl; + os << "total-notifications: " << std::to_string(s.notifications) << std::endl; + os << "total-skip: " << std::to_string(s.skip) << std::endl; + os << "total-wakeup: " << std::to_string(s.wakeup) << std::endl; + os << "total-wakeup-due-to-notify: " << std::to_string(s.wakeupDueToNotify) << std::endl; + return os; +} +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/WorkerStats.hpp b/emper/sleep_strategy/WorkerStats.hpp new file mode 100644 index 00000000..e37112a6 --- /dev/null +++ b/emper/sleep_strategy/WorkerStats.hpp @@ -0,0 +1,32 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <cstddef> +#include <iostream> + +namespace emper::sleep_strategy { +class WorkerStats { + friend class Stats; + friend auto operator<<(std::ostream& os, const WorkerStats& s) -> std::ostream&; + + size_t onNewWork = 0; + size_t notify = 0; + size_t notifications = 0; + size_t skip = 0; + size_t wakeup = 0; + size_t wakeupDueToNotify = 0; + + auto operator+=(const WorkerStats& other) -> WorkerStats& { + onNewWork += other.onNewWork; + notify += other.notify; + notifications += other.notifications; + skip += other.skip; + wakeup += other.wakeup; + wakeupDueToNotify += other.wakeupDueToNotify; + return *this; + } +}; + +auto operator<<(std::ostream& os, const WorkerStats& s) -> std::ostream&; +} // namespace emper::sleep_strategy diff --git a/emper/sleep_strategy/meson.build b/emper/sleep_strategy/meson.build index 6f04a329..819c5412 100644 --- a/emper/sleep_strategy/meson.build +++ b/emper/sleep_strategy/meson.build @@ -1,4 +1,6 @@ emper_cpp_sources += files( - 'SemaphoreWorkerSleepStrategy.cpp', 'PipeSleepStrategy.cpp', + 'SemaphoreWorkerSleepStrategy.cpp', + 'Stats.cpp', + 'WorkerStats.cpp', ) -- GitLab