Skip to content
Snippets Groups Projects
Commit 66dbc3eb authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Introduce PushBottomResult as return value of wsqueue's pushBottom()

This result also contains the observed queue size at the moment of the
push.
parent ad753436
No related branches found
No related tags found
No related merge requests found
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "Common.hpp" #include "Common.hpp"
#include "StealingResult.hpp" #include "StealingResult.hpp"
#include "lib/adt/PushBottomResult.hpp"
// Forward declaration, declared and defined in Fibril.hpp. // Forward declaration, declared and defined in Fibril.hpp.
namespace emper { namespace emper {
...@@ -41,14 +42,19 @@ class LockedQueue { ...@@ -41,14 +42,19 @@ class LockedQueue {
return deque.size(); return deque.size();
} }
auto pushBottom(const I item) -> bool { auto pushBottom(const I item) -> PushBottomResult {
bool pushed = true;
std::lock_guard<std::mutex> lock(queue_mutex); std::lock_guard<std::mutex> lock(queue_mutex);
if (deque.size() == SIZE) return false; if (deque.size() == SIZE) {
pushed = false;
goto out;
}
deque.push_back(item); deque.push_back(item);
return true; out:
return PushBottomResult{pushed, deque.size()};
} }
template <const int maxRetries> template <const int maxRetries>
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2022 Florian Schmaus
#pragma once
namespace adt {
struct PushBottomResult {
const bool pushed;
const uint64_t size;
operator bool() const { return pushed; }
};
} // namespace adt
// SPDX-License-Identifier: LGPL-3.0-or-later // SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus // Copyright © 2020-2022 Florian Schmaus
#pragma once #pragma once
#include <atomic> #include <atomic>
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include "Common.hpp" #include "Common.hpp"
#include "StealingResult.hpp" #include "StealingResult.hpp"
#include "lib/adt/PushBottomResult.hpp"
namespace adt { namespace adt {
...@@ -47,7 +48,7 @@ class WsClQueue { ...@@ -47,7 +48,7 @@ class WsClQueue {
WsClQueue() : bottom(1), top(1) {} WsClQueue() : bottom(1), top(1) {}
// TODO: Decide what to do regarding the following suppressed lint. // TODO: Decide what to do regarding the following suppressed lint.
// NOLINTNEXTLINE(readability-avoid-const-params-in-decls) // NOLINTNEXTLINE(readability-avoid-const-params-in-decls)
auto pushBottom(const PAYLOAD item) -> bool; auto pushBottom(const PAYLOAD item) -> PushBottomResult;
template <const int maxRetries> template <const int maxRetries>
auto popTop(PAYLOAD *item) -> emper::StealingResult; auto popTop(PAYLOAD *item) -> emper::StealingResult;
auto popBottom(PAYLOAD *item) -> bool; auto popBottom(PAYLOAD *item) -> bool;
...@@ -60,8 +61,14 @@ class WsClQueue { ...@@ -60,8 +61,14 @@ class WsClQueue {
}; };
template <typename PAYLOAD, const uintptr_t CAPACITY> template <typename PAYLOAD, const uintptr_t CAPACITY>
auto WsClQueue<PAYLOAD, CAPACITY>::pushBottom(const PAYLOAD item) -> bool { auto WsClQueue<PAYLOAD, CAPACITY>::pushBottom(const PAYLOAD item) -> PushBottomResult {
if (isFull()) return false; bool pushed = true;
uint64_t newSize =
bottom.load(std::memory_order_relaxed) - top.load(std::memory_order_relaxed) + 1;
if (newSize > CAPACITY) {
pushed = false;
goto out;
}
queue[bottom % CAPACITY] = item; queue[bottom % CAPACITY] = item;
...@@ -69,7 +76,9 @@ auto WsClQueue<PAYLOAD, CAPACITY>::pushBottom(const PAYLOAD item) -> bool { ...@@ -69,7 +76,9 @@ auto WsClQueue<PAYLOAD, CAPACITY>::pushBottom(const PAYLOAD item) -> bool {
atomic_thread_fence(std::memory_order_release); atomic_thread_fence(std::memory_order_release);
++bottom; ++bottom;
return true;
out:
return PushBottomResult{pushed, newSize};
} }
template <typename PAYLOAD, const uintptr_t CAPACITY> template <typename PAYLOAD, const uintptr_t CAPACITY>
......
// SPDX-License-Identifier: LGPL-3.0-or-later // SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus // Copyright © 2020-2022 Florian Schmaus
#pragma once #pragma once
#include <atomic> #include <atomic>
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include "Common.hpp" #include "Common.hpp"
#include "StealingResult.hpp" #include "StealingResult.hpp"
#include "lib/adt/PushBottomResult.hpp"
namespace adt { namespace adt {
...@@ -51,7 +52,7 @@ class WsClV2Queue { ...@@ -51,7 +52,7 @@ class WsClV2Queue {
// an underflow if bottom is '0'. The paper's queue uses Java // an underflow if bottom is '0'. The paper's queue uses Java
// 'long' for bottom and top and is thus safe since it's signed. // 'long' for bottom and top and is thus safe since it's signed.
WsClV2Queue() : top(1) {} WsClV2Queue() : top(1) {}
auto pushBottom(ITEM_TYPE item) -> bool; auto pushBottom(ITEM_TYPE item) -> PushBottomResult;
auto popBottom(ITEM_TYPE *item) -> bool; auto popBottom(ITEM_TYPE *item) -> bool;
template <const int maxRetries> template <const int maxRetries>
auto popTop(ITEM_TYPE *item) -> emper::StealingResult; auto popTop(ITEM_TYPE *item) -> emper::StealingResult;
...@@ -64,9 +65,13 @@ class WsClV2Queue { ...@@ -64,9 +65,13 @@ class WsClV2Queue {
}; };
template <typename ITEM_TYPE, const uintptr_t CAPACITY> template <typename ITEM_TYPE, const uintptr_t CAPACITY>
auto WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) -> bool { auto WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) -> PushBottomResult {
bool pushed = true;
// Check if queue is full. // Check if queue is full.
if (isFull()) return false; if (isFull()) {
pushed = false;
goto out;
}
queue[bottom % CAPACITY] = item; queue[bottom % CAPACITY] = item;
...@@ -76,7 +81,8 @@ auto WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) -> bool ...@@ -76,7 +81,8 @@ auto WsClV2Queue<ITEM_TYPE, CAPACITY>::pushBottom(const ITEM_TYPE item) -> bool
// Write fence / memory barrier // Write fence / memory barrier
atomic_thread_fence(std::memory_order_release); atomic_thread_fence(std::memory_order_release);
return true; out:
return PushBottomResult{pushed, bottom - top};
} }
template <typename ITEM_TYPE, const uintptr_t CAPACITY> template <typename ITEM_TYPE, const uintptr_t CAPACITY>
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "Worker.hpp" #include "Worker.hpp"
#include "emper-common.h" // for workerid_t #include "emper-common.h" // for workerid_t
#include "io/IoContext.hpp" #include "io/IoContext.hpp"
#include "lib/adt/PushBottomResult.hpp"
#include "strategies/AbstractWorkStealingStrategy.hpp" #include "strategies/AbstractWorkStealingStrategy.hpp"
#include "strategies/AbstractWorkStealingWorkerStats.hpp" #include "strategies/AbstractWorkStealingWorkerStats.hpp"
...@@ -51,7 +52,7 @@ AbstractWorkStealingScheduler::AbstractWorkStealingScheduler(Runtime& runtime, ...@@ -51,7 +52,7 @@ AbstractWorkStealingScheduler::AbstractWorkStealingScheduler(Runtime& runtime,
} }
void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) {
bool pushed = queue.pushBottom(&fiber); auto pushed = queue.pushBottom(&fiber);
if (unlikely(!pushed)) { if (unlikely(!pushed)) {
if constexpr (emper::OVERFLOW_QUEUE) { if constexpr (emper::OVERFLOW_QUEUE) {
enqueueInAnywhereQueue(fiber); enqueueInAnywhereQueue(fiber);
...@@ -61,7 +62,7 @@ void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) { ...@@ -61,7 +62,7 @@ void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) {
ABORT("Could not push fiber " << &fiber << " into queue"); ABORT("Could not push fiber " << &fiber << " into queue");
} }
} else if constexpr (emper::STATS) { } else if constexpr (emper::STATS) {
uint64_t queueLength = queue.usedSlots(); uint64_t queueLength = pushed.size;
awss::stats.recordScheduledToLocalAndQueueLength(queueLength); awss::stats.recordScheduledToLocalAndQueueLength(queueLength);
} }
...@@ -289,6 +290,10 @@ auto AbstractWorkStealingScheduler::pushBottom(AbstractFiber& fiber) -> bool { ...@@ -289,6 +290,10 @@ auto AbstractWorkStealingScheduler::pushBottom(AbstractFiber& fiber) -> bool {
auto pushed = queue.pushBottom(&fiber); auto pushed = queue.pushBottom(&fiber);
if (pushed) [[likely]] { if (pushed) [[likely]] {
onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local}); onNewWork<CallerEnvironment::EMPER>(emper::FiberHint{emper::FiberSource::local});
if constexpr (emper::STATS) {
uint64_t queueLength = pushed.size;
awss::stats.recordScheduledToLocalAndQueueLength(queueLength);
}
} }
return pushed; return pushed;
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment