Skip to content
Snippets Groups Projects
Commit ee54369f authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'echo_client_dont_measure_shutdown' into 'master'

[EchoClient] don't include shutdown duration in echo duration

See merge request !181
parents 668a57c4 321c9e2d
No related branches found
No related tags found
1 merge request!181[EchoClient] don't include shutdown duration in echo duration
Pipeline #61753 passed
......@@ -69,6 +69,7 @@ size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;
bool linked_futures;
bool histogram = false;
bool sendQuit = true;
std::atomic<bool> terminate = false;
......@@ -105,6 +106,9 @@ class Client {
size_t reconnects = 0;
struct TimeStamps* timeStamps = nullptr;
high_resolution_clock::time_point echoStart;
high_resolution_clock::time_point echoEnd;
high_resolution_clock::time_point echoLoopEnd;
Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps)
: id(id), readySem(readySem), startSem(startSem), cps(cps) {
......@@ -125,79 +129,46 @@ class Client {
static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool;
private:
template <bool collectTimeStamps = false>
auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
if constexpr (collectTimeStamps) {
timeStamps[iteration].start = high_resolution_clock::now();
}
sendFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
int32_t bytes_send = sendFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
}
if (unlikely(bytes_send < 0)) {
return {bytes_send, "send failed"};
}
int32_t bytes_recv = recvFuture.submitAndWait();
[[nodiscard]] auto shouldTerminate() const -> bool {
return iteration >= iterations || terminate.load(std::memory_order_relaxed);
}
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterRecv = high_resolution_clock::now();
void print_unexpected_echo_msg() const {
std::stringstream sst;
sst << "Client " << id << " got unexpected echo from server" << std::endl;
sst << "expected: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]);
}
sst << std::endl;
if (unlikely(bytes_recv < 0)) {
return {bytes_recv, "recv failed"};
sst << "received: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]);
sst << (unsigned)inBuf[i] << " ";
}
return {bytes_recv, ""};
sst << std::endl;
LOGE(sst.str());
}
template <bool collectTimeStamps = false>
auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
recvFuture.setDependency(sendFuture);
recvFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
int32_t res = recvFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = timeStamps[iteration].afterRecv =
high_resolution_clock::now();
void echoError(int err, std::string&& msg) const {
std::stringstream sst;
sst << "Client " << id << " failed at iteration " << iteration << " with " << msg;
if (err != 0) {
sst << ": " << strerror(err);
}
sst << std::endl;
LOGE(sst.str());
if (unlikely(res <= 0)) {
// recv failed
if (res != -ECANCELED) {
if (res == 0) {
return {0, "server closed the connection"};
}
return {res, "recv failed"};
}
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
return {send_res, "send failed"};
}
// send must have sent less than expected because recvFuture returned -ECANCELED
return {0, EMPER_BUILD_STR("short send: " << send_res)};
}
std::this_thread::sleep_for(ONERROR_SLEEP);
DIE;
}
return {res, ""};
void onCONNRESET() {
LOGW("Client " << id << " reconnecting");
close(sock);
// reconnect to try again
connect();
}
void connect() {
......@@ -225,9 +196,15 @@ class Client {
}
}
public:
template <bool collectTimeStamps = false>
void run() {
enum class CollectTimeStamps { yes, no };
enum class LinkedFutures { yes, no };
template <CollectTimeStamps collectTimeStampsSwitch, LinkedFutures linkedFuturesSwitch>
void _run() {
constexpr bool collectTimeStamps = collectTimeStampsSwitch == CollectTimeStamps::yes;
constexpr bool linkedFutures = linkedFuturesSwitch == LinkedFutures::yes;
connect();
// signal that this client is initialized and connected
......@@ -236,77 +213,112 @@ class Client {
// wait for all clients to be ready
startSem.acquire();
while (iteration < iterations && !terminate.load(std::memory_order_relaxed)) {
while (!shouldTerminate()) {
SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL);
RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL);
// prepare output buf
sprintf(outBuf, "%lu:%lu", id, iteration);
high_resolution_clock::time_point start;
if constexpr (!collectTimeStamps) {
start = high_resolution_clock::now();
echoStart = high_resolution_clock::now();
if constexpr (collectTimeStamps) {
timeStamps[iteration].start = echoStart;
}
IterationResult iterationResult;
if (linked_futures) {
iterationResult = linkedEcho<collectTimeStamps>(sendFuture, recvFuture);
if constexpr (!linkedFutures) {
sendFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
if (shouldTerminate()) {
break;
}
int32_t bytes_send = sendFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
}
if (unlikely(bytes_send < 0)) {
int err = -bytes_send;
if (err == ECONNRESET) {
onCONNRESET();
continue;
}
echoError(err, "send failed");
}
} else {
iterationResult = echo<collectTimeStamps>(sendFuture, recvFuture);
recvFuture.setDependency(sendFuture);
}
high_resolution_clock::time_point end;
if constexpr (!collectTimeStamps) {
end = high_resolution_clock::now();
recvFuture.submit();
if (shouldTerminate()) {
break;
}
int32_t bytes_recv = recvFuture.wait();
echoEnd = high_resolution_clock::now();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterRecv = echoEnd;
}
// less than 1 byte was received -> an error occurred
// (connection closed by server is considered an error)
if (unlikely(iterationResult.first <= 0)) {
int err = -iterationResult.first;
auto& msg = iterationResult.second;
if (unlikely(bytes_recv < 0)) {
int err = -bytes_recv;
if (err == ECONNRESET) {
LOGW("Client " << id << " reconnecting");
close(sock);
// reconnect and try again
connect();
onCONNRESET();
continue;
}
LOGE("Client failed at iteration " << iteration << " with " << msg << ": "
<< strerror(err));
std::this_thread::sleep_for(ONERROR_SLEEP);
DIE;
if constexpr (!linkedFutures) {
echoError(err, "recv failed");
} else {
// recv failed
if (err != ECANCELED) {
echoError(err, err < 0 ? "recv failed" : "server closed the connection");
}
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
echoError(-send_res, "send failed");
}
// send must have sent less than expected because recvFuture returned -ECANCELED
echoError(0, EMPER_BUILD_STR("short send: " << send_res));
}
}
if constexpr (!collectTimeStamps) {
nanoseconds duration = duration_cast<nanoseconds>(end - start);
nanoseconds duration = duration_cast<nanoseconds>(echoEnd - echoStart);
avg_ns.update(duration.count());
}
if (memcmp(outBuf, inBuf, size) != 0) {
std::stringstream sst;
sst << "got unexpected echo from server" << std::endl;
sst << "expected: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]);
}
sst << std::endl;
sst << "received: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]);
sst << (unsigned)inBuf[i] << " ";
}
sst << std::endl;
LOGE(sst.str());
print_unexpected_echo_msg();
unexpectedEchos++;
}
iteration++;
}
shutdown(sock, SHUT_RDWR);
echoLoopEnd = high_resolution_clock::now();
}
public:
void shutdown(bool isQuitClient) const {
if (sendQuit && isQuitClient) {
LOGI("Send quit message");
std::string msg = "quit\n";
ssize_t res = emper::io::sendAndWait(sock, msg.c_str(), msg.size(), MSG_NOSIGNAL);
if (res < 0) {
LOGW("Sending quit failed: " << strerror(errno));
}
}
::shutdown(sock, SHUT_RDWR);
// ShutdownFuture shut_f(sock, SHUT_RDWR);
CloseFuture cf(sock);
......@@ -315,6 +327,22 @@ class Client {
cf.wait();
}
void run() {
if (histogram) {
if (linked_futures) {
_run<CollectTimeStamps::yes, LinkedFutures::yes>();
} else {
_run<CollectTimeStamps::yes, LinkedFutures::no>();
}
} else {
if (linked_futures) {
_run<CollectTimeStamps::no, LinkedFutures::yes>();
} else {
_run<CollectTimeStamps::no, LinkedFutures::no>();
}
}
}
};
Client** clients;
......@@ -330,12 +358,7 @@ auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> boo
spawn(
[&, id = next_client_id] {
clients[id] = new Client(id, readySem, startSem, cps, histogram);
if (histogram) {
clients[id]->run<true>();
} else {
clients[id]->run();
}
clients[id]->run();
},
cps);
return true;
......@@ -427,6 +450,8 @@ auto main(int argc, char* argv[]) -> int {
}
}
sendQuit = !existsOption(argc, argv, "--no-quit");
int histogram_fd;
char* histogram_file = getOption(argc, argv, "--histogram");
if (histogram_file) {
......@@ -481,10 +506,12 @@ auto main(int argc, char* argv[]) -> int {
readySemaphore.acquire();
}
high_resolution_clock::time_point echoTerminationTime;
if (execution_seconds) {
iterations = SIZE_MAX;
terminator = std::thread([] {
terminator = std::thread([&] {
std::this_thread::sleep_for(seconds(execution_seconds));
echoTerminationTime = high_resolution_clock::now();
terminate.store(true);
});
}
......@@ -496,16 +523,21 @@ auto main(int argc, char* argv[]) -> int {
startSemaphore.release();
}
// await client termination
// await echo phase termination
cps.wait();
auto echo_end = high_resolution_clock::now();
if (execution_seconds) {
terminator.join();
}
auto echo_end = high_resolution_clock::now();
auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count();
uint64_t echoDurationUntilTermination;
if (execution_seconds) {
echoDurationUntilTermination =
duration_cast<nanoseconds>(echoTerminationTime - echo_start).count();
}
auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();
......@@ -521,6 +553,10 @@ auto main(int argc, char* argv[]) -> int {
if (!histogram) {
avg_ns.update(client->avg_ns.getAverage());
}
// terminate the connection and send 'quit' on the last connection
bool isQuitClient = i == (nclients - 1);
async([=]() { client->shutdown(isQuitClient); });
}
std::stringstream sst;
......@@ -532,19 +568,24 @@ auto main(int argc, char* argv[]) -> int {
sst << "total_unexpected_echos = " << total_unexpected_echos << std::endl;
sst << "avg_ns = " << avg_ns.getAverage() << std::endl;
sst << "connect_duration = " << connect_duration << std::endl;
if (execution_seconds) {
sst << "execution_duration = " << echoDurationUntilTermination << std::endl;
}
sst << "echo_duration = " << echo_duration << std::endl;
sst << "total_duration = " << total_duration << std::endl;
sst << std::endl;
sst << "[clients]" << std::endl;
sst << "csv =" << std::endl;
sst << " iterations,avg_ns,reconnects,unexpected_echos" << std::endl;
sst << " iterations,avg_ns,reconnects,unexpected_echos,echoLoopEnd" << std::endl;
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
sst << " " << client->iteration << ",";
sst << client->avg_ns.getAverage() << ",";
sst << client->reconnects << ",";
sst << client->unexpectedEchos << std::endl;
sst << client->unexpectedEchos << ",";
sst << duration_cast<nanoseconds>(client->echoLoopEnd.time_since_epoch()).count()
<< std::endl;
}
auto output = sst.str();
......@@ -590,17 +631,17 @@ auto main(int argc, char* argv[]) -> int {
emper::io::closeAndForget(histogram_fd);
}
for (size_t i = 0; i < nclients; ++i) {
delete clients[i];
}
delete[] clients;
exit(EXIT_SUCCESS);
Runtime::getRuntime()->initiateTermination();
});
runtime.scheduleFromAnywhere(*alphaFiber);
runtime.waitUntilFinished();
return EXIT_FAILURE;
for (size_t i = 0; i < nclients; ++i) {
delete clients[i];
}
delete[] clients;
return EXIT_SUCCESS;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment