Skip to content
Snippets Groups Projects

Synchronize LockedUnboundedQueue with pthread_rwlock

Closed Florian Fischer requested to merge aj46ezos/emper:pthread_rwlock into master
Files
4
@@ -2,7 +2,9 @@
@@ -2,7 +2,9 @@
// Copyright © 2020 Florian Schmaus
// Copyright © 2020 Florian Schmaus
#pragma once
#pragma once
#include <mutex>
#include <pthread.h>
 
 
#include <cstring>
#include <queue>
#include <queue>
namespace lib::adt {
namespace lib::adt {
@@ -10,23 +12,72 @@ namespace lib::adt {
@@ -10,23 +12,72 @@ namespace lib::adt {
template <typename I>
template <typename I>
class LockedUnboundedQueue {
class LockedUnboundedQueue {
private:
private:
std::mutex queue_mutex;
pthread_rwlock_t lock;
std::queue<I*> queue;
std::queue<I*> queue;
public:
public:
 
LockedUnboundedQueue() {
 
int err = pthread_rwlock_init(&lock, nullptr);
 
if (unlikely(err)) {
 
DIE_MSG("pthread_rwlock_init failed: " << strerror(err));
 
}
 
}
 
 
~LockedUnboundedQueue() {
 
int err = pthread_rwlock_destroy(&lock);
 
if (unlikely(err)) {
 
DIE_MSG("pthread_rwlock_destroy failed: " << strerror(err));
 
}
 
}
 
void enqueue(I* item) {
void enqueue(I* item) {
std::lock_guard<std::mutex> lock(queue_mutex);
int err = pthread_rwlock_wrlock(&lock);
 
if (unlikely(err)) {
 
DIE_MSG("pthread_rwlock_wrlock failed: " << strerror(err));
 
}
 
queue.push(item);
queue.push(item);
 
pthread_rwlock_unlock(&lock);
}
}
auto dequeue() -> I* {
auto dequeue() -> I* {
std::lock_guard<std::mutex> lock(queue_mutex);
I* res = nullptr;
 
 
int err = pthread_rwlock_rdlock(&lock);
 
if (unlikely(err)) {
 
DIE_MSG("pthread_rwlock_rdlock failed: " << strerror(err));
 
}
 
if (queue.empty()) {
if (queue.empty()) {
return nullptr;
goto unlock_and_return;
}
}
I* res = queue.front();
 
// try to upgrade to wrlock
 
err = pthread_rwlock_trywrlock(&lock);
 
if (err) {
 
if (unlikely(err != EBUSY)) {
 
DIE_MSG("pthread_rwlock_trylock failed: " << strerror(err));
 
}
 
 
// drop the read lock and aquire a write lock
 
pthread_rwlock_unlock(&lock);
 
int err = pthread_rwlock_wrlock(&lock);
 
if (unlikely(err)) {
 
DIE_MSG("pthread_rwlock_wrlock failed: " << strerror(err));
 
}
 
 
if (queue.empty()) {
 
goto unlock_and_return;
 
}
 
}
 
 
// we certainly hold the wrlock here
 
res = queue.front();
queue.pop();
queue.pop();
 
 
unlock_and_return:
 
pthread_rwlock_unlock(&lock);
return res;
return res;
}
}
};
};
Loading