diff --git a/emper/Emper.hpp b/emper/Emper.hpp index 8293acb3974449e67c9c3dc4d04572d3086e3bae..9f1a177ab1a4a2e866df6cf890f968c50f783dfe 100644 --- a/emper/Emper.hpp +++ b/emper/Emper.hpp @@ -159,14 +159,6 @@ static const bool IO_TRY_SYSCALL = static const bool IO_WORKER_URING = IO && !IO_SINGLE_URING; -static const bool IO_URING_SQPOLL = -#ifdef EMPER_IO_URING_SQPOLL - true -#else - false -#endif - ; - static const bool WAITFREE_IO_STEALING = #ifdef EMPER_IO_WAITFREE_STEALING true @@ -174,7 +166,6 @@ static const bool WAITFREE_IO_STEALING = false #endif ; - // Initialize this bool in Emper.cpp because it needs code evaluation // (LinuxVersion::compare) during runtime. // Using a static variable here means EACH object file including this header has to @@ -185,13 +176,14 @@ static const bool WAITFREE_IO_STEALING = // warnings during the comparison use not yet initialized components is reduced. extern const bool IO_MUST_INVALIDATE_BROKEN_CHAIN; -static const bool IO_URING_SHARED_POLLER = -#ifdef EMPER_IO_URING_SHARED_POLLER - true -#else - false -#endif - ; +enum class IoSqPoller { + off, + one, + each, + numa, +}; + +static const enum IoSqPoller IO_SQ_POLLER = IoSqPoller::EMPER_IO_SQ_POLLER; enum class IoCompleterBehavior { schedule, diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index 2c4db2992fd6da40b4a101fbda62dcff913466cf..df4e973afbbfcc95a0b202d0977cb58206c36bff 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -2,6 +2,7 @@ // Copyright © 2020-2021 Florian Schmaus, Florian Fischer #include "Runtime.hpp" +#include <numa.h> #include <pthread.h> // for pthread_t, pthread_attr_init #include <cerrno> // for errno @@ -140,6 +141,40 @@ Runtime::Runtime(workerid_t workerCount, const std::vector<NewWorkerHook>& newWo DIE_MSG("pinningOffset and not pinning workers are mutually exclusive"); } + // When sharing SQ poll threads the IoContext which the other attach to + // must be initialized first + if constexpr (emper::IO_SQ_POLLER == emper::IoSqPoller::numa) { + if (numa_available() < 0) { + DIE_MSG("numa support not available for numa based sq poller sharing"); + } + + struct bitmask* nodeCpus = numa_allocate_cpumask(); + if (!nodeCpus) { + DIE_MSG_ERRNO("numa_allocate_cpumask failed"); + } + + for (unsigned node = 0; node < numa_all_nodes_ptr->size; ++node) { + if (!numa_bitmask_isbitset(numa_all_nodes_ptr, node)) { + continue; + } + + int err = numa_node_to_cpus(static_cast<int>(node), nodeCpus); + if (err) { + DIE_MSG_ERRNO("numa_node_to_cpu failed"); + } + + for (unsigned cpu = 0; cpu < nodeCpus->size; ++cpu) { + if (!numa_bitmask_isbitset(nodeCpus, cpu)) { + continue; + } + + workerid_t workerId = cpuToWorkerId(cpu); + ioContexts[workerId] = new IoContext(*this); + break; + } + } + } + for (workerid_t i = 0; i < workerCount; ++i) { pthread_attr_t attr; errno = pthread_attr_init(&attr); @@ -242,12 +277,13 @@ auto Runtime::workerLoop(Worker* worker) -> void* { worker->setWorker(); if constexpr (emper::IO_WORKER_URING) { - auto* workerIo = new IoContext(*this); - // notify the completer that we have created our IoContext - ioReadySem.notify(); + if (!ioContexts[worker->workerId]) { + ioContexts[worker->workerId] = new IoContext(*this); + } - ioContexts[worker->workerId] = workerIo; - workerIo->setWorkerIo(worker); + // notify the globalCompleter that we have initialized our IoContext (registered our eventfd) + ioContexts[worker->workerId]->setWorkerIo(worker); + ioReadySem.notify(); } LOGD("Worker loop started by thread " << gettid()); diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 14ae319cee699fb8e0b2d58260f8dc33a91df07c..96931f39828ca04c639d00dbf2c76d5170184dd9 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -99,6 +99,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { emper::log::LogBuffer* logBuffer = nullptr; + workerid_t pinningOffset = 0; + auto workerLoop(Worker* worker) -> void*; WorkerSleepStrategy workerSleepStrategy; @@ -204,6 +206,10 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { static inline auto getWorkerId() -> workerid_t { return Worker::getCurrentWorkerId(); } + [[nodiscard]] inline auto cpuToWorkerId(unsigned cpu) const -> workerid_t { + return cpu - pinningOffset; + } + [[nodiscard]] inline auto getWorkerCount() const -> workerid_t { return workerCount; } static inline auto getRuntime() -> Runtime* { return currentRuntime; } diff --git a/emper/io/GlobalIoContext.cpp b/emper/io/GlobalIoContext.cpp index 422cc93508d2afc16e5d7ba1a9e26f1c94184584..40965dced10f475c9f4c46909c868b6597fc63b2 100644 --- a/emper/io/GlobalIoContext.cpp +++ b/emper/io/GlobalIoContext.cpp @@ -31,6 +31,15 @@ using emper::lib::TaggedPtr; namespace emper::io { +static void assert_submitted(ATTR_UNUSED unsigned submitted, ATTR_UNUSED unsigned expected) { + // Only assert how many sqe were submitted when doing the io_uring_enter syscall. + // Reporting how many sqes were submitted uses khead which + // is written asynchronously by the poller kthread + if constexpr (emper::IO_SQ_POLLER == IoSqPoller::off) { + assert(submitted == expected); + } +} + void GlobalIoContext::completerLoop() { int submitted; @@ -70,7 +79,7 @@ void GlobalIoContext::completerLoop() { } // We have submitted all eventfds - assert(submitted == runtime.getWorkerCount()); + assert_submitted(submitted, runtime.getWorkerCount()); } // Submit the read for our termination eventfd @@ -88,7 +97,7 @@ void GlobalIoContext::completerLoop() { if (unlikely(submitted < 0)) { DIE_MSG_ERRNO("submitting termination eventfd read failed"); } - assert(submitted == 1); + assert_submitted(submitted, 1); LOGD("start completer loop"); while (true) { @@ -134,11 +143,7 @@ void GlobalIoContext::completerLoop() { DIE_MSG_ERRNO("re-submitting eventfd read to global_ring failed"); } - // Reporting how many sqes were submitted uses khead which - // is written asynchronously by the poller kthread - if constexpr (!emper::IO_URING_SQPOLL) { - assert(submitted == 1); - } + assert_submitted(submitted, 1); if constexpr (emper::IO_COMPLETER_BEHAVIOR == emper::IoCompleterBehavior::wakeup) { // Notify the worker which's CQ has a new cqe diff --git a/emper/io/IoContext.cpp b/emper/io/IoContext.cpp index 741b7bb29925b02fe7430850bd59cda599a3aed1..7b59dd2b0069727b9f9ce8d88016f00d776a04ac 100644 --- a/emper/io/IoContext.cpp +++ b/emper/io/IoContext.cpp @@ -4,8 +4,10 @@ #include <liburing.h> // for io_uring_submit, io_uring_get_sqe #include <liburing/io_uring.h> // for io_uring_cqe, io_uring_params, IORI... -#include <sys/eventfd.h> // for eventfd -#include <unistd.h> // for close +#include <numa.h> +#include <sched.h> +#include <sys/eventfd.h> +#include <unistd.h> #include <algorithm> #include <atomic> // for atomic, __atomic_base @@ -528,21 +530,67 @@ IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) struct io_uring_params params; memset(¶ms, 0, sizeof(params)); - if constexpr (emper::IO_URING_SQPOLL) { + // Setup sq polling + if constexpr (emper::IO_SQ_POLLER != IoSqPoller::off) { // Using a poller needs at least linux 5.15 to safely handle // broken chains see: 43b8dc163f985341d2a2a24d1bfe0e6bb939f880. if (emper::IO_MUST_INVALIDATE_BROKEN_CHAIN) { DIE_MSG("Invalidating broken chains is not possible when using sqpoll"); } - params.flags |= IORING_SETUP_SQPOLL; - } - if constexpr (emper::IO_URING_SHARED_POLLER) { - auto *gio = runtime.globalIo; - if (gio) { - params.flags |= IORING_SETUP_ATTACH_WQ; - params.wq_fd = gio->ring.ring_fd; + // We share poller threads and may need to attach to someone + if constexpr (emper::IO_SQ_POLLER != IoSqPoller::each) { + int pollerRingFd = -1; + + // We use a single poller -> attach to the global io_uring + if constexpr (emper::IO_SQ_POLLER == IoSqPoller::one) { + auto *gio = runtime.globalIo; + if (gio) { + pollerRingFd = gio->ring.ring_fd; + } + // One sq poller thread per numa node + // The CPU with the lowest id in a node is responsible to create a poller. + // All other CPUs in the same node will attach to this io_uring + } else if constexpr (emper::IO_SQ_POLLER == IoSqPoller::numa) { + // get our numa node + unsigned cpu, node; + int err = getcpu(&cpu, &node); + if (unlikely(err)) { + DIE_MSG_ERRNO("getcpu failed"); + } + + // get all CPUs in our node + struct bitmask *mask = numa_allocate_cpumask(); + if (unlikely(!mask)) { + DIE_MSG_ERRNO("numa_allocate_cpumask failed"); + } + + err = numa_node_to_cpus(static_cast<int>(node), mask); + if (unlikely(err)) { + DIE_MSG_ERRNO("getcpu failed"); + } + + // find the lowest cpu in the current node + for (unsigned i = 0; i < mask->size; ++i) { + if (!numa_bitmask_isbitset(mask, i)) { + continue; + } + + LOGD("Found first CPU " << i << " in node " << node << " from CPU " << cpu); + IoContext *pollerIo = runtime.ioContexts[runtime.cpuToWorkerId(i)]; + if (pollerIo) { + pollerRingFd = pollerIo->ring.ring_fd; + } + break; + } + } + + // attach to the used poller's ring fd + if (pollerRingFd != -1) { + params.flags |= IORING_SETUP_ATTACH_WQ; + params.wq_fd = pollerRingFd; + } } } @@ -556,7 +604,7 @@ IoContext::IoContext(Runtime &runtime, size_t uring_entries) : runtime(runtime) LOGW("kernel das not support IORING_FEAT_NODROP. We may loose IO requests"); } - if constexpr (emper::IO_URING_SQPOLL) { + if constexpr (emper::IO_SQ_POLLER != IoSqPoller::off) { if (!(params.features & IORING_FEAT_SQPOLL_NONFIXED)) { DIE_MSG("io_uring_sqpoll defined but kernel does not support IORING_FEAT_SQPOLL_NONFIXED"); } diff --git a/emper/io/IoContext.hpp b/emper/io/IoContext.hpp index fd0db5a8356332143c437e25753ecb0f81658e66..0744715ac258bedc952085e154a7dc99fc6bffc8 100644 --- a/emper/io/IoContext.hpp +++ b/emper/io/IoContext.hpp @@ -279,7 +279,7 @@ class IoContext : public Logger<LogSubsystem::IO> { inline auto getSqe() -> struct io_uring_sqe * { struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); - if constexpr (emper::IO_URING_SQPOLL) { + if constexpr (emper::IO_SQ_POLLER != IoSqPoller::off) { // apparently using io_uring_get_sqe after waiting for one getting available // with io_uring_sqring_wait may not return a usable sqe. // Therefore we loop here diff --git a/meson.build b/meson.build index b5040f04839454f010938d9411e49b282aa5158f..80a95be067e61728b5c3a9d6413f04f205d89cd5 100644 --- a/meson.build +++ b/meson.build @@ -17,7 +17,9 @@ if not uring_dep.found() liburing_sp = subproject('liburing') uring_dep = liburing_sp.get_variable('uring').as_system() endif -emper_dependencies = [thread_dep, uring_dep] + +numa_dep = dependency('numa') +emper_dependencies = [thread_dep, uring_dep, numa_dep] boost_thread_dep = dependency('boost', modules : ['thread'], required: false) @@ -92,6 +94,7 @@ endif conf_data.set('EMPER_LOG_LEVEL', log_level) conf_data.set('EMPER_LOG_TIMESTAMP', get_option('log_timestamp')) +# IO configuration option_io = get_option('io') if option_io conf_data.set('EMPER_IO', true) @@ -103,7 +106,6 @@ io_bool_options = [ {'option': 'single_uring', 'dependencies': {'io_completer_behavior': 'schedule'}}, {'option': 'try_syscall'}, - {'option': 'uring_sqpoll'}, {'option': 'waitfree_stealing', 'dependencies': {'io_stealing': true, 'io_lockless_cq': true}}, ] @@ -139,12 +141,13 @@ foreach option : io_raw_options conf_data.set('EMPER_IO_' + option.to_upper(), get_option('io_' + option)) endforeach -if get_option('io_uring_sqpoll') and get_option('io_uring_shared_poller') == 'automatic' - conf_data.set('EMPER_IO_SQPOLL', true) -else - conf_data.set('EMPER_IO_SQPOLL', false) +io_sqpoll_option = get_option('io_uring_sq_poller') +if io_sqpoll_option != 'off' and not option_io + error('io_uring_sq_poller set without io') endif +conf_data.set('EMPER_IO_SQ_POLLER', io_sqpoll_option) + io_completer_behavior = get_option('io_completer_behavior') if io_completer_behavior == 'maybe_wakeup' if get_option('worker_sleep') @@ -167,6 +170,13 @@ else check_anywhere_queue_while_stealing == 'true') endif +# check io meson options consistency +if get_option('io_single_uring') + if io_sqpoll_option == 'each' or io_sqpoll_option == 'numa' + warning('sqpoller: ' + io_sqpoll_option + ' is useless when using a single io_uring') + endif +endif + subdir('emper') subdir('tests') subdir('apps') diff --git a/meson_options.txt b/meson_options.txt index ec601da5e57d378986d613201499c505b1639911..3cced4d588766dbbefb40d9c2b94e7d68bd81f44 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -149,17 +149,11 @@ option( description: 'Number of entries in each worker io_uring' ) option( - 'io_uring_sqpoll', - type: 'boolean', - value: false, - description: 'Enable io_urings SQPOLL feature (start a separate kernel thread which polls the sq reducing the amount of syscalls to submit new requests. This is a privileged operation.).' -) -option( - 'io_uring_shared_poller', + 'io_uring_sq_poller', type: 'combo', - value: 'automatic', - choices: ['automatic', 'off'], - description: 'Share a common poller thread backend between all io_urings' + value: 'off', + choices: ['each', 'numa', 'one', 'off'], + description: 'Use one or multiple kernel threads to poll the SQs' ) option( 'io_completer_behavior',