diff --git a/.dir-locals.el b/.dir-locals.el-disabled similarity index 100% rename from .dir-locals.el rename to .dir-locals.el-disabled diff --git a/emper/CMakeLists.txt b/emper/CMakeLists.txt index 946a1042496ffab53ccf786c82f8364cc91fcb31..62472c71c7b5533d05e38243094a1a8330561472 100644 --- a/emper/CMakeLists.txt +++ b/emper/CMakeLists.txt @@ -12,6 +12,7 @@ add_files(EMPER_SOURCE ContextManager.cpp) add_files(EMPER_SOURCE BinaryPrivateSemaphore.cpp) add_files(EMPER_SOURCE CountingPrivateSemaphore.cpp) add_files(EMPER_SOURCE Fibril.cpp) +add_files(EMPER_SOURCE Semaphore.cpp) add_files(EMPER_INCLUDE ".") add_files(EMPER_INCLUDE "include") diff --git a/emper/Context.hpp b/emper/Context.hpp index 879687dc7b2965b446460bc813271404b3023e30..fecb69ac5f6ff5fe66406f46a4293338618551ba 100644 --- a/emper/Context.hpp +++ b/emper/Context.hpp @@ -15,7 +15,15 @@ class Dispatcher; class Fiber; #define PAGE_SIZE (4 * 1024) -#define STACK_SIZE (8 * 1024 * 1024) +#ifdef EMPER_BENCH_STACK_SIZE +#define STACK_SIZE EMPER_BENCH_STACK_SIZE +#else +#define STACK_SIZE (0x10000) +#endif +#ifdef EMPER_FIBRIL_STATS +#include <atomic> +extern std::atomic<uint64_t> statsUnmapp; +#endif extern "C" [[noreturn]] void switch_and_load_context(void** toTos); @@ -27,7 +35,7 @@ extern "C" [[noreturn]] void switch_context(void** toTos); class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> { private: - static const unsigned int CONTEXT_SIZE = STACK_SIZE - PAGE_SIZE; // 0xffff; + static const unsigned int CONTEXT_SIZE = STACK_SIZE; // 0xffff; static thread_local Context* currentContext; @@ -135,10 +143,14 @@ public: const size_t PAGE_SIZE_MASK = 4 * 1024 - 1; const uintptr_t start = ((uintptr_t) context + PAGE_SIZE_MASK) & ~PAGE_SIZE_MASK; const uintptr_t end = (uintptr_t) from & ~PAGE_SIZE_MASK; - if (madvise((void*) start, (end - start), MADV_DONTNEED)) { + //if (madvise((void*) start, (end - start), MADV_DONTNEED)) { + if (madvise((void*) start, (end - start), MADV_FREE)) { perror("madvise"); // die()? } +#ifdef EMPER_FIBRIL_STATS + statsUnmapp++; +#endif } #endif diff --git a/emper/ContextManager.cpp b/emper/ContextManager.cpp index d16cf58055429720395df199a76a4d228d0a6195..8d4b2f7cb40623a879c5ea590bd771c8e8467741 100644 --- a/emper/ContextManager.cpp +++ b/emper/ContextManager.cpp @@ -47,9 +47,13 @@ void ContextManager::putFreeContext(Context* context) { thread_local static Continuation *cont; void ContextManager::start() { + uintptr_t val; Continuation c; cont = &c; - uintptr_t val = cont->setJmp(); + + val = cont->setJmp(); + if (Runtime::getRuntime()->isShuttingDown()) + pthread_exit(nullptr); Fibril::tryResumeFiber(val); diff --git a/emper/Fibril.hpp b/emper/Fibril.hpp index 7c5cb3fd8bde37c04e48d31a593517e5f788e0c3..eddfa74222cf6b2e2f9a22a5e2a426b333ef2aa0 100644 --- a/emper/Fibril.hpp +++ b/emper/Fibril.hpp @@ -153,6 +153,10 @@ public: reserveStealCount -= 1; #endif +#ifdef EMPER_FIBRIL_STATS + statsSteals++; +#endif + /* Reserve 128 byte at the bottom. */ /* FIXME clean up, make nice looking */ cont.execute((void**)Context::currentContext->getTos() - 16); diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 517536be9a3405581938a4c1f75f9fd24d18e976..2c812bfdef033328139e868fd9cb76a7109578b3 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -20,6 +20,11 @@ thread_local unsigned int Runtime::seed; thread_local workerid_t Runtime::workerId; RuntimeStrategy& Runtime::DEFAULT_STRATEGY = WsStrategy::INSTANCE; +#ifdef EMPER_FIBRIL_STATS +std::atomic<uint64_t> statsSteals = 0; +std::atomic<uint64_t> statsUnmapp = 0; +#endif + Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy) : workerCount(workerCount) , workerLatch(workerCount) , strategy(strategy) @@ -74,9 +79,11 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy) : workerCoun Runtime::~Runtime() { DBG("Runtime " << this << " is terminating"); + shutdown = true; + notifyAboutNewWork(); for (workerid_t i = 0; i < workerCount; ++i) { DBG("Runtime " << this << " is cancelling worker " << unsigned(i)); - errno = pthread_cancel(threads[i]); + errno = pthread_join(threads[i], nullptr); if (errno) { DIE_MSG_ERRNO("pthread_cancel() failed"); } @@ -85,6 +92,12 @@ Runtime::~Runtime() { std::lock_guard<std::mutex> lock(currentRuntimeMutex); currentRuntime = nullptr; } +#ifdef EMPER_FIBRIL_STATS + printf(" Statistics summary:\n"); + printf(" # of steals: %lu K\n", (statsSteals.load() + 500) / 1000); + printf(" # of unmapps: %lu K\n", (statsUnmapp.load() + 500) / 1000); + printf("===========================================\n"); +#endif DBG("Runtime " << this << " terminated"); } diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index e14a8e13a9b69d5f1cf7837fce952ed515dcbf80..0f6df3eb2b6e930687b8b9ff5032f4e9089f3926 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -13,6 +13,11 @@ class ContextManager; +#ifdef EMPER_FIBRIL_STATS +extern std::atomic<uint64_t> statsSteals; +extern std::atomic<uint64_t> statsUnmapp; +#endif + class Runtime : public Logger<LogSubsystem::RUNTI> { private: static std::mutex currentRuntimeMutex; @@ -44,6 +49,8 @@ private: static void printLastRuntimeStats(); + volatile bool shutdown = false; + protected: void addNewWorkerHook(std::function<void(void)> hook) { newWorkerHooks.push_back(hook); @@ -78,6 +85,10 @@ public: ~Runtime(); + inline bool isShuttingDown() { + return shutdown; + } + inline void schedule(Fiber& fiber) { scheduler.schedule(fiber); } diff --git a/emper/Semaphore.cpp b/emper/Semaphore.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d6df0b442c9b9d5eddec7056548929d8fce3a193 --- /dev/null +++ b/emper/Semaphore.cpp @@ -0,0 +1,17 @@ +#include "Semaphore.hpp" + +using namespace emper; + +void Semaphore::print() { + unsigned int count = this->count; + unsigned int waiterListSize; + { + const std::lock_guard<std::mutex> lock(mutex); + waiterListSize = waiterList.size(); + } + + std::cout << "Semaphore" + << " count=" << count + << " waiterListSize=" << waiterListSize + << std::endl; +} diff --git a/emper/Semaphore.hpp b/emper/Semaphore.hpp new file mode 100644 index 0000000000000000000000000000000000000000..83b13d09892729cb2523a7d026d7f23225426dd0 --- /dev/null +++ b/emper/Semaphore.hpp @@ -0,0 +1,53 @@ +#pragma once + +#include <queue> +#include <mutex> + +#include "BinaryPrivateSemaphore.hpp" + +namespace emper { + +class Semaphore { +private: + + std::queue<BinaryPrivateSemaphore*> waiterList; + unsigned int count; + std::mutex mutex; + +public: + bool acquire() { + bool blocked; + mutex.lock(); + if (count > 0) { + count--; + mutex.unlock(); + blocked = false; + } else { + BinaryPrivateSemaphore semaphore; + waiterList.push(&semaphore); + mutex.unlock(); + semaphore.wait(); + blocked = true; + } + return blocked; + } + + bool release() { + mutex.lock(); + bool waiterListEmpty = waiterList.empty(); + if (waiterListEmpty) { + count++; + mutex.unlock(); + } else { + BinaryPrivateSemaphore* semaphore = waiterList.front(); + waiterList.pop(); + mutex.unlock(); + semaphore->signal(); + } + return waiterListEmpty; + } + + void print(); +}; + +} diff --git a/emper/include/emper-common.h b/emper/include/emper-common.h index f73f8cc420f3f4ed238434a1a685f3e3226fe256..b86f3b3d7bf294b2c1505576a8471b5232cf09d5 100644 --- a/emper/include/emper-common.h +++ b/emper/include/emper-common.h @@ -10,7 +10,7 @@ #endif -typedef uint8_t workerid_t; +typedef uint16_t workerid_t; typedef int16_t workeraffinity_t; #define UNUSED_ARG __attribute__((unused)) diff --git a/emper/lib/adt/BoundedMpmcQueue.hpp b/emper/lib/adt/BoundedMpmcQueue.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e3e195a3c526da3f567728eaa64b2f05419fc104 --- /dev/null +++ b/emper/lib/adt/BoundedMpmcQueue.hpp @@ -0,0 +1,94 @@ +#pragma once + +#include <atomic> + + + + +namespace adt { + + + template<typename T, const uintptr_t CAPACITY> + class BoundedMpmcQueue { + private: + + struct { + std::atomic<uint64_t> next; + T value; + } alignas(128) buf[CAPACITY]; + + + using QueueHead = 0; + using FreeListHead = CAPACITY - 1; + + + inline uint64_t updateNext(uint64_t next, uint64_t value) { + return ((next + CAPACITY) & ~(CAPACITY - 1)) | value; + } + + + inline uint64_t getIdx(uint64_t value) { + return value & (CAPACITY - 1); + } + + + public: + + BoundedMpmcQueue() { + buf[QueueHead].next.store(0, std::memory_order_relaxed); + + for (uint64_t i = 1; i < FreeListHead; k++) { + buf[i].next.store(i + 1, std::memory_order_relaxed); + } + + buf[FreeListHead].next.store(1, std::memory_order_release); + } + + + inline bool put(T item) { + size_t head, index; + + head = buf[FreeListHead].next.load(std::memory_order_acquire); + do { + index = getIdx(head); + if (index == FreeListHead) + return false; + uint64_t next = updateNext(head, buf[index].next.load(std::memory_order_acquire)); + } while (!buf[FreeListHead].next.compare_exchange_weak(head, next, std::memory_order_acq_rel, std::memory_order_acquire)); + + buf[index].value = item; + head = buf[QueueHead].next.load(std::memory_order_acquire); + + do { + buf[index].next.store(getIdx(head), std::memory_order_relaxed); + uint64_t next = updateNext(head, index); + } while (!buf[QueueHead].next.compare_exchange_weak(head, next, std::memory_order_acq_rel, std::memory_order_acquire)); + } + + + inline bool get(T *itemPtr) { + uint64_t head, index; + + head = buf[QueueHead].next.load(std::memory_order_acquire); + do { + index = getIdx(head); + if (!index) + return false; + uint64_t next = updateNext(head, buf[index].next.load(std::memory_order_acquire)); + } while (!buf[QueueHead].next.compare_exchange_weak(head, next, std::memory_order_acq_rel, std::memory_order_acquire)); + + *itemPtr = buf[index].value; + head = buf[FreeListHead].next.load(std::memory_order_acquire); + + do { + buf[index].next.store(getIdx(head, std::memory_order_relaxed)); + uint64_t next = updateNext(head, index); + } while (!buf[FreeListHead].next.compare_exchange_weak(head, next, std::memory_order_acq_rel, std::memory_order_acquire)); + + return true; + } + + }; + + +} diff --git a/emper/lib/adt/WsClQueue.hpp b/emper/lib/adt/WsClQueue.hpp index 881fff3745607d06aa240f2c3fca331438f1a4f1..ec90f9196eb9a047c4af193d471c9a95a6b7bd07 100644 --- a/emper/lib/adt/WsClQueue.hpp +++ b/emper/lib/adt/WsClQueue.hpp @@ -109,6 +109,11 @@ bool WsClQueue<_PAYLOAD, _CAPACITY>::popBottom(_PAYLOAD *item) { // bool res = top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed); bool res = top.compare_exchange_weak(localTop, localTop + 1); + // Either a popTop() removed the element ('res' is false) or we + // removed the element ('res' is true), but we need to increment + // the 'bottom' value, since the element bottom pointed at is now + // gone. N.B. bottom does point to the next free slot, the actual + // element we remove is bottom-1. bottom = localBottom + 1; return res; } diff --git a/emper/lib/adt/WsClV2Queue.hpp b/emper/lib/adt/WsClV2Queue.hpp index 64cb6e0aec40631fb2a58221a46a4a593fb2e003..102f8867570f48f724543f4f3ff26004556e70d1 100644 --- a/emper/lib/adt/WsClV2Queue.hpp +++ b/emper/lib/adt/WsClV2Queue.hpp @@ -100,8 +100,12 @@ bool WsClV2Queue<ITEM_TYPE, CAPACITY>::popBottom(ITEM_TYPE *item) { if (localBottom > localTop) return true; bool res = top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed); - // TODO: Why do we reset bottom here? - bottom = localTop + 1; + // Either a popTop() removed the element ('res' is false) or we + // removed the element ('res' is true), but we need to increment + // the 'bottom' value, since the element bottom pointed at is now + // gone. N.B. bottom does point to the next free slot, the actual + // element we remove is bottom-1. + bottom = localBottom + 1; return res; } diff --git a/emper/lib/adt/WsClV3Queue.hpp b/emper/lib/adt/WsClV3Queue.hpp new file mode 100644 index 0000000000000000000000000000000000000000..bc14299aab5027a7b54afaf50780bb1e1cb98950 --- /dev/null +++ b/emper/lib/adt/WsClV3Queue.hpp @@ -0,0 +1,79 @@ +#pragma once + +#include <atomic> + + + +namespace adt { + + template<typename T, const uintptr_t CAPACITY> + class WsClV3Queue { + protected: + + alignas(64) std::atomic<uint64_t> top; + alignas(64) std::atomic<uint64_t> bottom; + + alignas(64) T queue[CAPACITY]; + + + public: + WsClV3Queue() : top(1), bottom(1) { } + + + bool pushBottom(const T item) { + uint64_t localTop, localBottom; + + localBottom = bottom.load(std::memory_order_relaxed); + localTop = top.load(std::memory_order_acquire); + if ((localBottom - localTop) == CAPACITY) + return false; + + queue[localBottom % CAPACITY] = item; + bottom.store(localBottom + 1, std::memory_order_release); + + return true; + } + + + bool popBottom(T* itemPtr) { + bool ret; + uint64_t localTop, localBottom; + + localBottom = bottom.fetch_sub(1, std::memory_order_acq_rel) - 1; + localTop = top.load(std::memory_order_acquire); + + *itemPtr = queue[localBottom % CAPACITY]; + + if (localBottom < localTop) { + bottom.store(localTop, std::memory_order_relaxed); + return false; + } else if (localBottom > localTop) + return true; + + ret = top.compare_exchange_strong(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed); + bottom.store(localBottom + 1, std::memory_order_relaxed); + + return ret; + } + + + bool popTop(T* itemPtr) { + uint64_t localTop, localBottom; + + localTop = top.load(std::memory_order_relaxed); +again: + localBottom = bottom.load(std::memory_order_acquire); + if (localBottom <= localTop) + return false; + + *itemPtr = queue[localTop % CAPACITY]; + + if (!top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_acquire)) + goto again; + + return true; + } + + }; + +} diff --git a/emper/lib/adt/WsClV4Queue.hpp b/emper/lib/adt/WsClV4Queue.hpp new file mode 100644 index 0000000000000000000000000000000000000000..9d58b2d5accc7236d556d2303256396f2a042ec7 --- /dev/null +++ b/emper/lib/adt/WsClV4Queue.hpp @@ -0,0 +1,84 @@ +#pragma once + +#include <atomic> + + + +namespace adt { + + template<typename T, const uintptr_t CAPACITY> + class WsClV4Queue { + protected: + + alignas(64) std::atomic<uint64_t> top; + alignas(64) std::atomic<uint64_t> bottom; + alignas(64) uint64_t top_private; + + alignas(64) T queue[CAPACITY]; + + + public: + WsClV4Queue() : top(1), bottom(1), top_private(1) { } + + + bool pushBottom(const T item) { + uint64_t localTop, localBottom; + + localBottom = bottom.load(std::memory_order_relaxed); + localTop = top_private; + if ((localBottom - localTop) == CAPACITY) { + localTop = top.load(std::memory_order_acquire); + if ((localBottom - localTop) == CAPACITY) + return false; + top_private = localTop; + } + + queue[localBottom % CAPACITY] = item; + bottom.store(localBottom + 1, std::memory_order_release); + + return true; + } + + + bool popBottom(T* itemPtr) { + bool ret; + uint64_t localTop, localBottom; + + localBottom = bottom.fetch_sub(1, std::memory_order_acq_rel) - 1; + localTop = top.load(std::memory_order_acquire); + + *itemPtr = queue[localBottom % CAPACITY]; + + if (localBottom < localTop) { + bottom.store(localTop, std::memory_order_relaxed); + return false; + } else if (localBottom > localTop) + return true; + + ret = top.compare_exchange_strong(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed); + bottom.store(localBottom + 1, std::memory_order_relaxed); + + return ret; + } + + + bool popTop(T* itemPtr) { + uint64_t localTop, localBottom; + + localTop = top.load(std::memory_order_relaxed); +again: + localBottom = bottom.load(std::memory_order_acquire); + if (localBottom <= localTop) + return false; + + *itemPtr = queue[localTop % CAPACITY]; + + if (!top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_acquire)) + goto again; + + return true; + } + + }; + +} diff --git a/emper/lib/sync/Latch.hpp b/emper/lib/sync/Latch.hpp index 04e40e5fe5a8195096137834ecaf897f6f11b3f2..2eb144cd3f72a55e65c48111f16e62a1078d88bc 100644 --- a/emper/lib/sync/Latch.hpp +++ b/emper/lib/sync/Latch.hpp @@ -8,7 +8,7 @@ class Latch { private: const unsigned int num; std::atomic<unsigned int> counter; - Semaphore semaphore; + emper::lib::sync::Semaphore semaphore; public: Latch(unsigned int counter) : num(counter), counter(counter), semaphore(0) { diff --git a/emper/lib/sync/Semaphore.hpp b/emper/lib/sync/Semaphore.hpp index a8653daa4bc3583f450866f0da950993902db235..46be64ecdccf2b37bea7dfea2909b00b1640d59d 100644 --- a/emper/lib/sync/Semaphore.hpp +++ b/emper/lib/sync/Semaphore.hpp @@ -3,6 +3,12 @@ #include <mutex> #include <condition_variable> +namespace emper { + +namespace lib { + +namespace sync { + class Semaphore { private: std::mutex m; @@ -32,3 +38,8 @@ public: } }; +} + +} + +} diff --git a/emper/strategies/laws/LawsDispatcher.cpp b/emper/strategies/laws/LawsDispatcher.cpp index 66846f9776118f54c2c987d2c806a958daa03c4c..b135e73a3eb0dfb5f9f7512c5061d4f617855cd7 100644 --- a/emper/strategies/laws/LawsDispatcher.cpp +++ b/emper/strategies/laws/LawsDispatcher.cpp @@ -2,11 +2,15 @@ #include "Runtime.hpp" #include "LawsStrategy.hpp" +#include "ContextManager.hpp" void LawsDispatcher::dispatchLoop() { while (true) { Fiber* fiber = runtime.nextFiber(); if (!fiber) { + Runtime *runtime = Runtime::getRuntime(); + if (runtime->isShuttingDown()) + runtime->getContextManager().resume(0); #ifdef EMPER_WORKER_SLEEP putRuntimeWorkerToSleep(); #else diff --git a/emper/strategies/ws/WsDispatcher.cpp b/emper/strategies/ws/WsDispatcher.cpp index 2955a02292803fb9226d91787cc163cd7c7e8e89..d6864c467c2caa057431642cc841016663070ae3 100644 --- a/emper/strategies/ws/WsDispatcher.cpp +++ b/emper/strategies/ws/WsDispatcher.cpp @@ -2,11 +2,15 @@ #include "Runtime.hpp" #include "Debug.hpp" +#include "ContextManager.hpp" void WsDispatcher::dispatchLoop() { while (true) { Fiber* fiber = runtime.nextFiber(); if (!fiber) { + Runtime *runtime = Runtime::getRuntime(); + if (runtime->isShuttingDown()) + runtime->getContextManager().resume(0); #ifdef EMPER_WORKER_SLEEP putRuntimeWorkerToSleep(); #else