From ad10eb3a00493a12045dd251fa75dc32205ae80b Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fischer@muhq.space>
Date: Wed, 25 Aug 2021 14:26:51 +0200
Subject: [PATCH] [Debug] implement logging to a memory-mapped log file

When setting the environment variable EMPER_LOG_FILE=<logfile> EMPER
will write its log messages to <logfile> instead of stderr.
This removes the need for the mutex protecting std::cerr as well as
multiple write calls to flush std:cerr.

To efficiently write log messages to the log file the algorithm uses
three memory 1MiB mapped views into <logfile> to store the log messages.
One buffer is active, one is new, and one is old.
The next buffer ensures that threads can write log messages even if the
active buffer would overflows.
The old buffer allows slower threads to still write safely while everyone
else uses the active buffer.
When a thread changes from the active buffer to the new buffer it
is responsible to renew the current old buffer and changing the semantic
of the buffers:

* active -> old
* next -> active
* old -> next

This buffer scheme allows wait-free logging.
But the approach is NOT sound because delayed thread may still use the
old buffer which gets renewed by the first thread touching the next buffer.
But the likeliness for this situation decreases with bigger sizes of the
buffers.

ATTENTION: Using SCHED_IDLE for the completer may break this likeliness
assumption.

Add new CI test job with mmaped log files.

This contains code cleanups
Suggested-By: Florian Schmaus <flow@cs.fau.de>
---
 .gitlab-ci.yml                   |   5 ++
 emper/Debug.hpp                  |  21 +++---
 emper/Runtime.cpp                |  11 ++-
 emper/Runtime.hpp                |   7 +-
 emper/io/GlobalIoContext.hpp     |   2 +-
 emper/log/LogBuffer.cpp          | 124 +++++++++++++++++++++++++++++++
 emper/log/LogBuffer.hpp          |  31 ++++++++
 emper/{Debug.cpp => log/log.cpp} |  31 ++++----
 emper/log/log.hpp                |  11 +++
 emper/log/meson.build            |   4 +
 emper/meson.build                |   2 +-
 11 files changed, 219 insertions(+), 30 deletions(-)
 create mode 100644 emper/log/LogBuffer.cpp
 create mode 100644 emper/log/LogBuffer.hpp
 rename emper/{Debug.cpp => log/log.cpp} (76%)
 create mode 100644 emper/log/log.hpp
 create mode 100644 emper/log/meson.build

diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index ba27f6da..cc612eb9 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -267,3 +267,8 @@ test-set-affinity-on-block:
   extends:
     - .test
     - .set-affinity-on-block
+
+test-mmapped-log:
+  extends:
+    - .meson-test
+  script: make && EMPER_LOG_FILE=emper.log make test
diff --git a/emper/Debug.hpp b/emper/Debug.hpp
index 507399fc..6520cf6c 100644
--- a/emper/Debug.hpp
+++ b/emper/Debug.hpp
@@ -2,11 +2,14 @@
 // Copyright © 2020-2021 Florian Schmaus, Florian Fischer
 #pragma once
 
+#include <atomic>
+#include <cstddef>
 #include <sstream>	// IWYU pragma: keep
 #include <string>
 #include <string_view>
 
 #include "emper-config.h"
+#include "log/log.hpp"
 
 // If we apply clang-format to the following region, then clang-format
 // will create multi-line macros. However clang-tidy's NOLINTNEXTLINE
@@ -24,7 +27,7 @@
 #define LOG(level, x, log_func) do { if constexpr (level <= EMPER_LOG_LEVEL) { log_func(EMPER_BUILD_STR(x)); } } while (false)
 
 // NOLINTNEXTLINE(bugprone-macro-parentheses)
-#define DBG(x) LOG(Debug, x, emper_log_no_prefix);
+#define DBG(x) LOG(Debug, x, emper::log::log_no_prefix);
 
 // To avoid "error: there are no arguments to ‘logD’ that depend on a
 // template parameter, so a declaration of ‘logD’ must be available"
@@ -35,19 +38,19 @@
 #define LOGDD(x) LOG(FineDebug, x, this->logDD);
 
 // NOLINTNEXTLINE(bugprone-macro-parentheses)
-#define LOGI(x) LOG(Info, "Info: " << x, emper_log_no_prefix);
+#define LOGI(x) LOG(Info, "Info: " << x, emper::log::log_no_prefix);
 // NOLINTNEXTLINE(bugprone-macro-parentheses)
 #define LOGGER_LOGI(x) LOG(Info, "Info: " << x, this->logI);
 
 // NOLINTNEXTLINE(bugprone-macro-parentheses)
-#define LOGW(x) LOG(Warning, "Warning: " << x, emper_log_no_prefix);
+#define LOGW(x) LOG(Warning, "Warning: " << x, emper::log::log_no_prefix);
 // NOLINTNEXTLINE(bugprone-macro-parentheses)
 #define LOGGER_LOGW(x) LOG(Warning, "Warning: " << x, this->logW);
 
 // NOLINTNEXTLINE(bugprone-macro-parentheses)
-#define LOGE(x) LOG(Error, "Error: " << x, emper_log_no_prefix);
+#define LOGE(x) LOG(Error, "Error: " << x, emper::log::log_no_prefix);
 // NOLINTNEXTLINE(bugprone-macro-parentheses)
-#define LOGGER_LOGE(x) LOG(Error, "Error: " << x, emper_log_no_prefix);
+#define LOGGER_LOGE(x) LOG(Error, "Error: " << x, emper::log::log_no_prefix);
 
 // NOLINTNEXTLINE(bugprone-macro-parentheses)
 #define ABORT(x) { LOGE(x); abort(); }
@@ -92,12 +95,6 @@ enum LogLevel {
 	ALL,
 };
 
-void emper_add_timestamp_to(std::ostringstream& logMessage);
-
-void emper_log(const std::string& prefix, const std::string& message);
-
-static inline void emper_log_no_prefix(const std::string& message) { emper_log("", message); }
-
 template <LogSubsystem logSubsystem>
 class Logger {
  private:
@@ -171,7 +168,7 @@ class Logger {
 			sst << " " << this;
 		}
 
-		emper_log(sst.str(), string);
+		emper::log::log(sst.str(), string);
 	}
 
 	inline void logE(const std::string& string) const { log<Error>(string); }
diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp
index 0ca865bd..4ea9687c 100644
--- a/emper/Runtime.cpp
+++ b/emper/Runtime.cpp
@@ -33,6 +33,7 @@
 #include "io/IoContext.hpp"
 #include "io/Stats.hpp"
 #include "lib/DebugUtil.hpp"
+#include "log/LogBuffer.hpp"
 #include "stats/FromAnywhere.hpp"
 #include "stats/Worker.hpp"
 #include "strategies/AbstractWorkStealingScheduler.hpp"
@@ -95,6 +96,11 @@ Runtime::Runtime(workerid_t workerCount, RuntimeStrategyFactory& strategyFactory
 		currentRuntime = this;
 	}
 
+	const char* envValueLogFile = std::getenv("EMPER_LOG_FILE");
+	if (envValueLogFile) {
+		logBuffer = new emper::log::LogBuffer(envValueLogFile);
+	}
+
 	if constexpr (emper::STATS) {
 		workerStats = new WorkerLocalData<emper::stats::Worker>(
 				[workerCount] {
@@ -227,12 +233,15 @@ Runtime::~Runtime() {
 		delete fromAnywhereStats;
 	}
 
+	DBG("Runtime " << this << " terminated");
+
 	// TODO: Move this before the delete invocations?
 	{
 		std::lock_guard<std::mutex> lock(currentRuntimeMutex);
 		currentRuntime = nullptr;
 	}
-	DBG("Runtime " << this << " terminated");
+
+	delete logBuffer;
 }
 
 auto Runtime::workerLoop(Worker* worker) -> void* {
diff --git a/emper/Runtime.hpp b/emper/Runtime.hpp
index efe1cbe8..c9f71733 100644
--- a/emper/Runtime.hpp
+++ b/emper/Runtime.hpp
@@ -44,6 +44,9 @@ namespace io {
 class GlobalIoContext;
 class IoContext;
 }	 // namespace io
+namespace log {
+class LogBuffer;
+}
 namespace stats {
 class Worker;
 class FromAnywhere;
@@ -89,6 +92,8 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
 	WorkerLocalData<emper::stats::Worker>* workerStats;
 	emper::stats::FromAnywhere* fromAnywhereStats;
 
+	emper::log::LogBuffer* logBuffer = nullptr;
+
 	auto workerLoop(Worker* worker) -> void*;
 
 	WorkerSleepStrategy workerSleepStrategy;
@@ -210,5 +215,5 @@ class Runtime : public Logger<LogSubsystem::RUNTI> {
 	template <typename>
 	friend class WorkerLocalData;
 
-	friend void emper_log(const std::string& prefix, const std::string& message);
+	friend void emper::log::log(const std::string& prefix, const std::string& message);
 };
diff --git a/emper/io/GlobalIoContext.hpp b/emper/io/GlobalIoContext.hpp
index 862c944a..611793f2 100644
--- a/emper/io/GlobalIoContext.hpp
+++ b/emper/io/GlobalIoContext.hpp
@@ -15,7 +15,7 @@ class Runtime;
 namespace emper::io {
 class GlobalIoContext : public IoContext {
 	friend Runtime;
-	friend void ::emper_log(const std::string& prefix, const std::string& message);
+	friend void emper::log::log(const std::string& prefix, const std::string& message);
 	friend class Future;
 	friend class SendFuture;
 	friend class RecvFuture;
diff --git a/emper/log/LogBuffer.cpp b/emper/log/LogBuffer.cpp
new file mode 100644
index 00000000..790abb9b
--- /dev/null
+++ b/emper/log/LogBuffer.cpp
@@ -0,0 +1,124 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2020 Florian Schmaus
+#include "LogBuffer.hpp"
+
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include <cstring>
+#include <ostream>
+
+#include "Common.hpp"
+
+namespace emper::log {
+
+LogBuffer::LogBuffer(const std::string& logFile) : logFile(logFile) {
+	logFd = open(logFile.c_str(), O_RDWR | O_TRUNC | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+	if (logFd == -1) {
+		DIE_MSG_ERRNO("opening log file " << logFile << " failed");
+	}
+
+	for (size_t i = 0; i < BUFFER_COUNT; ++i) {
+		const auto off = static_cast<off_t>(i * BUFFER_SIZE);
+		void* res = mmap(nullptr, BUFFER_SIZE, PROT_WRITE, MAP_FILE | MAP_SHARED, logFd, off);
+		// NOLINTNEXTLINE(performance-no-int-to-ptr)
+		if (res == MAP_FAILED) {
+			DIE_MSG_ERRNO("mmap failed");
+		}
+		bufs[i] = static_cast<char*>(res);
+	}
+
+	if (ftruncate(logFd, BUFFER_COUNT * BUFFER_SIZE)) {
+		DIE_MSG_ERRNO("initial log file " << logFile << "truncation failed");
+	}
+}
+
+LogBuffer::~LogBuffer() {
+	auto finalPos = static_cast<off_t>(bufPos.load(std::memory_order_relaxed));
+	if (ftruncate(logFd, finalPos)) {
+		DIE_MSG_ERRNO("trimming log file " << logFile << " failed");
+	}
+
+	for (auto* buf : bufs) {
+		if (munmap(buf, BUFFER_SIZE)) {
+			DIE_MSG_ERRNO("munmap failed");
+		}
+	}
+
+	close(logFd);
+}
+
+void LogBuffer::log(const std::string& message) {
+	const size_t messageLen = message.size();
+	const size_t startPos = bufPos.fetch_add(messageLen, std::memory_order_relaxed);
+	const size_t endPos = startPos + messageLen - 1;
+	char* buf = getBuf(startPos);
+	const size_t bufOffset = startPos % BUFFER_SIZE;
+	char* msgEndBuf = getBuf(endPos);
+
+	// The message fits completly into the current buffer
+	if (buf == msgEndBuf) {
+		// NOLINTNEXTLINE(bugprone-not-null-terminated-result)
+		memcpy(&buf[bufOffset], message.c_str(), message.size());
+
+		// Are we still in the active buffer?
+		if (bufOffset > 0) {
+			return;
+		}
+		// The message spans the active and the next buffer
+	} else {
+		const size_t leftInActive = BUFFER_SIZE - (startPos % BUFFER_SIZE);
+		// NOLINTNEXTLINE(bugprone-not-null-terminated-result)
+		memcpy(&buf[bufOffset], message.c_str(), leftInActive);
+		// NOLINTNEXTLINE(bugprone-not-null-terminated-result)
+		memcpy(&msgEndBuf[0], &message.c_str()[leftInActive], message.size() - leftInActive);
+	}
+
+	// This is the first overflow there is no old buffer to remap
+	if (bufPos < BUFFER_SIZE * 2) {
+		return;
+	}
+
+	// We are the first writing to the next buffer therefore we are responsible
+	// to remap the old.
+
+	// NOTE: This is NOT sound it is possible that other threads are still using the old
+	// buffer. But with big enough BUFFER_SIZE it is likely that all threads are using
+	// the current active buffer
+
+	// Active buffer -> old buffer
+	// Fresh buffer -> active buffer
+	// Remap old buffer -> next fresh buffer
+
+	// Remap the old buffer
+	const size_t oldBufIdx = ((startPos / BUFFER_SIZE) - 1) % BUFFER_COUNT;
+	char* oldBufPtr = bufs[oldBufIdx];
+
+	// The buffer [0, BUFFER_SIZE) is the first active buffer
+	// Therefore at bufPos = BUFFER_SIZE + 1 we are in the second active buffer
+	const size_t nthActive = (endPos / BUFFER_SIZE) + 1;
+
+	// Our log file has the size of all ever active buffers plus the new fresh one
+	const auto logFileSize = static_cast<off_t>((nthActive + 1) * BUFFER_SIZE);
+
+	// Grow the log file
+	if (ftruncate(logFd, logFileSize)) {
+		DIE_MSG_ERRNO("growing log file " << logFile << " failed");
+	}
+
+	const auto nextOffset = static_cast<off_t>(nthActive * BUFFER_SIZE);
+	void* res = mmap(nullptr, BUFFER_SIZE, PROT_WRITE, MAP_FILE | MAP_SHARED, logFd, nextOffset);
+	// NOLINTNEXTLINE(performance-no-int-to-ptr)
+	if (res == MAP_FAILED) {
+		DIE_MSG_ERRNO("mmap of fresh buffer failed");
+	}
+	bufs[oldBufIdx] = static_cast<char*>(res);
+
+	if (munmap(oldBufPtr, BUFFER_SIZE)) {
+		DIE_MSG_ERRNO("munmap of replaced buffer failed");
+	}
+}
+
+}	 // namespace emper::log
diff --git a/emper/log/LogBuffer.hpp b/emper/log/LogBuffer.hpp
new file mode 100644
index 00000000..8518b2c8
--- /dev/null
+++ b/emper/log/LogBuffer.hpp
@@ -0,0 +1,31 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2021 Florian Fischer
+#pragma once
+
+#include <atomic>
+#include <cstddef>
+#include <string>
+
+namespace emper::log {
+class LogBuffer {
+	// We use a triple buffer scheme, where one buffer is active, one is old and one is fresh
+	// The old buffer gets remapped by the first accessing the fresh buffer
+	static const int BUFFER_COUNT = 3;
+	static const size_t BUFFER_SIZE = 1 << 20;
+
+	char* bufs[BUFFER_COUNT];
+
+	std::atomic<size_t> bufPos = 0;
+
+	std::string logFile;
+	int logFd;
+
+	auto getBuf(size_t pos) -> char* { return bufs[(pos / BUFFER_SIZE) % BUFFER_COUNT]; }
+
+ public:
+	LogBuffer(const std::string& logFile);
+	~LogBuffer();
+
+	void log(const std::string& message);
+};
+}	 // namespace emper::log
diff --git a/emper/Debug.cpp b/emper/log/log.cpp
similarity index 76%
rename from emper/Debug.cpp
rename to emper/log/log.cpp
index 953fef6f..97898272 100644
--- a/emper/Debug.cpp
+++ b/emper/log/log.cpp
@@ -1,6 +1,7 @@
 // SPDX-License-Identifier: LGPL-3.0-or-later
 // Copyright © 2020 Florian Schmaus
-#include "Debug.hpp"
+// Copyright © 2021 Florian Fischer
+#include "log.hpp"
 
 #include <pthread.h>
 
@@ -15,14 +16,14 @@
 #include "Runtime.hpp"
 #include "Worker.hpp"
 #include "emper-common.h"	 // for workerid_t
-#include "emper-config.h"	 // for EMPER_LOG_LEVEL
 #include "io/GlobalIoContext.hpp"
+#include "log/LogBuffer.hpp"
 
 using emper::io::GlobalIoContext;
 
-static std::mutex emper_log_mutex;
+namespace emper::log {
 
-void emper_add_timestamp_to(std::ostringstream& logMessage) {
+static void add_timestamp_to(std::ostringstream& logMessage) {
 	auto now = std::chrono::high_resolution_clock::now();
 
 	auto now_time_t = std::chrono::high_resolution_clock::to_time_t(now);
@@ -37,7 +38,10 @@ void emper_add_timestamp_to(std::ostringstream& logMessage) {
 	logMessage << remaining_nanos;
 }
 
-void emper_log(const std::string& prefix, const std::string& message) {
+static std::mutex log_mutex;
+
+void log(const std::string& prefix, const std::string& message) {
+	Runtime* runtime = Runtime::getRuntime();
 	Worker* worker = Worker::getCurrentWorker();
 	std::ostringstream logMessage;
 	if (likely(worker)) {
@@ -46,7 +50,6 @@ void emper_log(const std::string& prefix, const std::string& message) {
 		logMessage << std::setfill('0') << std::setw(3) << workerIdAsString << " ";
 	} else {
 		// Are we the global IO completer
-		Runtime* runtime = Runtime::getRuntime();
 		GlobalIoContext* gio = runtime ? runtime->globalIo : nullptr;
 		if (gio && pthread_self() == gio->completer) {
 			logMessage << "IOC ";
@@ -56,7 +59,7 @@ void emper_log(const std::string& prefix, const std::string& message) {
 	}
 
 	if constexpr (emper::LOG_TIMESTAMP) {
-		emper_add_timestamp_to(logMessage);
+		add_timestamp_to(logMessage);
 	}
 
 	if (!prefix.empty()) {
@@ -67,11 +70,11 @@ void emper_log(const std::string& prefix, const std::string& message) {
 
 	logMessage << message << std::endl;
 
-	std::unique_lock<std::mutex> lock(emper_log_mutex);
-	std::cerr << logMessage.str();
-}
-
-// global log_level which can be changed to control the log output
-namespace emper {
-enum LogLevel log_level = EMPER_LOG_LEVEL;
+	if (likely(runtime) && runtime->logBuffer) {
+		runtime->logBuffer->log(logMessage.str());
+	} else {
+		std::unique_lock<std::mutex> lock(log_mutex);
+		std::cerr << logMessage.str();
+	}
 }
+}	 // namespace emper::log
diff --git a/emper/log/log.hpp b/emper/log/log.hpp
new file mode 100644
index 00000000..9bab9ca2
--- /dev/null
+++ b/emper/log/log.hpp
@@ -0,0 +1,11 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
+#pragma once
+
+#include <string>
+
+namespace emper::log {
+void log(const std::string& prefix, const std::string& message);
+
+static inline void log_no_prefix(const std::string& message) { log("", message); }
+}	 // namespace emper::log
diff --git a/emper/log/meson.build b/emper/log/meson.build
new file mode 100644
index 00000000..6de1e134
--- /dev/null
+++ b/emper/log/meson.build
@@ -0,0 +1,4 @@
+emper_cpp_sources += files(
+  'log.cpp',
+  'LogBuffer.cpp',
+)
diff --git a/emper/meson.build b/emper/meson.build
index 5e827d84..e5b942b9 100644
--- a/emper/meson.build
+++ b/emper/meson.build
@@ -21,7 +21,6 @@ emper_cpp_sources = [
   'Scheduler.cpp',
   'Dispatcher.cpp',
   'Common.cpp',
-  'Debug.cpp',
   'ContextManager.cpp',
   'BinaryPrivateSemaphore.cpp',
   'CountingPrivateSemaphore.cpp',
@@ -39,6 +38,7 @@ emper_library_include += include_directories('.')
 
 subdir('io')
 subdir('lib')
+subdir('log')
 subdir('stats')
 subdir('strategies')
 subdir('sleep_strategy')
-- 
GitLab