diff --git a/apps/fsearch/fsearch.cpp b/apps/fsearch/fsearch.cpp index 2093ca6dd485c0f81ec4da34e5db7d0c259846b6..59a2217082c43e9f818f7c5e4a1b93f6abe613b1 100644 --- a/apps/fsearch/fsearch.cpp +++ b/apps/fsearch/fsearch.cpp @@ -5,7 +5,6 @@ #include <array> #include <boost/program_options.hpp> -#include <climits> #include <cstdlib> #include <cstring> #include <exception> @@ -20,16 +19,19 @@ #include "Semaphore.hpp" #include "StealingMode.hpp" #include "io.hpp" +#include "lib/ShardedFileBuffer.hpp" namespace fs = std::filesystem; namespace po = boost::program_options; #define EMPER_RIPGREP_BUFSIZE 4096 -const char* needle; -size_t needle_len; +static const char* needle; +static size_t needle_len; -emper::Semaphore* max_running; +static emper::Semaphore* max_running; + +static emper::lib::ShardedFileBuffer* outBuf; static enum emper::StealingMode stealingMode; @@ -48,10 +50,7 @@ void search(const std::string& path) { ssize_t bytes_read = emper::io::readFileAndWait(fd, buf.data(), buf.size(), 0); while (bytes_read > 0) { if (memmem(&buf[0], bytes_read, needle, needle_len)) { - std::array<char, PATH_MAX + 1> outBuf; - memcpy(outBuf.data(), path.c_str(), path.length()); - outBuf[path.length()] = '\n'; - emper::io::writeFileAndWait(STDOUT_FILENO, outBuf.data(), path.length() + 1, -1); + outBuf->getStream() << path << std::endl; goto out; } @@ -71,6 +70,8 @@ out: } void walk_dir() { + outBuf = new emper::lib::ShardedFileBuffer(*Runtime::getRuntime(), STDOUT_FILENO); + // Only search in regular files auto filter = [](const fs::directory_entry& dirent) { return dirent.is_regular_file(); }; // Search the file @@ -85,6 +86,7 @@ void walk_dir() { break; } + delete outBuf; Runtime::getRuntime()->initiateTermination(); } diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index f6f2596b3725d9e943b66e42ba889de224e20450..a897ff116a73c2c15dd36b64e146c26649cc9e23 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -36,6 +36,7 @@ #include "io/IoContext.hpp" #include "io/Stats.hpp" #include "lib/DebugUtil.hpp" +#include "lib/ShardedFileBuffer.hpp" #include "lib/env.hpp" #include "log/LogBuffer.hpp" #include "stats/FromAnywhere.hpp" @@ -105,6 +106,8 @@ Runtime::Runtime(workerid_t workerCount, const std::vector<NewWorkerHook>& newWo currentRuntime = this; } + emper::lib::ShardedFileBuffer::registerRuntime(*this); + const char* envValueLogFile = std::getenv("EMPER_LOG_FILE"); if (envValueLogFile) { logBuffer = new emper::log::LogBuffer(envValueLogFile); @@ -261,6 +264,8 @@ Runtime::~Runtime() { initiateAndWaitUntilTermination(); } + emper::lib::ShardedFileBuffer::unregisterRuntime(*this); + // Print the stats after all worker threads have terminated and before we delete // objects bound to the runtime's lifetime if constexpr (emper::STATS) { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 5daa0e973e45ed35e36122cd996ce5fab297721e..40681e9da0abc6eca5f19c6688b4dfc30ab372f0 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -45,6 +45,9 @@ namespace io { class GlobalIoContext; class IoContext; } // namespace io +namespace lib { +class ShardedFileBuffer; +} namespace log { class LogBuffer; } @@ -266,6 +269,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { friend class WorkerLocalData; friend class Fibril; friend class emper::sleep_strategy::PipeSleepStrategy; + friend class emper::lib::ShardedFileBuffer; friend void emper::log::log(const std::string& prefix, const std::string& message); }; diff --git a/emper/log/LogBuffer.cpp b/emper/lib/MappedFileBuffer.cpp similarity index 73% rename from emper/log/LogBuffer.cpp rename to emper/lib/MappedFileBuffer.cpp index 94bd3f7b4c7940ba21876721a0ed5332ebecb2ee..320e12538d1b15cdc3a50f2d36649c2f864af234 100644 --- a/emper/log/LogBuffer.cpp +++ b/emper/lib/MappedFileBuffer.cpp @@ -1,6 +1,6 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus -#include "LogBuffer.hpp" +// Copyright © 2021-2022 Florian Fischer +#include "lib/MappedFileBuffer.hpp" #include <fcntl.h> #include <sys/mman.h> @@ -12,17 +12,17 @@ #include "Common.hpp" -namespace emper::log { +namespace emper::lib { -LogBuffer::LogBuffer(const std::string& logFile) : logFile(logFile) { - logFd = open(logFile.c_str(), O_RDWR | O_TRUNC | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - if (logFd == -1) { - DIE_MSG_ERRNO("opening log file " << logFile << " failed"); +MappedFileBuffer::MappedFileBuffer(const std::string& file) : file(file) { + fd = open(file.c_str(), O_RDWR | O_TRUNC | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (fd == -1) { + DIE_MSG_ERRNO("opening file " << file << " failed"); } for (size_t i = 0; i < BUFFER_COUNT; ++i) { const auto off = static_cast<off_t>(i * BUFFER_SIZE); - void* res = mmap(nullptr, BUFFER_SIZE, PROT_WRITE, MAP_FILE | MAP_SHARED, logFd, off); + void* res = mmap(nullptr, BUFFER_SIZE, PROT_WRITE, MAP_FILE | MAP_SHARED, fd, off); // NOLINTNEXTLINE(performance-no-int-to-ptr) if (res == MAP_FAILED) { DIE_MSG_ERRNO("mmap failed"); @@ -30,19 +30,19 @@ LogBuffer::LogBuffer(const std::string& logFile) : logFile(logFile) { bufs[i] = static_cast<char*>(res); } - if (ftruncate(logFd, BUFFER_COUNT * BUFFER_SIZE)) { - DIE_MSG_ERRNO("initial log file " << logFile << "truncation failed"); + if (ftruncate(fd, BUFFER_COUNT * BUFFER_SIZE)) { + DIE_MSG_ERRNO("initial mapped file " << file << "truncation failed"); } } -void LogBuffer::trim() { +void MappedFileBuffer::trim() { const auto finalPos = static_cast<off_t>(bufPos.load(std::memory_order_relaxed)); - if (ftruncate(logFd, finalPos)) { - DIE_MSG_ERRNO("trimming log file " << logFile << " failed"); + if (ftruncate(fd, finalPos)) { + DIE_MSG_ERRNO("trimming mapped file " << file << " failed"); } } -LogBuffer::~LogBuffer() { +MappedFileBuffer::~MappedFileBuffer() { trim(); for (auto* buf : bufs) { @@ -51,10 +51,10 @@ LogBuffer::~LogBuffer() { } } - close(logFd); + close(fd); } -void LogBuffer::log(const std::string& message) { +void MappedFileBuffer::put(const std::string& message) { const size_t messageLen = message.size(); const size_t startPos = bufPos.fetch_add(messageLen, std::memory_order_relaxed); const size_t endPos = startPos + messageLen - 1; @@ -104,16 +104,16 @@ void LogBuffer::log(const std::string& message) { // Therefore at bufPos = BUFFER_SIZE + 1 we are in the second active buffer const size_t nthActive = (endPos / BUFFER_SIZE) + 1; - // Our log file has the size of all ever active buffers plus the new fresh one - const auto logFileSize = static_cast<off_t>((nthActive + 1) * BUFFER_SIZE); + // Our mapped file has the size of all ever active buffers plus the new fresh one + const auto fileSize = static_cast<off_t>((nthActive + 1) * BUFFER_SIZE); - // Grow the log file - if (ftruncate(logFd, logFileSize)) { - DIE_MSG_ERRNO("growing log file " << logFile << " failed"); + // Grow the mapped file + if (ftruncate(fd, fileSize)) { + DIE_MSG_ERRNO("growing mapped file " << file << " failed"); } const auto nextOffset = static_cast<off_t>(nthActive * BUFFER_SIZE); - void* res = mmap(nullptr, BUFFER_SIZE, PROT_WRITE, MAP_FILE | MAP_SHARED, logFd, nextOffset); + void* res = mmap(nullptr, BUFFER_SIZE, PROT_WRITE, MAP_FILE | MAP_SHARED, fd, nextOffset); // NOLINTNEXTLINE(performance-no-int-to-ptr) if (res == MAP_FAILED) { DIE_MSG_ERRNO("mmap of fresh buffer failed"); @@ -125,4 +125,4 @@ void LogBuffer::log(const std::string& message) { } } -} // namespace emper::log +} // namespace emper::lib diff --git a/emper/lib/MappedFileBuffer.hpp b/emper/lib/MappedFileBuffer.hpp new file mode 100644 index 0000000000000000000000000000000000000000..f267fa8de36dc4a0d0d2a7051fdc528a5575d411 --- /dev/null +++ b/emper/lib/MappedFileBuffer.hpp @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021-2022 Florian Fischer +#pragma once + +#include <atomic> +#include <cstddef> +#include <string> + +namespace emper::lib { +class MappedFileBuffer { + // We use a triple buffer scheme, where one buffer is active, one is old and one is fresh + // The old buffer gets remapped by the first accessing the fresh buffer + static const int BUFFER_COUNT = 3; + // Use 1GB buffers to reduce the risk of using an unmapped one + static const size_t BUFFER_SIZE = 1 << 30; + + char* bufs[BUFFER_COUNT]; + + std::atomic<size_t> bufPos = 0; + + std::string file; + int fd; + + auto getBuf(size_t pos) -> char* { return bufs[(pos / BUFFER_SIZE) % BUFFER_COUNT]; } + + void trim(); + + public: + MappedFileBuffer(const std::string& file); + ~MappedFileBuffer(); + + void put(const std::string& message); +}; +} // namespace emper::lib diff --git a/emper/lib/ShardedFileBuffer.cpp b/emper/lib/ShardedFileBuffer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b6d3f741fc7da27a74e0f52a7c2c2672696fd090 --- /dev/null +++ b/emper/lib/ShardedFileBuffer.cpp @@ -0,0 +1,94 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Fischer +#include "ShardedFileBuffer.hpp" + +#include <fcntl.h> +#include <sys/stat.h> +#include <unistd.h> + +#include <atomic> +#include <cassert> + +#include "Common.hpp" +#include "Runtime.hpp" +#include "emper-common.h" +#include "io.hpp" + +namespace emper::lib { + +std::atomic<size_t> ShardedFileBuffer::ids = 0; +thread_local ShardedFileBuffer::WorkerBuffers* ShardedFileBuffer::workerBuffers = nullptr; + +std::mutex ShardedFileBuffer::registerLock; +ShardedFileBuffer::RuntimeBuffers ShardedFileBuffer::buffersPerRuntime; + +ShardedFileBuffer::ShardedFileBuffer(const Runtime& runtime, int fd) : runtime(runtime), fd(fd) { + std::stringstream sst; + sst << "fd: " << fd; + file = sst.str(); + + id = ids.fetch_add(1); +} + +ShardedFileBuffer::ShardedFileBuffer(const Runtime& runtime, const std::string& file) + : runtime(runtime), file(file) { + fd = open(file.c_str(), O_RDWR | O_TRUNC | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (fd == -1) { + DIE_MSG_ERRNO("opening file " << file << " failed"); + } + + id = ids.fetch_add(1); +} + +ShardedFileBuffer::~ShardedFileBuffer() { + flush(); + close(fd); +} + +void ShardedFileBuffer::flush() { + std::stringstream content; + const auto it = buffersPerRuntime.find(&runtime); + assert(it != buffersPerRuntime.end()); + const Shards* shards = it->second; + for (const auto* shard : *shards) { + const auto it = shard->find(id); + if (it != shard->end()) content << it->second.str(); + } + + const std::string& out = content.str(); + if (emper::io::writeFileAndWait(fd, out.c_str(), out.length(), -1) == -1) + DIE_MSG_ERRNO("flushing sharded file buffer to " << file << " failed"); +} + +void ShardedFileBuffer::registerRuntime(Runtime& runtime) { + std::lock_guard<std::mutex> lock(registerLock); + + assert(buffersPerRuntime.find(&runtime) == buffersPerRuntime.end()); + + auto* shards = new Shards(runtime.getWorkerCount()); + buffersPerRuntime.emplace(&runtime, shards); + + runtime.addNewWorkerHook([&runtime](workerid_t workerId) { + workerBuffers = new WorkerBuffers(); + const auto it = buffersPerRuntime.find(&runtime); + assert(it != buffersPerRuntime.end()); + + auto& shards = *(it->second); + shards[workerId] = workerBuffers; + }); +} + +void ShardedFileBuffer::unregisterRuntime(const Runtime& runtime) { + std::lock_guard<std::mutex> lock(registerLock); + + const auto it = buffersPerRuntime.find(&runtime); + assert(it != buffersPerRuntime.end()); + auto* shards = it->second; + for (auto* shard : *shards) { + delete shard; + } + delete shards; + + buffersPerRuntime.erase(it); +} +} // namespace emper::lib diff --git a/emper/lib/ShardedFileBuffer.hpp b/emper/lib/ShardedFileBuffer.hpp new file mode 100644 index 0000000000000000000000000000000000000000..096d80d3551b32350f2d677b758f3fa58e8bb71c --- /dev/null +++ b/emper/lib/ShardedFileBuffer.hpp @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Fischer +#pragma once + +#include <atomic> +#include <cstddef> +#include <map> +#include <mutex> +#include <sstream> +#include <string> +#include <utility> +#include <vector> + +#include "Common.hpp" + +class Runtime; + +namespace emper::lib { +/* + * @brief Runtime local ShardedFileBuffer + */ +class ShardedFileBuffer { + friend class Runtime; + + static std::atomic<size_t> ids; + + /* + * @brief Worker local part of ShardedFileBuffer infrastructure + */ + using WorkerBuffers = std::map<size_t, std::stringstream>; + + static thread_local WorkerBuffers* workerBuffers; + + using Shards = std::vector<WorkerBuffers*>; + + static std::mutex registerLock; + using RuntimeBuffers = std::map<const Runtime*, Shards*>; + static RuntimeBuffers buffersPerRuntime; + + const Runtime& runtime; + size_t id; + + std::string file; + int fd; + + public: + // TODO: why is Runtime not allowed to use those functions if they are declared private? + /* + * @brief register the current runtime to use ShardedFileBuffers + */ + static void registerRuntime(Runtime& runtime); + /* + * @brief Cleanup all runtime specific ShardedFileBuffer data + */ + static void unregisterRuntime(const Runtime& runtime); + + ShardedFileBuffer(const Runtime& runtime, int fd); + ShardedFileBuffer(const Runtime& runtime, const std::string& file); + ~ShardedFileBuffer(); + + auto getStream() -> std::stringstream& { + auto it = workerBuffers->find(id); + if (it == workerBuffers->end()) { + auto ret = workerBuffers->emplace(id, std::stringstream()); + if (!ret.second) DIE_MSG("Failed to emplace new stringstream in WorkerBuffers"); + it = ret.first; + } + + return it->second; + } + + void put(const std::string& message) { getStream() << message; } + + /* + * @brief Collect the buffer shards, join them and writes them to the file + * + * Attention: It is not safe to call flush during buffer modification! + * Flush the buffer only if you are sure it is not concurrently modified. + */ + void flush(); +}; +} // namespace emper::lib diff --git a/emper/lib/meson.build b/emper/lib/meson.build index 3aa7f1952fceb1111e39cb3324c2a389ae6300e1..4ae16ffbfcf10f80126f02c174568d572c97d598 100644 --- a/emper/lib/meson.build +++ b/emper/lib/meson.build @@ -2,6 +2,8 @@ emper_cpp_sources += files( 'DebugUtil.cpp', 'env.cpp', 'LinuxVersion.cpp', + 'MappedFileBuffer.cpp', + 'ShardedFileBuffer.cpp', 'util.cpp', ) diff --git a/emper/log/LogBuffer.hpp b/emper/log/LogBuffer.hpp index f4149ee98b94d22e0b6edbef471944d3112b67bb..ea20674f89d78286ec6771415260ef07d331692f 100644 --- a/emper/log/LogBuffer.hpp +++ b/emper/log/LogBuffer.hpp @@ -1,34 +1,16 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2021 Florian Fischer +// Copyright © 2021-2022 Florian Fischer #pragma once -#include <atomic> -#include <cstddef> #include <string> -namespace emper::log { -class LogBuffer { - // We use a triple buffer scheme, where one buffer is active, one is old and one is fresh - // The old buffer gets remapped by the first accessing the fresh buffer - static const int BUFFER_COUNT = 3; - // Use 1GB buffers to reduce the risk of using an unmapped one - static const size_t BUFFER_SIZE = 1 << 30; - - char* bufs[BUFFER_COUNT]; - - std::atomic<size_t> bufPos = 0; - - std::string logFile; - int logFd; - - auto getBuf(size_t pos) -> char* { return bufs[(pos / BUFFER_SIZE) % BUFFER_COUNT]; } - - void trim(); +#include "lib/MappedFileBuffer.hpp" +namespace emper::log { +class LogBuffer : emper::lib::MappedFileBuffer { public: - LogBuffer(const std::string& logFile); - ~LogBuffer(); + LogBuffer(const std::string& logFile) : MappedFileBuffer(logFile) {} - void log(const std::string& message); + void log(const std::string& message) { put(message); }; }; } // namespace emper::log diff --git a/emper/log/meson.build b/emper/log/meson.build index 6de1e134d89b7135ad80df2700b20a05a368eac7..cef2f999160bf6c140b21d1634f5bf19f6ea3dce 100644 --- a/emper/log/meson.build +++ b/emper/log/meson.build @@ -1,4 +1,3 @@ emper_cpp_sources += files( 'log.cpp', - 'LogBuffer.cpp', )