Commit 0bb7c145 authored by Florian Fischer's avatar Florian Fischer
Browse files

[EchoServers] improve the callback based echoserver

* Pass the IO results on the stack instead of storing them in the
  client object.
* Terminate the runtime on quit to print stats.
* Free Client objects.
parent 3afd04de
Pipeline #79838 passed with stages
in 14 minutes and 35 seconds
...@@ -27,6 +27,8 @@ static const std::string HOST = "::"; ...@@ -27,6 +27,8 @@ static const std::string HOST = "::";
static const std::string PORT = "12345"; static const std::string PORT = "12345";
static const int BACKLOG = 1024; static const int BACKLOG = 1024;
static const size_t BUF_SIZE = 1024;
static unsigned int computations_us = 0; static unsigned int computations_us = 0;
static unsigned int max_computations_us = 0; static unsigned int max_computations_us = 0;
static float max_computations_probability = -1; static float max_computations_probability = -1;
...@@ -98,7 +100,7 @@ auto main(int argc, char* argv[]) -> int { ...@@ -98,7 +100,7 @@ auto main(int argc, char* argv[]) -> int {
auto serverFunc = [](int socket) { auto serverFunc = [](int socket) {
// NOLINTNEXTLINE(modernize-avoid-c-arrays) // NOLINTNEXTLINE(modernize-avoid-c-arrays)
char buf[1024]; char buf[BUF_SIZE];
while (!quit.load(std::memory_order_consume)) { while (!quit.load(std::memory_order_consume)) {
ssize_t bytes_recv = emper::io::recvAndWait(socket, buf, sizeof(buf), 0); ssize_t bytes_recv = emper::io::recvAndWait(socket, buf, sizeof(buf), 0);
if (unlikely(bytes_recv <= 0)) { if (unlikely(bytes_recv <= 0)) {
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <array> #include <array>
#include <atomic>
#include <cstdint> #include <cstdint>
#include <cstdlib> #include <cstdlib>
#include <cstring> #include <cstring>
...@@ -20,67 +21,70 @@ using RecvFuture = emper::io::RecvFuture; ...@@ -20,67 +21,70 @@ using RecvFuture = emper::io::RecvFuture;
const std::string HOST = "::"; const std::string HOST = "::";
const std::string PORT = "12345"; const std::string PORT = "12345";
static const int BACKLOG = 1024;
const size_t BUF_SIZE = 1024; const size_t BUF_SIZE = 1024;
static std::atomic<bool> quit = false;
class Client { class Client {
public:
int sockfd; int sockfd;
size_t bytes_recv;
size_t bytes_send;
std::array<char, BUF_SIZE> buf; std::array<char, BUF_SIZE> buf;
Client(int socket) : sockfd(socket), bytes_recv(0), bytes_send(0) {} void terminate() {
emper::io::closeAndForget(sockfd);
delete this;
}
void submitSend() { void submitSend(int32_t bytes_recv) {
SendFuture sf(sockfd, &buf[bytes_send], bytes_recv - bytes_send, MSG_NOSIGNAL); SendFuture sf(sockfd, buf.data(), bytes_recv, MSG_NOSIGNAL);
sf.setCallback([this](int32_t bytes_send) { this->onSend(bytes_send); }); sf.setCallback(
[this, bytes_recv](int32_t bytes_send) { this->onSend(bytes_send, bytes_recv); });
sf.submit(); sf.submit();
} }
void onSend(int32_t res) { void onSend(int32_t bytes_send, int32_t bytes_recv) {
if (unlikely(res < 0)) { if (unlikely(bytes_send != bytes_recv)) {
LOGE("server send failed: " << strerror(-res)); LOGE("server send failed: " << strerror(-bytes_send));
emper::io::closeAndForget(sockfd); this->terminate();
return; return;
} }
bytes_send += res; if (!quit.load(std::memory_order_consume)) {
submitRecv();
// Send again } else {
if (bytes_send < bytes_recv) { this->terminate();
submitSend();
return;
} }
submitRecv();
}
void submitRecv() {
bytes_send = 0;
RecvFuture rf(sockfd, buf.data(), BUF_SIZE, 0);
rf.setCallback([this](int32_t bytes_recv) { this->onRecv(bytes_recv); });
rf.submit();
} }
void onRecv(int32_t res) { void onRecv(int32_t bytes_recv) {
if (unlikely(res <= 0)) { if (unlikely(bytes_recv <= 0)) {
// socket was shutdown // socket was shutdown
if (res < 0) { if (bytes_recv < 0) {
LOGE("server read failed:" << strerror(-res)); LOGE("server read failed:" << strerror(-bytes_recv));
} }
emper::io::closeAndForget(sockfd); this->terminate();
return; return;
} }
bytes_recv = res;
if (unlikely(bytes_recv == 5 && strncmp("quit\n", buf.data(), bytes_recv) == 0)) { if (unlikely(bytes_recv == 5 && strncmp("quit\n", buf.data(), bytes_recv) == 0)) {
exit(EXIT_SUCCESS); quit = true;
Runtime::getRuntime()->initiateTermination();
this->terminate();
return;
} }
submitSend(); submitSend(bytes_recv);
}
public:
Client(int socket) : sockfd(socket) {}
void submitRecv() {
RecvFuture rf(sockfd, buf.data(), BUF_SIZE, 0);
rf.setCallback([this](int32_t bytes_recv) { this->onRecv(bytes_recv); });
rf.submit();
} }
}; };
...@@ -100,10 +104,12 @@ auto main(int argc, char* argv[]) -> int { ...@@ -100,10 +104,12 @@ auto main(int argc, char* argv[]) -> int {
std::cout << "Echoserver listening on " << host << ":" << port << std::endl; std::cout << "Echoserver listening on " << host << ":" << port << std::endl;
Runtime runtime; Runtime runtime;
auto* listener = emper::io::tcp_listener(host, port, [](int socket) { auto* listener = emper::io::tcp_listener(host, port,
auto* client = new Client(socket); [](int socket) {
client->submitRecv(); auto* client = new Client(socket);
}); client->submitRecv();
},
BACKLOG, {emper::io::SockOpt::ReusePort});
if (!listener) { if (!listener) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment