Commit 819a0246 authored by Florian Fischer's avatar Florian Fischer
Browse files

Merge branch 'improve-echoservers' into 'master'

[EchoServers] improve the callback based echoserver

See merge request i4/manycore/emper!371
parents e45b2bc4 0bb7c145
Pipeline #79952 passed with stages
in 19 minutes and 44 seconds
......@@ -27,6 +27,8 @@ static const std::string HOST = "::";
static const std::string PORT = "12345";
static const int BACKLOG = 1024;
static const size_t BUF_SIZE = 1024;
static unsigned int computations_us = 0;
static unsigned int max_computations_us = 0;
static float max_computations_probability = -1;
......@@ -98,7 +100,7 @@ auto main(int argc, char* argv[]) -> int {
auto serverFunc = [](int socket) {
// NOLINTNEXTLINE(modernize-avoid-c-arrays)
char buf[1024];
char buf[BUF_SIZE];
while (!quit.load(std::memory_order_consume)) {
ssize_t bytes_recv = emper::io::recvAndWait(socket, buf, sizeof(buf), 0);
if (unlikely(bytes_recv <= 0)) {
......
......@@ -3,6 +3,7 @@
#include <sys/socket.h>
#include <array>
#include <atomic>
#include <cstdint>
#include <cstdlib>
#include <cstring>
......@@ -20,67 +21,70 @@ using RecvFuture = emper::io::RecvFuture;
const std::string HOST = "::";
const std::string PORT = "12345";
static const int BACKLOG = 1024;
const size_t BUF_SIZE = 1024;
static std::atomic<bool> quit = false;
class Client {
public:
int sockfd;
size_t bytes_recv;
size_t bytes_send;
std::array<char, BUF_SIZE> buf;
Client(int socket) : sockfd(socket), bytes_recv(0), bytes_send(0) {}
void terminate() {
emper::io::closeAndForget(sockfd);
delete this;
}
void submitSend() {
SendFuture sf(sockfd, &buf[bytes_send], bytes_recv - bytes_send, MSG_NOSIGNAL);
sf.setCallback([this](int32_t bytes_send) { this->onSend(bytes_send); });
void submitSend(int32_t bytes_recv) {
SendFuture sf(sockfd, buf.data(), bytes_recv, MSG_NOSIGNAL);
sf.setCallback(
[this, bytes_recv](int32_t bytes_send) { this->onSend(bytes_send, bytes_recv); });
sf.submit();
}
void onSend(int32_t res) {
if (unlikely(res < 0)) {
LOGE("server send failed: " << strerror(-res));
emper::io::closeAndForget(sockfd);
void onSend(int32_t bytes_send, int32_t bytes_recv) {
if (unlikely(bytes_send != bytes_recv)) {
LOGE("server send failed: " << strerror(-bytes_send));
this->terminate();
return;
}
bytes_send += res;
// Send again
if (bytes_send < bytes_recv) {
submitSend();
return;
if (!quit.load(std::memory_order_consume)) {
submitRecv();
} else {
this->terminate();
}
submitRecv();
}
void submitRecv() {
bytes_send = 0;
RecvFuture rf(sockfd, buf.data(), BUF_SIZE, 0);
rf.setCallback([this](int32_t bytes_recv) { this->onRecv(bytes_recv); });
rf.submit();
}
void onRecv(int32_t res) {
if (unlikely(res <= 0)) {
void onRecv(int32_t bytes_recv) {
if (unlikely(bytes_recv <= 0)) {
// socket was shutdown
if (res < 0) {
LOGE("server read failed:" << strerror(-res));
if (bytes_recv < 0) {
LOGE("server read failed:" << strerror(-bytes_recv));
}
emper::io::closeAndForget(sockfd);
this->terminate();
return;
}
bytes_recv = res;
if (unlikely(bytes_recv == 5 && strncmp("quit\n", buf.data(), bytes_recv) == 0)) {
exit(EXIT_SUCCESS);
quit = true;
Runtime::getRuntime()->initiateTermination();
this->terminate();
return;
}
submitSend();
submitSend(bytes_recv);
}
public:
Client(int socket) : sockfd(socket) {}
void submitRecv() {
RecvFuture rf(sockfd, buf.data(), BUF_SIZE, 0);
rf.setCallback([this](int32_t bytes_recv) { this->onRecv(bytes_recv); });
rf.submit();
}
};
......@@ -100,10 +104,12 @@ auto main(int argc, char* argv[]) -> int {
std::cout << "Echoserver listening on " << host << ":" << port << std::endl;
Runtime runtime;
auto* listener = emper::io::tcp_listener(host, port, [](int socket) {
auto* client = new Client(socket);
client->submitRecv();
});
auto* listener = emper::io::tcp_listener(host, port,
[](int socket) {
auto* client = new Client(socket);
client->submitRecv();
},
BACKLOG, {emper::io::SockOpt::ReusePort});
if (!listener) {
exit(EXIT_FAILURE);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment