From 2d0b5f6b34941c5ccf629bdd0ed2e770448ecbdc Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fl.fischer@fau.de> Date: Thu, 18 Feb 2021 14:43:46 +0100 Subject: [PATCH 1/2] [meson] better propagate dependencies The emper header LockedUnboundedQueue.hpp could depend on different libraries according to the implementation. To link those dependencies with everything including LockedUnboundedQueue.hpp we propagate all emper_dependencies through emper_dep. And using emper_dep as a dependency seems anyway better than essentially writing down emper_dep manually each time. emper_dep essentially is: (link_with:emper, include_directories: emper_all_include) --- emper/meson.build | 6 +++--- eval/meson.build | 11 +++-------- tests/fixtures/meson.build | 3 +-- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/emper/meson.build b/emper/meson.build index c3c85bb0..935099db 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -54,7 +54,8 @@ emper = library( emper_dep = declare_dependency( include_directories : emper_all_include, - link_with : emper + link_with : emper, + dependencies : emper_dependencies, ) @@ -65,8 +66,7 @@ emper_c_sources = [ emper_c = library( 'emper-c', emper_c_sources, - include_directories: emper_all_include, - link_with: emper, + dependencies: emper_dep, install: true, ) diff --git a/eval/meson.build b/eval/meson.build index e33ab63c..44606558 100644 --- a/eval/meson.build +++ b/eval/meson.build @@ -1,22 +1,17 @@ executable( 'time_to_spawn', 'TimeToSpawn.cpp', - include_directories: emper_all_include, - dependencies: thread_dep, - link_with: emper, + dependencies: emper_dep, ) executable( 'spawn_a_lot', 'SpawnALot.cpp', - include_directories: emper_all_include, - dependencies: thread_dep, - link_with: emper, + dependencies: emper_dep, ) executable( 'locality', 'Locality.cpp', - include_directories: emper_all_include, - link_with: emper, + dependencies: emper_dep, ) diff --git a/tests/fixtures/meson.build b/tests/fixtures/meson.build index 37deaeff..0ba7b33d 100644 --- a/tests/fixtures/meson.build +++ b/tests/fixtures/meson.build @@ -5,8 +5,7 @@ test_fixtures_sources = ['network.cpp'] test_fixtures_lib = library( 'test-fixtures', test_fixtures_sources, - include_directories: emper_all_include, - link_with: emper, + dependencies: emper_dep, ) test_fixtures = declare_dependency( -- GitLab From 9b949e497b002503352d24ee5fe3e700b9a6e40f Mon Sep 17 00:00:00 2001 From: Florian Fischer <florian.fl.fischer@fau.de> Date: Thu, 18 Feb 2021 14:36:50 +0100 Subject: [PATCH 2/2] Make LockedUnboundedQueue implementation configurable Available implementations configurations through the meson option 'locked_unbounded_queue_implementation' are: mutex - our current LockedUnboundedQueue implementation using std::mutex rwlock - An implementation with pthread_rwlock. The implementations tries to upgrade its rdlock and drops and acquires a wrlock on failure shared_mutex - An implementation using std::shared_mutex. dequeue() acquires a shared lock at first, drops it and acquires a unique lock boost_shared_mutex - An implementation using boost::shared_mutex. dequeue() acquires an upgradable lock and upgrade it to a unique lock if necessary --- .../adt/BoostSharedMutexUnboundedQueue.hpp | 43 +++++++++ emper/lib/adt/LockedUnboundedQueue.hpp | 62 ++++++------- emper/lib/adt/MutexUnboundedQueue.hpp | 41 +++++++++ emper/lib/adt/RwLockUnboundedQueue.hpp | 92 +++++++++++++++++++ emper/lib/adt/SharedMutexUnboundedQueue.hpp | 48 ++++++++++ meson.build | 7 ++ meson_options.txt | 12 +++ 7 files changed, 271 insertions(+), 34 deletions(-) create mode 100644 emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp create mode 100644 emper/lib/adt/MutexUnboundedQueue.hpp create mode 100644 emper/lib/adt/RwLockUnboundedQueue.hpp create mode 100644 emper/lib/adt/SharedMutexUnboundedQueue.hpp diff --git a/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp b/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp new file mode 100644 index 00000000..bd8465b4 --- /dev/null +++ b/emper/lib/adt/BoostSharedMutexUnboundedQueue.hpp @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Schmaus, Florian Fischer +#pragma once + +#include <boost/thread/shared_mutex.hpp> +#include <queue> + +namespace lib::adt { + +template <typename I> +class BoostSharedMutexUnboundedQueue { + private: + boost::shared_mutex queue_mutex; + + std::queue<I*> queue; + + public: + void enqueue(I* item) { + boost::unique_lock<boost::shared_mutex> lock(queue_mutex); + queue.push(item); + } + + template <class InputIt> + void insert(InputIt begin, InputIt end) { + boost::unique_lock<boost::shared_mutex> lock(queue_mutex); + for (; begin != end; ++begin) { + queue.push(*begin); + } + } + + auto dequeue() -> I* { + boost::upgrade_lock<boost::shared_mutex> rlock(queue_mutex); + if (queue.empty()) { + return nullptr; + } + + boost::upgrade_to_unique_lock<boost::shared_mutex> wlock(rlock); + I* res = queue.front(); + queue.pop(); + return res; + } +}; +} // namespace lib::adt diff --git a/emper/lib/adt/LockedUnboundedQueue.hpp b/emper/lib/adt/LockedUnboundedQueue.hpp index f6ccda48..ff506f7c 100644 --- a/emper/lib/adt/LockedUnboundedQueue.hpp +++ b/emper/lib/adt/LockedUnboundedQueue.hpp @@ -1,41 +1,35 @@ // SPDX-License-Identifier: LGPL-3.0-or-later -// Copyright © 2020 Florian Schmaus +// Copyright © 2020-2021 Florian Schmaus, Florian Fischer #pragma once -#include <mutex> -#include <queue> +#ifdef EMPER_MUTEX_LOCKED_UNBOUNDED_QUEUE +#include "lib/adt/MutexUnboundedQueue.hpp" -namespace lib::adt { +#elif defined EMPER_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE +#include "lib/adt/SharedMutexUnboundedQueue.hpp" + +#elif defined EMPER_RWLOCK_LOCKED_UNBOUNDED_QUEUE +#include "lib/adt/RwLockUnboundedQueue.hpp" + +#elif defined EMPER_BOOST_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE +#include "lib/adt/BoostSharedMutexUnboundedQueue.hpp" + +#else +#error Unknown LockedUnboundedQueue implementation +#endif +namespace lib::adt { template <typename I> -class LockedUnboundedQueue { - private: - std::mutex queue_mutex; - - std::queue<I*> queue; - - public: - void enqueue(I* item) { - std::lock_guard<std::mutex> lock(queue_mutex); - queue.push(item); - } - - template <class InputIt> - void insert(InputIt begin, InputIt end) { - std::lock_guard<std::mutex> lock(queue_mutex); - for (; begin != end; ++begin) { - queue.push(*begin); - } - } - - auto dequeue() -> I* { - std::lock_guard<std::mutex> lock(queue_mutex); - if (queue.empty()) { - return nullptr; - } - I* res = queue.front(); - queue.pop(); - return res; - } -}; +#ifdef EMPER_MUTEX_LOCKED_UNBOUNDED_QUEUE +using LockedUnboundedQueue = MutexUnboundedQueue<I>; + +#elif defined EMPER_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE +using LockedUnboundedQueue = SharedMutexUnboundedQueue<I>; + +#elif defined EMPER_RWLOCK_LOCKED_UNBOUNDED_QUEUE +using LockedUnboundedQueue = RwLockUnboundedQueue<I>; + +#elif defined EMPER_BOOST_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE +using LockedUnboundedQueue = BoostSharedMutexUnboundedQueue<I>; +#endif } // namespace lib::adt diff --git a/emper/lib/adt/MutexUnboundedQueue.hpp b/emper/lib/adt/MutexUnboundedQueue.hpp new file mode 100644 index 00000000..5aa84ecb --- /dev/null +++ b/emper/lib/adt/MutexUnboundedQueue.hpp @@ -0,0 +1,41 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Schmaus, Florian Fischer +#pragma once + +#include <mutex> +#include <queue> + +namespace lib::adt { + +template <typename I> +class MutexUnboundedQueue { + private: + std::mutex queue_mutex; + + std::queue<I*> queue; + + public: + void enqueue(I* item) { + std::lock_guard<std::mutex> lock(queue_mutex); + queue.push(item); + } + + template <class InputIt> + void insert(InputIt begin, InputIt end) { + std::lock_guard<std::mutex> lock(queue_mutex); + for (; begin != end; ++begin) { + queue.push(*begin); + } + } + + auto dequeue() -> I* { + std::lock_guard<std::mutex> lock(queue_mutex); + if (queue.empty()) { + return nullptr; + } + I* res = queue.front(); + queue.pop(); + return res; + } +}; +} // namespace lib::adt diff --git a/emper/lib/adt/RwLockUnboundedQueue.hpp b/emper/lib/adt/RwLockUnboundedQueue.hpp new file mode 100644 index 00000000..0d60254e --- /dev/null +++ b/emper/lib/adt/RwLockUnboundedQueue.hpp @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Schmaus, Florian Fischer +#pragma once + +#include <pthread.h> + +#include <cstring> +#include <queue> + +namespace lib::adt { + +static void aquire_wrlock(pthread_rwlock_t& lock) { + int err = pthread_rwlock_wrlock(&lock); + if (unlikely(err)) { + DIE_MSG("pthread_rwlock_wrlock failed: " << strerror(err)); + } +} + +template <typename I> +class RwLockUnboundedQueue { + private: + pthread_rwlock_t lock; + + std::queue<I*> queue; + + public: + RwLockUnboundedQueue() { + int err = pthread_rwlock_init(&lock, nullptr); + if (unlikely(err)) { + DIE_MSG("pthread_rwlock_init failed: " << strerror(err)); + } + } + + ~RwLockUnboundedQueue() { + int err = pthread_rwlock_destroy(&lock); + if (unlikely(err)) { + DIE_MSG("pthread_rwlock_destroy failed: " << strerror(err)); + } + } + + void enqueue(I* item) { + aquire_wrlock(lock); + queue.push(item); + pthread_rwlock_unlock(&lock); + } + + template <class InputIt> + void insert(InputIt begin, InputIt end) { + aquire_wrlock(lock); + for (; begin != end; ++begin) { + queue.push(*begin); + } + pthread_rwlock_unlock(&lock); + } + + auto dequeue() -> I* { + I* res = nullptr; + + int err = pthread_rwlock_rdlock(&lock); + if (unlikely(err)) { + DIE_MSG("pthread_rwlock_rdlock failed: " << strerror(err)); + } + + if (queue.empty()) { + goto unlock_and_return; + } + + // try to upgrade to wrlock + err = pthread_rwlock_trywrlock(&lock); + if (err) { + if (unlikely(err != EBUSY)) { + DIE_MSG("pthread_rwlock_trylock failed: " << strerror(err)); + } + + // drop the read lock and aquire a write lock + aquire_wrlock(lock); + + if (queue.empty()) { + goto unlock_and_return; + } + } + + // we certainly hold the wrlock here + res = queue.front(); + queue.pop(); + + unlock_and_return: + pthread_rwlock_unlock(&lock); + return res; + } +}; +} // namespace lib::adt diff --git a/emper/lib/adt/SharedMutexUnboundedQueue.hpp b/emper/lib/adt/SharedMutexUnboundedQueue.hpp new file mode 100644 index 00000000..5a478da6 --- /dev/null +++ b/emper/lib/adt/SharedMutexUnboundedQueue.hpp @@ -0,0 +1,48 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2020-2021 Florian Schmaus, Florian Fischer +#pragma once + +#include <queue> +#include <shared_mutex> + +namespace lib::adt { + +template <typename I> +class SharedMutexUnboundedQueue { + private: + std::shared_mutex queue_mutex; + + std::queue<I*> queue; + + public: + void enqueue(I* item) { + std::unique_lock lock(queue_mutex); + queue.push(item); + } + + template <class InputIt> + void insert(InputIt begin, InputIt end) { + std::unique_lock lock(queue_mutex); + for (; begin != end; ++begin) { + queue.push(*begin); + } + } + + auto dequeue() -> I* { + { + std::shared_lock lock(queue_mutex); + if (queue.empty()) { + return nullptr; + } + } + + std::unique_lock lock(queue_mutex); + if (queue.empty()) { + return nullptr; + } + I* res = queue.front(); + queue.pop(); + return res; + } +}; +} // namespace lib::adt diff --git a/meson.build b/meson.build index d5493868..46065772 100644 --- a/meson.build +++ b/meson.build @@ -43,6 +43,13 @@ conf_data.set('EMPER_BLOCKED_CONTEXT_SET', get_option('blocked_context_set')) semaphore_impl = get_option('wakeup_semaphore_implementation') conf_data.set('EMPER_' + semaphore_impl.to_upper() + '_WAKEUP_SEMAPHORE', true) +locked_unbounded_queue_impl = get_option('locked_unbounded_queue_implementation') +if locked_unbounded_queue_impl == 'boost_shared_mutex' + boost_thread_dep = dependency('boost', modules : ['thread']) + emper_dependencies += [boost_thread_dep] +endif +conf_data.set('EMPER_' + locked_unbounded_queue_impl.to_upper() + '_LOCKED_UNBOUNDED_QUEUE', true) + default_scheduling_strategy = get_option('default_scheduling_strategy') conf_data.set('EMPER_DEFAULT_SCHEDULING_STRATEGY_' + default_scheduling_strategy.to_upper(), true) diff --git a/meson_options.txt b/meson_options.txt index 610cbdbb..2d63c48a 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -64,6 +64,18 @@ option( value: false, description: 'Use the locked variant for the MPSC queue' ) +option( + 'locked_unbounded_queue_implementation', + type: 'combo', + choices: [ + 'mutex', + 'rwlock', + 'shared_mutex', + 'boost_shared_mutex', + ], + value: 'mutex', + description: 'LockedUnboundedQueue implementation', +) option( 'stats', type: 'boolean', -- GitLab