Commit f3d39a5c authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'worker_exclusive_uring' into 'master'

Worker exclusive uring

See merge request i4/manycore/emper!54
parents a2587f7f 07782b4f
Pipeline #56689 passed with stages
in 10 minutes and 14 seconds
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <netdb.h> //for getaddrinfo
#include <sys/socket.h> // for shutdown, socket, AF_INET
#include <algorithm> // for find
#include <cerrno> // for errno, ECANCELED
#include <chrono> // for nanoseconds, duration, durat...
#include <cstdint> // for uint64_t, int32_t
#include <cstdlib> // for size_t, strtol, exit, EXIT_F...
#include <cstring> // for memcmp
#include <iostream> // for operator<<, basic_ostream, endl
#include <string> // for allocator, string, char_traits
#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli...
#include "CountingPrivateSemaphore.hpp" // for CPS
#include "Debug.hpp" // for LOGE
#include "Fiber.hpp" // for Fiber
#include "Runtime.hpp" // for Runtime
#include "Semaphore.hpp" // for Semaphore
#include "emper.hpp" // for spawn
#include "io.hpp" // for connectAndWait
#include "io/Future.hpp" // for CloseFuture, RecvFuture, Sen...
using emper::Semaphore;
using emper::io::CloseFuture;
using emper::io::RecvFuture;
using emper::io::SendFuture;
// Defaults
const int DECIMAL = 10;
const std::string HOST = "0.0.0.0";
const std::string PORT = "12345";
const size_t ITERATIONS = 10000;
const size_t CLIENTS = 10000;
const size_t SIZE = 32;
static auto clientFunc(struct addrinfo& server, size_t iterations, size_t size, Semaphore& readySem,
Semaphore& startSem) -> uint64_t {
int client_sock = socket(server.ai_family, server.ai_socktype, server.ai_protocol);
if (client_sock < 0) {
DIE_MSG_ERRNO("creating new client socket failed");
}
int err = emper::io::connectAndWait(client_sock, server.ai_addr, server.ai_addrlen);
if (err) {
DIE_MSG_ERRNO("connecting new client socket failed");
}
// NOLINTNEXTLINE(modernize-avoid-c-arrays)
char* buf[2];
buf[0] = new char[size];
buf[1] = new char[size];
uint64_t avg_ns = 0;
readySem.release();
startSem.acquire();
for (size_t i = 0; i < iterations; ++i) {
char* outBuf = buf[i % 2];
char* inBuf = buf[(i + 1) % 2];
SendFuture sendFuture(client_sock, outBuf, size, MSG_NOSIGNAL);
RecvFuture recvFuture(client_sock, inBuf, size, MSG_WAITALL);
recvFuture.setDependency(sendFuture);
auto start = std::chrono::high_resolution_clock::now();
int32_t res = recvFuture.submitAndWait();
auto end = std::chrono::high_resolution_clock::now();
if (res <= 0) {
// recv failed
if (res != -ECANCELED) {
if (res == 0) {
DIE_MSG("server closed the connection");
}
errno = -res;
DIE_MSG_ERRNO("receive failed");
}
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
errno = -send_res;
DIE_MSG_ERRNO("send failed");
}
DIE_MSG("short send" << send_res);
}
if (memcmp(outBuf, inBuf, size) != 0) {
DIE_MSG("got unexpected echo from server");
}
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
uint64_t ns = duration.count();
if (unlikely(avg_ns == 0)) {
avg_ns = ns;
} else {
avg_ns += ns;
avg_ns /= 2;
}
}
// Shutdown and close the client socket;
shutdown(client_sock, SHUT_RDWR);
// ShutdownFuture shut_f(client_sock, SHUT_RDWR);
CloseFuture cf(client_sock);
// cf.setDependency(shut_f);
cf.submit();
delete[] buf[0];
delete[] buf[1];
cf.wait();
return avg_ns;
}
auto getOption(int argc, char** argv, const std::string& option) -> char* {
char** end = argv + argc;
char** itr = std::find(argv, end, option);
if (itr != end && ++itr != end) {
return *itr;
}
return nullptr;
}
static auto existsOption(int argc, char** argv, const std::string& option) -> bool {
char** end = argv + argc;
return std::find(argv, end, option) != end;
}
static void printUsage(char* name) {
std::cerr << "Usage: " << name
<< "[-h] [-p <port>] [-c <clients>] [-i <iterations>] [-a <address>] [-s <size>]"
<< std::endl;
}
auto main(int argc, char* argv[]) -> int {
std::string host = HOST;
std::string port = PORT;
size_t iterations = ITERATIONS;
size_t clients = CLIENTS;
size_t size = SIZE;
if (existsOption(argc, argv, "-h")) {
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
char* port_s = getOption(argc, argv, "-p");
if (port_s) {
port = std::string(port_s);
}
char* addr_s = getOption(argc, argv, "-a");
if (addr_s) {
host = std::string(addr_s);
}
char* client_s = getOption(argc, argv, "-c");
if (client_s) {
clients = strtol(client_s, nullptr, DECIMAL);
}
char* iterations_s = getOption(argc, argv, "-i");
if (iterations_s) {
iterations = strtol(iterations_s, nullptr, DECIMAL);
}
char* size_s = getOption(argc, argv, "-s");
if (size_s) {
size = strtol(size_s, nullptr, DECIMAL);
}
struct addrinfo* server;
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
if (err) {
if (err == EAI_SYSTEM) {
DIE_MSG_ERRNO("getaddrinfo failed");
} else {
LOGE("error in getaddrinfo: " << gai_strerror(err));
exit(EXIT_FAILURE);
}
}
Runtime runtime;
Fiber* alphaFiber = Fiber::from([&] {
auto* client_avgs = new uint64_t[clients];
CPS cps;
Semaphore readySemaphore;
Semaphore startSemaphore;
// establish client connections
for (size_t i = 0; i < clients; ++i) {
spawn(
[&, i] {
client_avgs[i] = clientFunc(*server, iterations, size, readySemaphore, startSemaphore);
},
cps);
}
// await the clients
for (size_t i = 0; i < clients; ++i) {
readySemaphore.acquire();
}
// start the clients
for (size_t i = 0; i < clients; ++i) {
startSemaphore.release();
}
// await the clients
cps.wait();
uint64_t avg_ns = client_avgs[0];
for (size_t i = 1; i < clients; ++i) {
avg_ns += client_avgs[i];
avg_ns /= 2;
}
std::cout << "clients,iterations,size,time" << std::endl;
std::cout << clients << "," << iterations << "," << size << "," << avg_ns << std::endl;
delete[] client_avgs;
exit(EXIT_SUCCESS);
});
runtime.scheduleFromAnywhere(*alphaFiber);
runtime.waitUntilFinished();
return EXIT_FAILURE;
}
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Fischer
#include <sys/types.h>
#include <cassert>
#include <climits>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <string>
#include "Common.hpp"
#include "Runtime.hpp"
#include "emper-common.h"
#include "io.hpp"
const int DECIMAL = 10;
const std::string HOST = "0.0.0.0";
const int PORT = 12345;
auto main(int argc, char* argv[]) -> int {
int port = PORT;
std::string host = HOST;
if (argc > 2) {
std::cerr << "Usage: " << argv[0] << " [port]" << std::endl;
exit(EXIT_FAILURE);
}
if (argc > 1) {
long aport = strtol(argv[1], nullptr, DECIMAL);
assert(aport <= INT_MAX && aport >= INT_MIN);
port = (int)aport;
}
std::cout << "Echoserver listening on " << host << ":" << port << std::endl;
Runtime runtime;
auto* listener = emper::io::tcp_listener(host, port, [](int socket) {
// NOLINTNEXTLINE(modernize-avoid-c-arrays)
char buf[1024];
for (;;) {
ssize_t bytes_recv = emper::io::recvAndWait(socket, buf, sizeof(buf), 0);
if (unlikely(bytes_recv <= 0)) {
// socket was shutdown
if (bytes_recv < 0) {
DIE_MSG_ERRNO("server read failed");
}
emper::io::closeAndForget(socket);
return;
}
if (unlikely(bytes_recv == 5 && strncmp("quit\n", buf, bytes_recv) == 0)) {
exit(EXIT_SUCCESS);
}
ATTR_UNUSED ssize_t bytes_send = emper::io::sendAndWait(socket, buf, bytes_recv, 0);
assert(bytes_recv == bytes_send);
}
});
if (!listener) {
exit(EXIT_FAILURE);
}
runtime.scheduleFromAnywhere(*listener);
runtime.waitUntilFinished();
return EXIT_FAILURE;
}
......@@ -9,3 +9,15 @@ worker_sleep_example_exe = executable(
'WorkerSleepExample.cpp',
dependencies: emper_dep,
)
echoserver_exe = executable(
'echoserver',
'EchoServer.cpp',
dependencies: emper_dep,
)
echoclient_exe = executable(
'echoclient',
'EchoClient.cpp',
dependencies: emper_dep,
)
......@@ -35,6 +35,8 @@ class BinaryPrivateSemaphore : public PrivateSemaphore {
BinaryPrivateSemaphore() : bpsState(initial) {}
void wait() override;
void reset() { bpsState = initial; };
};
using BPS = BinaryPrivateSemaphore;
......@@ -9,7 +9,11 @@
#include "Context.hpp"
#include "ContextManager.hpp"
#include "Debug.hpp"
#include "Emper.hpp"
#include "Runtime.hpp"
#include "lib/adt/LockedSet.hpp"
static emper::lib::adt::LockedSet<Context*> blockedContexts;
template <LogSubsystem logSubsystem>
class Blockable : public Logger<logSubsystem> {
......@@ -29,12 +33,21 @@ class Blockable : public Logger<logSubsystem> {
assert(Runtime::inRuntime());
LOGD("block() blockedContext is " << Context::getCurrentContext());
if constexpr (emper::BLOCKED_CONTEXT_SET) {
blockedContexts.insert(Context::getCurrentContext());
}
contextManager.saveAndStartNew(std::move(freshContextHook));
}
template <CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
void unblock(Context* context) {
assert(context != nullptr);
if constexpr (emper::BLOCKED_CONTEXT_SET) {
blockedContexts.erase(context);
}
// cppcheck-suppress unsafeClassCanLeak
Fiber* unblockFiber =
Fiber::from([this, context]() { contextManager.discardAndResume(context); });
......
......@@ -23,3 +23,17 @@ using func_t = std::function<void()>;
[[noreturn]] void die(const char* message, bool usePerror);
using WORD = unsigned int;
#ifdef EMPER_STATS
#include <chrono>
#define TIME_NS(code, record_func) \
auto _time_ns_start = std::chrono::high_resolution_clock::now(); \
code; \
auto _time_ns_end = std::chrono::high_resolution_clock::now(); \
auto _time_ns_duration = \
std::chrono::duration_cast<std::chrono::nanoseconds>(_time_ns_end - _time_ns_start); \
record_func(_time_ns_duration);
#else
#define TIME_NS(code, record) code
#endif
......@@ -63,6 +63,7 @@ enum class LogSubsystem {
SCHED,
RUNTI,
U_B_MPSC_Q,
IO,
};
enum LogLevel {
......@@ -99,6 +100,7 @@ class Logger {
case LogSubsystem::SCHED:
case LogSubsystem::RUNTI:
case LogSubsystem::U_B_MPSC_Q:
case LogSubsystem::IO:
default:
return ALL;
}
......@@ -122,6 +124,8 @@ class Logger {
return "RUNTI";
case LogSubsystem::U_B_MPSC_Q:
return "UBSCQ";
case LogSubsystem::IO:
return "IO ";
default:
return "UNKNOWN SUBSYSTEM (Add it *now*)";
}
......
......@@ -65,4 +65,28 @@ static const bool OVERFLOW_QUEUE =
;
auto getFullVersion() -> std::string;
static const bool BLOCKED_CONTEXT_SET =
#ifdef EMPER_BLOCKED_CONTEXT_SET
DEBUG
#else
false
#endif
;
static const bool IO =
#ifdef EMPER_IO
true
#else
false
#endif
;
static const bool IO_URING_SQPOLL =
#ifdef EMPER_IO_URING_SQPOLL
true
#else
false
#endif
;
} // namespace emper
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#include "Runtime.hpp"
#include <pthread.h> // for pthread_t, pthread_attr_init
......@@ -24,6 +24,8 @@
#include "RuntimeStrategyFactory.hpp"
#include "RuntimeStrategyStats.hpp" // for RuntimeStrategyStats
#include "emper-config.h" // IWYU pragma: keep
#include "io/IoContext.hpp" // for IoContext
#include "io/Stats.hpp" // for emper::io::Stats
#include "lib/DebugUtil.hpp"
#ifdef EMPER_DEFAULT_SCHEDULING_STRATEGY_WORK_STEALING
......@@ -58,6 +60,8 @@ RuntimeStrategyFactory& Runtime::DEFAULT_STRATEGY =
#endif
;
using emper::io::IoContext;
Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory, unsigned int seed)
: workerCount(workerCount),
workerLatch(workerCount),
......@@ -80,6 +84,32 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory
currentRuntime = this;
}
// initialize the global and all worker IoContexts
if constexpr (emper::IO) {
// The global io_uring needs at least workerCount entries in its SQ because
// for each worker's IoContext one eventfd read is prepared before the
// globalCompleter is started and submits all previously prepared sqes.
globalIo = new IoContext(workerCount);
ioContexts = new IoContext[workerCount];
if constexpr (emper::STATS) {
globalIo->stats.workerId = -1;
std::atexit(emper::io::Stats::printWorkerStats);
}
// submit the workers' CQ eventfds to the global IoContext
for (workerid_t i = 0; i < workerCount; ++i) {
ioContexts[i].submit_efd();
if constexpr (emper::STATS) {
ioContexts[i].stats.workerId = i;
}
}
// start the globalCompleter after all eventfd are submitted so we don't need to
// synchronize the globalIo's SQ
IoContext::startGlobalCompleter(*globalIo);
}
for (workerid_t i = 0; i < workerCount; ++i) {
pthread_attr_t attr;
errno = pthread_attr_init(&attr);
......@@ -141,6 +171,12 @@ Runtime::~Runtime() {
}
delete[] workers;
delete[] threads;
if constexpr (emper::IO) {
delete[] ioContexts;
delete globalIo;
}
{
std::lock_guard<std::mutex> lock(currentRuntimeMutex);
currentRuntime = nullptr;
......@@ -151,6 +187,10 @@ Runtime::~Runtime() {
auto Runtime::workerLoop(Worker* worker) -> void* {
worker->setWorker();
if constexpr (emper::IO) {
ioContexts[worker->workerId].setWorkerIo();
}
LOGD("Worker loop started by thread " << syscall(SYS_gettid));
int oldType;
......@@ -169,7 +209,14 @@ auto Runtime::workerLoop(Worker* worker) -> void* {
return nullptr;
}
auto Runtime::nextFiber() -> NextFiberResult { return scheduler.nextFiber(); }
auto Runtime::nextFiber() -> NextFiberResult {
if constexpr (emper::IO) {
// Schedule all fibers waiting on completed IO
IoContext::getWorkerIo()->reapCompletions();
}
return scheduler.nextFiber();
}
void Runtime::waitUntilFinished() {
for (workerid_t i = 0; i < workerCount; ++i) {
......
......@@ -30,6 +30,12 @@ class Fiber;
class RuntimeStrategy;
class RuntimeStrategyFactory;
namespace emper::io {
class IoContext;
}
using emper::io::IoContext;
enum WakeupMode {
IF_SLEEPING_OBSERVED,
ALWAYS,
......@@ -52,6 +58,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
ContextManager& contextManager;
pthread_t* threads;
Worker** workers;
IoContext* globalIo;
IoContext* ioContexts;
std::default_random_engine randomEngine;
std::uniform_int_distribution<unsigned int> uniformIntDistribution;
......@@ -115,6 +123,14 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
workerSleepConditionVariable.wait(lk);
}
inline auto getGlobalIo() -> IoContext* {
if constexpr (emper::IO) {
return globalIo;
}
return nullptr;
}
public:
Runtime() : Runtime(std::thread::hardware_concurrency()) {}
......@@ -164,6 +180,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
friend ContextManager;
friend Scheduler;
friend Dispatcher;
friend IoContext;
template <typename T, intptr_t WS_QUEUE_SIZE, size_t WORKER_EXCLUSIVE_QUEUE_SIZE>
friend class MemoryManager;
};
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus
// Copyright © 2020 Florian Schmaus, Florian Fischer
#include "BinaryPrivateSemaphore.hpp" // for BinaryPrivateSemaphore, BPS
#include "CountingPrivateSemaphore.hpp" // for CountingPrivateSemaphore, CPS
#include "Fiber.hpp" // for Fiber, Fiber::NOT_AFFINE
#include "Runtime.hpp" // for Runtime
#include "emper-common.h" // for workeraffinity_t
#include "emper-config.h" // IWYU pragma: keep
#include "emper.h" // for fiber, cps, bps, runtime
#ifdef EMPER_IO
#include <sys/socket.h> // for socklen_t
#include <sys/types.h> // for off_t
#include <cstddef> // for size_t
#include "io.hpp" // for emper::io::emper_*
#endif
auto init_runtime(void) -> runtime* {
auto* r = new Runtime();
return reinterpret_cast<runtime*>(r);
......@@ -99,3 +109,35 @@ void wait_cps(cps* sem) {
auto* countingPrivateSemaphore = reinterpret_cast<CountingPrivateSemaphore*>(sem);
countingPrivateSemaphore->wait();
}
#ifdef EMPER_IO
auto emper_recv(int socket, void* buffer, size_t length, int flags) -> ssize_t {
return emper::io::recvAndWait(socket, buffer, length, flags);
}
auto emper_send(int socket, const void* buffer, size_t length, int flags) -> ssize_t {
return emper::io::sendAndWait(socket, buffer, length, flags);
}
auto emper_connect(int socket, const struct sockaddr* address, socklen_t address_len) -> int {
return emper::io::connectAndWait(socket, address, address_len);
}