Commit 05235b7c authored by Florian Fischer's avatar Florian Fischer
Browse files

clients/io_uring: replace cpp latch with pthread_barrier_t

parent 932d61ae
Pipeline #72814 failed with stage
in 1 minute and 15 seconds
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
// Copyright © 2021 Florian Fischer // Copyright © 2021 Florian Fischer
#include <liburing.h> #include <liburing.h>
#include <netdb.h> #include <netdb.h>
#include <pthread.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/types.h> #include <sys/types.h>
...@@ -13,14 +14,6 @@ ...@@ -13,14 +14,6 @@
#include <thread> #include <thread>
#include <vector> #include <vector>
#include "config.h"
#ifdef CPP_HAS_LATCH_H
#include <latch>
#else
#include <experimental/latch>
#endif
#define DIE_MSG(...) \ #define DIE_MSG(...) \
do { \ do { \
std::cout << __VA_ARGS__ << std::endl; \ std::cout << __VA_ARGS__ << std::endl; \
...@@ -66,7 +59,7 @@ class Client { ...@@ -66,7 +59,7 @@ class Client {
std::atomic<bool> quit = false; std::atomic<bool> quit = false;
std::latch connectBarrier; pthread_barrier_t connectBarrier;
public: public:
Client(const std::string& host, const std::string& port, unsigned connections, unsigned time, Client(const std::string& host, const std::string& port, unsigned connections, unsigned time,
...@@ -125,10 +118,7 @@ class Connection { ...@@ -125,10 +118,7 @@ class Connection {
Client::Client(const std::string& host, const std::string& port, unsigned connections, Client::Client(const std::string& host, const std::string& port, unsigned connections,
unsigned time, unsigned iterations, unsigned _workerCount, unsigned echoSize) unsigned time, unsigned iterations, unsigned _workerCount, unsigned echoSize)
: iterations(iterations), : iterations(iterations), workerCount(std::min(_workerCount, connections)), echoSize(echoSize) {
workerCount(std::min(_workerCount, connections)),
echoSize(echoSize),
connectBarrier(workerCount + 1) {
std::cout << "[global]" << std::endl; std::cout << "[global]" << std::endl;
std::cout << "clients = " << connections << std::endl; std::cout << "clients = " << connections << std::endl;
std::cout << "size = " << echoSize << std::endl; std::cout << "size = " << echoSize << std::endl;
...@@ -136,6 +126,9 @@ Client::Client(const std::string& host, const std::string& port, unsigned connec ...@@ -136,6 +126,9 @@ Client::Client(const std::string& host, const std::string& port, unsigned connec
std::cout << "iterations = " << iterations << std::endl; std::cout << "iterations = " << iterations << std::endl;
server = getFirstAddrInfo(host, port); server = getFirstAddrInfo(host, port);
int res = pthread_barrier_init(&connectBarrier, nullptr, workerCount + 1);
if (res) DIE_MSG_ERRNO("pthread_barrier_init failed");
workers = new Worker*[workerCount]; workers = new Worker*[workerCount];
for (unsigned i = 0; i < workerCount; ++i) { for (unsigned i = 0; i < workerCount; ++i) {
unsigned workerConnections = connections / workerCount; unsigned workerConnections = connections / workerCount;
...@@ -148,7 +141,7 @@ Client::Client(const std::string& host, const std::string& port, unsigned connec ...@@ -148,7 +141,7 @@ Client::Client(const std::string& host, const std::string& port, unsigned connec
workers[i] = new Worker(*this, i, workerConnections); workers[i] = new Worker(*this, i, workerConnections);
} }
connectBarrier.arrive_and_wait(); pthread_barrier_wait(&connectBarrier);
if (time) { if (time) {
std::jthread([this, time] { std::jthread([this, time] {
...@@ -168,6 +161,8 @@ Client::~Client() { ...@@ -168,6 +161,8 @@ Client::~Client() {
} }
std::cout << "total_iterations = " << echos << std::endl; std::cout << "total_iterations = " << echos << std::endl;
delete[] workers; delete[] workers;
pthread_barrier_destroy(&connectBarrier);
} }
Worker::Worker(Client& client, unsigned id, unsigned connectionCount) Worker::Worker(Client& client, unsigned id, unsigned connectionCount)
...@@ -198,7 +193,7 @@ Worker::Worker(Client& client, unsigned id, unsigned connectionCount) ...@@ -198,7 +193,7 @@ Worker::Worker(Client& client, unsigned id, unsigned connectionCount)
connected += count; connected += count;
} }
this->client.connectBarrier.arrive_and_wait(); pthread_barrier_wait(&this->client.connectBarrier);
for (unsigned i = 0; i < this->connectionCount; ++i) this->connections[i]->prepSend(); for (unsigned i = 0; i < this->connectionCount; ++i) this->connections[i]->prepSend();
......
...@@ -19,15 +19,6 @@ if not uring_dep.found() ...@@ -19,15 +19,6 @@ if not uring_dep.found()
endif endif
dependencies = [thread_dep, uring_dep] dependencies = [thread_dep, uring_dep]
conf_data = configuration_data()
cpp_compiler = meson.get_compiler('cpp')
if cpp_compiler.has_header('latch')
conf_data.set('CPP_HAS_LATCH_H', true)
endif
config_header = configure_file(output: 'config.h', configuration: conf_data)
client = executable( client = executable(
'client', 'client',
'client.cpp', 'client.cpp',
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment