Skip to content
Snippets Groups Projects
Commit ae81b029 authored by Florian Fischer's avatar Florian Fischer
Browse files

[EchoClient] make linked futures optional

While we haven't figured out why the version using linked futures breaks
we can use a simpler implementation which calls submitAndWait on both futures
separately
parent d87acb25
No related branches found
No related tags found
No related merge requests found
Pipeline #59388 passed
......@@ -9,7 +9,6 @@
#include <unistd.h>
#include <algorithm> // for find
#include <array>
#include <atomic>
#include <cerrno> // for errno, ECANCELED
#include <chrono> // for nanoseconds, duration, durat...
......@@ -57,6 +56,7 @@ size_t execution_seconds = 0;
size_t clients = CLIENTS;
size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;
bool linked_futures;
std::atomic<bool> terminate = false;
......@@ -72,6 +72,60 @@ struct ClientResult* clientResults;
struct addrinfo* server;
static auto clientIteration(SendFuture& sendFuture, RecvFuture& recvFuture) -> nanoseconds {
auto send_start = high_resolution_clock::now();
int32_t bytes_send = sendFuture.submitAndWait();
auto send_end = high_resolution_clock::now();
if (bytes_send < 0) {
errno = -bytes_send;
DIE_MSG_ERRNO("send failed");
}
auto recv_start = high_resolution_clock::now();
int32_t bytes_recv = recvFuture.submitAndWait();
auto recv_end = high_resolution_clock::now();
if (bytes_recv < 0) {
errno = -bytes_recv;
DIE_MSG_ERRNO("recv failed");
}
return duration_cast<nanoseconds>((send_end - send_start) + (recv_end - recv_start));
}
static auto clientIterationLinked(SendFuture& sendFuture, RecvFuture& recvFuture) -> nanoseconds {
recvFuture.setDependency(sendFuture);
auto start = high_resolution_clock::now();
int32_t res = recvFuture.submitAndWait();
auto end = high_resolution_clock::now();
if (res <= 0) {
// recv failed
if (res != -ECANCELED) {
if (res == 0) {
DIE_MSG("server closed the connection");
}
errno = -res;
DIE_MSG_ERRNO("receive failed");
}
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
errno = -send_res;
DIE_MSG_ERRNO("send failed");
}
// send must send less than expected because recvFuture returned -ECANCELED
DIE_MSG("short send" << send_res);
}
return duration_cast<nanoseconds>(end - start);
}
static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& startSem, CPS& cps) {
int client_sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
if (client_sock < 0) {
......@@ -94,9 +148,8 @@ static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& start
DIE_MSG_ERRNO("setsockopt failed");
}
std::array<char*, 2> buf;
buf[0] = new char[size];
buf[1] = new char[size];
auto* outBuf = new char[size];
auto* inBuf = new char[size];
uint64_t avg_ns = 0;
readySem.release();
......@@ -104,43 +157,17 @@ static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& start
size_t i = 0;
while (i < iterations && !terminate.load(std::memory_order_relaxed)) {
char* outBuf = buf[i % 2];
char* inBuf = buf[(i + 1) % 2];
SendFuture sendFuture(client_sock, outBuf, size, MSG_NOSIGNAL);
RecvFuture recvFuture(client_sock, inBuf, size, MSG_WAITALL);
recvFuture.setDependency(sendFuture);
auto start = high_resolution_clock::now();
int32_t res = recvFuture.submitAndWait();
auto end = high_resolution_clock::now();
if (res <= 0) {
// recv failed
if (res != -ECANCELED) {
if (res == 0) {
DIE_MSG("server closed the connection");
}
errno = -res;
DIE_MSG_ERRNO("receive failed");
}
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
errno = -send_res;
DIE_MSG_ERRNO("send failed");
}
DIE_MSG("short send" << send_res);
}
if (memcmp(outBuf, inBuf, size) != 0) {
DIE_MSG("got unexpected echo from server");
nanoseconds duration;
if (linked_futures) {
duration = clientIterationLinked(sendFuture, recvFuture);
} else {
duration = clientIteration(sendFuture, recvFuture);
}
auto duration = duration_cast<nanoseconds>(end - start);
// TODO: replace with emper::math::RunningAverage when !127 is merged
uint64_t ns = duration.count();
if (unlikely(avg_ns == 0)) {
avg_ns = ns;
......@@ -149,6 +176,10 @@ static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& start
avg_ns /= 2;
}
if (memcmp(outBuf, inBuf, size) != 0) {
DIE_MSG("got unexpected echo from server");
}
i++;
}
......@@ -160,8 +191,8 @@ static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& start
// cf.setDependency(shut_f);
cf.submit();
delete[] buf[0];
delete[] buf[1];
delete[] inBuf;
delete[] outBuf;
cf.wait();
......@@ -186,10 +217,11 @@ static auto existsOption(int argc, char** argv, const std::string& option) -> bo
}
static void printUsage(char* name) {
std::cerr << "Usage: " << name
<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
" [-f <output-file>] [-i <iterations> | -t <execution time in sec>]"
<< std::endl;
std::cerr
<< "Usage: " << name
<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
" [-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]"
<< std::endl;
}
auto main(int argc, char* argv[]) -> int {
......@@ -238,6 +270,8 @@ auto main(int argc, char* argv[]) -> int {
server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
}
linked_futures = getOption(argc, argv, "--linked-futures");
int out_fd = STDOUT_FILENO;
char* output_file = getOption(argc, argv, "-f");
......@@ -293,7 +327,7 @@ auto main(int argc, char* argv[]) -> int {
startSemaphore.release();
}
// await the clients
// await client termination
cps.wait();
if (execution_seconds) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment