diff --git a/apps/EchoServer.cpp b/apps/EchoServer.cpp index 0aa3eb4b2d51be69cf44d9904e28938f810305db..70a1cf5133930cf5918fb115455eeada3691d5e4 100644 --- a/apps/EchoServer.cpp +++ b/apps/EchoServer.cpp @@ -27,6 +27,8 @@ static const std::string HOST = "::"; static const std::string PORT = "12345"; static const int BACKLOG = 1024; +static const size_t BUF_SIZE = 1024; + static unsigned int computations_us = 0; static unsigned int max_computations_us = 0; static float max_computations_probability = -1; @@ -98,7 +100,7 @@ auto main(int argc, char* argv[]) -> int { auto serverFunc = [](int socket) { // NOLINTNEXTLINE(modernize-avoid-c-arrays) - char buf[1024]; + char buf[BUF_SIZE]; while (!quit.load(std::memory_order_consume)) { ssize_t bytes_recv = emper::io::recvAndWait(socket, buf, sizeof(buf), 0); if (unlikely(bytes_recv <= 0)) { diff --git a/apps/EchoServerCallback.cpp b/apps/EchoServerCallback.cpp index c5b1eff0697826083ded540e602a95a3b83e74f9..eb63d28a6a0845ad5f9617c38723d7048a7f9142 100644 --- a/apps/EchoServerCallback.cpp +++ b/apps/EchoServerCallback.cpp @@ -3,6 +3,7 @@ #include <sys/socket.h> #include <array> +#include <atomic> #include <cstdint> #include <cstdlib> #include <cstring> @@ -20,67 +21,70 @@ using RecvFuture = emper::io::RecvFuture; const std::string HOST = "::"; const std::string PORT = "12345"; +static const int BACKLOG = 1024; const size_t BUF_SIZE = 1024; +static std::atomic<bool> quit = false; + class Client { - public: int sockfd; - size_t bytes_recv; - size_t bytes_send; std::array<char, BUF_SIZE> buf; - Client(int socket) : sockfd(socket), bytes_recv(0), bytes_send(0) {} + void terminate() { + emper::io::closeAndForget(sockfd); + delete this; + } - void submitSend() { - SendFuture sf(sockfd, &buf[bytes_send], bytes_recv - bytes_send, MSG_NOSIGNAL); - sf.setCallback([this](int32_t bytes_send) { this->onSend(bytes_send); }); + void submitSend(int32_t bytes_recv) { + SendFuture sf(sockfd, buf.data(), bytes_recv, MSG_NOSIGNAL); + sf.setCallback( + [this, bytes_recv](int32_t bytes_send) { this->onSend(bytes_send, bytes_recv); }); sf.submit(); } - void onSend(int32_t res) { - if (unlikely(res < 0)) { - LOGE("server send failed: " << strerror(-res)); - emper::io::closeAndForget(sockfd); + void onSend(int32_t bytes_send, int32_t bytes_recv) { + if (unlikely(bytes_send != bytes_recv)) { + LOGE("server send failed: " << strerror(-bytes_send)); + this->terminate(); return; } - bytes_send += res; - - // Send again - if (bytes_send < bytes_recv) { - submitSend(); - return; + if (!quit.load(std::memory_order_consume)) { + submitRecv(); + } else { + this->terminate(); } - - submitRecv(); - } - - void submitRecv() { - bytes_send = 0; - RecvFuture rf(sockfd, buf.data(), BUF_SIZE, 0); - rf.setCallback([this](int32_t bytes_recv) { this->onRecv(bytes_recv); }); - rf.submit(); } - void onRecv(int32_t res) { - if (unlikely(res <= 0)) { + void onRecv(int32_t bytes_recv) { + if (unlikely(bytes_recv <= 0)) { // socket was shutdown - if (res < 0) { - LOGE("server read failed:" << strerror(-res)); + if (bytes_recv < 0) { + LOGE("server read failed:" << strerror(-bytes_recv)); } - emper::io::closeAndForget(sockfd); + this->terminate(); return; } - bytes_recv = res; - if (unlikely(bytes_recv == 5 && strncmp("quit\n", buf.data(), bytes_recv) == 0)) { - exit(EXIT_SUCCESS); + quit = true; + Runtime::getRuntime()->initiateTermination(); + this->terminate(); + return; } - submitSend(); + submitSend(bytes_recv); + } + + public: + Client(int socket) : sockfd(socket) {} + + void submitRecv() { + RecvFuture rf(sockfd, buf.data(), BUF_SIZE, 0); + rf.setCallback([this](int32_t bytes_recv) { this->onRecv(bytes_recv); }); + rf.submit(); } }; @@ -100,10 +104,12 @@ auto main(int argc, char* argv[]) -> int { std::cout << "Echoserver listening on " << host << ":" << port << std::endl; Runtime runtime; - auto* listener = emper::io::tcp_listener(host, port, [](int socket) { - auto* client = new Client(socket); - client->submitRecv(); - }); + auto* listener = emper::io::tcp_listener(host, port, + [](int socket) { + auto* client = new Client(socket); + client->submitRecv(); + }, + BACKLOG, {emper::io::SockOpt::ReusePort}); if (!listener) { exit(EXIT_FAILURE);