Skip to content
Snippets Groups Projects
Commit 2a4bd942 authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Try to lift fiber batches from AnywhereQueue in WS layer

parent b8e2983f
No related branches found
No related tags found
1 merge request!196Batch dequeue from AnywhereQueue
Showing
with 229 additions and 24 deletions
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
// Copyright © 2020-2021 Florian Schmaus // Copyright © 2020-2021 Florian Schmaus
#pragma once #pragma once
#include <cstddef>
#include <functional> // for function #include <functional> // for function
#include <ostream> #include <ostream>
...@@ -53,7 +54,10 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { ...@@ -53,7 +54,10 @@ class Scheduler : public Logger<LogSubsystem::SCHED> {
scheduleAnywhereQueue.insert(fibers, count); scheduleAnywhereQueue.insert(fibers, count);
} }
auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); } template <class OutputIt>
auto dequeueFibersFromAnywhereQueue(OutputIt begin, OutputIt end) -> size_t {
return scheduleAnywhereQueue.dequeue(begin, end);
}
virtual void scheduleInternal(Fiber& fiber) = 0; virtual void scheduleInternal(Fiber& fiber) = 0;
virtual void scheduleFromAnywhereInternal(Fiber& fiber) = 0; virtual void scheduleFromAnywhereInternal(Fiber& fiber) = 0;
......
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
#include <cstdlib> // for abort #include <cstdlib> // for abort
#include <iostream> // for operator<<, basic_ostream, basi... #include <iostream> // for operator<<, basic_ostream, basi...
#include <map> // for map, map<>::value_compare #include <map> // for map, map<>::value_compare
#include <string>
#include <vector> #include <vector>
#include "CallerEnvironment.hpp" #include "CallerEnvironment.hpp"
......
...@@ -46,5 +46,25 @@ class BoostSharedMutexUnboundedQueue { ...@@ -46,5 +46,25 @@ class BoostSharedMutexUnboundedQueue {
queue.pop(); queue.pop();
return res; return res;
} }
template <class OutputIt>
auto dequeue(OutputIt begin, OutputIt end) -> size_t {
OutputIt cur = begin;
{
boost::upgrade_lock<boost::shared_mutex> rlock(queue_mutex);
if (queue.empty()) {
return 0;
}
{
boost::upgrade_to_unique_lock<boost::shared_mutex> wlock(rlock);
for (; !queue.empty() && cur != end; ++cur) {
*cur = queue.front();
queue.pop();
}
}
}
return cur - begin;
}
}; };
} // namespace lib::adt } // namespace lib::adt
...@@ -24,6 +24,11 @@ class LockedQueue { ...@@ -24,6 +24,11 @@ class LockedQueue {
return deque.size() == SIZE; return deque.size() == SIZE;
} }
auto freeSlots() -> size_t {
std::lock_guard<std::mutex> lock(queue_mutex);
return SIZE - deque.size();
}
auto pushBottom(const I item) -> bool { auto pushBottom(const I item) -> bool {
std::lock_guard<std::mutex> lock(queue_mutex); std::lock_guard<std::mutex> lock(queue_mutex);
......
...@@ -44,5 +44,18 @@ class MutexUnboundedQueue { ...@@ -44,5 +44,18 @@ class MutexUnboundedQueue {
queue.pop(); queue.pop();
return res; return res;
} }
template <class OutputIt>
auto dequeue(OutputIt begin, OutputIt end) -> size_t {
OutputIt cur = begin;
{
std::lock_guard<std::mutex> lock(queue_mutex);
for (; !queue.empty() && cur != end; ++cur) {
*cur = queue.front();
queue.pop();
}
}
return cur - begin;
}
}; };
} // namespace lib::adt } // namespace lib::adt
...@@ -98,5 +98,42 @@ class RwLockUnboundedQueue { ...@@ -98,5 +98,42 @@ class RwLockUnboundedQueue {
pthread_rwlock_unlock(&lock); pthread_rwlock_unlock(&lock);
return res; return res;
} }
template <class OutputIt>
auto dequeue(OutputIt begin, OutputIt end) -> size_t {
int err = pthread_rwlock_rdlock(&lock);
if (unlikely(err)) {
DIE_MSG("pthread_rwlock_rdlock failed: " << strerror(err));
}
OutputIt cur = begin;
if (queue.empty()) {
goto unlock;
}
// try to upgrade to wrlock
err = pthread_rwlock_trywrlock(&lock);
if (err) {
if (unlikely(err != EBUSY)) {
DIE_MSG("pthread_rwlock_trylock failed: " << strerror(err));
}
// drop the read lock and aquire a write lock
aquire_wrlock(lock);
if (queue.empty()) {
goto unlock;
}
}
for (; !queue.empty() && cur != end; ++cur) {
*cur = queue.front();
queue.pop();
}
unlock:
pthread_rwlock_unlock(&lock);
return cur - begin;
}
}; };
} // namespace lib::adt } // namespace lib::adt
...@@ -51,5 +51,23 @@ class SharedMutexUnboundedQueue { ...@@ -51,5 +51,23 @@ class SharedMutexUnboundedQueue {
queue.pop(); queue.pop();
return res; return res;
} }
template <class OutputIt>
auto dequeue(OutputIt begin, OutputIt end) -> size_t {
{
std::shared_lock lock(queue_mutex);
if (queue.empty()) return 0;
}
OutputIt cur = begin;
{
std::unique_lock lock(queue_mutex);
for (; !queue.empty() && cur != end; ++cur) {
*cur = queue.front();
queue.pop();
}
}
return cur - begin;
}
}; };
} // namespace lib::adt } // namespace lib::adt
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <cassert> #include <cassert>
#include <cstdint> #include <cstdint>
#include <iosfwd> #include <iosfwd>
#include <string>
#include <type_traits> #include <type_traits>
namespace emper::lib::math { namespace emper::lib::math {
...@@ -54,9 +55,9 @@ class RunningAverage { ...@@ -54,9 +55,9 @@ class RunningAverage {
calcRunningAvg(average, counter, value); calcRunningAvg(average, counter, value);
} }
auto getCounter() -> counter_type { return counter; } [[nodiscard]] auto getCounter() const -> counter_type { return counter; }
auto getAverage() -> average_type { return average; } [[nodiscard]] auto getAverage() const -> average_type { return average; }
friend auto operator<<(std::ostream& strm, const RunningAverage& avg) -> std::ostream& { friend auto operator<<(std::ostream& strm, const RunningAverage& avg) -> std::ostream& {
strm << std::to_string(avg.average) << " [" << std::to_string(avg.counter) << "]"; strm << std::to_string(avg.average) << " [" << std::to_string(avg.counter) << "]";
......
...@@ -2,6 +2,9 @@ ...@@ -2,6 +2,9 @@
// Copyright © 2021 Florian Schmaus // Copyright © 2021 Florian Schmaus
#include "AbstractWorkStealingScheduler.hpp" #include "AbstractWorkStealingScheduler.hpp"
#include <algorithm>
#include <array>
#include <cassert>
#include <ostream> // for operator<<, basic_ostream<>::__ostream_type #include <ostream> // for operator<<, basic_ostream<>::__ostream_type
#include "Common.hpp" // for unlikely, likely #include "Common.hpp" // for unlikely, likely
...@@ -55,6 +58,64 @@ auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool { ...@@ -55,6 +58,64 @@ auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool {
return true; return true;
} }
auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue()
-> std::optional<std::pair<Fiber*, FiberSource>> {
const size_t KEEP_FREE_SLOTS = 64;
const size_t DEQUEUE_FROM_ANYWHERE_MAX = 128;
const uintptr_t remainingCapacityLocalWsQueue = queue.freeSlots();
size_t maxFiberCount;
if (remainingCapacityLocalWsQueue > KEEP_FREE_SLOTS) {
maxFiberCount = remainingCapacityLocalWsQueue - KEEP_FREE_SLOTS;
maxFiberCount = std::min(maxFiberCount, DEQUEUE_FROM_ANYWHERE_MAX);
} else {
maxFiberCount = 1;
}
std::array<Fiber*, DEQUEUE_FROM_ANYWHERE_MAX> fibers;
size_t fiberCount =
dequeueFibersFromAnywhereQueue(fibers.begin(), fibers.begin() + maxFiberCount);
Fiber* res = nullptr;
size_t fibersLiftedFromAnywhere = 0;
for (size_t i = 0; i < fiberCount; i++) {
Fiber* fiber = fibers[i];
if (maybeRecycle(fiber)) continue;
// We found a runnable fiber. First check if we already have one
// fiber saved to be returned.
if (!res) {
res = fiber;
continue;
}
// We already have a fiber saved which will be returned. Hence we
// push the remaining runnable fibers to our work-stealing queue,
// so that they can be stolen by others.
[[maybe_unused]] bool pushed = queue.pushBottom(fiber);
// We have ensured that there is capacity for this fiber, hence we
// must be able to push it here.
assert(pushed);
fibersLiftedFromAnywhere++;
}
if (fibersLiftedFromAnywhere) {
onNewWork();
if constexpr (emper::STATS) {
awss::stats.recordFibersLiftedFromAnywhereQueue(fibersLiftedFromAnywhere);
}
}
if (res) return std::make_pair(res, FiberSource::anywhereQueue);
return std::nullopt;
}
auto AbstractWorkStealingScheduler::nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource> { auto AbstractWorkStealingScheduler::nextFiberViaWorkStealing() -> std::pair<Fiber*, FiberSource> {
FiberSource fiberSource = FiberSource::local; FiberSource fiberSource = FiberSource::local;
Fiber* fiber; Fiber* fiber;
...@@ -93,17 +154,15 @@ popBottom: ...@@ -93,17 +154,15 @@ popBottom:
} }
} }
dequeueFromAnywhere:
// Try the "scheduled from anywhere" queue to get work as last resort. // Try the "scheduled from anywhere" queue to get work as last resort.
fiber = dequeueFiberFromAnywhereQueue(); {
if (fiber) { auto anywhereQueueFiber = nextFiberViaAnywhereQueue();
emper::statsIncr(awss::stats.nextFiberFromAnywhereQueue); if (anywhereQueueFiber) return *anywhereQueueFiber;
if (maybeRecycle(fiber)) goto dequeueFromAnywhere;
fiberSource = FiberSource::anywhereQueue;
} }
// We where not able to dequeue any fiber if we reach this point.
fiber = nullptr;
out: out:
return std::make_pair(fiber, fiberSource); return std::make_pair(fiber, fiberSource);
} }
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <cstddef> // for size_t #include <cstddef> // for size_t
#include <cstdint> #include <cstdint>
#include <optional>
#include <utility> #include <utility>
#include "NextFiberResult.hpp" #include "NextFiberResult.hpp"
...@@ -36,6 +37,9 @@ class AbstractWorkStealingScheduler : public Scheduler { ...@@ -36,6 +37,9 @@ class AbstractWorkStealingScheduler : public Scheduler {
anywhereQueue, anywhereQueue,
}; };
private:
auto nextFiberViaAnywhereQueue() -> std::optional<std::pair<Fiber*, FiberSource>>;
protected: protected:
WsQueue<QUEUE_SIZE>** queues; WsQueue<QUEUE_SIZE>** queues;
static thread_local WsQueue<QUEUE_SIZE> queue; static thread_local WsQueue<QUEUE_SIZE> queue;
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <string> #include <string>
#include "WorkerLocalData.hpp" #include "WorkerLocalData.hpp"
#include "lib/math.hpp"
#include "strategies/AbstractWorkStealingStrategy.hpp" #include "strategies/AbstractWorkStealingStrategy.hpp"
AbstractWorkStealingStats::AbstractWorkStealingStats(AbstractWorkStealingStrategy& strategy) AbstractWorkStealingStats::AbstractWorkStealingStats(AbstractWorkStealingStrategy& strategy)
...@@ -25,5 +26,12 @@ void AbstractWorkStealingStats::print() { ...@@ -25,5 +26,12 @@ void AbstractWorkStealingStats::print() {
<< "total-next-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextFiberStolen) << "total-next-fiber-stolen: " << std::to_string(comulatedWorkerStats.nextFiberStolen)
<< std::endl << std::endl
<< "total-next-fiber-from-anywhere-queue: " << "total-next-fiber-from-anywhere-queue: "
<< std::to_string(comulatedWorkerStats.nextFiberFromAnywhereQueue) << std::endl; << std::to_string(comulatedWorkerStats.nextFiberFromAnywhereQueue) << std::endl
<< "total-fibers-lifted-from-anywhere-queue: "
<< std::to_string(comulatedWorkerStats.totalFibersLiftedFromAnywhereQueue) << std::endl
<< "max-fibers-lifted-from-anywhere-queue: "
<< std::to_string(comulatedWorkerStats.maxFibersLiftedFromAnywhereQueue) << std::endl
<< "avg-fibers-lifted-from-anywhere-queue: "
<< std::to_string(comulatedWorkerStats.avgFibersLiftedFromAnywhereQueue.getAverage())
<< std::endl;
} }
...@@ -9,8 +9,8 @@ ...@@ -9,8 +9,8 @@
class AbstractWorkStealingScheduler; class AbstractWorkStealingScheduler;
class AbstractWorkStealingStats; class AbstractWorkStealingStats;
class AbstractWorkStealingWorkerStats;
class Runtime; class Runtime;
struct AbstractWorkStealingWorkerStats;
class AbstractWorkStealingStrategy : public RuntimeStrategy { class AbstractWorkStealingStrategy : public RuntimeStrategy {
public: public:
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Schmaus
#include "AbstractWorkStealingWorkerStats.hpp"
#include <algorithm>
auto AbstractWorkStealingWorkerStats::operator+=(const AbstractWorkStealingWorkerStats& other)
-> AbstractWorkStealingWorkerStats& {
scheduledFibersToLocal += other.scheduledFibersToLocal;
scheduledFibersToOverflowQueue += other.scheduledFibersToOverflowQueue;
nextFiberFromLocal += other.nextFiberFromLocal;
nextFiberStolen += other.nextFiberStolen;
nextFiberFromAnywhereQueue += other.nextFiberFromAnywhereQueue;
totalFibersLiftedFromAnywhereQueue += other.totalFibersLiftedFromAnywhereQueue;
maxFibersLiftedFromAnywhereQueue =
std::max(maxFibersLiftedFromAnywhereQueue, other.maxFibersLiftedFromAnywhereQueue);
avgFibersLiftedFromAnywhereQueue.update(other.avgFibersLiftedFromAnywhereQueue.getAverage());
return *this;
}
void AbstractWorkStealingWorkerStats::recordFibersLiftedFromAnywhereQueue(
size_t fibersLiftedFromAnywhereQueue) {
totalFibersLiftedFromAnywhereQueue += fibersLiftedFromAnywhereQueue;
maxFibersLiftedFromAnywhereQueue =
std::max(maxFibersLiftedFromAnywhereQueue, fibersLiftedFromAnywhereQueue);
avgFibersLiftedFromAnywhereQueue.update(fibersLiftedFromAnywhereQueue);
}
...@@ -2,22 +2,29 @@ ...@@ -2,22 +2,29 @@
// Copyright © 2020-2021 Florian Schmaus // Copyright © 2020-2021 Florian Schmaus
#pragma once #pragma once
#include <cstddef>
#include <cstdint> #include <cstdint>
struct AbstractWorkStealingWorkerStats { #include "lib/math.hpp"
namespace math = emper::lib::math;
class AbstractWorkStealingWorkerStats {
friend class AbstractWorkStealingScheduler;
public:
uint64_t scheduledFibersToLocal = 0; uint64_t scheduledFibersToLocal = 0;
uint64_t scheduledFibersToOverflowQueue = 0; uint64_t scheduledFibersToOverflowQueue = 0;
uint64_t nextFiberFromLocal = 0; uint64_t nextFiberFromLocal = 0;
uint64_t nextFiberStolen = 0; uint64_t nextFiberStolen = 0;
uint64_t nextFiberFromAnywhereQueue = 0; uint64_t nextFiberFromAnywhereQueue = 0;
auto operator+=(const AbstractWorkStealingWorkerStats& other) uint64_t totalFibersLiftedFromAnywhereQueue = 0;
-> AbstractWorkStealingWorkerStats& { uint64_t maxFibersLiftedFromAnywhereQueue = 0;
scheduledFibersToLocal += other.scheduledFibersToLocal; math::RunningAverage<double, uint32_t> avgFibersLiftedFromAnywhereQueue;
scheduledFibersToOverflowQueue += other.scheduledFibersToOverflowQueue;
nextFiberFromLocal += other.nextFiberFromLocal; auto operator+=(const AbstractWorkStealingWorkerStats& other) -> AbstractWorkStealingWorkerStats&;
nextFiberStolen += other.nextFiberStolen;
nextFiberFromAnywhereQueue = other.nextFiberFromAnywhereQueue; private:
return *this; void recordFibersLiftedFromAnywhereQueue(size_t fibersLiftedFromAnywhere);
}
}; };
...@@ -2,6 +2,7 @@ emper_cpp_sources += files( ...@@ -2,6 +2,7 @@ emper_cpp_sources += files(
'AbstractWorkStealingScheduler.cpp', 'AbstractWorkStealingScheduler.cpp',
'AbstractWorkStealingStats.cpp', 'AbstractWorkStealingStats.cpp',
'AbstractWorkStealingStrategy.cpp', 'AbstractWorkStealingStrategy.cpp',
'AbstractWorkStealingWorkerStats.cpp',
) )
subdir('ws') subdir('ws')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment