diff --git a/apps/EchoClient.cpp b/apps/EchoClient.cpp index ac159565bd5a6081f9ecdf554b389496eaae48f4..03737953bc6de506a25c754671c16d50de271814 100644 --- a/apps/EchoClient.cpp +++ b/apps/EchoClient.cpp @@ -427,9 +427,9 @@ static void printSettings() { } else { std::cout << iterations << "iterations"; } - std::cout << ",clients=" << nclients; - std::cout << ",size=" << size; - std::cout << ",sendquit= " << (sendQuit ? "true" : "false"); + std::cout << ", clients=" << nclients; + std::cout << ", size=" << size; + std::cout << ", sendquit=" << (sendQuit ? "true" : "false"); std::cout << "}" << std::endl; } diff --git a/apps/EchoServer.cpp b/apps/EchoServer.cpp index c64a59492e19f04e5ca8c77df012855b7eafc5cf..7b92494aef8287c659779e1774b268e0f479e290 100644 --- a/apps/EchoServer.cpp +++ b/apps/EchoServer.cpp @@ -9,11 +9,14 @@ #include <cstdlib> #include <cstring> #include <iostream> +#include <random> #include <string> #include "Common.hpp" #include "Debug.hpp" #include "Runtime.hpp" +#include "RuntimeBuilder.hpp" +#include "emper-common.h" #include "emper-config.h" #include "io.hpp" @@ -26,15 +29,36 @@ static const std::string PORT = "12345"; static const int BACKLOG = 1024; static unsigned int computations_us = 0; +static unsigned int max_computations_us = 0; +static float max_computations_probability = -1; static std::atomic<bool> quit = false; +static thread_local std::mt19937 randGenerator; +static auto getComputation() -> unsigned { + // fixed computation is computations_us + if (!max_computations_us) return computations_us; + + // computation is in range [computations_us, max_computations_us] + if (max_computations_probability == -1) { + std::uniform_int_distribution<unsigned int> distribution(computations_us, max_computations_us); + return computations_us += distribution(randGenerator); + } + + // computation is either computations_us or max_computations_us with probability + // max_computations_probability + std::uniform_real_distribution<float> distribution(0, 1); + float p = distribution(randGenerator); + return p >= max_computations_probability ? max_computations_us : computations_us; +} + auto main(int argc, char* argv[]) -> int { std::string host = HOST; std::string port = PORT; - if (argc > 3) { - std::cerr << "Usage: " << argv[0] << " [port] [computation_us]" << std::endl; + if (argc > 5) { + std::cerr << "Usage: " << argv[0] << " [port] [computations_us]" + << " [max_computations_us] [max_computations_probability]" << std::endl; exit(EXIT_FAILURE); } @@ -46,9 +70,33 @@ auto main(int argc, char* argv[]) -> int { computations_us = std::stoi(argv[2]); } - std::cout << "Echoserver listening on " << host << ":" << port << std::endl; + if (argc > 3) { + max_computations_us = std::stoi(argv[3]); + if (max_computations_us < computations_us) + DIE_MSG("max_computations_us must be bigger than computations_us"); + } + + if (argc > 4) { + max_computations_probability = std::stof(argv[4]); + if (max_computations_probability < 0 || max_computations_probability > 1) + DIE_MSG("max_computations_probability must be in [0,1]"); + } + + std::cout << "Echoserver listening on " << host << ":" << port; + if (computations_us) { + std::cout << " with " << computations_us; + if (max_computations_us) std::cout << " - " << max_computations_us; + std::cout << " us computations"; + } + std::cout << std::endl; + + RuntimeBuilder runtimeBuilder; + if (max_computations_us) { + runtimeBuilder.newWorkerHook([](workerid_t id) { randGenerator.seed(id); }); + } + + auto runtime = runtimeBuilder.build(); - Runtime runtime; auto serverFunc = [](int socket) { // NOLINTNEXTLINE(modernize-avoid-c-arrays) char buf[1024]; @@ -68,12 +116,15 @@ auto main(int argc, char* argv[]) -> int { break; } - const auto start = std::chrono::steady_clock::now(); - const auto deadline = start + std::chrono::microseconds(computations_us); - // TODO: The suppressed linter error below may be a false positive - // reported by clang-tidy. - // NOLINTNEXTLINE(modernize-use-nullptr) - while (std::chrono::steady_clock::now() < deadline) { + if (computations_us) { + unsigned int computation = getComputation(); + const auto start = std::chrono::steady_clock::now(); + const auto deadline = start + std::chrono::microseconds(computation); + // TODO: The suppressed linter error below may be a false positive + // reported by clang-tidy. + // NOLINTNEXTLINE(modernize-use-nullptr) + while (std::chrono::steady_clock::now() < deadline) { + } } ssize_t bytes_send = emper::io::sendAndWait(socket, buf, bytes_recv, MSG_NOSIGNAL, true); diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index bb5e1ef835d075194ed832e2ef663bac30103b7e..c246bbc6e0fffd1bd43b7208b8180c249117aacd 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -72,7 +72,9 @@ RuntimeStrategyFactory& Runtime::DEFAULT_STRATEGY = using emper::io::GlobalIoContext; using emper::io::IoContext; -Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory, unsigned int seed) +Runtime::Runtime(workerid_t workerCount, const std::vector<NewWorkerHook>& newWorkerHooks, + bool pinWorkers, workerid_t pinningOffset, RuntimeStrategyFactory& strategyFactory, + unsigned int seed) : workerCount(workerCount), workerLatch(workerCount), firstWorkerThreadExitLatch(workerCount), @@ -116,6 +118,9 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory fromAnywhereStats = nullptr; } + // transfere newWorkerHooks + for (const auto& f : newWorkerHooks) this->newWorkerHooks.push_back(f); + // initialize the global IoContext if a completer is used if constexpr (emper::IO && emper::IO_COMPLETER_BEHAVIOR != emper::IoCompleterBehavior::none) { // The global io_uring needs at least workerCount entries in its SQ because @@ -130,21 +135,9 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory } } - bool pinWorkers = shouldPinWorkers(); - - // Core id we start the worker pinning - workerid_t pinningOffset = 0; - char* pinningOffsetEnv = std::getenv("EMPER_PINNING_OFFSET"); - if (pinningOffsetEnv) { - if (!pinWorkers) { - DIE_MSG("EMPER_PIN_WORKERS=false and EMPER_PINNING_OFFSET are mutual exclusive"); - } - - int pinningOffsetInt = std::stoi(pinningOffsetEnv); - if (pinningOffsetInt > UINT8_MAX) { - DIE_MSG("Pinning offset " << pinningOffsetInt << " to big for its datatype"); - } - pinningOffset = static_cast<workerid_t>(pinningOffsetInt); + // Check if the pinning settings are sound + if (pinningOffset && !pinWorkers) { + DIE_MSG("pinningOffset and not pinning workers are mutually exclusive"); } for (workerid_t i = 0; i < workerCount; ++i) { @@ -298,13 +291,9 @@ auto Runtime::workerLoop(Worker* worker) -> void* { } auto Runtime::getDefaultWorkerCount() -> workerid_t { - char* workerCountEnv = std::getenv("EMPER_WORKER_COUNT"); + auto workerCountEnv = emper::lib::env::getUnsignedFromEnv<workerid_t>("EMPER_WORKER_COUNT"); if (workerCountEnv) { - int workerCountInt = std::stoi(workerCountEnv); - if (workerCountInt > UINT8_MAX) { - DIE_MSG("Worker count " << workerCountInt << " to big for its datatype"); - } - return static_cast<workerid_t>(workerCountInt); + return workerCountEnv.value(); } // The CPU count reported by sysconf(_SC_NPROCESSORS_ONLN), sysconf(_SC_NPROCESSORS_CONF) diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index c3467559ac55ea41dcc3cfa3166c0b394a1078d3..14ae319cee699fb8e0b2d58260f8dc33a91df07c 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -31,6 +31,7 @@ #include "sleep_strategy/WorkerSleepStrategy.hpp" enum class LogSubsystem; +class RuntimeBuilder; class ContextManager; class Dispatcher; class Fiber; @@ -59,13 +60,16 @@ using emper::io::IoContext; using emper::sleep_strategy::WorkerSleepStrategy; class Runtime : public Logger<LogSubsystem::RUNTI> { + public: + using NewWorkerHook = std::function<void(workerid_t)>; + private: static std::mutex currentRuntimeMutex; static Runtime* currentRuntime; const workerid_t workerCount; - std::vector<std::function<void(workerid_t)>> newWorkerHooks; + std::vector<NewWorkerHook> newWorkerHooks; Latch workerLatch; Latch firstWorkerThreadExitLatch; @@ -110,6 +114,10 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { return emper::lib::env::getBoolFromEnv("EMPER_PIN_WORKERS").value_or(true); } + static auto getDefaultPinningOffset() -> workerid_t { + return emper::lib::env::getUnsignedFromEnv<workerid_t>("EMPER_PINNING_OFFSET").value_or(0); + } + protected: void addNewWorkerHook(const std::function<void(workerid_t)>& hook) { newWorkerHooks.push_back(hook); @@ -157,7 +165,12 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { : Runtime(getDefaultWorkerCount(), strategyFactory) {} Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory, - unsigned int seed = std::random_device()()); + unsigned int seed = std::random_device()()) + : Runtime(workerCount, std::vector<NewWorkerHook>(), shouldPinWorkers(), + getDefaultPinningOffset(), strategyFactory, seed) {} + + Runtime(workerid_t workerCount, const std::vector<NewWorkerHook>& newWorkerHooks, bool pinWorkers, + workerid_t pinningOffset, RuntimeStrategyFactory& strategyFactory, unsigned int seed); ~Runtime(); @@ -216,6 +229,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { friend class AbstractWorkStealingScheduler; template <LogSubsystem> friend class Blockable; + friend RuntimeBuilder; friend ContextManager; friend Scheduler; friend Dispatcher; diff --git a/emper/RuntimeBuilder.hpp b/emper/RuntimeBuilder.hpp new file mode 100644 index 0000000000000000000000000000000000000000..5e3e8b2e7c535f50140319b0e5ef24ce8a7c467a --- /dev/null +++ b/emper/RuntimeBuilder.hpp @@ -0,0 +1,70 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#pragma once + +#include <optional> +#include <vector> + +#include "Runtime.hpp" +#include "emper-common.h" + +class RuntimeBuilder { + private: + workerid_t workerCount = 0; + + std::vector<Runtime::NewWorkerHook> newWorkerHooks; + + RuntimeStrategyFactory* strategyFactory = nullptr; + + std::optional<bool> pinWorkers = std::nullopt; + std::optional<workerid_t> pinningOffset = std::nullopt; + + std::optional<unsigned> seed = std::nullopt; + + public: + inline auto withWorkerCount(workerid_t workerCount) -> RuntimeBuilder& { + this->workerCount = workerCount; + return *this; + }; + + inline auto newWorkerHook(const Runtime::NewWorkerHook& hook) -> RuntimeBuilder& { + newWorkerHooks.push_back(hook); + return *this; + }; + + inline auto withPinWorkers(bool pinWorkers) -> RuntimeBuilder& { + this->pinWorkers = pinWorkers; + return *this; + }; + + inline auto withPinningOffset(workerid_t pinningOffset) -> RuntimeBuilder& { + this->pinningOffset = pinningOffset; + return *this; + }; + + inline auto withStrategyFactory(RuntimeStrategyFactory* factory) -> RuntimeBuilder& { + this->strategyFactory = factory; + return *this; + }; + + inline auto withSeed(unsigned seed) -> RuntimeBuilder& { + this->seed = seed; + return *this; + }; + + inline auto build() -> Runtime { + auto workerCount = + this->workerCount != 0 ? this->workerCount : Runtime::getDefaultWorkerCount(); + + auto* strategyFactory = + this->strategyFactory != nullptr ? this->strategyFactory : &Runtime::DEFAULT_STRATEGY; + + auto pinWorkers = this->pinWorkers.value_or(Runtime::shouldPinWorkers()); + + auto pinningOffset = this->pinningOffset.value_or(Runtime::getDefaultPinningOffset()); + + auto seed = this->seed.value_or(std::random_device()()); + + return Runtime(workerCount, newWorkerHooks, pinWorkers, pinningOffset, *strategyFactory, seed); + } +}; diff --git a/emper/lib/env.hpp b/emper/lib/env.hpp index 65b6c4fc2dcefe3421286f64ff27ea1114de4709..4d97aaecba060ff8227b19d3930a336a50495e02 100644 --- a/emper/lib/env.hpp +++ b/emper/lib/env.hpp @@ -2,6 +2,7 @@ // Copyright © 2021 Florian Fischer #pragma once +#include <cinttypes> #include <string> #include "Debug.hpp" @@ -27,4 +28,28 @@ static auto getBoolFromEnv(const std::string&& key) -> std::optional<bool> { DIE_MSG(key << " has invalid value: " << envStr << " (expected true or false)"); } +template <typename unsigned_type> +static auto getUnsignedFromEnv(const std::string&& key) -> std::optional<unsigned_type> { + DBG("parse " << key << " environment variable"); + char* envVar = std::getenv(key.c_str()); + if (!envVar) { + return std::nullopt; + } + + std::string envStr(envVar); + char* last; + + uintmax_t num = std::strtoumax(envStr.c_str(), &last, 10); + if (last != &envStr[0] + envStr.size()) { + DIE_MSG(key << " has invalid value: " << envStr << " (expected base-10 number)"); + } + + const unsigned_type t_max = std::numeric_limits<unsigned_type>::max(); + if (num > static_cast<uintmax_t>(t_max)) { + DIE_MSG(key << " is to big: " << envStr << " (type '" << typeid(t_max).name() + << "' max: " << t_max << ")"); + } + return static_cast<unsigned_type>(num); +} + } // namespace emper::lib::env