diff --git a/emper/Actor.hpp b/emper/Actor.hpp index f2b1d57baeb154369acf2a324edb4accf5d848eb..80fa5f7b25c42cc748ddb53838c21d92ee3f28f5 100644 --- a/emper/Actor.hpp +++ b/emper/Actor.hpp @@ -8,6 +8,7 @@ #include "CallerEnvironment.hpp" #include "Fiber.hpp" #include "UnboundedBlockingMpscQueue.hpp" +#include "emper.hpp" template <typename T> class Actor { @@ -78,8 +79,13 @@ class Actor { auto waitUntilIdle(long timeout) -> bool { const auto start = std::chrono::steady_clock::now(); - const auto deadline = start + std::chrono::milliseconds(timeout); + const auto deadline = start + std::chrono::seconds(timeout); while (!(queue.size() == 0 && state.load(std::memory_order_acquire) == Retrieving)) { + if constexpr (emper::IO) { + emper::sleep(1); + } else { + emper::yield(); + } // TODO: The suppressed linter error below may be a false positive // reported by clang-tidy. // NOLINTNEXTLINE(modernize-use-nullptr) diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index e195d3e05f24a07876511c1beb6002b2cc56fc54..17ef6b9b0982a8d6921439403872736090a32f6c 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -14,7 +14,8 @@ #include <memory> // for __shared_ptr_access, shared_ptr #include <string> // for string -#include "Common.hpp" // for DIE_MSG_ERRNO, DIE, DIE_MSG +#include "Common.hpp" // for DIE_MSG_ERRNO, DIE, DIE_MSG +#include "Context.hpp" #include "ContextManager.hpp" // for ContextManager #include "Debug.hpp" // for DBG, ABORT, LOGD, LOGE #include "Emper.hpp" @@ -209,6 +210,15 @@ auto Runtime::workerLoop(Worker* worker) -> void* { return nullptr; } +void Runtime::yield() { + Context* context = Context::getCurrentContext(); + contextManager.saveAndStartNew([context, this] { + Fiber* resumeFiber = + Fiber::from([context, this] { this->contextManager.discardAndResume(context); }); + this->scheduleFromAnywhere(*resumeFiber); + }); +} + auto Runtime::nextFiber() -> NextFiberResult { if constexpr (emper::IO) { // Schedule all fibers waiting on completed IO diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 803d935d38fb15e7a8a58ec24b8142985781373e..92487a336c3219860a1dafce3182c955df6113cd 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -153,6 +153,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { inline void scheduleFromAnywhere(Fiber& fiber) { scheduler.scheduleFromAnywhere(fiber); } + void yield(); + // TODO: This should probably not be a public method of Runtime. auto nextFiber() -> NextFiberResult; diff --git a/emper/include/emper.hpp b/emper/include/emper.hpp index 82e1f259a784354baa923ddd31a3cb65887bb88b..c5691e232354e9803ab2c7e978de0f503b1c74f5 100644 --- a/emper/include/emper.hpp +++ b/emper/include/emper.hpp @@ -6,9 +6,11 @@ #include <functional> #include <utility> +#include "Common.hpp" #include "Fiber.hpp" #include "Runtime.hpp" #include "SynchronizedFiber.hpp" +#include "io/Future.hpp" void async(Fiber* fiber) { assert(fiber != nullptr); @@ -59,3 +61,22 @@ void spawn(Fiber::fiber_fun0_t function, workeraffinity_t* affinity, S& semaphor Fiber* fiber = SynchronizedFiber::from(function, affinity, semaphore); async(fiber); } + +namespace emper { +void yield() { + Runtime* runtime = Runtime::getRuntime(); + runtime->yield(); +} + +auto sleep(unsigned int seconds) -> bool { + if constexpr (!emper::IO) { + DIE_MSG("sleep requires emper::io"); + } + + emper::io::AlarmFuture::Timespec ts = {.tv_sec = seconds, .tv_nsec = 0}; + emper::io::AlarmFuture alarm(ts); + int32_t res = alarm.submitAndWait(); + + return res == -ETIME; +} +} // namespace emper diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp index 91f157a18c8a629b1781a11c6d84986f312ab9df..b3c5c5ccc5d6937c0740488228b45a6ecb7b7660 100644 --- a/emper/io/Future.hpp +++ b/emper/io/Future.hpp @@ -21,7 +21,6 @@ #include "Emper.hpp" // for DEBUG #include "io/Operation.hpp" // for Operation, operator<<, Operati... -struct __kernel_timespec; struct io_uring_sqe; namespace emper::io { @@ -564,29 +563,33 @@ class CloseFuture : public Future { * @endcode */ class TimeoutWrapper : public Future { - void prepareSqe(io_uring_sqe* sqe) override { - io_uring_prep_link_timeout(sqe, (struct ::__kernel_timespec*)buf, 0); - } - public: - TimeoutWrapper(Future& future, struct __kernel_timespec& ts) + using Timespec = struct __kernel_timespec; + + TimeoutWrapper(Future& future, Timespec& ts) : Future(Operation::LINK_TIMEOUT, 0, (void*)&ts, 0, 0) { setDependency(future); }; + + private: + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_link_timeout(sqe, (Timespec*)buf, 0); + } }; /* * @brief Arm a timeout which will signal the future when it is reached * - * If the timeout was reached it will wait() will return -ETIME. + * If the timeout was reached calling wait() will return -ETIME. */ class AlarmFuture : public Future { - void prepareSqe(io_uring_sqe* sqe) override { - io_uring_prep_timeout(sqe, (struct ::__kernel_timespec*)buf, 0, 0); - } - public: - AlarmFuture(struct __kernel_timespec& ts) : Future(Operation::TIMEOUT, 0, (void*)&ts, 0, 0){}; + using Timespec = struct __kernel_timespec; + + AlarmFuture(Timespec& ts) : Future(Operation::TIMEOUT, 0, (void*)&ts, 0, 0){}; + + private: + void prepareSqe(io_uring_sqe* sqe) override { io_uring_prep_timeout(sqe, (Timespec*)buf, 0, 0); } }; /* diff --git a/tests/AlarmActorTest.cpp b/tests/AlarmActorTest.cpp index a0f962b60f994a2699a842b385ef24639a18c903..31c9c46a4c922817acb595a13d40598e704ca40f 100644 --- a/tests/AlarmActorTest.cpp +++ b/tests/AlarmActorTest.cpp @@ -61,7 +61,7 @@ auto main(int argc, char* argv[]) -> int { cps.wait(); // Wait for the actor to become idle. - bool actorIdle = alarmActor.waitUntilIdle(60 * 1000); + bool actorIdle = alarmActor.waitUntilIdle(60); if (!actorIdle) { std::cerr << "FAILURE: Actor did not went idle"; exit(EXIT_FAILURE); diff --git a/tests/AlarmFutureTest.cpp b/tests/AlarmFutureTest.cpp index 833ec08a7cec551a3fedc61c598afbf66fdc4baf..11dcad1c6d6fff5702ead2149d7b35970fa4d12c 100644 --- a/tests/AlarmFutureTest.cpp +++ b/tests/AlarmFutureTest.cpp @@ -1,7 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later // Copyright © 2020-2021 Florian Fischer -#include <liburing.h> - #include <cassert> // for assert #include <cerrno> // for ETIME #include <chrono> // for microseconds, duration_cast, operator- @@ -14,7 +12,7 @@ using emper::io::AlarmFuture; void emperTest() { - struct __kernel_timespec ts = {.tv_sec = 1, .tv_nsec = 0}; + AlarmFuture::Timespec ts = {.tv_sec = 1, .tv_nsec = 0}; AlarmFuture alarm(ts); auto start = std::chrono::high_resolution_clock::now(); diff --git a/tests/FutureCallbackTest.cpp b/tests/FutureCallbackTest.cpp index 066efe567f9c74a5f29bf8871f592e34256bfaa0..4e7e85bb2a903c6d0ac3ef138239968a08564813 100644 --- a/tests/FutureCallbackTest.cpp +++ b/tests/FutureCallbackTest.cpp @@ -1,7 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later // Copyright © 2020-2021 Florian Fischer -#include <liburing.h> - #include <cassert> // for assert #include <cerrno> // for ETIME #include <cstdint> // for int32_t @@ -18,7 +16,7 @@ void callback(int32_t res, BPS& bps) { } void emperTest() { - struct __kernel_timespec ts = {.tv_sec = 1, .tv_nsec = 0}; + AlarmFuture::Timespec ts = {.tv_sec = 1, .tv_nsec = 0}; AlarmFuture alarm(ts); BPS bps; diff --git a/tests/SimpleActorTest.cpp b/tests/SimpleActorTest.cpp index 3415863c08a9332fbe78fc7eda651f96ecbfaa27..76b4cfe3d08b717531c400ef348366367378c39f 100644 --- a/tests/SimpleActorTest.cpp +++ b/tests/SimpleActorTest.cpp @@ -63,7 +63,7 @@ static void mainFiber(void* runtime_ptr) { cps.wait(); // Wait for the actor to become idle. - bool actorIdle = sumActor.waitUntilIdle(60 * 1000); + bool actorIdle = sumActor.waitUntilIdle(60); if (!actorIdle) { std::cerr << "FAILURE: Actor did not went idle"; exit(EXIT_FAILURE); diff --git a/tests/TimeoutWrapperTest.cpp b/tests/TimeoutWrapperTest.cpp index 00a592439b703e336c239ceea4bf966aeadc7f52..8211b3766d8ccac16d1c4c632395e5cf91d39e81 100644 --- a/tests/TimeoutWrapperTest.cpp +++ b/tests/TimeoutWrapperTest.cpp @@ -1,6 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later // Copyright © 2020-2021 Florian Fischer -#include <liburing.h> #include <sys/eventfd.h> // for eventfd, EFD_SEMAPHORE #include <cassert> // for assert @@ -23,7 +22,7 @@ void emperTest() { uint64_t read_buf; ReadFuture readFuture(efd, &read_buf, sizeof(read_buf), 0); - struct __kernel_timespec ts = {.tv_sec = 1, .tv_nsec = 0}; + TimeoutWrapper::Timespec ts = {.tv_sec = 1, .tv_nsec = 0}; TimeoutWrapper t(readFuture, ts); int32_t res = t.submitAndWait(); diff --git a/tests/TooLongFutureChain.cpp b/tests/TooLongFutureChain.cpp index 12b61e0324d5781fb6313d00d76f801597efa542..8f861e0a6cb0d82c7754ea2f9ea8930f7844f5db 100644 --- a/tests/TooLongFutureChain.cpp +++ b/tests/TooLongFutureChain.cpp @@ -1,7 +1,5 @@ // SPDX-License-Identifier: LGPL-3.0-or-later // Copyright © 2020-2021 Florian Fischer -#include <liburing.h> - #include <array> // for array #include <cstdlib> // for exit, EXIT_SUCCESS, size_t @@ -13,7 +11,7 @@ using emper::io::AlarmFuture; void emperTest() { const size_t links = EMPER_IO_WORKER_URING_ENTRIES * 2; std::array<AlarmFuture*, links> futures; - struct __kernel_timespec ts = {.tv_sec = 0, .tv_nsec = 100}; + AlarmFuture::Timespec ts = {.tv_sec = 0, .tv_nsec = 100}; for (unsigned i = 0; i < links; ++i) { auto* future = new AlarmFuture(ts); diff --git a/tests/UnblockOnMainActorTest.cpp b/tests/UnblockOnMainActorTest.cpp index 020db28f5dc8242054a729fb65b1613676f14a5d..6ae3883f27974b786f8df82b848c03096b375e5d 100644 --- a/tests/UnblockOnMainActorTest.cpp +++ b/tests/UnblockOnMainActorTest.cpp @@ -76,7 +76,7 @@ auto main(int argc, char* argv[]) -> int { cps.wait(); // Wait for the actor to become idle. - bool actorIdle = alarmActor.waitUntilIdle(60 * 1000); + bool actorIdle = alarmActor.waitUntilIdle(60); if (!actorIdle) { std::cerr << "FAILURE: Actor did not went idle"; quit = true; diff --git a/tests/YieldToAnywhereTest.cpp b/tests/YieldToAnywhereTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ee0fb8ef89552e30b4119c41e0017e25494e0a47 --- /dev/null +++ b/tests/YieldToAnywhereTest.cpp @@ -0,0 +1,22 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020 Florian Schmaus +#include <cstdlib> + +#include "CountingPrivateSemaphore.hpp" +#include "emper.hpp" + +void emperTest() { + const unsigned fiberCount = 100; + CPS cps; + for (unsigned i = 0; i < fiberCount; ++i) { + spawn( + [] { + for (unsigned i = 0; i < 100; ++i) { + emper::yield(); + } + }, + cps); + } + cps.wait(); + exit(EXIT_SUCCESS); +} diff --git a/tests/meson.build b/tests/meson.build index 89518c6bf6099a3d023c4434af28831f333fbb22..52662f38a5bcdb2ce515401a27afcbf3f56911c0 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -29,6 +29,12 @@ tests = { 'description': 'Test EMPER\'s C++ API', }, + 'YieldToAnywhereTest.cpp': + { + 'description': 'Test emper::yieldToAnywhere', + 'test_runner': 'emper', + }, + 'ReuseBpsTest.cpp': { 'description': 'Test resetting of BPSs',