diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index cadd6ed41ab6e3de4784f9fc320c5728ea7d3b44..5ab9afc2e437b17f0142e9975bbeec27ce2461dd 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -284,7 +284,7 @@ auto Runtime::nextFiber() -> NextFiberResult { if (ncompletions > 0) { // Keep the first and schedule the rest Fiber* next = completions[0]; - schedule(completions.begin() + 1, completions.begin() + ncompletions); + schedule(&completions[1], ncompletions - 1); // TODO: hint that this fiber comes from the IO subsystem return NextFiberResult{ diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index c1868e77f67aba1571b68660a4cd963acb0516a3..fdd0783acfac5fb3730783ba331cbdfb8ae44625 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -136,19 +136,17 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { scheduler.schedule(fiber); } - template <class InputIt> - inline void schedule(InputIt begin, InputIt end) { + inline void schedule(Fiber** fibers, unsigned count) { // Calling schedule() only works from within the EMPER runtime. assert(inRuntime()); - scheduler.schedule(begin, end); + scheduler.schedule(fibers, count); } inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } - template <class InputIt> - inline void scheduleFromAnywhere(InputIt begin, InputIt end) { - scheduler.scheduleFromAnywhere(begin, end); + inline void scheduleFromAnywhere(Fiber** fibers, unsigned count) { + scheduler.scheduleFromAnywhere(fibers, count); } void yield(); diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index 6ea42bc8f38e4723c64413c1639902c90442ae0b..d71ba18b813d1be89113d6bff27520315366b1aa 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -3,6 +3,7 @@ #pragma once #include <functional> // for function +#include <ostream> #include "CallerEnvironment.hpp" #include "Debug.hpp" // for LogSubsystem, LogSubsystem::SCHED, Logger @@ -44,26 +45,26 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); } - template <class InputIt> - void insertInAnywhereQueue(InputIt begin, InputIt end) { - scheduleAnywhereQueue.insert(begin, end); + void insertInAnywhereQueue(Fiber** fibers, unsigned count) { + scheduleAnywhereQueue.insert(fibers, count); } auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); } virtual void scheduleInternal(Fiber& fiber) = 0; + virtual void scheduleFromAnywhereInternal(Fiber& fiber) = 0; + virtual void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) = 0; public: void schedule(Fiber& fiber) { LOGD("Scheduling fiber " << &fiber); - scheduleInternal(fiber); } - template <class InputIt> - void schedule(InputIt begin, InputIt end) { - for (; begin != end; ++begin) { - Fiber& fiber = **begin; + // TODO: maybe this should also be a specialized function + void schedule(Fiber** fibers, unsigned count) { + for (unsigned i = 0; i < count; ++i) { + Fiber& fiber = *fibers[i]; LOGD("Scheduling batched fiber " << &fiber); scheduleInternal(fiber); } @@ -72,17 +73,11 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { virtual auto nextFiber() -> NextFiberResult = 0; void scheduleFromAnywhere(Fiber& fiber) { - enqueueInAnywhereQueue(fiber); - - onNewWork<CallerEnvironment::ANYWHERE>(); + LOGD("Scheduling fiber " << &fiber << " from anywhere"); + scheduleFromAnywhereInternal(fiber); } - // TODO: investigate if it is still a good idea to wakeup only a single - // worker maybe we want something like onNewWork(amountOfWork) - template <class InputIt> - void scheduleFromAnywhere(InputIt begin, InputIt end) { - insertInAnywhereQueue(begin, end); - - onNewWork<CallerEnvironment::ANYWHERE>(); + void scheduleFromAnywhere(Fiber** fibers, unsigned count) { + scheduleFromAnywhereInternal(fibers, count); } }; diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp index c559ba996e9e7b57ac5f6dd30f3cbd34b2a91822..4bac70ce22cf656239af8e6d1e6a41e76e2425c9 100644 --- a/emper/io/Future.hpp +++ b/emper/io/Future.hpp @@ -14,17 +14,19 @@ #include <ostream> // for operator<<, ostream, basic_ost... #include <stdexcept> #include <string> +#include <utility> #include "BinaryPrivateSemaphore.hpp" // for BPS #include "CallerEnvironment.hpp" // for CallerEnvironment, ANYWHERE #include "Common.hpp" #include "Debug.hpp" // for LOGD, LogSubsystem, LogSubsyst... #include "Emper.hpp" // for DEBUG +#include "Fiber.hpp" #include "Runtime.hpp" +#include "emper-common.h" #include "io/Operation.hpp" // for Operation, operator<<, Operati... struct io_uring_sqe; -class Fiber; namespace emper::io { class Stats; @@ -43,9 +45,22 @@ class Future : public Logger<LogSubsystem::IO> { friend class Stats; public: + // User facing Callback type using Callback = std::function<void(const int32_t&)>; protected: + // Internal Callback type + class CallbackInternal { + std::function<void(const int32_t&)> callback; + + public: + workeraffinity_t affinity; + + CallbackInternal(Callback callback) : callback(std::move(callback)) {} + + void operator()(const int32_t& res) { callback(res); } + }; + using State = struct State { uint8_t submitted : 1 = 0; /*!< An sqe for this Future was prepared */ uint8_t completed : 1 = 0; /*!< The semaphor was signaled */ @@ -58,6 +73,16 @@ class Future : public Logger<LogSubsystem::IO> { BPS sem; + // It would be OK to not initialize this member, since it is only + // set after the the Future is prepared and only read if the Future + // get completed from anywhere. + // However clang-tidy then starts to complain + // about clang-analyzer-optin.cplusplus.UninitializedObject + + // The workerId of the worker which submitted this Future + // This will be passed to the BPS when it is signalled from the completer thread + workeraffinity_t affinity = Fiber::NOT_AFFINE; + State state; /* IO operation to perform */ @@ -90,7 +115,7 @@ class Future : public Logger<LogSubsystem::IO> { * time the request is in the io_uring. * The callback is called with the value causing the completion */ - Callback* callback = nullptr; + CallbackInternal* callback = nullptr; virtual void prepareSqe(io_uring_sqe* sqe) = 0; @@ -108,7 +133,8 @@ class Future : public Logger<LogSubsystem::IO> { virtual void completeFromAnywhere(int32_t res) { setCompletion(res); - sem.signalFromAnywhere(); + // pass our worker affinity set during IoContext::submit + sem.signalFromAnywhere(&affinity); } virtual auto completeAndGetContinuation(int32_t res) -> Fiber* { @@ -218,16 +244,6 @@ class Future : public Logger<LogSubsystem::IO> { this->dependency = &dependency; } - private: - inline void setCallback(Callback* callback) { - if (unlikely(this->callback)) { - delete this->callback; - } - - this->callback = callback; - } - - public: /* * @brief register a callback which is executed in a new Fiber on completion * @@ -235,19 +251,10 @@ class Future : public Logger<LogSubsystem::IO> { * It gets passed the value causing the completion. */ inline void setCallback(const Callback& callback) { - /* Call copy constructor */ - setCallback(new Callback(callback)); - } - - /* - * @brief register a callback which is executed in a new Fiber on completion - * - * @param callback Callback rvalue which gets moved and executed on completion. - * It gets passed the value causing the completion. - */ - inline void setCallback(const Callback&& callback) { - /* Call move constructor */ - setCallback(new Callback(callback)); + if (unlikely(this->callback)) { + delete this->callback; + } + this->callback = new CallbackInternal(callback); } /* diff --git a/emper/io/GlobalIoContext.hpp b/emper/io/GlobalIoContext.hpp index b1ac413be0d530125d3e57a166d0b08a8e3a3049..78dc214d21492656a27b59530a8a5927eac21673 100644 --- a/emper/io/GlobalIoContext.hpp +++ b/emper/io/GlobalIoContext.hpp @@ -19,7 +19,9 @@ class GlobalIoContext : public IoContext { friend class RecvFuture; private: - GlobalIoContext(Runtime& runtime, workerid_t workerCount) : IoContext(runtime, workerCount) {} + GlobalIoContext(Runtime& runtime, workerid_t workerCount) : IoContext(runtime, workerCount) { + worker = nullptr; + } // This mutex is only used to protect the SQ during start up when all workers // register their IoContext's eventfds in parallel diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index eebd741f432a977c581c97a55b57db914e698c6d..4fe1e748e747c8327cff9e0aa1dc607b9ff55857 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -65,12 +65,17 @@ auto IoContext::prepareFutureChain(Future &future, unsigned chain_length) -> uns // we should start a new Fiber executing callback on completion if (future.callback) { LOGD("prepare " << future << " Callback " << future.callback); + // set the Callback affinity if the callback is scheduled from anywhere + if (worker) future.callback->affinity = worker->getWorkerId(); io_uring_sqe_set_data(sqe, TaggedPtr(future.callback, static_cast<uint16_t>(PointerTags::Callback))); // Someone wants to be notified about the completion of this Future } else if (!future.isForgotten()) { io_uring_sqe_set_data(sqe, TaggedPtr(&future, static_cast<uint16_t>(PointerTags::Future))); + + // set the Future affinity passed to the BPS when signaled from anywhere + if (worker) future.affinity = worker->getWorkerId(); } future.state.submitted = true; @@ -274,12 +279,14 @@ reap_cqes: auto tag = static_cast<PointerTags>(tptr.getTag()); switch (tag) { case PointerTags::Callback: { - auto *callback = tptr.getPtr<Future::Callback>(); + auto *callback = tptr.getPtr<Future::CallbackInternal>(); LOGD("Create new callback fiber for " << callback); - auto *callbackFiber = Fiber::from([&c = *callback, res] { - c(res); - delete &c; - }); + auto *callbackFiber = Fiber::from( + [&c = *callback, res] { + c(res); + delete &c; + }, + &callback->affinity); continuationFibers[posInBuf++] = callbackFiber; } break; @@ -318,7 +325,7 @@ reap_cqes: if (reReap) { // schedule all already collected continuation fibers - runtime.schedule(continuationFibers.begin(), continuationFibers.begin() + posInBuf); + runtime.schedule(continuationFibers.data(), posInBuf); reReapCount++; diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 31805c34e814c3d2f55ff3e18f5cdb4e2c9ea081..9c1183197b97b5c250c2602a27f30e6714e3bf3e 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -215,13 +215,12 @@ class IoContext : public Logger<LogSubsystem::IO> { ContinuationBuffer completionBuf; unsigned ncompletions = reapCompletions<callerEnvironment>(completionBuf); if (ncompletions > 0) { - auto *begin = completionBuf.begin(); - auto *finish = completionBuf.begin() + ncompletions; + auto *fibers = completionBuf.data(); if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - runtime.schedule(begin, finish); + runtime.schedule(fibers, ncompletions); } else { - runtime.scheduleFromAnywhere(begin, finish); + runtime.scheduleFromAnywhere(fibers, ncompletions); } } } diff --git a/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp b/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp index bd8465b47eb1bd21c5b6b2613acd2705751a624b..939a092c8c893ad7a438310d8c57722c93e94bbe 100644 --- a/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp +++ b/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp @@ -28,6 +28,13 @@ class BoostSharedMutexUnboundedQueue { } } + void insert(I** items, unsigned count) { + boost::unique_lock<boost::shared_mutex> lock(queue_mutex); + for (unsigned i = 0; i < count; ++i) { + queue.push(items[i]); + } + } + auto dequeue() -> I* { boost::upgrade_lock<boost::shared_mutex> rlock(queue_mutex); if (queue.empty()) { diff --git a/emper/lib/adt/LockedSet.hpp b/emper/lib/adt/LockedSet.hpp index 7c4cccfdd8c4324fbd67bb33f9242b660ade1857..c1f4a6928fe8c774f904d12719037caf10156489 100644 --- a/emper/lib/adt/LockedSet.hpp +++ b/emper/lib/adt/LockedSet.hpp @@ -27,6 +27,13 @@ class LockedSet { _set.insert(first, last); } + void insert(const Key* items, unsigned count) { + std::lock_guard<std::mutex> lock(_mutex); + for (unsigned i = 0; i < count; ++i) { + _set.insert(items[i]); + } + } + auto erase(const Key& key) -> size_t { std::lock_guard<std::mutex> lock(_mutex); return _set.erase(key); diff --git a/emper/lib/adt/MutexUnboundedQueue.hpp b/emper/lib/adt/MutexUnboundedQueue.hpp index 5aa84ecbc9c76580ed6fb1a4604111606511f994..f146c007feed1ca827a4a1955036915f87fb9d4b 100644 --- a/emper/lib/adt/MutexUnboundedQueue.hpp +++ b/emper/lib/adt/MutexUnboundedQueue.hpp @@ -28,6 +28,13 @@ class MutexUnboundedQueue { } } + void insert(I** items, unsigned count) { + std::lock_guard<std::mutex> lock(queue_mutex); + for (unsigned i = 0; i < count; ++i) { + queue.push(items[i]); + } + } + auto dequeue() -> I* { std::lock_guard<std::mutex> lock(queue_mutex); if (queue.empty()) { diff --git a/emper/lib/adt/RwLockUnboundedQueue.hpp b/emper/lib/adt/RwLockUnboundedQueue.hpp index 8a305b377affa1e811fcf64e0ea11c098659bd74..7e887ded45e5333e547c1ae2cbac2f98113d52b7 100644 --- a/emper/lib/adt/RwLockUnboundedQueue.hpp +++ b/emper/lib/adt/RwLockUnboundedQueue.hpp @@ -55,6 +55,14 @@ class RwLockUnboundedQueue { pthread_rwlock_unlock(&lock); } + void insert(I** items, unsigned count) { + aquire_wrlock(lock); + for (unsigned i = 0; i < count; ++i) { + queue.push(items[i]); + } + pthread_rwlock_unlock(&lock); + } + auto dequeue() -> I* { I* res = nullptr; diff --git a/emper/lib/adt/SharedMutexUnboundedQueue.hpp b/emper/lib/adt/SharedMutexUnboundedQueue.hpp index 5a478da648aa4dc7ab3eb758ce823a0c48aa00bc..c3a7864453dc23f580a52dbf507ad20c9c4d42de 100644 --- a/emper/lib/adt/SharedMutexUnboundedQueue.hpp +++ b/emper/lib/adt/SharedMutexUnboundedQueue.hpp @@ -28,6 +28,13 @@ class SharedMutexUnboundedQueue { } } + void insert(I** items, unsigned count) { + std::unique_lock lock(queue_mutex); + for (unsigned i = 0; i < count; ++i) { + queue.push(items[i]); + } + } + auto dequeue() -> I* { { std::shared_lock lock(queue_mutex); diff --git a/emper/strategies/laws/LawsScheduler.cpp b/emper/strategies/laws/LawsScheduler.cpp index 6ea84c23fef24c9f9ad52be0b17303c2718039df..3bf7079a8e0bc7d0ba625c792d91f53d8c1008be 100644 --- a/emper/strategies/laws/LawsScheduler.cpp +++ b/emper/strategies/laws/LawsScheduler.cpp @@ -4,6 +4,7 @@ #include <cstdint> +#include "CallerEnvironment.hpp" #include "Emper.hpp" #include "LawsStrategy.hpp" // IWYU pragma: keep #include "NextFiberResult.hpp" @@ -21,17 +22,17 @@ LawsScheduler::LawsScheduler(Runtime& runtime) : AbstractWorkStealingScheduler(r addNewWorkerHook(newWorkerHook); } -void LawsScheduler::scheduleInternal(Fiber& fiber) { +void LawsScheduler::tryScheduleToPriorityQueue(Fiber& fiber) { workeraffinity_t* const affinity_buffer = getAffinityBuffer(fiber); if (affinity_buffer) { workeraffinity_t affinity = *affinity_buffer; workerid_t workerId = Runtime::getWorkerId(); if (affinity == workerId) { - goto scheduleViaWorkStealing; + return; } if (affinity == Fiber::NOT_AFFINE) { - goto scheduleViaWorkStealing; + return; } // We found a fiber to schedule on a remote prority queue. @@ -40,11 +41,28 @@ void LawsScheduler::scheduleInternal(Fiber& fiber) { emper::statsIncr(LawsStrategy::stats.scheduledFibersToPriority); } +} -scheduleViaWorkStealing: +void LawsScheduler::scheduleInternal(Fiber& fiber) { + tryScheduleToPriorityQueue(fiber); scheduleViaWorkStealing(fiber); } +void LawsScheduler::scheduleFromAnywhereInternal(Fiber& fiber) { + tryScheduleToPriorityQueue(fiber); + enqueueInAnywhereQueue(fiber); + onNewWork<CallerEnvironment::ANYWHERE>(); +} + +void LawsScheduler::scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) { + for (unsigned i = 0; i < count; ++i) { + Fiber& fiber = *fibers[i]; + tryScheduleToPriorityQueue(fiber); + } + insertInAnywhereQueue(fibers, count); + onNewWork<CallerEnvironment::ANYWHERE>(); +} + auto LawsScheduler::nextFiber() -> NextFiberResult { Fiber* fiber = priorityQueue.dequeue(); if (fiber != nullptr) { diff --git a/emper/strategies/laws/LawsScheduler.hpp b/emper/strategies/laws/LawsScheduler.hpp index d95a4bf06146caaca0af83b35bfd0183108d4019..43f7f12b2cf71ac7cc79ef6b319dc208e5a154cc 100644 --- a/emper/strategies/laws/LawsScheduler.hpp +++ b/emper/strategies/laws/LawsScheduler.hpp @@ -17,8 +17,12 @@ class LawsScheduler : public AbstractWorkStealingScheduler { static thread_local LawsMpscQueue priorityQueue; + void tryScheduleToPriorityQueue(Fiber& fiber); + protected: void scheduleInternal(Fiber& fiber) override; + void scheduleFromAnywhereInternal(Fiber& fiber) override; + void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) override; public: LawsScheduler(Runtime& runtime); diff --git a/emper/strategies/ws/WsScheduler.hpp b/emper/strategies/ws/WsScheduler.hpp index 392b0aff0c7f9742be48efaff3ef94ef0d2784a4..c122f301a6cc817a9fe567154bde5b8ac7ca1707 100644 --- a/emper/strategies/ws/WsScheduler.hpp +++ b/emper/strategies/ws/WsScheduler.hpp @@ -2,6 +2,7 @@ // Copyright © 2020-2021 Florian Schmaus #pragma once +#include "CallerEnvironment.hpp" #include "NextFiberResult.hpp" #include "strategies/AbstractWorkStealingScheduler.hpp" @@ -11,6 +12,15 @@ class Runtime; class WsScheduler : public AbstractWorkStealingScheduler { protected: void scheduleInternal(Fiber& fiber) override { scheduleViaWorkStealing(fiber); } + void scheduleFromAnywhereInternal(Fiber& fiber) override { + enqueueInAnywhereQueue(fiber); + onNewWork<CallerEnvironment::ANYWHERE>(); + } + + void scheduleFromAnywhereInternal(Fiber** fibers, unsigned count) override { + insertInAnywhereQueue(fibers, count); + onNewWork<CallerEnvironment::ANYWHERE>(); + } public: WsScheduler(Runtime& runtime); diff --git a/tests/io/FutureCallbackTest.cpp b/tests/io/FutureCallbackTest.cpp index 4e7e85bb2a903c6d0ac3ef138239968a08564813..a4a1dcd229d673d72302d6c5b588fa63c8822209 100644 --- a/tests/io/FutureCallbackTest.cpp +++ b/tests/io/FutureCallbackTest.cpp @@ -10,17 +10,15 @@ using emper::io::AlarmFuture; using emper::io::Future; -void callback(int32_t res, BPS& bps) { - assert(res == -ETIME); - bps.signal(); -} - void emperTest() { AlarmFuture::Timespec ts = {.tv_sec = 1, .tv_nsec = 0}; AlarmFuture alarm(ts); BPS bps; - alarm.setCallback(Future::Callback([&bps](int32_t res) { callback(res, bps); })); + alarm.setCallback([&bps](int32_t res) { + assert(res == -ETIME); + bps.signal(); + }); alarm.submit(); // wait till the callback was executed