Commit ffb5e3f4 authored by Florian Fischer's avatar Florian Fischer
Browse files

EchoClient: simplify echo implementation and add timeout

* Always link futures
* Add a timeout if the termination condition is time
parent 8da0974b
Pipeline #81908 passed with stages
in 17 minutes and 6 seconds
......@@ -65,10 +65,10 @@ std::string host = HOST;
std::string port = PORT;
size_t iterations = ITERATIONS;
size_t execution_seconds = 0;
TimeoutWrapper::Timespec* timeout = nullptr;
size_t nclients = CLIENTS;
size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;
bool linked_futures;
bool histogram = false;
bool sendQuit = true;
......@@ -130,6 +130,7 @@ class Client {
RunningAverage<double> avg_ns;
size_t unexpectedEchos = 0;
size_t reconnects = 0;
bool timedOut = false;
struct TimeStamps* timeStamps = nullptr;
high_resolution_clock::time_point echoStart;
......@@ -229,103 +230,97 @@ class Client {
}
}
enum class CollectTimeStamps { yes, no };
void prepeareOutBuf() {
// prepare output buf
sprintf(outBuf, "%lu:%lu", id, iteration);
}
enum class LinkedFutures { yes, no };
enum class CollectTimeStamps { yes, no };
enum class UseTimeout { yes, no };
template <CollectTimeStamps collectTimeStampsSwitch, LinkedFutures linkedFuturesSwitch>
void _run() {
template <CollectTimeStamps collectTimeStampsSwitch>
void storeTimestamp(high_resolution_clock::time_point* mem,
high_resolution_clock::time_point& time) {
constexpr bool collectTimeStamps = collectTimeStampsSwitch == CollectTimeStamps::yes;
constexpr bool linkedFutures = linkedFuturesSwitch == LinkedFutures::yes;
connect();
if constexpr (collectTimeStamps) {
*mem = time;
}
}
// signal that this client is initialized and connected
readySem.release();
template <CollectTimeStamps collectTimeStampsSwitch>
auto echo() -> bool {
SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL);
RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL);
// wait for all clients to be ready
startSem.acquire();
setState(ClientState::unknown);
recvFuture.setDependency(sendFuture);
while (!shouldTerminate()) {
SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL);
RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL);
prepeareOutBuf();
// prepare output buf
sprintf(outBuf, "%lu:%lu", id, iteration);
echoStart = high_resolution_clock::now();
storeTimestamp<collectTimeStampsSwitch>(&timeStamps[iteration].start, echoStart);
echoStart = high_resolution_clock::now();
if constexpr (collectTimeStamps) {
timeStamps[iteration].start = echoStart;
int32_t bytes_recv = 0;
if (timeout) {
TimeoutWrapper t(recvFuture, *timeout);
t.submit();
int32_t res = t.wait();
if (unlikely(res == -ETIME)) {
timedOut = true;
return false;
}
if constexpr (!linkedFutures) {
setState(ClientState::sending);
sendFuture.submit();
bytes_recv = recvFuture.wait();
} else {
recvFuture.submit();
if (shouldTerminate()) return false;
bytes_recv = recvFuture.wait();
}
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
echoEnd = high_resolution_clock::now();
storeTimestamp<collectTimeStampsSwitch>(&timeStamps[iteration].afterRecv, echoEnd);
if (shouldTerminate()) {
break;
}
int32_t bytes_send = sendFuture.wait();
setState(ClientState::receiving);
if (unlikely(bytes_recv < 0)) {
int err = -bytes_recv;
if (err == ECONNRESET) {
onCONNRESET();
return false;
}
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
}
// recv failed
if (err != ECANCELED) {
echoError(err, err < 0 ? "recv failed" : "server closed the connection");
}
if (unlikely(bytes_send < 0)) {
int err = -bytes_send;
if (err == ECONNRESET) {
onCONNRESET();
continue;
}
echoError(err, "send failed");
}
} else {
setState(ClientState::unknown);
recvFuture.setDependency(sendFuture);
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
echoError(-send_res, "send failed");
}
recvFuture.submit();
// send must have sent less than expected because recvFuture returned -ECANCELED
echoError(0, EMPER_BUILD_STR("short send: " << send_res));
}
if (shouldTerminate()) {
break;
}
int32_t bytes_recv = recvFuture.wait();
return true;
}
echoEnd = high_resolution_clock::now();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterRecv = echoEnd;
}
template <CollectTimeStamps collectTimeStampsSwitch>
void _run() {
constexpr bool collectTimeStamps = collectTimeStampsSwitch == CollectTimeStamps::yes;
if (unlikely(bytes_recv < 0)) {
int err = -bytes_recv;
if (err == ECONNRESET) {
onCONNRESET();
continue;
}
connect();
if constexpr (!linkedFutures) {
echoError(err, "recv failed");
} else {
// recv failed
if (err != ECANCELED) {
echoError(err, err < 0 ? "recv failed" : "server closed the connection");
}
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
echoError(-send_res, "send failed");
}
// send must have sent less than expected because recvFuture returned -ECANCELED
echoError(0, EMPER_BUILD_STR("short send: " << send_res));
}
}
// signal that this client is initialized and connected
readySem.release();
// wait for all clients to be ready
startSem.acquire();
while (!shouldTerminate()) {
if (!echo<collectTimeStampsSwitch>()) continue;
if constexpr (!collectTimeStamps) {
nanoseconds duration = duration_cast<nanoseconds>(echoEnd - echoStart);
......@@ -336,7 +331,6 @@ class Client {
print_unexpected_echo_msg();
unexpectedEchos++;
}
iteration++;
}
......@@ -363,19 +357,10 @@ class Client {
}
void run() {
if (histogram) {
if (linked_futures) {
_run<CollectTimeStamps::yes, LinkedFutures::yes>();
} else {
_run<CollectTimeStamps::yes, LinkedFutures::no>();
}
} else {
if (linked_futures) {
_run<CollectTimeStamps::no, LinkedFutures::yes>();
} else {
_run<CollectTimeStamps::no, LinkedFutures::no>();
}
}
if (histogram)
_run<CollectTimeStamps::yes>();
else
_run<CollectTimeStamps::no>();
}
};
......@@ -425,7 +410,6 @@ static void printUsage(char* name) {
<< " [-f <output-file>]\tfile to write the results to" << std::endl
<< " [-i <iterations> | -t <execution time in sec>]\ttermination conditions"
<< std::endl
<< " [--linked-futures]\tuse linked futures" << std::endl
<< " [--coordinator <coordinator-host>]\tcoordinator address to connect to"
<< std::endl;
}
......@@ -494,6 +478,7 @@ auto main(int argc, char* argv[]) -> int {
exit(EXIT_FAILURE);
}
execution_seconds = strtol(time_s, nullptr, DECIMAL);
timeout = new TimeoutWrapper::Timespec{static_cast<int64_t>(execution_seconds), 0};
}
char* size_s = getOption(argc, argv, "-s");
......@@ -506,8 +491,6 @@ auto main(int argc, char* argv[]) -> int {
server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
}
linked_futures = getOption(argc, argv, "--linked-futures");
int out_fd = STDOUT_FILENO;
char* output_file = getOption(argc, argv, "-f");
......@@ -641,12 +624,14 @@ auto main(int argc, char* argv[]) -> int {
uint64_t total_iterations = 0;
uint64_t total_reconnects = 0;
uint64_t total_unexpected_echos = 0;
uint64_t total_timeouts = 0;
RunningAverage avg_ns;
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
total_iterations += client->iteration;
total_reconnects += client->reconnects;
total_unexpected_echos += client->unexpectedEchos;
if (client->timedOut) ++total_timeouts;
if (!histogram) {
avg_ns.update(client->avg_ns.getAverage());
}
......@@ -663,6 +648,7 @@ auto main(int argc, char* argv[]) -> int {
sst << "total_iterations = " << total_iterations << std::endl;
sst << "total_reconnects = " << total_reconnects << std::endl;
sst << "total_unexpected_echos = " << total_unexpected_echos << std::endl;
sst << "total_timeouts = " << total_timeouts << std::endl;
sst << "avg_ns = " << avg_ns.getAverage() << std::endl;
sst << "connect_duration = " << connect_duration << std::endl;
if (execution_seconds) {
......@@ -743,6 +729,7 @@ auto main(int argc, char* argv[]) -> int {
delete clients[i];
}
delete[] clients;
delete timeout;
return EXIT_SUCCESS;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment