Skip to content
Snippets Groups Projects
Commit 174c2326 authored by Florian Fischer's avatar Florian Fischer
Browse files

[EchoClient] add network barrier support to synchronize multiple clients

parent 16ed71a6
No related branches found
No related tags found
1 merge request!228Support distributing multiple echoclient over the network
......@@ -36,9 +36,7 @@
#include "lib/math.hpp"
using emper::Semaphore;
using emper::io::CloseFuture;
using emper::io::RecvFuture;
using emper::io::SendFuture;
using namespace emper::io;
using emper::lib::math::RunningAverage;
......@@ -76,6 +74,22 @@ bool sendQuit = true;
std::atomic<bool> terminate = false;
struct addrinfo* server;
struct addrinfo* coordinator;
std::string COORDINATOR_PORT = "12346";
static auto getFirstAddrInfo(const std::string& host, const std::string& port) -> struct addrinfo* {
struct addrinfo* addrinfo;
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &addrinfo);
if (err) {
if (err == EAI_SYSTEM) {
DIE_MSG_ERRNO("getaddrinfo failed");
} else {
LOGE("error in getaddrinfo: " << gai_strerror(err));
exit(EXIT_FAILURE);
}
}
return addrinfo;
}
struct TimeStamps {
high_resolution_clock::time_point start;
......@@ -168,7 +182,7 @@ class Client {
void onCONNRESET() {
LOGW("Client " << id << " reconnecting");
close(sock);
closeAndForget(sock);
// reconnect to try again
connect();
}
......@@ -387,6 +401,7 @@ static void printUsage(char* name) {
<< "Usage: " << name
<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
" [-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]"
" [--coordinator <coordinator-host>"
<< std::endl;
}
......@@ -476,7 +491,7 @@ auto main(int argc, char* argv[]) -> int {
DIE_MSG("Output file: " << output_file << " already exists");
}
out_fd = open(output_file, OPENFLAGS, OPENMODE);
out_fd = ::open(output_file, OPENFLAGS, OPENMODE);
if (out_fd < 0) {
DIE_MSG_ERRNO("opening output file failed");
}
......@@ -497,22 +512,19 @@ auto main(int argc, char* argv[]) -> int {
DIE_MSG("Histogram file: " << histogram_file << " already exists");
}
histogram_fd = open(histogram_file, OPENFLAGS, OPENMODE);
histogram_fd = ::open(histogram_file, OPENFLAGS, OPENMODE);
if (histogram_fd < 0) {
DIE_MSG_ERRNO("opening histogram file failed");
}
}
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
if (err) {
if (err == EAI_SYSTEM) {
DIE_MSG_ERRNO("getaddrinfo failed");
} else {
LOGE("error in getaddrinfo: " << gai_strerror(err));
exit(EXIT_FAILURE);
}
char* coordinator_s = getOption(argc, argv, "--coordinator");
if (coordinator_s) {
coordinator = getFirstAddrInfo(coordinator_s, COORDINATOR_PORT);
}
server = getFirstAddrInfo(host, port);
printIp(server->ai_addr);
printSettings();
......@@ -541,6 +553,29 @@ auto main(int argc, char* argv[]) -> int {
readySemaphore.acquire();
}
// synchronize with other peers using the coordinator
if (coordinator) {
int conn = socket(coordinator->ai_family, coordinator->ai_socktype, coordinator->ai_protocol);
if (conn < 0) DIE_MSG_ERRNO("coordinator socket creation failed");
int err = emper::io::connectAndWait(conn, coordinator->ai_addr, coordinator->ai_addrlen);
if (err) DIE_MSG_ERRNO("coordinator socket connect failed");
std::string m = "OK";
SendFuture sf(conn, m.c_str(), m.size(), 0);
std::array<char, 16> buf;
RecvFuture rf(conn, buf.data(), buf.size(), 0);
rf.setDependency(sf);
err = rf.submitAndWait();
if (err < 0) DIE_MSG("coordinator recv failed: " << strerror(-err));
closeAndForget(conn);
LOGI("Coordinator go received");
}
high_resolution_clock::time_point echoTerminationTime;
if (execution_seconds) {
iterations = SIZE_MAX;
......
......@@ -93,5 +93,5 @@ auto main(int argc, char* argv[]) -> int {
runtime.waitUntilFinished();
return EXIT_FAILURE;
return EXIT_SUCCESS;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment