Skip to content
Snippets Groups Projects
Commit a1bb4f7a authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'unbounded_queue_implementation' into 'master'

Make LockedUnboundedQueue implementation configurable

See merge request !113
parents b22579c0 9b949e49
No related branches found
No related tags found
1 merge request!113Make LockedUnboundedQueue implementation configurable
Pipeline #58734 passed
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <boost/thread/shared_mutex.hpp>
#include <queue>
namespace lib::adt {
template <typename I>
class BoostSharedMutexUnboundedQueue {
private:
boost::shared_mutex queue_mutex;
std::queue<I*> queue;
public:
void enqueue(I* item) {
boost::unique_lock<boost::shared_mutex> lock(queue_mutex);
queue.push(item);
}
template <class InputIt>
void insert(InputIt begin, InputIt end) {
boost::unique_lock<boost::shared_mutex> lock(queue_mutex);
for (; begin != end; ++begin) {
queue.push(*begin);
}
}
auto dequeue() -> I* {
boost::upgrade_lock<boost::shared_mutex> rlock(queue_mutex);
if (queue.empty()) {
return nullptr;
}
boost::upgrade_to_unique_lock<boost::shared_mutex> wlock(rlock);
I* res = queue.front();
queue.pop();
return res;
}
};
} // namespace lib::adt
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020 Florian Schmaus
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <mutex>
#include <queue>
#ifdef EMPER_MUTEX_LOCKED_UNBOUNDED_QUEUE
#include "lib/adt/MutexUnboundedQueue.hpp"
namespace lib::adt {
#elif defined EMPER_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE
#include "lib/adt/SharedMutexUnboundedQueue.hpp"
#elif defined EMPER_RWLOCK_LOCKED_UNBOUNDED_QUEUE
#include "lib/adt/RwLockUnboundedQueue.hpp"
#elif defined EMPER_BOOST_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE
#include "lib/adt/BoostSharedMutexUnboundedQueue.hpp"
#else
#error Unknown LockedUnboundedQueue implementation
#endif
namespace lib::adt {
template <typename I>
class LockedUnboundedQueue {
private:
std::mutex queue_mutex;
std::queue<I*> queue;
public:
void enqueue(I* item) {
std::lock_guard<std::mutex> lock(queue_mutex);
queue.push(item);
}
template <class InputIt>
void insert(InputIt begin, InputIt end) {
std::lock_guard<std::mutex> lock(queue_mutex);
for (; begin != end; ++begin) {
queue.push(*begin);
}
}
auto dequeue() -> I* {
std::lock_guard<std::mutex> lock(queue_mutex);
if (queue.empty()) {
return nullptr;
}
I* res = queue.front();
queue.pop();
return res;
}
};
#ifdef EMPER_MUTEX_LOCKED_UNBOUNDED_QUEUE
using LockedUnboundedQueue = MutexUnboundedQueue<I>;
#elif defined EMPER_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE
using LockedUnboundedQueue = SharedMutexUnboundedQueue<I>;
#elif defined EMPER_RWLOCK_LOCKED_UNBOUNDED_QUEUE
using LockedUnboundedQueue = RwLockUnboundedQueue<I>;
#elif defined EMPER_BOOST_SHARED_MUTEX_LOCKED_UNBOUNDED_QUEUE
using LockedUnboundedQueue = BoostSharedMutexUnboundedQueue<I>;
#endif
} // namespace lib::adt
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <mutex>
#include <queue>
namespace lib::adt {
template <typename I>
class MutexUnboundedQueue {
private:
std::mutex queue_mutex;
std::queue<I*> queue;
public:
void enqueue(I* item) {
std::lock_guard<std::mutex> lock(queue_mutex);
queue.push(item);
}
template <class InputIt>
void insert(InputIt begin, InputIt end) {
std::lock_guard<std::mutex> lock(queue_mutex);
for (; begin != end; ++begin) {
queue.push(*begin);
}
}
auto dequeue() -> I* {
std::lock_guard<std::mutex> lock(queue_mutex);
if (queue.empty()) {
return nullptr;
}
I* res = queue.front();
queue.pop();
return res;
}
};
} // namespace lib::adt
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <pthread.h>
#include <cstring>
#include <queue>
namespace lib::adt {
static void aquire_wrlock(pthread_rwlock_t& lock) {
int err = pthread_rwlock_wrlock(&lock);
if (unlikely(err)) {
DIE_MSG("pthread_rwlock_wrlock failed: " << strerror(err));
}
}
template <typename I>
class RwLockUnboundedQueue {
private:
pthread_rwlock_t lock;
std::queue<I*> queue;
public:
RwLockUnboundedQueue() {
int err = pthread_rwlock_init(&lock, nullptr);
if (unlikely(err)) {
DIE_MSG("pthread_rwlock_init failed: " << strerror(err));
}
}
~RwLockUnboundedQueue() {
int err = pthread_rwlock_destroy(&lock);
if (unlikely(err)) {
DIE_MSG("pthread_rwlock_destroy failed: " << strerror(err));
}
}
void enqueue(I* item) {
aquire_wrlock(lock);
queue.push(item);
pthread_rwlock_unlock(&lock);
}
template <class InputIt>
void insert(InputIt begin, InputIt end) {
aquire_wrlock(lock);
for (; begin != end; ++begin) {
queue.push(*begin);
}
pthread_rwlock_unlock(&lock);
}
auto dequeue() -> I* {
I* res = nullptr;
int err = pthread_rwlock_rdlock(&lock);
if (unlikely(err)) {
DIE_MSG("pthread_rwlock_rdlock failed: " << strerror(err));
}
if (queue.empty()) {
goto unlock_and_return;
}
// try to upgrade to wrlock
err = pthread_rwlock_trywrlock(&lock);
if (err) {
if (unlikely(err != EBUSY)) {
DIE_MSG("pthread_rwlock_trylock failed: " << strerror(err));
}
// drop the read lock and aquire a write lock
aquire_wrlock(lock);
if (queue.empty()) {
goto unlock_and_return;
}
}
// we certainly hold the wrlock here
res = queue.front();
queue.pop();
unlock_and_return:
pthread_rwlock_unlock(&lock);
return res;
}
};
} // namespace lib::adt
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2020-2021 Florian Schmaus, Florian Fischer
#pragma once
#include <queue>
#include <shared_mutex>
namespace lib::adt {
template <typename I>
class SharedMutexUnboundedQueue {
private:
std::shared_mutex queue_mutex;
std::queue<I*> queue;
public:
void enqueue(I* item) {
std::unique_lock lock(queue_mutex);
queue.push(item);
}
template <class InputIt>
void insert(InputIt begin, InputIt end) {
std::unique_lock lock(queue_mutex);
for (; begin != end; ++begin) {
queue.push(*begin);
}
}
auto dequeue() -> I* {
{
std::shared_lock lock(queue_mutex);
if (queue.empty()) {
return nullptr;
}
}
std::unique_lock lock(queue_mutex);
if (queue.empty()) {
return nullptr;
}
I* res = queue.front();
queue.pop();
return res;
}
};
} // namespace lib::adt
......@@ -54,7 +54,8 @@ emper = library(
emper_dep = declare_dependency(
include_directories : emper_all_include,
link_with : emper
link_with : emper,
dependencies : emper_dependencies,
)
......@@ -65,8 +66,7 @@ emper_c_sources = [
emper_c = library(
'emper-c',
emper_c_sources,
include_directories: emper_all_include,
link_with: emper,
dependencies: emper_dep,
install: true,
)
......
executable(
'time_to_spawn',
'TimeToSpawn.cpp',
include_directories: emper_all_include,
dependencies: thread_dep,
link_with: emper,
dependencies: emper_dep,
)
executable(
'spawn_a_lot',
'SpawnALot.cpp',
include_directories: emper_all_include,
dependencies: thread_dep,
link_with: emper,
dependencies: emper_dep,
)
executable(
'locality',
'Locality.cpp',
include_directories: emper_all_include,
link_with: emper,
dependencies: emper_dep,
)
......@@ -43,6 +43,13 @@ conf_data.set('EMPER_BLOCKED_CONTEXT_SET', get_option('blocked_context_set'))
semaphore_impl = get_option('wakeup_semaphore_implementation')
conf_data.set('EMPER_' + semaphore_impl.to_upper() + '_WAKEUP_SEMAPHORE', true)
locked_unbounded_queue_impl = get_option('locked_unbounded_queue_implementation')
if locked_unbounded_queue_impl == 'boost_shared_mutex'
boost_thread_dep = dependency('boost', modules : ['thread'])
emper_dependencies += [boost_thread_dep]
endif
conf_data.set('EMPER_' + locked_unbounded_queue_impl.to_upper() + '_LOCKED_UNBOUNDED_QUEUE', true)
default_scheduling_strategy = get_option('default_scheduling_strategy')
conf_data.set('EMPER_DEFAULT_SCHEDULING_STRATEGY_' + default_scheduling_strategy.to_upper(), true)
......
......@@ -64,6 +64,18 @@ option(
value: false,
description: 'Use the locked variant for the MPSC queue'
)
option(
'locked_unbounded_queue_implementation',
type: 'combo',
choices: [
'mutex',
'rwlock',
'shared_mutex',
'boost_shared_mutex',
],
value: 'mutex',
description: 'LockedUnboundedQueue implementation',
)
option(
'stats',
type: 'boolean',
......
......@@ -5,8 +5,7 @@ test_fixtures_sources = ['network.cpp']
test_fixtures_lib = library(
'test-fixtures',
test_fixtures_sources,
include_directories: emper_all_include,
link_with: emper,
dependencies: emper_dep,
)
test_fixtures = declare_dependency(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment