Commit 71791914 authored by Nicolas Pfeiffer's avatar Nicolas Pfeiffer
Browse files

Merge branch 'cactus_stack_devel' of gitlab.cs.fau.de:uh15efil/emper into cactus_stack_devel

parents 6c076baf ac7e5b7e
......@@ -12,6 +12,7 @@ add_files(EMPER_SOURCE ContextManager.cpp)
add_files(EMPER_SOURCE BinaryPrivateSemaphore.cpp)
add_files(EMPER_SOURCE CountingPrivateSemaphore.cpp)
add_files(EMPER_SOURCE Fibril.cpp)
add_files(EMPER_SOURCE Semaphore.cpp)
add_files(EMPER_INCLUDE ".")
add_files(EMPER_INCLUDE "include")
......
......@@ -15,7 +15,15 @@ class Dispatcher;
class Fiber;
#define PAGE_SIZE (4 * 1024)
#define STACK_SIZE (8 * 1024 * 1024)
#ifdef EMPER_BENCH_STACK_SIZE
#define STACK_SIZE EMPER_BENCH_STACK_SIZE
#else
#define STACK_SIZE (0x10000)
#endif
#ifdef EMPER_FIBRIL_STATS
#include <atomic>
extern std::atomic<uint64_t> statsUnmapp;
#endif
extern "C" [[noreturn]] void switch_and_load_context(void** toTos);
......@@ -27,7 +35,7 @@ extern "C" [[noreturn]] void switch_context(void** toTos);
class ALIGN_TO_CACHE_LINE Context : Logger<LogSubsystem::C> {
private:
static const unsigned int CONTEXT_SIZE = STACK_SIZE - PAGE_SIZE; // 0xffff;
static const unsigned int CONTEXT_SIZE = STACK_SIZE; // 0xffff;
static thread_local Context* currentContext;
......@@ -135,10 +143,14 @@ public:
const size_t PAGE_SIZE_MASK = 4 * 1024 - 1;
const uintptr_t start = ((uintptr_t) context + PAGE_SIZE_MASK) & ~PAGE_SIZE_MASK;
const uintptr_t end = (uintptr_t) from & ~PAGE_SIZE_MASK;
if (madvise((void*) start, (end - start), MADV_DONTNEED)) {
//if (madvise((void*) start, (end - start), MADV_DONTNEED)) {
if (madvise((void*) start, (end - start), MADV_FREE)) {
perror("madvise");
// die()?
}
#ifdef EMPER_FIBRIL_STATS
statsUnmapp++;
#endif
}
#endif
......
......@@ -47,9 +47,13 @@ void ContextManager::putFreeContext(Context* context) {
thread_local static Continuation *cont;
void ContextManager::start() {
uintptr_t val;
Continuation c;
cont = &c;
uintptr_t val = cont->setJmp();
val = cont->setJmp();
if (Runtime::getRuntime()->isShuttingDown())
pthread_exit(nullptr);
Fibril::tryResumeFiber(val);
......
......@@ -153,6 +153,10 @@ public:
reserveStealCount -= 1;
#endif
#ifdef EMPER_FIBRIL_STATS
statsSteals++;
#endif
/* Reserve 128 byte at the bottom. */
/* FIXME clean up, make nice looking */
cont.execute((void**)Context::currentContext->getTos() - 16);
......
......@@ -20,6 +20,11 @@ thread_local unsigned int Runtime::seed;
thread_local workerid_t Runtime::workerId;
RuntimeStrategy& Runtime::DEFAULT_STRATEGY = WsStrategy::INSTANCE;
#ifdef EMPER_FIBRIL_STATS
std::atomic<uint64_t> statsSteals = 0;
std::atomic<uint64_t> statsUnmapp = 0;
#endif
Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy) : workerCount(workerCount)
, workerLatch(workerCount)
, strategy(strategy)
......@@ -74,9 +79,11 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategy& strategy) : workerCoun
Runtime::~Runtime() {
DBG("Runtime " << this << " is terminating");
shutdown = true;
notifyAboutNewWork();
for (workerid_t i = 0; i < workerCount; ++i) {
DBG("Runtime " << this << " is cancelling worker " << unsigned(i));
errno = pthread_cancel(threads[i]);
errno = pthread_join(threads[i], nullptr);
if (errno) {
DIE_MSG_ERRNO("pthread_cancel() failed");
}
......@@ -85,6 +92,12 @@ Runtime::~Runtime() {
std::lock_guard<std::mutex> lock(currentRuntimeMutex);
currentRuntime = nullptr;
}
#ifdef EMPER_FIBRIL_STATS
printf(" Statistics summary:\n");
printf(" # of steals: %lu K\n", (statsSteals.load() + 500) / 1000);
printf(" # of unmapps: %lu K\n", (statsUnmapp.load() + 500) / 1000);
printf("===========================================\n");
#endif
DBG("Runtime " << this << " terminated");
}
......
......@@ -13,6 +13,11 @@
class ContextManager;
#ifdef EMPER_FIBRIL_STATS
extern std::atomic<uint64_t> statsSteals;
extern std::atomic<uint64_t> statsUnmapp;
#endif
class Runtime : public Logger<LogSubsystem::RUNTI> {
private:
static std::mutex currentRuntimeMutex;
......@@ -44,6 +49,8 @@ private:
static void printLastRuntimeStats();
volatile bool shutdown = false;
protected:
void addNewWorkerHook(std::function<void(void)> hook) {
newWorkerHooks.push_back(hook);
......@@ -78,6 +85,10 @@ public:
~Runtime();
inline bool isShuttingDown() {
return shutdown;
}
inline void schedule(Fiber& fiber) {
scheduler.schedule(fiber);
}
......
#include "Semaphore.hpp"
using namespace emper;
void Semaphore::print() {
unsigned int count = this->count;
unsigned int waiterListSize;
{
const std::lock_guard<std::mutex> lock(mutex);
waiterListSize = waiterList.size();
}
std::cout << "Semaphore"
<< " count=" << count
<< " waiterListSize=" << waiterListSize
<< std::endl;
}
#pragma once
#include <queue>
#include <mutex>
#include "BinaryPrivateSemaphore.hpp"
namespace emper {
class Semaphore {
private:
std::queue<BinaryPrivateSemaphore*> waiterList;
unsigned int count;
std::mutex mutex;
public:
bool acquire() {
bool blocked;
mutex.lock();
if (count > 0) {
count--;
mutex.unlock();
blocked = false;
} else {
BinaryPrivateSemaphore semaphore;
waiterList.push(&semaphore);
mutex.unlock();
semaphore.wait();
blocked = true;
}
return blocked;
}
bool release() {
mutex.lock();
bool waiterListEmpty = waiterList.empty();
if (waiterListEmpty) {
count++;
mutex.unlock();
} else {
BinaryPrivateSemaphore* semaphore = waiterList.front();
waiterList.pop();
mutex.unlock();
semaphore->signal();
}
return waiterListEmpty;
}
void print();
};
}
......@@ -10,7 +10,7 @@
#endif
typedef uint8_t workerid_t;
typedef uint16_t workerid_t;
typedef int16_t workeraffinity_t;
#define UNUSED_ARG __attribute__((unused))
......
#pragma once
#include <atomic>
namespace adt {
template<typename T, const uintptr_t CAPACITY>
class BoundedMpmcQueue {
private:
struct {
std::atomic<uint64_t> next;
T value;
} alignas(128) buf[CAPACITY];
using QueueHead = 0;
using FreeListHead = CAPACITY - 1;
inline uint64_t updateNext(uint64_t next, uint64_t value) {
return ((next + CAPACITY) & ~(CAPACITY - 1)) | value;
}
inline uint64_t getIdx(uint64_t value) {
return value & (CAPACITY - 1);
}
public:
BoundedMpmcQueue() {
buf[QueueHead].next.store(0, std::memory_order_relaxed);
for (uint64_t i = 1; i < FreeListHead; k++) {
buf[i].next.store(i + 1, std::memory_order_relaxed);
}
buf[FreeListHead].next.store(1, std::memory_order_release);
}
inline bool put(T item) {
size_t head, index;
head = buf[FreeListHead].next.load(std::memory_order_acquire);
do {
index = getIdx(head);
if (index == FreeListHead)
return false;
uint64_t next = updateNext(head, buf[index].next.load(std::memory_order_acquire));
} while (!buf[FreeListHead].next.compare_exchange_weak(head, next, std::memory_order_acq_rel, std::memory_order_acquire));
buf[index].value = item;
head = buf[QueueHead].next.load(std::memory_order_acquire);
do {
buf[index].next.store(getIdx(head), std::memory_order_relaxed);
uint64_t next = updateNext(head, index);
} while (!buf[QueueHead].next.compare_exchange_weak(head, next, std::memory_order_acq_rel, std::memory_order_acquire));
}
inline bool get(T *itemPtr) {
uint64_t head, index;
head = buf[QueueHead].next.load(std::memory_order_acquire);
do {
index = getIdx(head);
if (!index)
return false;
uint64_t next = updateNext(head, buf[index].next.load(std::memory_order_acquire));
} while (!buf[QueueHead].next.compare_exchange_weak(head, next, std::memory_order_acq_rel, std::memory_order_acquire));
*itemPtr = buf[index].value;
head = buf[FreeListHead].next.load(std::memory_order_acquire);
do {
buf[index].next.store(getIdx(head, std::memory_order_relaxed));
uint64_t next = updateNext(head, index);
} while (!buf[FreeListHead].next.compare_exchange_weak(head, next, std::memory_order_acq_rel, std::memory_order_acquire));
return true;
}
};
}
......@@ -109,6 +109,11 @@ bool WsClQueue<_PAYLOAD, _CAPACITY>::popBottom(_PAYLOAD *item) {
// bool res = top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed);
bool res = top.compare_exchange_weak(localTop, localTop + 1);
// Either a popTop() removed the element ('res' is false) or we
// removed the element ('res' is true), but we need to increment
// the 'bottom' value, since the element bottom pointed at is now
// gone. N.B. bottom does point to the next free slot, the actual
// element we remove is bottom-1.
bottom = localBottom + 1;
return res;
}
......
......@@ -100,8 +100,12 @@ bool WsClV2Queue<ITEM_TYPE, CAPACITY>::popBottom(ITEM_TYPE *item) {
if (localBottom > localTop) return true;
bool res = top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed);
// TODO: Why do we reset bottom here?
bottom = localTop + 1;
// Either a popTop() removed the element ('res' is false) or we
// removed the element ('res' is true), but we need to increment
// the 'bottom' value, since the element bottom pointed at is now
// gone. N.B. bottom does point to the next free slot, the actual
// element we remove is bottom-1.
bottom = localBottom + 1;
return res;
}
......
#pragma once
#include <atomic>
namespace adt {
template<typename T, const uintptr_t CAPACITY>
class WsClV3Queue {
protected:
alignas(64) std::atomic<uint64_t> top;
alignas(64) std::atomic<uint64_t> bottom;
alignas(64) T queue[CAPACITY];
public:
WsClV3Queue() : top(1), bottom(1) { }
bool pushBottom(const T item) {
uint64_t localTop, localBottom;
localBottom = bottom.load(std::memory_order_relaxed);
localTop = top.load(std::memory_order_acquire);
if ((localBottom - localTop) == CAPACITY)
return false;
queue[localBottom % CAPACITY] = item;
bottom.store(localBottom + 1, std::memory_order_release);
return true;
}
bool popBottom(T* itemPtr) {
bool ret;
uint64_t localTop, localBottom;
localBottom = bottom.fetch_sub(1, std::memory_order_acq_rel) - 1;
localTop = top.load(std::memory_order_acquire);
*itemPtr = queue[localBottom % CAPACITY];
if (localBottom < localTop) {
bottom.store(localTop, std::memory_order_relaxed);
return false;
} else if (localBottom > localTop)
return true;
ret = top.compare_exchange_strong(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed);
bottom.store(localBottom + 1, std::memory_order_relaxed);
return ret;
}
bool popTop(T* itemPtr) {
uint64_t localTop, localBottom;
localTop = top.load(std::memory_order_relaxed);
again:
localBottom = bottom.load(std::memory_order_acquire);
if (localBottom <= localTop)
return false;
*itemPtr = queue[localTop % CAPACITY];
if (!top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_acquire))
goto again;
return true;
}
};
}
#pragma once
#include <atomic>
namespace adt {
template<typename T, const uintptr_t CAPACITY>
class WsClV4Queue {
protected:
alignas(64) std::atomic<uint64_t> top;
alignas(64) std::atomic<uint64_t> bottom;
alignas(64) uint64_t top_private;
alignas(64) T queue[CAPACITY];
public:
WsClV4Queue() : top(1), bottom(1), top_private(1) { }
bool pushBottom(const T item) {
uint64_t localTop, localBottom;
localBottom = bottom.load(std::memory_order_relaxed);
localTop = top_private;
if ((localBottom - localTop) == CAPACITY) {
localTop = top.load(std::memory_order_acquire);
if ((localBottom - localTop) == CAPACITY)
return false;
top_private = localTop;
}
queue[localBottom % CAPACITY] = item;
bottom.store(localBottom + 1, std::memory_order_release);
return true;
}
bool popBottom(T* itemPtr) {
bool ret;
uint64_t localTop, localBottom;
localBottom = bottom.fetch_sub(1, std::memory_order_acq_rel) - 1;
localTop = top.load(std::memory_order_acquire);
*itemPtr = queue[localBottom % CAPACITY];
if (localBottom < localTop) {
bottom.store(localTop, std::memory_order_relaxed);
return false;
} else if (localBottom > localTop)
return true;
ret = top.compare_exchange_strong(localTop, localTop + 1, std::memory_order_release, std::memory_order_relaxed);
bottom.store(localBottom + 1, std::memory_order_relaxed);
return ret;
}
bool popTop(T* itemPtr) {
uint64_t localTop, localBottom;
localTop = top.load(std::memory_order_relaxed);
again:
localBottom = bottom.load(std::memory_order_acquire);
if (localBottom <= localTop)
return false;
*itemPtr = queue[localTop % CAPACITY];
if (!top.compare_exchange_weak(localTop, localTop + 1, std::memory_order_release, std::memory_order_acquire))
goto again;
return true;
}
};
}
......@@ -8,7 +8,7 @@ class Latch {
private:
const unsigned int num;
std::atomic<unsigned int> counter;
Semaphore semaphore;
emper::lib::sync::Semaphore semaphore;
public:
Latch(unsigned int counter) : num(counter), counter(counter), semaphore(0) {
......
......@@ -3,6 +3,12 @@
#include <mutex>
#include <condition_variable>
namespace emper {
namespace lib {
namespace sync {
class Semaphore {
private:
std::mutex m;
......@@ -32,3 +38,8 @@ public:
}
};
}
}
}
......@@ -2,11 +2,15 @@
#include "Runtime.hpp"
#include "LawsStrategy.hpp"
#include "ContextManager.hpp"
void LawsDispatcher::dispatchLoop() {
while (true) {
Fiber* fiber = runtime.nextFiber();
if (!fiber) {
Runtime *runtime = Runtime::getRuntime();
if (runtime->isShuttingDown())
runtime->getContextManager().resume(0);
#ifdef EMPER_WORKER_SLEEP
putRuntimeWorkerToSleep();
#else
......
......@@ -2,11 +2,15 @@
#include "Runtime.hpp"
#include "Debug.hpp"
#include "ContextManager.hpp"
void WsDispatcher::dispatchLoop() {
while (true) {
Fiber* fiber = runtime.nextFiber();
if (!fiber) {
Runtime *runtime = Runtime::getRuntime();
if (runtime->isShuttingDown())
runtime->getContextManager().resume(0);
#ifdef EMPER_WORKER_SLEEP
putRuntimeWorkerToSleep();
#else
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment