Skip to content
Snippets Groups Projects
Commit 92239853 authored by Florian Fischer's avatar Florian Fischer
Browse files

[IO] parallelize IO startup

GlobalIoContext::registerWorkerIo() now protects the GlobalIoContext's SQ
with a mutex and the globalCompleter waits till all worker's registered
their IoContext with the new Sempahore Runtime.ioReadySem.
parent bc6e4139
No related branches found
No related tags found
1 merge request!122[IO] parallelize IO startup
......@@ -76,6 +76,7 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory
threads(new pthread_t[workerCount]),
workers(new Worker*[workerCount]),
ioContexts(emper::IO ? workerCount : 0),
ioReadySem(0),
randomEngine(seed) {
const int nprocs = get_nprocs();
......@@ -93,24 +94,12 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory
// for each worker's IoContext one eventfd read is prepared before the
// globalCompleter is started and submits all previously prepared sqes.
globalIo = new GlobalIoContext(*this, workerCount);
globalIo->startGlobalCompleter();
if constexpr (emper::STATS) {
globalIo->stats.workerId = -1;
std::atexit(emper::io::Stats::printWorkerStats);
}
// submit the workers' CQ eventfds to the global IoContext
for (workerid_t i = 0; i < workerCount; ++i) {
ioContexts[i] = new IoContext(*this);
globalIo->registerWorkerIo(*ioContexts[i]);
if constexpr (emper::STATS) {
ioContexts[i]->stats.workerId = i;
}
}
// start the globalCompleter after all eventfd are submitted so we don't need to
// synchronize the globalIo's SQ
globalIo->startGlobalCompleter();
}
// Core id we start the worker pinning
......@@ -207,7 +196,19 @@ auto Runtime::workerLoop(Worker* worker) -> void* {
worker->setWorker();
if constexpr (emper::IO) {
ioContexts[worker->workerId]->setWorkerIo();
auto* workerIo = new IoContext(*this);
// submit the workers' CQ eventfds to the global IoContext
globalIo->registerWorkerIo(*workerIo);
// notify the globalCompleter that we have registered our eventfd
ioReadySem.notify();
ioContexts[worker->workerId] = workerIo;
workerIo->setWorkerIo();
if constexpr (emper::STATS) {
workerIo->stats.workerId = worker->workerId;
}
}
LOGD("Worker loop started by thread " << syscall(SYS_gettid));
......
......@@ -23,6 +23,7 @@
#include "Worker.hpp"
#include "emper-common.h" // for workerid_t
#include "lib/sync/Latch.hpp" // for Latch
#include "lib/sync/Semaphore.hpp"
#include "lib/sync/WorkerWakeupSemaphore.hpp"
class ContextManager;
......@@ -59,6 +60,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
Worker** workers;
GlobalIoContext* globalIo = nullptr;
std::vector<IoContext*> ioContexts;
emper::lib::sync::Semaphore ioReadySem;
std::default_random_engine randomEngine;
std::uniform_int_distribution<unsigned int> uniformIntDistribution;
......
......@@ -10,6 +10,7 @@
#include <cerrno>
#include <cstdio>
#include <memory>
#include <mutex>
#include "CallerEnvironment.hpp"
#include "Common.hpp"
......@@ -17,16 +18,20 @@
#include "io/Future.hpp"
#include "io/IoContext.hpp"
#include "lib/TaggedPtr.hpp"
#include "lib/sync/Semaphore.hpp"
using emper::lib::TaggedPtr;
namespace emper::io {
// The globalCompleter must be started after all worker io_uring eventfds are submitted
// so we don't have to synchronize the global SQ
auto GlobalIoContext::globalCompleterFunc(void* arg) -> void* {
auto* globalIoContext = reinterpret_cast<GlobalIoContext*>(arg);
// wait till all workers registered their IoContext's eventfd
for (workerid_t i = 0; i < globalIoContext->runtime.workerCount; ++i) {
globalIoContext->runtime.ioReadySem.wait();
}
globalIoContext->logD("submit all worker io_uring eventfds");
// submit all eventfds in the SQ inserted by IoContext::submit_efd calls
......@@ -158,6 +163,7 @@ void GlobalIoContext::registerWorkerIo(IoContext& workerIo) {
DIE_MSG_ERRNO("io_uring_register_eventfd failed");
}
std::lock_guard<std::mutex> lock(registerLock);
prepareWorkerNotification(workerIo);
// The sqe we prepared will be submitted to io_uring when the globalCompleter starts.
}
......
......@@ -5,6 +5,7 @@
#include <pthread.h> // for pthread_t
#include <cstdint>
#include <mutex>
#include "emper-common.h"
#include "io/IoContext.hpp"
......@@ -20,6 +21,10 @@ class GlobalIoContext : public IoContext {
private:
GlobalIoContext(Runtime& runtime, workerid_t workerCount) : IoContext(runtime, workerCount) {}
// This mutex is only used to protect the SQ during start up when all workers
// register their IoContext's eventfds in parallel
std::mutex registerLock;
enum class PointerTags : uint16_t { Future, IoContext, TerminationEvent };
// pthread used to monitor the CQs from worker io_urings
......@@ -30,8 +35,6 @@ class GlobalIoContext : public IoContext {
static auto globalCompleterFunc(void* arg) -> void*;
// start the global completer thread
// this must be called after all worker IoContexts' eventfds are submitted
// to the global IoContext
void startGlobalCompleter();
void initiateTermination();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment