Skip to content
Snippets Groups Projects
Commit a54045cf authored by Nicolas Pfeiffer's avatar Nicolas Pfeiffer
Browse files

locked Fibrils and clean up (first working impl)

parent d18e0047
Branches
Tags
No related merge requests found
...@@ -48,7 +48,7 @@ void ContextManager::start() { ...@@ -48,7 +48,7 @@ void ContextManager::start() {
cont = new Continuation(); cont = new Continuation();
cont->setJmp(); cont->setJmp();
Fibril::schedule(); Fibril::tryResumeFiber();
Context* currentContext = Context::getCurrentContext(); Context* currentContext = Context::getCurrentContext();
if (currentContext) { if (currentContext) {
......
...@@ -10,70 +10,119 @@ ...@@ -10,70 +10,119 @@
class Fibril : public Fiber { class Fibril : public Fiber {
private: private:
std::atomic<int> count; /* TODO use lock, or make lock free thread-safe */ int count; /* TODO make lock free thread-safe */
int lock; std::atomic<int> lock_var;
Continuation cont; Continuation cont;
Context *stack; Context *stack;
static thread_local Fibril *toResume; static thread_local Fibril *toResume;
inline void tryResume() {
int c;
lock();
c = --count;
unlock();
if (c > 0) {
if (stack == Context::currentContext) {
Context::currentContext = nullptr;
}
/* randomSteal() */;
return;
} else {
if (stack != Context::currentContext) {
Runtime::getRuntime()->getContextManager().putFreeContext(Context::currentContext);
Context::currentContext = stack;
}
cont.execute(cont.sp);
}
}
public: public:
inline __attribute__((always_inline)) inline __attribute__((always_inline))
Fibril() : Fiber(Fiber::fiber_fun0_t(nullptr)), count(0), lock(0), cont() { Fibril() : Fiber(Fiber::fiber_fun0_t(nullptr)), count(0), lock_var(1), cont() {
stack = Context::currentContext; // TODO check if this is correct stack = Context::currentContext; // TODO check if this is correct
}; };
~Fibril() { ~Fibril() {
assert(count == 0 && lock == 0); /* XXX DEBUG */ assert(count == 0 && lock_var == 1); /* XXX DEBUG */
}; };
void run() override { void run() override {
int expected = 0; if (!count) count += 2;
int c = count; else count++;
fprintf(stderr, "[%d] run(): c = %d\n", Runtime::getWorkerId(), c); unlock();
fflush(stderr);
if (c == 0 && count.compare_exchange_strong(expected, 2)) {
//stack.ptr = /* TODO victim->stack */ nullptr;
} else {
count += 1;
}
cont.execute(Context::currentContext->getTos()); cont.execute(Context::currentContext->getTos());
}; };
inline void lock() {
while (true) {
if (lock_var && lock_var.exchange(0))
return;
}
};
inline void unlock() {
lock_var = 1;
};
inline void resume() { inline void resume() {
toResume = this; toResume = this;
Runtime::getRuntime()->getContextManager().resume(); Runtime::getRuntime()->getContextManager().resume();
}; };
inline static void schedule() { inline static void tryResumeFiber() {
if (toResume == nullptr) { if (toResume != nullptr) {
return; toResume->tryResume();
} }
Fibril *fr = toResume;
int c = --fr->count;
fprintf(stderr, "[%d] schedule(): c = %d\n", Runtime::getWorkerId(), c);
fflush(stderr);
if (c > 0) {
if (fr->stack == Context::currentContext) {
Context::currentContext = nullptr;
}
/* randomSteal() */;
return;
} else {
if (fr->stack != Context::currentContext) {
Runtime::getRuntime()->getContextManager().putFreeContext(Context::currentContext);
Context::currentContext = fr->stack;
} }
fr->cont.execute(fr->cont.sp);
inline __attribute__((always_inline))
void fork(Fiber::fiber_fun0_t fun) {
auto fork_func = [this, fun] () __attribute__((noinline, hot, optimize(3))) {
cont.ip = __builtin_return_address(0);
Runtime* runtime = Runtime::getRuntime();
runtime->pushBottom(*this);
fun();
if (!runtime->popBottom()) { /* TODO laws Scheduler pushes to queues of other threads, handle that */
resume();
} }
};
membar(fork_func());
};
inline __attribute__((always_inline))
void fork(Fiber::fiber_fun_t fun, void *arg) {
auto fork_func = [this, fun, arg] () __attribute__((noinline, hot, optimize(3))) {
cont.ip = __builtin_return_address(0);
Runtime* runtime = Runtime::getRuntime();
runtime->pushBottom(*this);
fun(arg);
if (!runtime->popBottom()) { /* TODO laws Scheduler pushes to queues of other threads, handle that */
resume();
} }
};
membar(fork_func());
};
inline __attribute__((always_inline))
void join() {
if (count == 0)
return;
auto join_func = [this] () __attribute__((noinline, hot, optimize(3))) {
cont.ip = __builtin_return_address(0);
resume();
};
membar(join_func());
};
friend void fibril_fork(Fibril*, Fiber::fiber_fun0_t);
friend void fibril_fork(Fibril*, Fiber::fiber_fun_t, void*);
friend void fibril_join(Fibril*);
}; };
...@@ -62,57 +62,8 @@ void spawn(Fiber::fiber_fun0_t function, workeraffinity_t* affinity, S& semaphor ...@@ -62,57 +62,8 @@ void spawn(Fiber::fiber_fun0_t function, workeraffinity_t* affinity, S& semaphor
/* VVVVVVVVVVVVVVVV CONTINUATION VVVVVVVVVVVVVVVVVVVV */ /* VVVVVVVVVVVVVVVV CONTINUATION VVVVVVVVVVVVVVVVVVVV */
#include "ContextManager.hpp"
#include "Context.hpp"
#include "Fibril.hpp" #include "Fibril.hpp"
#define fibril __attribute__((noinline, sysv_abi, optimize("no-omit-frame-pointer"))) #define fibril __attribute__((sysv_abi, optimize("no-omit-frame-pointer")))
inline __attribute__((always_inline))
void fibril_fork(Fibril* fr, Fiber::fiber_fun0_t fun) {
auto fork_func = [fr, fun] () __attribute__((noinline, hot, optimize(3))) {
fr->cont.ip = __builtin_return_address(0);
Runtime* runtime = Runtime::getRuntime();
runtime->pushBottom(*fr);
fun();
if (!runtime->popBottom()) { /* TODO laws Scheduler pushes to queues of other threads, handle that */
fr->resume();
}
};
membar(fork_func());
}
inline __attribute__((always_inline))
void fibril_fork(Fibril* fr, Fiber::fiber_fun_t fun, void* arg) {
auto fork_func = [fr, fun, arg] () __attribute__((noinline, hot, optimize(3))) {
fr->cont.ip = __builtin_return_address(0);
Runtime* runtime = Runtime::getRuntime();
runtime->pushBottom(*fr);
fun(arg);
if (!runtime->popBottom()) { /* TODO laws Scheduler pushes to queues of other threads, handle that */
fr->resume();
}
};
membar(fork_func());
}
inline __attribute__((always_inline))
void fibril_join(Fibril* fr) {
if (fr->count == 0)
return;
auto join_func = [fr] () __attribute__((noinline, hot, optimize(3))) {
fr->cont.ip = __builtin_return_address(0);
fr->resume();
};
membar(join_func());
}
...@@ -5,6 +5,8 @@ ...@@ -5,6 +5,8 @@
#include <mutex> #include <mutex>
#include <deque> #include <deque>
#include "Fibril.hpp"
namespace adt { namespace adt {
template<typename I, const uintptr_t SIZE> template<typename I, const uintptr_t SIZE>
...@@ -37,6 +39,9 @@ namespace adt { ...@@ -37,6 +39,9 @@ namespace adt {
if (deque.empty()) return false; if (deque.empty()) return false;
*itemPtr = deque.front(); *itemPtr = deque.front();
if (Fibril* fr = dynamic_cast<Fibril*>(*itemPtr)) {
fr->lock();
}
deque.pop_front(); deque.pop_front();
......
...@@ -12,7 +12,8 @@ class WsScheduler: public Scheduler { ...@@ -12,7 +12,8 @@ class WsScheduler: public Scheduler {
#ifdef EMPER_LOCKED_WS_QUEUE #ifdef EMPER_LOCKED_WS_QUEUE
using WsQueue = adt::LockedQueue<Fiber*, SIZE>; using WsQueue = adt::LockedQueue<Fiber*, SIZE>;
#else #else
using WsQueue = adt::WsClQueue<Fiber*, SIZE>; // using WsQueue = adt::WsClQueue<Fiber*, SIZE>;
using WsQueue = adt::LockedQueue<Fiber*, SIZE>;
#endif #endif
public: public:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment