diff --git a/apps/EchoClient.cpp b/apps/EchoClient.cpp index 18bcbf764d79651d0b4561da1a5e2b6f27178233..85a150c5b169cd3b86def38b39114338b7d8036e 100644 --- a/apps/EchoClient.cpp +++ b/apps/EchoClient.cpp @@ -36,9 +36,7 @@ #include "lib/math.hpp" using emper::Semaphore; -using emper::io::CloseFuture; -using emper::io::RecvFuture; -using emper::io::SendFuture; +using namespace emper::io; using emper::lib::math::RunningAverage; @@ -76,6 +74,22 @@ bool sendQuit = true; std::atomic<bool> terminate = false; struct addrinfo* server; +struct addrinfo* coordinator; +std::string COORDINATOR_PORT = "12346"; + +static auto getFirstAddrInfo(const std::string& host, const std::string& port) -> struct addrinfo* { + struct addrinfo* addrinfo; + int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &addrinfo); + if (err) { + if (err == EAI_SYSTEM) { + DIE_MSG_ERRNO("getaddrinfo failed"); + } else { + LOGE("error in getaddrinfo: " << gai_strerror(err)); + exit(EXIT_FAILURE); + } + } + return addrinfo; +} struct TimeStamps { high_resolution_clock::time_point start; @@ -168,7 +182,7 @@ class Client { void onCONNRESET() { LOGW("Client " << id << " reconnecting"); - close(sock); + closeAndForget(sock); // reconnect to try again connect(); } @@ -387,6 +401,7 @@ static void printUsage(char* name) { << "Usage: " << name << "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]" " [-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]" + " [--coordinator <coordinator-host>" << std::endl; } @@ -476,7 +491,7 @@ auto main(int argc, char* argv[]) -> int { DIE_MSG("Output file: " << output_file << " already exists"); } - out_fd = open(output_file, OPENFLAGS, OPENMODE); + out_fd = ::open(output_file, OPENFLAGS, OPENMODE); if (out_fd < 0) { DIE_MSG_ERRNO("opening output file failed"); } @@ -497,22 +512,19 @@ auto main(int argc, char* argv[]) -> int { DIE_MSG("Histogram file: " << histogram_file << " already exists"); } - histogram_fd = open(histogram_file, OPENFLAGS, OPENMODE); + histogram_fd = ::open(histogram_file, OPENFLAGS, OPENMODE); if (histogram_fd < 0) { DIE_MSG_ERRNO("opening histogram file failed"); } } - int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server); - if (err) { - if (err == EAI_SYSTEM) { - DIE_MSG_ERRNO("getaddrinfo failed"); - } else { - LOGE("error in getaddrinfo: " << gai_strerror(err)); - exit(EXIT_FAILURE); - } + char* coordinator_s = getOption(argc, argv, "--coordinator"); + if (coordinator_s) { + coordinator = getFirstAddrInfo(coordinator_s, COORDINATOR_PORT); } + server = getFirstAddrInfo(host, port); + printIp(server->ai_addr); printSettings(); @@ -541,6 +553,29 @@ auto main(int argc, char* argv[]) -> int { readySemaphore.acquire(); } + // synchronize with other peers using the coordinator + if (coordinator) { + int conn = socket(coordinator->ai_family, coordinator->ai_socktype, coordinator->ai_protocol); + if (conn < 0) DIE_MSG_ERRNO("coordinator socket creation failed"); + + int err = emper::io::connectAndWait(conn, coordinator->ai_addr, coordinator->ai_addrlen); + if (err) DIE_MSG_ERRNO("coordinator socket connect failed"); + + std::string m = "OK"; + SendFuture sf(conn, m.c_str(), m.size(), 0); + + std::array<char, 16> buf; + RecvFuture rf(conn, buf.data(), buf.size(), 0); + + rf.setDependency(sf); + err = rf.submitAndWait(); + if (err < 0) DIE_MSG("coordinator recv failed: " << strerror(-err)); + + closeAndForget(conn); + + LOGI("Coordinator go received"); + } + high_resolution_clock::time_point echoTerminationTime; if (execution_seconds) { iterations = SIZE_MAX; diff --git a/apps/EchoServer.cpp b/apps/EchoServer.cpp index 081f7c85e3dd1aa2d22b14b3b9256839839a98ef..b88560b6f699cab977b7ffbc78cbdf57b7d3008b 100644 --- a/apps/EchoServer.cpp +++ b/apps/EchoServer.cpp @@ -93,5 +93,5 @@ auto main(int argc, char* argv[]) -> int { runtime.waitUntilFinished(); - return EXIT_FAILURE; + return EXIT_SUCCESS; }