From 2531492e67afd59dca2a1738273bcf3171f0fd21 Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fischer@muhq.space>
Date: Tue, 22 Feb 2022 11:18:19 +0100
Subject: [PATCH] implement a sharded file buffer

Each worker thread has it own stringstream per ShardedFileBuffer.
All streams are joined and flushed to the file when a ShardedFileBuffer
is destructed.

Flushing a ShardedFileBuffer is not thread-safe and must not be called
concurrently to any modifications to the ShardedFileBuffer.

Use a ShardedFileBuffer to accumulate and print all paths during fsearch.
---
 apps/fsearch/fsearch.cpp        | 18 ++++---
 emper/Runtime.cpp               |  5 ++
 emper/Runtime.hpp               |  4 ++
 emper/lib/ShardedFileBuffer.cpp | 94 +++++++++++++++++++++++++++++++++
 emper/lib/ShardedFileBuffer.hpp | 82 ++++++++++++++++++++++++++++
 emper/lib/meson.build           |  1 +
 6 files changed, 196 insertions(+), 8 deletions(-)
 create mode 100644 emper/lib/ShardedFileBuffer.cpp
 create mode 100644 emper/lib/ShardedFileBuffer.hpp

diff --git a/apps/fsearch/fsearch.cpp b/apps/fsearch/fsearch.cpp
index 2093ca6d..59a22170 100644
--- a/apps/fsearch/fsearch.cpp
+++ b/apps/fsearch/fsearch.cpp
@@ -5,7 +5,6 @@
 
 #include <array>
 #include <boost/program_options.hpp>
-#include <climits>
 #include <cstdlib>
 #include <cstring>
 #include <exception>
@@ -20,16 +19,19 @@
 #include "Semaphore.hpp"
 #include "StealingMode.hpp"
 #include "io.hpp"
+#include "lib/ShardedFileBuffer.hpp"
 
 namespace fs = std::filesystem;
 namespace po = boost::program_options;
 
 #define EMPER_RIPGREP_BUFSIZE 4096
 
-const char* needle;
-size_t needle_len;
+static const char* needle;
+static size_t needle_len;
 
-emper::Semaphore* max_running;
+static emper::Semaphore* max_running;
+
+static emper::lib::ShardedFileBuffer* outBuf;
 
 static enum emper::StealingMode stealingMode;
 
@@ -48,10 +50,7 @@ void search(const std::string& path) {
 	ssize_t bytes_read = emper::io::readFileAndWait(fd, buf.data(), buf.size(), 0);
 	while (bytes_read > 0) {
 		if (memmem(&buf[0], bytes_read, needle, needle_len)) {
-			std::array<char, PATH_MAX + 1> outBuf;
-			memcpy(outBuf.data(), path.c_str(), path.length());
-			outBuf[path.length()] = '\n';
-			emper::io::writeFileAndWait(STDOUT_FILENO, outBuf.data(), path.length() + 1, -1);
+			outBuf->getStream() << path << std::endl;
 			goto out;
 		}
 
@@ -71,6 +70,8 @@ out:
 }
 
 void walk_dir() {
+	outBuf = new emper::lib::ShardedFileBuffer(*Runtime::getRuntime(), STDOUT_FILENO);
+
 	// Only search in regular files
 	auto filter = [](const fs::directory_entry& dirent) { return dirent.is_regular_file(); };
 	// Search the file
@@ -85,6 +86,7 @@ void walk_dir() {
 			break;
 	}
 
+	delete outBuf;
 	Runtime::getRuntime()->initiateTermination();
 }
 
diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp
index f6f2596b..a897ff11 100644
--- a/emper/Runtime.cpp
+++ b/emper/Runtime.cpp
@@ -36,6 +36,7 @@
 #include "io/IoContext.hpp"
 #include "io/Stats.hpp"
 #include "lib/DebugUtil.hpp"
+#include "lib/ShardedFileBuffer.hpp"
 #include "lib/env.hpp"
 #include "log/LogBuffer.hpp"
 #include "stats/FromAnywhere.hpp"
@@ -105,6 +106,8 @@ Runtime::Runtime(workerid_t workerCount, const std::vector<NewWorkerHook>& newWo
 		currentRuntime = this;
 	}
 
+	emper::lib::ShardedFileBuffer::registerRuntime(*this);
+
 	const char* envValueLogFile = std::getenv("EMPER_LOG_FILE");
 	if (envValueLogFile) {
 		logBuffer = new emper::log::LogBuffer(envValueLogFile);
@@ -261,6 +264,8 @@ Runtime::~Runtime() {
 		initiateAndWaitUntilTermination();
 	}
 
+	emper::lib::ShardedFileBuffer::unregisterRuntime(*this);
+
 	// Print the stats after all worker threads have terminated and before we delete
 	// objects bound to the runtime's lifetime
 	if constexpr (emper::STATS) {
diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp
index 5daa0e97..40681e9d 100644
--- a/emper/Runtime.hpp
+++ b/emper/Runtime.hpp
@@ -45,6 +45,9 @@ namespace io {
 class GlobalIoContext;
 class IoContext;
 }	 // namespace io
+namespace lib {
+class ShardedFileBuffer;
+}
 namespace log {
 class LogBuffer;
 }
@@ -266,6 +269,7 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
 	friend class WorkerLocalData;
 	friend class Fibril;
 	friend class emper::sleep_strategy::PipeSleepStrategy;
+	friend class emper::lib::ShardedFileBuffer;
 
 	friend void emper::log::log(const std::string& prefix, const std::string& message);
 };
diff --git a/emper/lib/ShardedFileBuffer.cpp b/emper/lib/ShardedFileBuffer.cpp
new file mode 100644
index 00000000..b6d3f741
--- /dev/null
+++ b/emper/lib/ShardedFileBuffer.cpp
@@ -0,0 +1,94 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2022 Florian Fischer
+#include "ShardedFileBuffer.hpp"
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include <atomic>
+#include <cassert>
+
+#include "Common.hpp"
+#include "Runtime.hpp"
+#include "emper-common.h"
+#include "io.hpp"
+
+namespace emper::lib {
+
+std::atomic<size_t> ShardedFileBuffer::ids = 0;
+thread_local ShardedFileBuffer::WorkerBuffers* ShardedFileBuffer::workerBuffers = nullptr;
+
+std::mutex ShardedFileBuffer::registerLock;
+ShardedFileBuffer::RuntimeBuffers ShardedFileBuffer::buffersPerRuntime;
+
+ShardedFileBuffer::ShardedFileBuffer(const Runtime& runtime, int fd) : runtime(runtime), fd(fd) {
+	std::stringstream sst;
+	sst << "fd: " << fd;
+	file = sst.str();
+
+	id = ids.fetch_add(1);
+}
+
+ShardedFileBuffer::ShardedFileBuffer(const Runtime& runtime, const std::string& file)
+		: runtime(runtime), file(file) {
+	fd = open(file.c_str(), O_RDWR | O_TRUNC | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+	if (fd == -1) {
+		DIE_MSG_ERRNO("opening file " << file << " failed");
+	}
+
+	id = ids.fetch_add(1);
+}
+
+ShardedFileBuffer::~ShardedFileBuffer() {
+	flush();
+	close(fd);
+}
+
+void ShardedFileBuffer::flush() {
+	std::stringstream content;
+	const auto it = buffersPerRuntime.find(&runtime);
+	assert(it != buffersPerRuntime.end());
+	const Shards* shards = it->second;
+	for (const auto* shard : *shards) {
+		const auto it = shard->find(id);
+		if (it != shard->end()) content << it->second.str();
+	}
+
+	const std::string& out = content.str();
+	if (emper::io::writeFileAndWait(fd, out.c_str(), out.length(), -1) == -1)
+		DIE_MSG_ERRNO("flushing sharded file buffer to " << file << " failed");
+}
+
+void ShardedFileBuffer::registerRuntime(Runtime& runtime) {
+	std::lock_guard<std::mutex> lock(registerLock);
+
+	assert(buffersPerRuntime.find(&runtime) == buffersPerRuntime.end());
+
+	auto* shards = new Shards(runtime.getWorkerCount());
+	buffersPerRuntime.emplace(&runtime, shards);
+
+	runtime.addNewWorkerHook([&runtime](workerid_t workerId) {
+		workerBuffers = new WorkerBuffers();
+		const auto it = buffersPerRuntime.find(&runtime);
+		assert(it != buffersPerRuntime.end());
+
+		auto& shards = *(it->second);
+		shards[workerId] = workerBuffers;
+	});
+}
+
+void ShardedFileBuffer::unregisterRuntime(const Runtime& runtime) {
+	std::lock_guard<std::mutex> lock(registerLock);
+
+	const auto it = buffersPerRuntime.find(&runtime);
+	assert(it != buffersPerRuntime.end());
+	auto* shards = it->second;
+	for (auto* shard : *shards) {
+		delete shard;
+	}
+	delete shards;
+
+	buffersPerRuntime.erase(it);
+}
+}	 // namespace emper::lib
diff --git a/emper/lib/ShardedFileBuffer.hpp b/emper/lib/ShardedFileBuffer.hpp
new file mode 100644
index 00000000..096d80d3
--- /dev/null
+++ b/emper/lib/ShardedFileBuffer.hpp
@@ -0,0 +1,82 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2022 Florian Fischer
+#pragma once
+
+#include <atomic>
+#include <cstddef>
+#include <map>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "Common.hpp"
+
+class Runtime;
+
+namespace emper::lib {
+/*
+ * @brief Runtime local ShardedFileBuffer
+ */
+class ShardedFileBuffer {
+	friend class Runtime;
+
+	static std::atomic<size_t> ids;
+
+	/*
+	 * @brief Worker local part of ShardedFileBuffer infrastructure
+	 */
+	using WorkerBuffers = std::map<size_t, std::stringstream>;
+
+	static thread_local WorkerBuffers* workerBuffers;
+
+	using Shards = std::vector<WorkerBuffers*>;
+
+	static std::mutex registerLock;
+	using RuntimeBuffers = std::map<const Runtime*, Shards*>;
+	static RuntimeBuffers buffersPerRuntime;
+
+	const Runtime& runtime;
+	size_t id;
+
+	std::string file;
+	int fd;
+
+ public:
+	// TODO: why is Runtime not allowed to use those functions if they are declared private?
+	/*
+	 * @brief register the current runtime to use ShardedFileBuffers
+	 */
+	static void registerRuntime(Runtime& runtime);
+	/*
+	 * @brief Cleanup all runtime specific ShardedFileBuffer data
+	 */
+	static void unregisterRuntime(const Runtime& runtime);
+
+	ShardedFileBuffer(const Runtime& runtime, int fd);
+	ShardedFileBuffer(const Runtime& runtime, const std::string& file);
+	~ShardedFileBuffer();
+
+	auto getStream() -> std::stringstream& {
+		auto it = workerBuffers->find(id);
+		if (it == workerBuffers->end()) {
+			auto ret = workerBuffers->emplace(id, std::stringstream());
+			if (!ret.second) DIE_MSG("Failed to emplace new stringstream in WorkerBuffers");
+			it = ret.first;
+		}
+
+		return it->second;
+	}
+
+	void put(const std::string& message) { getStream() << message; }
+
+	/*
+	 * @brief Collect the buffer shards, join them and writes them to the file
+	 *
+	 * Attention: It is not safe to call flush during buffer modification!
+	 *            Flush the buffer only if you are sure it is not concurrently modified.
+	 */
+	void flush();
+};
+}	 // namespace emper::lib
diff --git a/emper/lib/meson.build b/emper/lib/meson.build
index 16a4c6a5..4ae16ffb 100644
--- a/emper/lib/meson.build
+++ b/emper/lib/meson.build
@@ -3,6 +3,7 @@ emper_cpp_sources += files(
   'env.cpp',
   'LinuxVersion.cpp',
   'MappedFileBuffer.cpp',
+  'ShardedFileBuffer.cpp',
   'util.cpp',
 )
 
-- 
GitLab