Skip to content
Snippets Groups Projects
Commit 5ac424cc authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Add Actor API and improve debug log output

parent 6ee3bc62
No related branches found
No related tags found
No related merge requests found
#pragma once
#include "UnboundedBlockingMpscQueue.hpp"
#include "Fiber.hpp"
#include <atomic>
#include <chrono>
template<typename T>
class Actor {
private:
enum State {
Stopped,
Retrieving,
Running,
};
Runtime& runtime;
// TODO Make atomic.
State state = Stopped;
UnboundedBlockingMpscQueue<T> queue;
void actorLoop() {
state = Running;
while (state == Running) {
state = Retrieving;
T t = queue.get();
state = Running;
receive(t);
}
state = Stopped;
}
protected:
Actor(Runtime& runtime) : runtime(runtime), queue(runtime) {
}
virtual void receive(T t) = 0;
void stop() {
state = Stopped;
}
public:
void start() {
if (state != Stopped) return;
Fiber* actorFiber = Fiber::from(std::bind(&Actor::actorLoop, this));
runtime.schedule(*actorFiber);
}
void tell(T t) {
queue.put(t);
}
size_t pendingMailboxItems() {
return queue.size();
}
bool waitUntilIdle(long timeout) {
const auto start = std::chrono::steady_clock::now();
const auto deadline = start + std::chrono::milliseconds(timeout);
while (!(queue.size() == 0 && state == Retrieving)) {
if (std::chrono::steady_clock::now() > deadline) {
return false;
}
}
return true;
}
};
#pragma once
#include "Common.hpp"
#include "Debug.hpp"
#include "Runtime.hpp"
#include "ContextManager.hpp"
#include "Context.hpp"
class Blockable {
protected:
Runtime& runtime;
ContextManager& contextManager;
Blockable(Runtime& runtime) : runtime(runtime),
contextManager(runtime.getContextManager()) {
}
void block(func_t freshContextHook) {
LOGD("block() blockedContext is " << Context::getCurrentContext());
contextManager.saveAndStartNew(freshContextHook);
}
void unblock(Context* context) {
assert(context != nullptr);
// cppcheck-suppress unsafeClassCanLeak
Fiber* unblockFiber = Fiber::from([this, context]() {
contextManager.discardAndResume(context);
});
runtime.schedule(*unblockFiber);
}
virtual void logD(const std::string& string) const = 0;
};
......@@ -12,5 +12,11 @@ void worker_log(const std::string& prefix, const std::string& message) {
const workerid_t workerId = Runtime::getWorkerId();
std::unique_lock<std::mutex> lock(worker_log_mutex);
std::cerr << (unsigned int) workerId << ": " << prefix << " " << message << std::endl;
std::cerr << (unsigned int) workerId;
if (!prefix.empty()) {
std::cerr << " " << prefix << " ";
} else {
std::cerr << " ";
}
std::cerr << message << std::endl;
}
......@@ -29,7 +29,10 @@ enum class LogSubsystem {
F,
C,
CM,
D,
DISP,
SCHED,
RUNTI,
U_B_MPSC_Q,
};
enum LogLevel {
......@@ -50,7 +53,10 @@ static const std::map<LogSubsystem, LogLevel> LOG_CONFIG = {
{ LogSubsystem::F, ALL },
{ LogSubsystem::C, ALL },
{ LogSubsystem::CM, ALL },
{ LogSubsystem::D, ALL },
{ LogSubsystem::DISP, ALL },
{ LogSubsystem::SCHED, ALL },
{ LogSubsystem::RUNTI, ALL },
{ LogSubsystem::U_B_MPSC_Q, ALL },
};
template <LogSubsystem logSubsystem>
......@@ -60,20 +66,36 @@ private:
static constexpr char const * getTagFor(LogSubsystem system) {
switch (system) {
case LogSubsystem::PS:
return "PS";
return "PS ";
case LogSubsystem::F:
return "F";
return "F ";
case LogSubsystem::C:
return "C";
return "C ";
case LogSubsystem::CM:
return "CM";
case LogSubsystem::D:
return "D";
return "CM ";
case LogSubsystem::DISP:
return "DISP ";
case LogSubsystem::SCHED:
return "SCHED";
case LogSubsystem::RUNTI:
return "RUNTI";
case LogSubsystem::U_B_MPSC_Q:
return "UBSCQ";
default:
return "UNKNOWN SUBSYSTEM (Add it *now*)";
}
}
static constexpr bool shouldPrefixThis(LogSubsystem system) {
switch (system) {
case LogSubsystem::RUNTI:
case LogSubsystem::SCHED:
return false;
default:
return true;
}
}
protected:
inline void log(LogLevel level, const std::string& string) const {
......@@ -86,7 +108,10 @@ protected:
std::string subSystemTag = getTagFor(logSubsystem);;
std::ostringstream sst;
sst << subSystemTag << ": " << this;
sst << subSystemTag;
if (shouldPrefixThis(logSubsystem)) {
sst << " " << this;
}
worker_log(sst.str(), string);
}
......
......@@ -7,7 +7,7 @@
class Runtime;
class ContextManager;
class Dispatcher : public Logger<LogSubsystem::D> {
class Dispatcher : public Logger<LogSubsystem::DISP> {
protected:
static thread_local const Fiber* currentFiber;
......@@ -44,7 +44,12 @@ public:
}
static const Fiber* getCurrentFiber() {
static const Fiber& getCurrentFiber() {
const Fiber* fiber = getCurrentFiberPtr();
return *fiber;
}
static const Fiber* getCurrentFiberPtr() {
return currentFiber;
}
......
......@@ -80,7 +80,7 @@ Runtime::~Runtime() {
void* Runtime::workerLoop(void* voidWorkerId) {
workerId = *(workerid_t*) voidWorkerId;
WDBG("workerLoop started by thread " << syscall(SYS_gettid));
LOGD("Worker loop started by thread " << syscall(SYS_gettid));
int oldType;
int res = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldType);
......
......@@ -3,6 +3,7 @@
#include <thread>
#include <vector>
#include "Debug.hpp"
#include "Dispatcher.hpp"
#include "Common.hpp"
#include "Latch.hpp"
......@@ -12,7 +13,7 @@
class ContextManager;
class Runtime {
class Runtime : public Logger<LogSubsystem::RUNTI> {
private:
static std::mutex currentRuntimeMutex;
static Runtime* currentRuntime;
......
#pragma once
#include "Debug.hpp"
#include <functional>
#include "Fiber.hpp"
#include "Common.hpp"
class Runtime;
class Scheduler {
class Scheduler : public Logger<LogSubsystem::SCHED>{
protected:
Runtime& runtime;
Scheduler(Runtime& runtime);
......
#pragma once
#include "Blockable.hpp"
#include "Context.hpp"
#include <queue>
#include <mutex>
template<typename T>
class UnboundedBlockingMpscQueue : protected Logger<LogSubsystem::U_B_MPSC_Q>
, protected Blockable {
private:
std::atomic<Context*> blockedContext;
bool tPopped;
T t;
std::queue<T> mpscQueue;
std::mutex queueMutex;
void tryToWakeupBlockedContext() {
Context* context = blockedContext.exchange(nullptr);
if (context) {
unblock(context);
}
}
void tryToGetElement() {
std::lock_guard<std::mutex> lock(queueMutex);
if (!mpscQueue.empty()) {
t = mpscQueue.front();
mpscQueue.pop();
tPopped = true;
}
}
virtual void logD(const std::string& string) const {
Logger::logD(string);
}
public:
UnboundedBlockingMpscQueue(Runtime& runtime) : Blockable(runtime) {
}
void put(T t) {
{
std::lock_guard<std::mutex> lock(queueMutex);
mpscQueue.push(t);
}
// Micro optimization, see if there is a blocked context
// before performing the atomic exchange operation.
if (blockedContext.load() != nullptr) {
tryToWakeupBlockedContext();
}
}
T get() {
tPopped = false;
tryToGetElement();
if (!tPopped) {
Context* context = Context::getCurrentContext();
block([this, context] {
blockedContext = context;
tryToGetElement();
if (tPopped) {
tryToWakeupBlockedContext();
}
});
if (!tPopped) {
std::lock_guard<std::mutex> lock(queueMutex);
// If 't' isn't already set, then mspcQueue.get() MUST
// return an element. Note that a non-lineralizabe
// queue may break this invariant.
assert(!mpscQueue.empty());
t = mpscQueue.front();
mpscQueue.pop();
}
}
return t;
}
size_t size() {
std::lock_guard<std::mutex> lock(queueMutex);
return mpscQueue.size();
}
};
#pragma once
template<typename T>
class MultiList_UnboundedWaitFreeMpscQueue {
private:
std::atomic<uint64_t> currentTimestamp;
Node* heads[];
Node* tails[];
struct Node {
T item;
Node next;
std::atomic<uint64_t> ts;
Node() : ts(UINT64_MAX) {};
};
public:
void enqueue(Node* node) {
const workerid_t workerId = Runtime::getWorkerId();
uint64_t ts = currentTimestamp.load();
Node* ltail = tails[workerId];
tails[workerId] = node;
ltial
};
......@@ -22,7 +22,7 @@ LawsScheduler::LawsScheduler(Runtime& runtime) : Scheduler(runtime) {
}
void LawsScheduler::schedule(Fiber& fiber) {
WDBG("Scheduling fiber " << &fiber);
LOGD("Scheduling fiber " << &fiber);
workeraffinity_t* const affinity_buffer = getAffinityBuffer(fiber);
if (affinity_buffer) {
workeraffinity_t affinity = *affinity_buffer;
......
......@@ -16,7 +16,7 @@ WsScheduler::WsScheduler(Runtime& runtime) : Scheduler(runtime) {
}
void WsScheduler::schedule(Fiber& fiber) {
WDBG("Scheduling fiber " << &fiber);
LOGD("Scheduling fiber " << &fiber);
queue.pushBottom(&fiber);
}
......
......@@ -13,3 +13,7 @@ add_test(CApiTest c_api_test)
add_executable(cpp_api_test CppApiTest.cpp)
target_link_libraries(cpp_api_test emper)
add_test(CppApiTest cpp_api_test)
add_executable(simple_actor_test SimpleActorTest.cpp)
target_link_libraries(simple_actor_test emper)
add_test(SimpleActorTest simple_actor_test)
#include "Actor.hpp"
#include "Runtime.hpp"
#include "CountingPrivateSemaphore.hpp"
#include "Dispatcher.hpp"
#include "Debug.hpp"
#include "emper.hpp"
#include <mutex>
class SumActor : public Actor<uint64_t> {
private:
uint64_t sum = 0;
protected:
virtual void receive(uint64_t t) override {
sum+=t;
}
public:
SumActor(Runtime& runtime) : Actor(runtime) {
}
uint64_t getSum() {
return sum;
}
};
static void mainFiber(void* runtime_ptr) {
Runtime& runtime = * (Runtime*) runtime_ptr;
const unsigned int FIBER_COUNT = 10;
const uint64_t FIBERS_COUNT_TO = 100;
const uint64_t PER_FIBER_SUM = (FIBERS_COUNT_TO * (FIBERS_COUNT_TO + 1)) / 2;
const uint64_t EXPECTED_SUM = FIBER_COUNT * PER_FIBER_SUM;
SumActor sumActor(runtime);
sumActor.start();
CPS cps;
for (unsigned int i = 0; i < FIBER_COUNT; ++i) {
spawn([&sumActor, i] {
WDBG(Dispatcher::getCurrentFiber() << " starts to count to " << FIBERS_COUNT_TO);
for (uint64_t i = 1; i <= FIBERS_COUNT_TO; ++i) {
sumActor.tell(i);
}
}, cps);
}
// Wait for the producer fibers to finish.
cps.wait();
// Wait for the actor to become idle.
bool actorIdle = sumActor.waitUntilIdle(60 * 1000);
if (!actorIdle) {
std::cerr << "FAILURE: Actor did not went idle";
exit(EXIT_FAILURE);
}
if (sumActor.getSum() != EXPECTED_SUM) {
std::cerr << "FAILURE: Actor sum " << sumActor.getSum() << " is not equal to excpted sum " << EXPECTED_SUM << std::endl;
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}
int main(UNUSED_ARG int arg, UNUSED_ARG char *argv[]) {
Runtime runtime;
Fiber* fiber = Fiber::from(mainFiber, (void*) &runtime);
runtime.schedule(*fiber);
runtime.waitUntilFinished();
return EXIT_FAILURE;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment