Skip to content
Snippets Groups Projects
Commit 354cdbce authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'echo_client_histo' into 'master'

[EchoClient] support latency histograms

See merge request !174
parents 7165f0c9 a71a6127
No related branches found
No related tags found
No related merge requests found
...@@ -56,6 +56,9 @@ const size_t SERVER_BACKLOG = 1024; ...@@ -56,6 +56,9 @@ const size_t SERVER_BACKLOG = 1024;
const seconds ONERROR_SLEEP(10); const seconds ONERROR_SLEEP(10);
const int OPENFLAGS = O_CREAT | O_WRONLY;
const int OPENMODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
// Globals // Globals
std::string host = HOST; std::string host = HOST;
std::string port = PORT; std::string port = PORT;
...@@ -65,11 +68,19 @@ size_t nclients = CLIENTS; ...@@ -65,11 +68,19 @@ size_t nclients = CLIENTS;
size_t size = SIZE; size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG; size_t server_backlog = SERVER_BACKLOG;
bool linked_futures; bool linked_futures;
bool histogram = false;
std::atomic<bool> terminate = false; std::atomic<bool> terminate = false;
struct addrinfo* server; struct addrinfo* server;
struct TimeStamps {
high_resolution_clock::time_point start;
high_resolution_clock::time_point afterSend;
high_resolution_clock::time_point afterSendDispatch;
high_resolution_clock::time_point afterRecv;
};
class Client { class Client {
using IterationResult = std::pair<int, std::string>; using IterationResult = std::pair<int, std::string>;
...@@ -93,17 +104,44 @@ class Client { ...@@ -93,17 +104,44 @@ class Client {
size_t unexpectedEchos = 0; size_t unexpectedEchos = 0;
size_t reconnects = 0; size_t reconnects = 0;
Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps) struct TimeStamps* timeStamps = nullptr;
Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps)
: id(id), readySem(readySem), startSem(startSem), cps(cps) { : id(id), readySem(readySem), startSem(startSem), cps(cps) {
outBuf = new char[size]; outBuf = new char[size];
inBuf = new char[size]; inBuf = new char[size];
if (collectTimeStamps) {
timeStamps = new TimeStamps[iterations];
}
}
~Client() {
delete[] outBuf;
delete[] inBuf;
delete[] timeStamps;
} }
static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool; static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool;
private: private:
static auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult { template <bool collectTimeStamps = false>
int32_t bytes_send = sendFuture.submitAndWait(); auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
if constexpr (collectTimeStamps) {
timeStamps[iteration].start = high_resolution_clock::now();
}
sendFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
int32_t bytes_send = sendFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
}
if (unlikely(bytes_send < 0)) { if (unlikely(bytes_send < 0)) {
return {bytes_send, "send failed"}; return {bytes_send, "send failed"};
...@@ -111,6 +149,10 @@ class Client { ...@@ -111,6 +149,10 @@ class Client {
int32_t bytes_recv = recvFuture.submitAndWait(); int32_t bytes_recv = recvFuture.submitAndWait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterRecv = high_resolution_clock::now();
}
if (unlikely(bytes_recv < 0)) { if (unlikely(bytes_recv < 0)) {
return {bytes_recv, "recv failed"}; return {bytes_recv, "recv failed"};
} }
...@@ -118,10 +160,22 @@ class Client { ...@@ -118,10 +160,22 @@ class Client {
return {bytes_recv, ""}; return {bytes_recv, ""};
} }
static auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult { template <bool collectTimeStamps = false>
auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
recvFuture.setDependency(sendFuture); recvFuture.setDependency(sendFuture);
int32_t res = recvFuture.submitAndWait(); recvFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
int32_t res = recvFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = timeStamps[iteration].afterRecv =
high_resolution_clock::now();
}
if (unlikely(res <= 0)) { if (unlikely(res <= 0)) {
// recv failed // recv failed
...@@ -172,6 +226,7 @@ class Client { ...@@ -172,6 +226,7 @@ class Client {
} }
public: public:
template <bool collectTimeStamps = false>
void run() { void run() {
connect(); connect();
...@@ -188,14 +243,22 @@ class Client { ...@@ -188,14 +243,22 @@ class Client {
// prepare output buf // prepare output buf
sprintf(outBuf, "%lu:%lu", id, iteration); sprintf(outBuf, "%lu:%lu", id, iteration);
high_resolution_clock::time_point start;
if constexpr (!collectTimeStamps) {
start = high_resolution_clock::now();
}
IterationResult iterationResult; IterationResult iterationResult;
auto start = high_resolution_clock::now();
if (linked_futures) { if (linked_futures) {
iterationResult = linkedEcho(sendFuture, recvFuture); iterationResult = linkedEcho<collectTimeStamps>(sendFuture, recvFuture);
} else { } else {
iterationResult = echo(sendFuture, recvFuture); iterationResult = echo<collectTimeStamps>(sendFuture, recvFuture);
}
high_resolution_clock::time_point end;
if constexpr (!collectTimeStamps) {
end = high_resolution_clock::now();
} }
auto end = high_resolution_clock::now();
// less than 1 byte was received -> an error occurred // less than 1 byte was received -> an error occurred
// (connection closed by server is considered an error) // (connection closed by server is considered an error)
...@@ -216,9 +279,10 @@ class Client { ...@@ -216,9 +279,10 @@ class Client {
DIE; DIE;
} }
nanoseconds duration = duration_cast<nanoseconds>(end - start); if constexpr (!collectTimeStamps) {
nanoseconds duration = duration_cast<nanoseconds>(end - start);
avg_ns.update(duration.count()); avg_ns.update(duration.count());
}
if (memcmp(outBuf, inBuf, size) != 0) { if (memcmp(outBuf, inBuf, size) != 0) {
std::stringstream sst; std::stringstream sst;
...@@ -249,9 +313,6 @@ class Client { ...@@ -249,9 +313,6 @@ class Client {
// cf.setDependency(shut_f); // cf.setDependency(shut_f);
cf.submit(); cf.submit();
delete[] inBuf;
delete[] outBuf;
cf.wait(); cf.wait();
} }
}; };
...@@ -268,8 +329,13 @@ auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> boo ...@@ -268,8 +329,13 @@ auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> boo
spawn( spawn(
[&, id = next_client_id] { [&, id = next_client_id] {
clients[id] = new Client(id, readySem, startSem, cps); clients[id] = new Client(id, readySem, startSem, cps, histogram);
clients[id]->run();
if (histogram) {
clients[id]->run<true>();
} else {
clients[id]->run();
}
}, },
cps); cps);
return true; return true;
...@@ -350,6 +416,36 @@ auto main(int argc, char* argv[]) -> int { ...@@ -350,6 +416,36 @@ auto main(int argc, char* argv[]) -> int {
int out_fd = STDOUT_FILENO; int out_fd = STDOUT_FILENO;
char* output_file = getOption(argc, argv, "-f"); char* output_file = getOption(argc, argv, "-f");
if (output_file) {
if (access(output_file, W_OK) == 0) {
DIE_MSG("Output file: " << output_file << " already exists");
}
out_fd = open(output_file, OPENFLAGS, OPENMODE);
if (out_fd < 0) {
DIE_MSG_ERRNO("opening output file failed");
}
}
int histogram_fd;
char* histogram_file = getOption(argc, argv, "--histogram");
if (histogram_file) {
if (!iterations_s) {
DIE_MSG("histograms are currently only possible with fixed iterations");
}
histogram = true;
if (access(histogram_file, W_OK) == 0) {
DIE_MSG("Histogram file: " << histogram_file << " already exists");
}
histogram_fd = open(histogram_file, OPENFLAGS, OPENMODE);
if (histogram_fd < 0) {
DIE_MSG_ERRNO("opening histogram file failed");
}
}
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server); int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
if (err) { if (err) {
if (err == EAI_SYSTEM) { if (err == EAI_SYSTEM) {
...@@ -413,20 +509,6 @@ auto main(int argc, char* argv[]) -> int { ...@@ -413,20 +509,6 @@ auto main(int argc, char* argv[]) -> int {
auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count(); auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count(); auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();
if (output_file) {
if (access(output_file, W_OK) == 0) {
DIE_MSG("Output file: " << output_file << " already exists");
}
int openflags = O_CREAT | O_WRONLY;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
out_fd = emper::io::openAndWait(output_file, openflags, mode);
if (out_fd < 0) {
DIE_MSG_ERRNO("opening output file failed");
}
}
uint64_t total_iterations = 0; uint64_t total_iterations = 0;
uint64_t total_reconnects = 0; uint64_t total_reconnects = 0;
uint64_t total_unexpected_echos = 0; uint64_t total_unexpected_echos = 0;
...@@ -436,7 +518,9 @@ auto main(int argc, char* argv[]) -> int { ...@@ -436,7 +518,9 @@ auto main(int argc, char* argv[]) -> int {
total_iterations += client->iteration; total_iterations += client->iteration;
total_reconnects += client->reconnects; total_reconnects += client->reconnects;
total_unexpected_echos += client->unexpectedEchos; total_unexpected_echos += client->unexpectedEchos;
avg_ns.update(client->avg_ns.getAverage()); if (!histogram) {
avg_ns.update(client->avg_ns.getAverage());
}
} }
std::stringstream sst; std::stringstream sst;
...@@ -472,6 +556,40 @@ auto main(int argc, char* argv[]) -> int { ...@@ -472,6 +556,40 @@ auto main(int argc, char* argv[]) -> int {
emper::io::closeAndForget(out_fd); emper::io::closeAndForget(out_fd);
} }
if (histogram) {
std::string histogramDataHeader =
"total_latency,after_send_latency,after_send_dispatch_latency\n";
if (emper::io::writeFileAndWait(histogram_fd, histogramDataHeader.c_str(),
histogramDataHeader.size()) < 0) {
DIE_MSG_ERRNO("writing histogram data header failed");
}
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
std::stringstream sst;
for (size_t iteration = 0; iteration < iterations; ++iteration) {
auto timeStamps = client->timeStamps[iteration];
nanoseconds totalLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.start);
nanoseconds afterSendLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSend);
nanoseconds afterSendDispatchLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSendDispatch);
sst << totalLatency.count() << ",";
sst << afterSendLatency.count() << ",";
sst << afterSendDispatchLatency.count() << std::endl;
}
auto histoOutput = sst.str();
if (emper::io::writeFileAndWait(histogram_fd, histoOutput.c_str(), histoOutput.size()) <
0) {
DIE_MSG_ERRNO("writing histogram data failed");
}
}
emper::io::closeAndForget(histogram_fd);
}
for (size_t i = 0; i < nclients; ++i) { for (size_t i = 0; i < nclients; ++i) {
delete clients[i]; delete clients[i];
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment