diff --git a/emper/lib/adt/LockedQueue.hpp b/emper/lib/adt/LockedQueue.hpp index 1fcd6bd1c4ab1fae72d6cc0fec42b1db5f405dc5..544db34a3039694a5196b818016da677fa6664bb 100644 --- a/emper/lib/adt/LockedQueue.hpp +++ b/emper/lib/adt/LockedQueue.hpp @@ -8,6 +8,7 @@ #include "Common.hpp" #include "StealingResult.hpp" +#include "lib/adt/PushBottomResult.hpp" // Forward declaration, declared and defined in Fibril.hpp. namespace emper { @@ -41,14 +42,19 @@ class LockedQueue { return deque.size(); } - auto pushBottom(const I item) -> bool { + auto pushBottom(const I item) -> PushBottomResult { + bool pushed = true; std::lock_guard<std::mutex> lock(queue_mutex); - if (deque.size() == SIZE) return false; + if (deque.size() == SIZE) { + pushed = false; + goto out; + } deque.push_back(item); - return true; + out: + return PushBottomResult{pushed, deque.size()}; } template <const int maxRetries> diff --git a/emper/lib/adt/PushBottomResult.hpp b/emper/lib/adt/PushBottomResult.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b65236ce624f3b41d807c774787d81a0bcdc27bb --- /dev/null +++ b/emper/lib/adt/PushBottomResult.hpp @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Schmaus +#pragma once + +namespace adt { + +struct PushBottomResult { + const bool pushed; + const uint64_t size; + + operator bool() const { return pushed; } +}; + +} // namespace adt diff --git a/emper/lib/adt/WsClQueue.hpp b/emper/lib/adt/WsClQueue.hpp index 34112f0e10d7f6f02158c313692539a6804e8f50..c79dd42e6cf859813513c48daf6ee9f4375cdfdd 100644 --- a/emper/lib/adt/WsClQueue.hpp +++ b/emper/lib/adt/WsClQueue.hpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2022 Florian Schmaus #pragma once #include <atomic> @@ -7,6 +7,7 @@ #include "Common.hpp" #include "StealingResult.hpp" +#include "lib/adt/PushBottomResult.hpp" namespace adt { @@ -47,7 +48,7 @@ class WsClQueue { WsClQueue() : bottom(1), top(1) {} // TODO: Decide what to do regarding the following suppressed lint. // NOLINTNEXTLINE(readability-avoid-const-params-in-decls) - auto pushBottom(const PAYLOAD item) -> bool; + auto pushBottom(const PAYLOAD item) -> PushBottomResult; template <const int maxRetries> auto popTop(PAYLOAD *item) -> emper::StealingResult; auto popBottom(PAYLOAD *item) -> bool; @@ -60,8 +61,14 @@ class WsClQueue { }; template <typename PAYLOAD, const uintptr_t CAPACITY> -auto WsClQueue<PAYLOAD, CAPACITY>::pushBottom(const PAYLOAD item) -> bool { - if (isFull()) return false; +auto WsClQueue<PAYLOAD, CAPACITY>::pushBottom(const PAYLOAD item) -> PushBottomResult { + bool pushed = true; + uint64_t newSize = + bottom.load(std::memory_order_relaxed) - top.load(std::memory_order_relaxed) + 1; + if (newSize > CAPACITY) { + pushed = false; + goto out; + } queue[bottom % CAPACITY] = item; @@ -69,7 +76,9 @@ auto WsClQueue<PAYLOAD, CAPACITY>::pushBottom(const PAYLOAD item) -> bool { atomic_thread_fence(std::memory_order_release); ++bottom; - return true; + +out: + return PushBottomResult{pushed, newSize}; } template <typename PAYLOAD, const uintptr_t CAPACITY> diff --git a/emper/lib/adt/WsClV2Queue.hpp b/emper/lib/adt/WsClV2Queue.hpp index 943f41c8bbc6c434c01ce25cb13e60fb76224053..6d9c43ac0d261ae58b4923bd27b829e34340c79e 100644 --- a/emper/lib/adt/WsClV2Queue.hpp +++ b/emper/lib/adt/WsClV2Queue.hpp @@ -1,5 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2022 Florian Schmaus #pragma once #include <atomic> @@ -7,6 +7,7 @@ #include "Common.hpp" #include "StealingResult.hpp" +#include "lib/adt/PushBottomResult.hpp" namespace adt { @@ -51,7 +52,7 @@ class WsClV2Queue { // an underflow if bottom is '0'. The paper's queue uses Java // 'long' for bottom and top and is thus safe since it's signed. WsClV2Queue() : top(1) {} - auto pushBottom(ITEM_TYPE item) -> bool; + auto pushBottom(ITEM_TYPE item) -> PushBottomResult; auto popBottom(ITEM_TYPE *item) -> bool; template <const int maxRetries> auto popTop(ITEM_TYPE *item) -> emper::StealingResult; @@ -64,9 +65,13 @@ class WsClV2Queue { }; template <typename ITEM_TYPE, const uintptr_t CAPACITY> -auto WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) -> bool { +auto WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) -> PushBottomResult { + bool pushed = true; // Check if queue is full. - if (isFull()) return false; + if (isFull()) { + pushed = false; + goto out; + } queue[bottom % CAPACITY] = item; @@ -76,7 +81,8 @@ auto WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) -> bool // Write fence / memory barrier atomic_thread_fence(std::memory_order_release); - return true; +out: + return PushBottomResult{pushed, bottom - top}; } template <typename ITEM_TYPE, const uintptr_t CAPACITY> diff --git a/emper/strategies/AbstractWorkStealingScheduler.cpp b/emper/strategies/AbstractWorkStealingScheduler.cpp index 7cd422d56c0fe7b84267e8d01f4c2a10327f3d1f..0dbfb91c619057a030131c791710359ea76d052c 100644 --- a/emper/strategies/AbstractWorkStealingScheduler.cpp +++ b/emper/strategies/AbstractWorkStealingScheduler.cpp @@ -23,6 +23,7 @@ #include "Worker.hpp" #include "emper-common.h" // for workerid_t #include "io/IoContext.hpp" +#include "lib/adt/PushBottomResult.hpp" #include "strategies/AbstractWorkStealingStrategy.hpp" #include "strategies/AbstractWorkStealingWorkerStats.hpp" @@ -51,7 +52,7 @@ AbstractWorkStealingScheduler::AbstractWorkStealingScheduler(Runtime& runtime, } void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { - bool pushed = queue.pushBottom(&fiber); + auto pushed = queue.pushBottom(&fiber); if (unlikely(!pushed)) { if constexpr (emper::OVERFLOW_QUEUE) { enqueueInAnywhereQueue(fiber); @@ -61,7 +62,7 @@ void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { ABORT("Could not push fiber " << &fiber << " into queue"); } } else if constexpr (emper::STATS) { - uint64_t queueLength = queue.usedSlots(); + uint64_t queueLength = pushed.size; awss::stats.recordScheduledToLocalAndQueueLength(queueLength); } @@ -289,6 +290,10 @@ auto AbstractWorkStealingScheduler::pushBottom(AbstractFiber& fiber) -> bool { auto pushed = queue.pushBottom(&fiber); if (pushed) [[likely]] { onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local}); + if constexpr (emper::STATS) { + uint64_t queueLength = pushed.size; + awss::stats.recordScheduledToLocalAndQueueLength(queueLength); + } } return pushed; }