diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index cadd6ed41ab6e3de4784f9fc320c5728ea7d3b44..5ab9afc2e437b17f0142e9975bbeec27ce2461dd 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -284,7 +284,7 @@ auto Runtime::nextFiber() -> NextFiberResult { if (ncompletions > 0) { // Keep the first and schedule the rest Fiber* next = completions[0]; - schedule(completions.begin() + 1, completions.begin() + ncompletions); + schedule(&completions[1], ncompletions - 1); // TODO: hint that this fiber comes from the IO subsystem return NextFiberResult{ diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index c1868e77f67aba1571b68660a4cd963acb0516a3..fdd0783acfac5fb3730783ba331cbdfb8ae44625 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -136,19 +136,17 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { scheduler.schedule(fiber); } - template <class InputIt> - inline void schedule(InputIt begin, InputIt end) { + inline void schedule(Fiber** fibers, unsigned count) { // Calling schedule() only works from within the EMPER runtime. assert(inRuntime()); - scheduler.schedule(begin, end); + scheduler.schedule(fibers, count); } inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } - template <class InputIt> - inline void scheduleFromAnywhere(InputIt begin, InputIt end) { - scheduler.scheduleFromAnywhere(begin, end); + inline void scheduleFromAnywhere(Fiber** fibers, unsigned count) { + scheduler.scheduleFromAnywhere(fibers, count); } void yield(); diff --git a/emper/Scheduler.hpp b/emper/Scheduler.hpp index 6ea42bc8f38e4723c64413c1639902c90442ae0b..13157322348beca93d27049c3eb50c0b101a8690 100644 --- a/emper/Scheduler.hpp +++ b/emper/Scheduler.hpp @@ -44,9 +44,8 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { void enqueueInAnywhereQueue(Fiber& fiber) { scheduleAnywhereQueue.enqueue(&fiber); } - template <class InputIt> - void insertInAnywhereQueue(InputIt begin, InputIt end) { - scheduleAnywhereQueue.insert(begin, end); + void insertInAnywhereQueue(Fiber** fibers, unsigned count) { + scheduleAnywhereQueue.insert(fibers, count); } auto dequeueFiberFromAnywhereQueue() -> Fiber* { return scheduleAnywhereQueue.dequeue(); } @@ -60,10 +59,9 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { scheduleInternal(fiber); } - template <class InputIt> - void schedule(InputIt begin, InputIt end) { - for (; begin != end; ++begin) { - Fiber& fiber = **begin; + void schedule(Fiber** fibers, unsigned count) { + for (unsigned i = 0; i < count; ++i) { + Fiber& fiber = *fibers[i]; LOGD("Scheduling batched fiber " << &fiber); scheduleInternal(fiber); } @@ -79,9 +77,8 @@ class Scheduler : public Logger<LogSubsystem::SCHED> { // TODO: investigate if it is still a good idea to wakeup only a single // worker maybe we want something like onNewWork(amountOfWork) - template <class InputIt> - void scheduleFromAnywhere(InputIt begin, InputIt end) { - insertInAnywhereQueue(begin, end); + void scheduleFromAnywhere(Fiber** fibers, unsigned count) { + insertInAnywhereQueue(fibers, count); onNewWork<CallerEnvironment::ANYWHERE>(); } diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index eebd741f432a977c581c97a55b57db914e698c6d..307dce058a2fc79a95d6090d1375863a538a9534 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -318,7 +318,7 @@ reap_cqes: if (reReap) { // schedule all already collected continuation fibers - runtime.schedule(continuationFibers.begin(), continuationFibers.begin() + posInBuf); + runtime.schedule(continuationFibers.data(), posInBuf); reReapCount++; diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index 31805c34e814c3d2f55ff3e18f5cdb4e2c9ea081..9c1183197b97b5c250c2602a27f30e6714e3bf3e 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -215,13 +215,12 @@ class IoContext : public Logger<LogSubsystem::IO> { ContinuationBuffer completionBuf; unsigned ncompletions = reapCompletions<callerEnvironment>(completionBuf); if (ncompletions > 0) { - auto *begin = completionBuf.begin(); - auto *finish = completionBuf.begin() + ncompletions; + auto *fibers = completionBuf.data(); if constexpr (callerEnvironment == CallerEnvironment::EMPER) { - runtime.schedule(begin, finish); + runtime.schedule(fibers, ncompletions); } else { - runtime.scheduleFromAnywhere(begin, finish); + runtime.scheduleFromAnywhere(fibers, ncompletions); } } } diff --git a/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp b/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp index bd8465b47eb1bd21c5b6b2613acd2705751a624b..939a092c8c893ad7a438310d8c57722c93e94bbe 100644 --- a/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp +++ b/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp @@ -28,6 +28,13 @@ class BoostSharedMutexUnboundedQueue { } } + void insert(I** items, unsigned count) { + boost::unique_lock<boost::shared_mutex> lock(queue_mutex); + for (unsigned i = 0; i < count; ++i) { + queue.push(items[i]); + } + } + auto dequeue() -> I* { boost::upgrade_lock<boost::shared_mutex> rlock(queue_mutex); if (queue.empty()) { diff --git a/emper/lib/adt/LockedSet.hpp b/emper/lib/adt/LockedSet.hpp index 7c4cccfdd8c4324fbd67bb33f9242b660ade1857..c1f4a6928fe8c774f904d12719037caf10156489 100644 --- a/emper/lib/adt/LockedSet.hpp +++ b/emper/lib/adt/LockedSet.hpp @@ -27,6 +27,13 @@ class LockedSet { _set.insert(first, last); } + void insert(const Key* items, unsigned count) { + std::lock_guard<std::mutex> lock(_mutex); + for (unsigned i = 0; i < count; ++i) { + _set.insert(items[i]); + } + } + auto erase(const Key& key) -> size_t { std::lock_guard<std::mutex> lock(_mutex); return _set.erase(key); diff --git a/emper/lib/adt/MutexUnboundedQueue.hpp b/emper/lib/adt/MutexUnboundedQueue.hpp index 5aa84ecbc9c76580ed6fb1a4604111606511f994..f146c007feed1ca827a4a1955036915f87fb9d4b 100644 --- a/emper/lib/adt/MutexUnboundedQueue.hpp +++ b/emper/lib/adt/MutexUnboundedQueue.hpp @@ -28,6 +28,13 @@ class MutexUnboundedQueue { } } + void insert(I** items, unsigned count) { + std::lock_guard<std::mutex> lock(queue_mutex); + for (unsigned i = 0; i < count; ++i) { + queue.push(items[i]); + } + } + auto dequeue() -> I* { std::lock_guard<std::mutex> lock(queue_mutex); if (queue.empty()) { diff --git a/emper/lib/adt/RwLockUnboundedQueue.hpp b/emper/lib/adt/RwLockUnboundedQueue.hpp index 8a305b377affa1e811fcf64e0ea11c098659bd74..7e887ded45e5333e547c1ae2cbac2f98113d52b7 100644 --- a/emper/lib/adt/RwLockUnboundedQueue.hpp +++ b/emper/lib/adt/RwLockUnboundedQueue.hpp @@ -55,6 +55,14 @@ class RwLockUnboundedQueue { pthread_rwlock_unlock(&lock); } + void insert(I** items, unsigned count) { + aquire_wrlock(lock); + for (unsigned i = 0; i < count; ++i) { + queue.push(items[i]); + } + pthread_rwlock_unlock(&lock); + } + auto dequeue() -> I* { I* res = nullptr; diff --git a/emper/lib/adt/SharedMutexUnboundedQueue.hpp b/emper/lib/adt/SharedMutexUnboundedQueue.hpp index 5a478da648aa4dc7ab3eb758ce823a0c48aa00bc..c3a7864453dc23f580a52dbf507ad20c9c4d42de 100644 --- a/emper/lib/adt/SharedMutexUnboundedQueue.hpp +++ b/emper/lib/adt/SharedMutexUnboundedQueue.hpp @@ -28,6 +28,13 @@ class SharedMutexUnboundedQueue { } } + void insert(I** items, unsigned count) { + std::unique_lock lock(queue_mutex); + for (unsigned i = 0; i < count; ++i) { + queue.push(items[i]); + } + } + auto dequeue() -> I* { { std::shared_lock lock(queue_mutex);