Skip to content
Snippets Groups Projects
Commit 9b949e49 authored by Florian Fischer's avatar Florian Fischer
Browse files

Make LockedUnboundedQueue implementation configurable

Available implementations configurations through the meson option
'locked_unbounded_queue_implementation' are:

mutex - our current LockedUnboundedQueue implementation using std::mutex

rwlock - An implementation with pthread_rwlock. The implementations tries
         to upgrade its rdlock and drops and acquires a wrlock on failure

shared_mutex - An implementation using std::shared_mutex.
         dequeue() acquires a shared lock at first, drops it and
         acquires a unique lock

boost_shared_mutex - An implementation using boost::shared_mutex.
         dequeue() acquires an upgradable lock and upgrade it
         to a unique lock if necessary
parent 2d0b5f6b
No related branches found
No related tags found
No related merge requests found
Pipeline #58726 passed
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <boost/thread/shared_mutex.hpp>
#include <queue>
namespace lib::adt {
template <typename I>
class BoostSharedMutexUnboundedQueue {
private:
boost::shared_mutex queue_mutex;
std::queue<I*> queue;
public:
void enqueue(I* item) {
boost::unique_lock<boost::shared_mutex> lock(queue_mutex);
queue.push(item);
}
template <class InputIt>
void insert(InputIt begin, InputIt end) {
boost::unique_lock<boost::shared_mutex> lock(queue_mutex);
for (; begin != end; ++begin) {
queue.push(*begin);
}
}
auto dequeue() -> I* {
boost::upgrade_lock<boost::shared_mutex> rlock(queue_mutex);
if (queue.empty()) {
return nullptr;
}
boost::upgrade_to_unique_lock<boost::shared_mutex> wlock(rlock);
I* res = queue.front();
queue.pop();
return res;
}
};
} // namespace lib::adt
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <mutex>
#include <queue>
#ifdef EMPER_MUTEX_LOCKED_UNBOUNDED_QUEUE
#include "lib/adt/MutexUnboundedQueue.hpp"
namespace lib::adt {
#elif defined EMPER_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE
#include "lib/adt/SharedMutexUnboundedQueue.hpp"
#elif defined EMPER_RWLOCK_LOCKED_UNBOUNDED_QUEUE
#include "lib/adt/RwLockUnboundedQueue.hpp"
#elif defined EMPER_BOOST_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE
#include "lib/adt/BoostSharedMutexUnboundedQueue.hpp"
#else
#error Unknown LockedUnboundedQueue implementation
#endif
namespace lib::adt {
template <typename I>
class LockedUnboundedQueue {
private:
std::mutex queue_mutex;
std::queue<I*> queue;
public:
void enqueue(I* item) {
std::lock_guard<std::mutex> lock(queue_mutex);
queue.push(item);
}
template <class InputIt>
void insert(InputIt begin, InputIt end) {
std::lock_guard<std::mutex> lock(queue_mutex);
for (; begin != end; ++begin) {
queue.push(*begin);
}
}
auto dequeue() -> I* {
std::lock_guard<std::mutex> lock(queue_mutex);
if (queue.empty()) {
return nullptr;
}
I* res = queue.front();
queue.pop();
return res;
}
};
#ifdef EMPER_MUTEX_LOCKED_UNBOUNDED_QUEUE
using LockedUnboundedQueue = MutexUnboundedQueue<I>;
#elif defined EMPER_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE
using LockedUnboundedQueue = SharedMutexUnboundedQueue<I>;
#elif defined EMPER_RWLOCK_LOCKED_UNBOUNDED_QUEUE
using LockedUnboundedQueue = RwLockUnboundedQueue<I>;
#elif defined EMPER_BOOST_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE
using LockedUnboundedQueue = BoostSharedMutexUnboundedQueue<I>;
#endif
} // namespace lib::adt
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <mutex>
#include <queue>
namespace lib::adt {
template <typename I>
class MutexUnboundedQueue {
private:
std::mutex queue_mutex;
std::queue<I*> queue;
public:
void enqueue(I* item) {
std::lock_guard<std::mutex> lock(queue_mutex);
queue.push(item);
}
template <class InputIt>
void insert(InputIt begin, InputIt end) {
std::lock_guard<std::mutex> lock(queue_mutex);
for (; begin != end; ++begin) {
queue.push(*begin);
}
}
auto dequeue() -> I* {
std::lock_guard<std::mutex> lock(queue_mutex);
if (queue.empty()) {
return nullptr;
}
I* res = queue.front();
queue.pop();
return res;
}
};
} // namespace lib::adt
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <pthread.h>
#include <cstring>
#include <queue>
namespace lib::adt {
static void aquire_wrlock(pthread_rwlock_t& lock) {
int err = pthread_rwlock_wrlock(&lock);
if (unlikely(err)) {
DIE_MSG("pthread_rwlock_wrlock failed: " << strerror(err));
}
}
template <typename I>
class RwLockUnboundedQueue {
private:
pthread_rwlock_t lock;
std::queue<I*> queue;
public:
RwLockUnboundedQueue() {
int err = pthread_rwlock_init(&lock, nullptr);
if (unlikely(err)) {
DIE_MSG("pthread_rwlock_init failed: " << strerror(err));
}
}
~RwLockUnboundedQueue() {
int err = pthread_rwlock_destroy(&lock);
if (unlikely(err)) {
DIE_MSG("pthread_rwlock_destroy failed: " << strerror(err));
}
}
void enqueue(I* item) {
aquire_wrlock(lock);
queue.push(item);
pthread_rwlock_unlock(&lock);
}
template <class InputIt>
void insert(InputIt begin, InputIt end) {
aquire_wrlock(lock);
for (; begin != end; ++begin) {
queue.push(*begin);
}
pthread_rwlock_unlock(&lock);
}
auto dequeue() -> I* {
I* res = nullptr;
int err = pthread_rwlock_rdlock(&lock);
if (unlikely(err)) {
DIE_MSG("pthread_rwlock_rdlock failed: " << strerror(err));
}
if (queue.empty()) {
goto unlock_and_return;
}
// try to upgrade to wrlock
err = pthread_rwlock_trywrlock(&lock);
if (err) {
if (unlikely(err != EBUSY)) {
DIE_MSG("pthread_rwlock_trylock failed: " << strerror(err));
}
// drop the read lock and aquire a write lock
aquire_wrlock(lock);
if (queue.empty()) {
goto unlock_and_return;
}
}
// we certainly hold the wrlock here
res = queue.front();
queue.pop();
unlock_and_return:
pthread_rwlock_unlock(&lock);
return res;
}
};
} // namespace lib::adt
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <queue>
#include <shared_mutex>
namespace lib::adt {
template <typename I>
class SharedMutexUnboundedQueue {
private:
std::shared_mutex queue_mutex;
std::queue<I*> queue;
public:
void enqueue(I* item) {
std::unique_lock lock(queue_mutex);
queue.push(item);
}
template <class InputIt>
void insert(InputIt begin, InputIt end) {
std::unique_lock lock(queue_mutex);
for (; begin != end; ++begin) {
queue.push(*begin);
}
}
auto dequeue() -> I* {
{
std::shared_lock lock(queue_mutex);
if (queue.empty()) {
return nullptr;
}
}
std::unique_lock lock(queue_mutex);
if (queue.empty()) {
return nullptr;
}
I* res = queue.front();
queue.pop();
return res;
}
};
} // namespace lib::adt
......@@ -43,6 +43,13 @@ conf_data.set('EMPER_BLOCKED_CONTEXT_SET', get_option('blocked_context_set'))
semaphore_impl = get_option('wakeup_semaphore_implementation')
conf_data.set('EMPER_' + semaphore_impl.to_upper() + '_WAKEUP_SEMAPHORE', true)
locked_unbounded_queue_impl = get_option('locked_unbounded_queue_implementation')
if locked_unbounded_queue_impl == 'boost_shared_mutex'
boost_thread_dep = dependency('boost', modules : ['thread'])
emper_dependencies += [boost_thread_dep]
endif
conf_data.set('EMPER_' + locked_unbounded_queue_impl.to_upper() + '_LOCKED_UNBOUNDED_QUEUE', true)
default_scheduling_strategy = get_option('default_scheduling_strategy')
conf_data.set('EMPER_DEFAULT_SCHEDULING_STRATEGY_' + default_scheduling_strategy.to_upper(), true)
......
......@@ -64,6 +64,18 @@ option(
value: false,
description: 'Use the locked variant for the MPSC queue'
)
option(
'locked_unbounded_queue_implementation',
type: 'combo',
choices: [
'mutex',
'rwlock',
'shared_mutex',
'boost_shared_mutex',
],
value: 'mutex',
description: 'LockedUnboundedQueue implementation',
)
option(
'stats',
type: 'boolean',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment