From 2d0b5f6b34941c5ccf629bdd0ed2e770448ecbdc Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fl.fischer@fau.de>
Date: Thu, 18 Feb 2021 14:43:46 +0100
Subject: [PATCH 1/2] [meson] better propagate dependencies

The emper header LockedUnboundedQueue.hpp could depend on different libraries
according to the implementation.

To link those dependencies with everything including LockedUnboundedQueue.hpp
we propagate all emper_dependencies through emper_dep.

And using emper_dep as a dependency seems anyway better than essentially
writing down emper_dep manually each time.

emper_dep essentially is:
(link_with:emper, include_directories: emper_all_include)
---
 emper/meson.build          |  6 +++---
 eval/meson.build           | 11 +++--------
 tests/fixtures/meson.build |  3 +--
 3 files changed, 7 insertions(+), 13 deletions(-)

diff --git a/emper/meson.build b/emper/meson.build
index c3c85bb0..935099db 100644
--- a/emper/meson.build
+++ b/emper/meson.build
@@ -54,7 +54,8 @@ emper = library(
 
 emper_dep = declare_dependency(
   include_directories : emper_all_include,
-  link_with : emper
+  link_with : emper,
+  dependencies : emper_dependencies,
 )
 
 
@@ -65,8 +66,7 @@ emper_c_sources = [
 emper_c = library(
   'emper-c',
   emper_c_sources,
-  include_directories: emper_all_include,
-  link_with: emper,
+  dependencies: emper_dep,
   install: true,
 )
 
diff --git a/eval/meson.build b/eval/meson.build
index e33ab63c..44606558 100644
--- a/eval/meson.build
+++ b/eval/meson.build
@@ -1,22 +1,17 @@
 executable(
   'time_to_spawn',
   'TimeToSpawn.cpp',
-  include_directories: emper_all_include,
-  dependencies: thread_dep,
-  link_with: emper,
+  dependencies: emper_dep,
 )
 
 executable(
   'spawn_a_lot',
   'SpawnALot.cpp',
-  include_directories: emper_all_include,
-  dependencies: thread_dep,
-  link_with: emper,
+  dependencies: emper_dep,
 )
 
 executable(
   'locality',
   'Locality.cpp',
-  include_directories: emper_all_include,
-  link_with: emper,
+  dependencies: emper_dep,
 )
diff --git a/tests/fixtures/meson.build b/tests/fixtures/meson.build
index 37deaeff..0ba7b33d 100644
--- a/tests/fixtures/meson.build
+++ b/tests/fixtures/meson.build
@@ -5,8 +5,7 @@ test_fixtures_sources = ['network.cpp']
 test_fixtures_lib = library(
 	'test-fixtures',
 	test_fixtures_sources,
-	include_directories: emper_all_include,
-	link_with: emper,
+	dependencies: emper_dep,
 )
 
 test_fixtures = declare_dependency(
-- 
GitLab


From 9b949e497b002503352d24ee5fe3e700b9a6e40f Mon Sep 17 00:00:00 2001
From: Florian Fischer <florian.fl.fischer@fau.de>
Date: Thu, 18 Feb 2021 14:36:50 +0100
Subject: [PATCH 2/2] Make LockedUnboundedQueue implementation configurable

Available implementations configurations through the meson option
'locked_unbounded_queue_implementation' are:

mutex - our current LockedUnboundedQueue implementation using std::mutex

rwlock - An implementation with pthread_rwlock. The implementations tries
         to upgrade its rdlock and drops and acquires a wrlock on failure

shared_mutex - An implementation using std::shared_mutex.
         dequeue() acquires a shared lock at first, drops it and
         acquires a unique lock

boost_shared_mutex - An implementation using boost::shared_mutex.
         dequeue() acquires an upgradable lock and upgrade it
         to a unique lock if necessary
---
 .../adt/BoostSharedMutexUnboundedQueue.hpp    | 43 +++++++++
 emper/lib/adt/LockedUnboundedQueue.hpp        | 62 ++++++-------
 emper/lib/adt/MutexUnboundedQueue.hpp         | 41 +++++++++
 emper/lib/adt/RwLockUnboundedQueue.hpp        | 92 +++++++++++++++++++
 emper/lib/adt/SharedMutexUnboundedQueue.hpp   | 48 ++++++++++
 meson.build                                   |  7 ++
 meson_options.txt                             | 12 +++
 7 files changed, 271 insertions(+), 34 deletions(-)
 create mode 100644 emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp
 create mode 100644 emper/lib/adt/MutexUnboundedQueue.hpp
 create mode 100644 emper/lib/adt/RwLockUnboundedQueue.hpp
 create mode 100644 emper/lib/adt/SharedMutexUnboundedQueue.hpp

diff --git a/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp b/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp
new file mode 100644
index 00000000..bd8465b4
--- /dev/null
+++ b/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp
@@ -0,0 +1,43 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
+#pragma once
+
+#include <boost/thread/shared_mutex.hpp>
+#include <queue>
+
+namespace lib::adt {
+
+template <typename I>
+class BoostSharedMutexUnboundedQueue {
+ private:
+	boost::shared_mutex queue_mutex;
+
+	std::queue<I*> queue;
+
+ public:
+	void enqueue(I* item) {
+		boost::unique_lock<boost::shared_mutex> lock(queue_mutex);
+		queue.push(item);
+	}
+
+	template <class InputIt>
+	void insert(InputIt begin, InputIt end) {
+		boost::unique_lock<boost::shared_mutex> lock(queue_mutex);
+		for (; begin != end; ++begin) {
+			queue.push(*begin);
+		}
+	}
+
+	auto dequeue() -> I* {
+		boost::upgrade_lock<boost::shared_mutex> rlock(queue_mutex);
+		if (queue.empty()) {
+			return nullptr;
+		}
+
+		boost::upgrade_to_unique_lock<boost::shared_mutex> wlock(rlock);
+		I* res = queue.front();
+		queue.pop();
+		return res;
+	}
+};
+}	 // namespace lib::adt
diff --git a/emper/lib/adt/LockedUnboundedQueue.hpp b/emper/lib/adt/LockedUnboundedQueue.hpp
index f6ccda48..ff506f7c 100644
--- a/emper/lib/adt/LockedUnboundedQueue.hpp
+++ b/emper/lib/adt/LockedUnboundedQueue.hpp
@@ -1,41 +1,35 @@
 // SPDX-License-Identifier: LGPL-3.0-or-later
-// Copyright © 2020 Florian Schmaus
+// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
 #pragma once
 
-#include <mutex>
-#include <queue>
+#ifdef EMPER_MUTEX_LOCKED_UNBOUNDED_QUEUE
+#include "lib/adt/MutexUnboundedQueue.hpp"
 
-namespace lib::adt {
+#elif defined EMPER_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE
+#include "lib/adt/SharedMutexUnboundedQueue.hpp"
+
+#elif defined EMPER_RWLOCK_LOCKED_UNBOUNDED_QUEUE
+#include "lib/adt/RwLockUnboundedQueue.hpp"
+
+#elif defined EMPER_BOOST_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE
+#include "lib/adt/BoostSharedMutexUnboundedQueue.hpp"
+
+#else
+#error Unknown LockedUnboundedQueue implementation
+#endif
 
+namespace lib::adt {
 template <typename I>
-class LockedUnboundedQueue {
- private:
-	std::mutex queue_mutex;
-
-	std::queue<I*> queue;
-
- public:
-	void enqueue(I* item) {
-		std::lock_guard<std::mutex> lock(queue_mutex);
-		queue.push(item);
-	}
-
-	template <class InputIt>
-	void insert(InputIt begin, InputIt end) {
-		std::lock_guard<std::mutex> lock(queue_mutex);
-		for (; begin != end; ++begin) {
-			queue.push(*begin);
-		}
-	}
-
-	auto dequeue() -> I* {
-		std::lock_guard<std::mutex> lock(queue_mutex);
-		if (queue.empty()) {
-			return nullptr;
-		}
-		I* res = queue.front();
-		queue.pop();
-		return res;
-	}
-};
+#ifdef EMPER_MUTEX_LOCKED_UNBOUNDED_QUEUE
+using LockedUnboundedQueue = MutexUnboundedQueue<I>;
+
+#elif defined EMPER_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE
+using LockedUnboundedQueue = SharedMutexUnboundedQueue<I>;
+
+#elif defined EMPER_RWLOCK_LOCKED_UNBOUNDED_QUEUE
+using LockedUnboundedQueue = RwLockUnboundedQueue<I>;
+
+#elif defined EMPER_BOOST_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE
+using LockedUnboundedQueue = BoostSharedMutexUnboundedQueue<I>;
+#endif
 }	 // namespace lib::adt
diff --git a/emper/lib/adt/MutexUnboundedQueue.hpp b/emper/lib/adt/MutexUnboundedQueue.hpp
new file mode 100644
index 00000000..5aa84ecb
--- /dev/null
+++ b/emper/lib/adt/MutexUnboundedQueue.hpp
@@ -0,0 +1,41 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
+#pragma once
+
+#include <mutex>
+#include <queue>
+
+namespace lib::adt {
+
+template <typename I>
+class MutexUnboundedQueue {
+ private:
+	std::mutex queue_mutex;
+
+	std::queue<I*> queue;
+
+ public:
+	void enqueue(I* item) {
+		std::lock_guard<std::mutex> lock(queue_mutex);
+		queue.push(item);
+	}
+
+	template <class InputIt>
+	void insert(InputIt begin, InputIt end) {
+		std::lock_guard<std::mutex> lock(queue_mutex);
+		for (; begin != end; ++begin) {
+			queue.push(*begin);
+		}
+	}
+
+	auto dequeue() -> I* {
+		std::lock_guard<std::mutex> lock(queue_mutex);
+		if (queue.empty()) {
+			return nullptr;
+		}
+		I* res = queue.front();
+		queue.pop();
+		return res;
+	}
+};
+}	 // namespace lib::adt
diff --git a/emper/lib/adt/RwLockUnboundedQueue.hpp b/emper/lib/adt/RwLockUnboundedQueue.hpp
new file mode 100644
index 00000000..0d60254e
--- /dev/null
+++ b/emper/lib/adt/RwLockUnboundedQueue.hpp
@@ -0,0 +1,92 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
+#pragma once
+
+#include <pthread.h>
+
+#include <cstring>
+#include <queue>
+
+namespace lib::adt {
+
+static void aquire_wrlock(pthread_rwlock_t& lock) {
+	int err = pthread_rwlock_wrlock(&lock);
+	if (unlikely(err)) {
+		DIE_MSG("pthread_rwlock_wrlock failed: " << strerror(err));
+	}
+}
+
+template <typename I>
+class RwLockUnboundedQueue {
+ private:
+	pthread_rwlock_t lock;
+
+	std::queue<I*> queue;
+
+ public:
+	RwLockUnboundedQueue() {
+		int err = pthread_rwlock_init(&lock, nullptr);
+		if (unlikely(err)) {
+			DIE_MSG("pthread_rwlock_init failed: " << strerror(err));
+		}
+	}
+
+	~RwLockUnboundedQueue() {
+		int err = pthread_rwlock_destroy(&lock);
+		if (unlikely(err)) {
+			DIE_MSG("pthread_rwlock_destroy failed: " << strerror(err));
+		}
+	}
+
+	void enqueue(I* item) {
+		aquire_wrlock(lock);
+		queue.push(item);
+		pthread_rwlock_unlock(&lock);
+	}
+
+	template <class InputIt>
+	void insert(InputIt begin, InputIt end) {
+		aquire_wrlock(lock);
+		for (; begin != end; ++begin) {
+			queue.push(*begin);
+		}
+		pthread_rwlock_unlock(&lock);
+	}
+
+	auto dequeue() -> I* {
+		I* res = nullptr;
+
+		int err = pthread_rwlock_rdlock(&lock);
+		if (unlikely(err)) {
+			DIE_MSG("pthread_rwlock_rdlock failed: " << strerror(err));
+		}
+
+		if (queue.empty()) {
+			goto unlock_and_return;
+		}
+
+		// try to upgrade to wrlock
+		err = pthread_rwlock_trywrlock(&lock);
+		if (err) {
+			if (unlikely(err != EBUSY)) {
+				DIE_MSG("pthread_rwlock_trylock failed: " << strerror(err));
+			}
+
+			// drop the read lock and aquire a write lock
+			aquire_wrlock(lock);
+
+			if (queue.empty()) {
+				goto unlock_and_return;
+			}
+		}
+
+		// we certainly hold the wrlock here
+		res = queue.front();
+		queue.pop();
+
+	unlock_and_return:
+		pthread_rwlock_unlock(&lock);
+		return res;
+	}
+};
+}	 // namespace lib::adt
diff --git a/emper/lib/adt/SharedMutexUnboundedQueue.hpp b/emper/lib/adt/SharedMutexUnboundedQueue.hpp
new file mode 100644
index 00000000..5a478da6
--- /dev/null
+++ b/emper/lib/adt/SharedMutexUnboundedQueue.hpp
@@ -0,0 +1,48 @@
+// SPDX-License-Identifier: LGPL-3.0-or-later
+// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
+#pragma once
+
+#include <queue>
+#include <shared_mutex>
+
+namespace lib::adt {
+
+template <typename I>
+class SharedMutexUnboundedQueue {
+ private:
+	std::shared_mutex queue_mutex;
+
+	std::queue<I*> queue;
+
+ public:
+	void enqueue(I* item) {
+		std::unique_lock lock(queue_mutex);
+		queue.push(item);
+	}
+
+	template <class InputIt>
+	void insert(InputIt begin, InputIt end) {
+		std::unique_lock lock(queue_mutex);
+		for (; begin != end; ++begin) {
+			queue.push(*begin);
+		}
+	}
+
+	auto dequeue() -> I* {
+		{
+			std::shared_lock lock(queue_mutex);
+			if (queue.empty()) {
+				return nullptr;
+			}
+		}
+
+		std::unique_lock lock(queue_mutex);
+		if (queue.empty()) {
+			return nullptr;
+		}
+		I* res = queue.front();
+		queue.pop();
+		return res;
+	}
+};
+}	 // namespace lib::adt
diff --git a/meson.build b/meson.build
index d5493868..46065772 100644
--- a/meson.build
+++ b/meson.build
@@ -43,6 +43,13 @@ conf_data.set('EMPER_BLOCKED_CONTEXT_SET', get_option('blocked_context_set'))
 semaphore_impl = get_option('wakeup_semaphore_implementation')
 conf_data.set('EMPER_' + semaphore_impl.to_upper() + '_WAKEUP_SEMAPHORE', true)
 
+locked_unbounded_queue_impl = get_option('locked_unbounded_queue_implementation')
+if locked_unbounded_queue_impl == 'boost_shared_mutex'
+	boost_thread_dep = dependency('boost', modules : ['thread'])
+	emper_dependencies += [boost_thread_dep]
+endif
+conf_data.set('EMPER_' + locked_unbounded_queue_impl.to_upper() + '_LOCKED_UNBOUNDED_QUEUE', true)
+
 default_scheduling_strategy = get_option('default_scheduling_strategy')
 conf_data.set('EMPER_DEFAULT_SCHEDULING_STRATEGY_' + default_scheduling_strategy.to_upper(), true)
 
diff --git a/meson_options.txt b/meson_options.txt
index 610cbdbb..2d63c48a 100644
--- a/meson_options.txt
+++ b/meson_options.txt
@@ -64,6 +64,18 @@ option(
   value: false,
   description: 'Use the locked variant for the MPSC queue'
 )
+option(
+  'locked_unbounded_queue_implementation',
+  type: 'combo',
+  choices: [
+	'mutex',
+	'rwlock',
+	'shared_mutex',
+	'boost_shared_mutex',
+  ],
+  value: 'mutex',
+  description: 'LockedUnboundedQueue implementation',
+)
 option(
   'stats',
   type: 'boolean',
-- 
GitLab