diff --git a/apps/fsearch/fsearch.cpp b/apps/fsearch/fsearch.cpp index 2093ca6dd485c0f81ec4da34e5db7d0c259846b6..59a2217082c43e9f818f7c5e4a1b93f6abe613b1 100644 --- a/apps/fsearch/fsearch.cpp +++ b/apps/fsearch/fsearch.cpp @@ -5,7 +5,6 @@ #include <array> #include <boost/program_options.hpp> -#include <climits> #include <cstdlib> #include <cstring> #include <exception> @@ -20,16 +19,19 @@ #include "Semaphore.hpp" #include "StealingMode.hpp" #include "io.hpp" +#include "lib/ShardedFileBuffer.hpp" namespace fs = std::filesystem; namespace po = boost::program_options; #define EMPER_RIPGREP_BUFSIZE 4096 -const char* needle; -size_t needle_len; +static const char* needle; +static size_t needle_len; -emper::Semaphore* max_running; +static emper::Semaphore* max_running; + +static emper::lib::ShardedFileBuffer* outBuf; static enum emper::StealingMode stealingMode; @@ -48,10 +50,7 @@ void search(const std::string& path) { ssize_t bytes_read = emper::io::readFileAndWait(fd, buf.data(), buf.size(), 0); while (bytes_read > 0) { if (memmem(&buf[0], bytes_read, needle, needle_len)) { - std::array<char, PATH_MAX + 1> outBuf; - memcpy(outBuf.data(), path.c_str(), path.length()); - outBuf[path.length()] = '\n'; - emper::io::writeFileAndWait(STDOUT_FILENO, outBuf.data(), path.length() + 1, -1); + outBuf->getStream() << path << std::endl; goto out; } @@ -71,6 +70,8 @@ out: } void walk_dir() { + outBuf = new emper::lib::ShardedFileBuffer(*Runtime::getRuntime(), STDOUT_FILENO); + // Only search in regular files auto filter = [](const fs::directory_entry& dirent) { return dirent.is_regular_file(); }; // Search the file @@ -85,6 +86,7 @@ void walk_dir() { break; } + delete outBuf; Runtime::getRuntime()->initiateTermination(); } diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index f6f2596b3725d9e943b66e42ba889de224e20450..a897ff116a73c2c15dd36b64e146c26649cc9e23 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -36,6 +36,7 @@ #include "io/IoContext.hpp" #include "io/Stats.hpp" #include "lib/DebugUtil.hpp" +#include "lib/ShardedFileBuffer.hpp" #include "lib/env.hpp" #include "log/LogBuffer.hpp" #include "stats/FromAnywhere.hpp" @@ -105,6 +106,8 @@ Runtime::Runtime(workerid_t workerCount, const std::vector<NewWorkerHook>& newWo currentRuntime = this; } + emper::lib::ShardedFileBuffer::registerRuntime(*this); + const char* envValueLogFile = std::getenv("EMPER_LOG_FILE"); if (envValueLogFile) { logBuffer = new emper::log::LogBuffer(envValueLogFile); @@ -261,6 +264,8 @@ Runtime::~Runtime() { initiateAndWaitUntilTermination(); } + emper::lib::ShardedFileBuffer::unregisterRuntime(*this); + // Print the stats after all worker threads have terminated and before we delete // objects bound to the runtime's lifetime if constexpr (emper::STATS) { diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp index 5daa0e973e45ed35e36122cd996ce5fab297721e..40681e9da0abc6eca5f19c6688b4dfc30ab372f0 100644 --- a/emper/Runtime.hpp +++ b/emper/Runtime.hpp @@ -45,6 +45,9 @@ namespace io { class GlobalIoContext; class IoContext; } // namespace io +namespace lib { +class ShardedFileBuffer; +} namespace log { class LogBuffer; } @@ -266,6 +269,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> { friend class WorkerLocalData; friend class Fibril; friend class emper::sleep_strategy::PipeSleepStrategy; + friend class emper::lib::ShardedFileBuffer; friend void emper::log::log(const std::string& prefix, const std::string& message); }; diff --git a/emper/lib/ShardedFileBuffer.cpp b/emper/lib/ShardedFileBuffer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b6d3f741fc7da27a74e0f52a7c2c2672696fd090 --- /dev/null +++ b/emper/lib/ShardedFileBuffer.cpp @@ -0,0 +1,94 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Fischer +#include "ShardedFileBuffer.hpp" + +#include <fcntl.h> +#include <sys/stat.h> +#include <unistd.h> + +#include <atomic> +#include <cassert> + +#include "Common.hpp" +#include "Runtime.hpp" +#include "emper-common.h" +#include "io.hpp" + +namespace emper::lib { + +std::atomic<size_t> ShardedFileBuffer::ids = 0; +thread_local ShardedFileBuffer::WorkerBuffers* ShardedFileBuffer::workerBuffers = nullptr; + +std::mutex ShardedFileBuffer::registerLock; +ShardedFileBuffer::RuntimeBuffers ShardedFileBuffer::buffersPerRuntime; + +ShardedFileBuffer::ShardedFileBuffer(const Runtime& runtime, int fd) : runtime(runtime), fd(fd) { + std::stringstream sst; + sst << "fd: " << fd; + file = sst.str(); + + id = ids.fetch_add(1); +} + +ShardedFileBuffer::ShardedFileBuffer(const Runtime& runtime, const std::string& file) + : runtime(runtime), file(file) { + fd = open(file.c_str(), O_RDWR | O_TRUNC | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (fd == -1) { + DIE_MSG_ERRNO("opening file " << file << " failed"); + } + + id = ids.fetch_add(1); +} + +ShardedFileBuffer::~ShardedFileBuffer() { + flush(); + close(fd); +} + +void ShardedFileBuffer::flush() { + std::stringstream content; + const auto it = buffersPerRuntime.find(&runtime); + assert(it != buffersPerRuntime.end()); + const Shards* shards = it->second; + for (const auto* shard : *shards) { + const auto it = shard->find(id); + if (it != shard->end()) content << it->second.str(); + } + + const std::string& out = content.str(); + if (emper::io::writeFileAndWait(fd, out.c_str(), out.length(), -1) == -1) + DIE_MSG_ERRNO("flushing sharded file buffer to " << file << " failed"); +} + +void ShardedFileBuffer::registerRuntime(Runtime& runtime) { + std::lock_guard<std::mutex> lock(registerLock); + + assert(buffersPerRuntime.find(&runtime) == buffersPerRuntime.end()); + + auto* shards = new Shards(runtime.getWorkerCount()); + buffersPerRuntime.emplace(&runtime, shards); + + runtime.addNewWorkerHook([&runtime](workerid_t workerId) { + workerBuffers = new WorkerBuffers(); + const auto it = buffersPerRuntime.find(&runtime); + assert(it != buffersPerRuntime.end()); + + auto& shards = *(it->second); + shards[workerId] = workerBuffers; + }); +} + +void ShardedFileBuffer::unregisterRuntime(const Runtime& runtime) { + std::lock_guard<std::mutex> lock(registerLock); + + const auto it = buffersPerRuntime.find(&runtime); + assert(it != buffersPerRuntime.end()); + auto* shards = it->second; + for (auto* shard : *shards) { + delete shard; + } + delete shards; + + buffersPerRuntime.erase(it); +} +} // namespace emper::lib diff --git a/emper/lib/ShardedFileBuffer.hpp b/emper/lib/ShardedFileBuffer.hpp new file mode 100644 index 0000000000000000000000000000000000000000..096d80d3551b32350f2d677b758f3fa58e8bb71c --- /dev/null +++ b/emper/lib/ShardedFileBuffer.hpp @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2022 Florian Fischer +#pragma once + +#include <atomic> +#include <cstddef> +#include <map> +#include <mutex> +#include <sstream> +#include <string> +#include <utility> +#include <vector> + +#include "Common.hpp" + +class Runtime; + +namespace emper::lib { +/* + * @brief Runtime local ShardedFileBuffer + */ +class ShardedFileBuffer { + friend class Runtime; + + static std::atomic<size_t> ids; + + /* + * @brief Worker local part of ShardedFileBuffer infrastructure + */ + using WorkerBuffers = std::map<size_t, std::stringstream>; + + static thread_local WorkerBuffers* workerBuffers; + + using Shards = std::vector<WorkerBuffers*>; + + static std::mutex registerLock; + using RuntimeBuffers = std::map<const Runtime*, Shards*>; + static RuntimeBuffers buffersPerRuntime; + + const Runtime& runtime; + size_t id; + + std::string file; + int fd; + + public: + // TODO: why is Runtime not allowed to use those functions if they are declared private? + /* + * @brief register the current runtime to use ShardedFileBuffers + */ + static void registerRuntime(Runtime& runtime); + /* + * @brief Cleanup all runtime specific ShardedFileBuffer data + */ + static void unregisterRuntime(const Runtime& runtime); + + ShardedFileBuffer(const Runtime& runtime, int fd); + ShardedFileBuffer(const Runtime& runtime, const std::string& file); + ~ShardedFileBuffer(); + + auto getStream() -> std::stringstream& { + auto it = workerBuffers->find(id); + if (it == workerBuffers->end()) { + auto ret = workerBuffers->emplace(id, std::stringstream()); + if (!ret.second) DIE_MSG("Failed to emplace new stringstream in WorkerBuffers"); + it = ret.first; + } + + return it->second; + } + + void put(const std::string& message) { getStream() << message; } + + /* + * @brief Collect the buffer shards, join them and writes them to the file + * + * Attention: It is not safe to call flush during buffer modification! + * Flush the buffer only if you are sure it is not concurrently modified. + */ + void flush(); +}; +} // namespace emper::lib diff --git a/emper/lib/meson.build b/emper/lib/meson.build index 16a4c6a512b6f70dbe9558dc5654ec98b7b8fae7..4ae16ffbfcf10f80126f02c174568d572c97d598 100644 --- a/emper/lib/meson.build +++ b/emper/lib/meson.build @@ -3,6 +3,7 @@ emper_cpp_sources += files( 'env.cpp', 'LinuxVersion.cpp', 'MappedFileBuffer.cpp', + 'ShardedFileBuffer.cpp', 'util.cpp', )