diff --git a/apps/EchoClient.cpp b/apps/EchoClient.cpp index 3240643f2953f02539ac8b39c6cf5f7513861158..ea4b354c2e34c7590a39f76a1f71bef325581816 100644 --- a/apps/EchoClient.cpp +++ b/apps/EchoClient.cpp @@ -4,13 +4,14 @@ #include <sys/socket.h> // for shutdown, socket, AF_INET #include <algorithm> // for find -#include <cerrno> // for errno, ECANCELED -#include <chrono> // for nanoseconds, duration, durat... -#include <cstdint> // for uint64_t, int32_t -#include <cstdlib> // for size_t, strtol, exit, EXIT_F... -#include <cstring> // for memcmp -#include <iostream> // for operator<<, basic_ostream, endl -#include <string> // for allocator, string, char_traits +#include <atomic> +#include <cerrno> // for errno, ECANCELED +#include <chrono> // for nanoseconds, duration, durat... +#include <cstdint> // for uint64_t, int32_t +#include <cstdlib> // for size_t, strtol, exit, EXIT_F... +#include <cstring> // for memcmp +#include <iostream> // for operator<<, basic_ostream, endl +#include <string> // for allocator, string, char_traits #include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli... #include "CountingPrivateSemaphore.hpp" // for CPS @@ -34,19 +35,38 @@ const std::string PORT = "12345"; const size_t ITERATIONS = 10000; const size_t CLIENTS = 10000; const size_t SIZE = 32; +const size_t SERVER_BACKLOG = 1024; -static auto clientFunc(struct addrinfo& server, size_t iterations, size_t size, Semaphore& readySem, - Semaphore& startSem) -> uint64_t { - int client_sock = socket(server.ai_family, server.ai_socktype, server.ai_protocol); +// Globals +std::string host = HOST; +std::string port = PORT; +size_t iterations = ITERATIONS; +size_t clients = CLIENTS; +size_t size = SIZE; +size_t server_backlog = SERVER_BACKLOG; + +std::atomic<size_t> client_ids = 0; +uint64_t* client_avgs; + +struct addrinfo* server; + +static void clientFunc(uint64_t client_id, Semaphore& readySem, Semaphore& startSem, CPS& cps) { + int client_sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol); if (client_sock < 0) { DIE_MSG_ERRNO("creating new client socket failed"); } - int err = emper::io::connectAndWait(client_sock, server.ai_addr, server.ai_addrlen); + int err = emper::io::connectAndWait(client_sock, server->ai_addr, server->ai_addrlen); if (err) { DIE_MSG_ERRNO("connecting new client socket failed"); } + // We are connected -> start next client + size_t next_client_id = client_ids.fetch_add(1); + if (next_client_id < clients) { + spawn([&, next_client_id] { clientFunc(next_client_id, readySem, startSem, cps); }, cps); + } + // NOLINTNEXTLINE(modernize-avoid-c-arrays) char* buf[2]; buf[0] = new char[size]; @@ -116,7 +136,7 @@ static auto clientFunc(struct addrinfo& server, size_t iterations, size_t size, cf.wait(); - return avg_ns; + client_avgs[client_id] = avg_ns; } auto getOption(int argc, char** argv, const std::string& option) -> char* { @@ -137,17 +157,12 @@ static auto existsOption(int argc, char** argv, const std::string& option) -> bo static void printUsage(char* name) { std::cerr << "Usage: " << name - << "[-h] [-p <port>] [-c <clients>] [-i <iterations>] [-a <address>] [-s <size>]" + << "[-h] [-p <port>] [-c <clients>] [-i <iterations>] [-a <address>] [-s <size>] [-b " + "<server backlog>]" << std::endl; } auto main(int argc, char* argv[]) -> int { - std::string host = HOST; - std::string port = PORT; - size_t iterations = ITERATIONS; - size_t clients = CLIENTS; - size_t size = SIZE; - if (existsOption(argc, argv, "-h")) { printUsage(argv[0]); exit(EXIT_FAILURE); @@ -178,7 +193,11 @@ auto main(int argc, char* argv[]) -> int { size = strtol(size_s, nullptr, DECIMAL); } - struct addrinfo* server; + char* server_backlog_s = getOption(argc, argv, "-b"); + if (server_backlog_s) { + server_backlog = strtol(server_backlog_s, nullptr, DECIMAL); + } + int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server); if (err) { if (err == EAI_SYSTEM) { @@ -192,17 +211,19 @@ auto main(int argc, char* argv[]) -> int { Runtime runtime; Fiber* alphaFiber = Fiber::from([&] { - auto* client_avgs = new uint64_t[clients]; + client_avgs = new uint64_t[clients]; CPS cps; Semaphore readySemaphore; Semaphore startSemaphore; - // establish client connections - for (size_t i = 0; i < clients; ++i) { + // start first client batch + for (size_t i = 0; i < server_backlog; ++i) { + size_t next_client_id = client_ids.fetch_add(1); + if (next_client_id >= clients) { + break; + } spawn( - [&, i] { - client_avgs[i] = clientFunc(*server, iterations, size, readySemaphore, startSemaphore); - }, + [&, next_client_id] { clientFunc(next_client_id, readySemaphore, startSemaphore, cps); }, cps); } diff --git a/apps/EchoServer.cpp b/apps/EchoServer.cpp index 4d21dac67043eee4caed1480cd48c65d17b5e931..e0c96ef13f18961e94533e9c0cd6db25fb3f4563 100644 --- a/apps/EchoServer.cpp +++ b/apps/EchoServer.cpp @@ -1,26 +1,25 @@ // SPDX-License-Identifier: LGPL-3.0-or-later // Copyright © 2020-2021 Florian Fischer +#include <sys/socket.h> #include <sys/types.h> -#include <cassert> -#include <climits> +#include <cerrno> #include <cstdlib> #include <cstring> #include <iostream> #include <string> #include "Common.hpp" +#include "Debug.hpp" #include "Runtime.hpp" -#include "emper-common.h" #include "io.hpp" -const int DECIMAL = 10; -const std::string HOST = "0.0.0.0"; -const int PORT = 12345; +const std::string HOST = "::"; +const std::string PORT = "12345"; auto main(int argc, char* argv[]) -> int { - int port = PORT; std::string host = HOST; + std::string port = PORT; if (argc > 2) { std::cerr << "Usage: " << argv[0] << " [port]" << std::endl; @@ -28,9 +27,7 @@ auto main(int argc, char* argv[]) -> int { } if (argc > 1) { - long aport = strtol(argv[1], nullptr, DECIMAL); - assert(aport <= INT_MAX && aport >= INT_MIN); - port = (int)aport; + port = std::string(argv[1]); } std::cout << "Echoserver listening on " << host << ":" << port << std::endl; @@ -44,9 +41,10 @@ auto main(int argc, char* argv[]) -> int { if (unlikely(bytes_recv <= 0)) { // socket was shutdown if (bytes_recv < 0) { - DIE_MSG_ERRNO("server read failed"); + LOGE("server read failed:" << strerror(errno)); } + finish: emper::io::closeAndForget(socket); return; } @@ -55,8 +53,11 @@ auto main(int argc, char* argv[]) -> int { exit(EXIT_SUCCESS); } - ATTR_UNUSED ssize_t bytes_send = emper::io::sendAndWait(socket, buf, bytes_recv, 0); - assert(bytes_recv == bytes_send); + ssize_t bytes_send = emper::io::sendAndWait(socket, buf, bytes_recv, MSG_NOSIGNAL, true); + if (unlikely(bytes_recv != bytes_send)) { + LOGE("server send failed: " << strerror(errno)); + goto finish; + } } }); diff --git a/emper/io.hpp b/emper/io.hpp index 4addc610e9c80b0b7132736ae78a1471ef85cd61..10b228781b31067b7ed3b74ea391816372e8f01e 100644 --- a/emper/io.hpp +++ b/emper/io.hpp @@ -466,6 +466,6 @@ inline void closeAndForget(int fd) { * * @return nullptr on error, otherwise the TCP listener Fiber */ -auto tcp_listener(std::string &host, int &port, const std::function<void(int)> &handler, +auto tcp_listener(std::string &host, std::string &port, const std::function<void(int)> &handler, int backlog = 1024) -> Fiber *; } // namespace emper::io diff --git a/emper/io/io.cpp b/emper/io/io.cpp index 140f91c9513cb84e1e3695fbeaec3997d0b062b9..83c1abeb739c090179b8a6e8eeb1c79b8065e7b7 100644 --- a/emper/io/io.cpp +++ b/emper/io/io.cpp @@ -2,8 +2,7 @@ // Copyright © 2020-2021 Florian Fischer #include "io.hpp" -#include <arpa/inet.h> -#include <netinet/in.h> +#include <netdb.h> #include <sys/socket.h> #include <cerrno> @@ -19,23 +18,28 @@ #endif namespace emper::io { -auto tcp_listener(std::string& host, int& port, const std::function<void(int)>& handler, +auto tcp_listener(std::string& host, std::string& port, const std::function<void(int)>& handler, int backlog) -> Fiber* { int listen_socket; - struct sockaddr_in servaddr; + struct addrinfo* servaddr; + + int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &servaddr); + if (err) { + if (err == EAI_SYSTEM) { + LOGE("getaddrinfo failed: " << strerror(errno)); + return nullptr; + } + + LOGE("error in getaddrinfo: " << gai_strerror(err)); + return nullptr; + } // socket creation and verification - listen_socket = socket(AF_INET, SOCK_STREAM, 0); + listen_socket = socket(servaddr->ai_family, servaddr->ai_socktype, servaddr->ai_protocol); if (listen_socket == -1) { LOGE("listen socket creation failed: " << strerror(errno)); return nullptr; } - memset(&servaddr, 0, sizeof(servaddr)); - - // assign IP, PORT - servaddr.sin_family = AF_INET; - servaddr.sin_addr.s_addr = inet_addr(host.c_str()); - servaddr.sin_port = htons(port); int reuseaddr = 1; if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) { @@ -43,7 +47,7 @@ auto tcp_listener(std::string& host, int& port, const std::function<void(int)>& return nullptr; } - if (bind(listen_socket, (sockaddr*)&servaddr, sizeof(servaddr)) == -1) { + if (bind(listen_socket, servaddr->ai_addr, servaddr->ai_addrlen) == -1) { LOGE("bind failed: " << strerror(errno)); return nullptr; } diff --git a/tests/ConcurrentNetworkEchoTest.cpp b/tests/ConcurrentNetworkEchoTest.cpp index 0ccfddabd741ac38be3ecad82dbaa08826f6f35a..3a2583c3cc1ac5294ace91244414ab4214962eb0 100644 --- a/tests/ConcurrentNetworkEchoTest.cpp +++ b/tests/ConcurrentNetworkEchoTest.cpp @@ -13,7 +13,6 @@ #include "fixtures/network.hpp" #include "io.hpp" -#define PORT 4241 #define BUF_SIZE 1024 auto main(int argc, char* argv[]) -> int { @@ -23,7 +22,7 @@ auto main(int argc, char* argv[]) -> int { unsigned int client_count = 10; unsigned int echos = 1000; - int port = PORT; + std::string port = "4241"; std::string host = "127.0.0.1"; if (argc > 3) { @@ -50,7 +49,7 @@ auto main(int argc, char* argv[]) -> int { spawn( [&] { const std::vector<std::string> msgs = {"foo"}; - echo_client("127.0.0.1", PORT, msgs, echos); + echo_client(host, port, msgs, echos); }, cps); } diff --git a/tests/SimpleDiskAndNetworkTest.cpp b/tests/SimpleDiskAndNetworkTest.cpp index 60a2e569580ac658e8ad878fe4d4393f2039e102..182bcdbfce52463de6d2850aaa299f87e51131e6 100644 --- a/tests/SimpleDiskAndNetworkTest.cpp +++ b/tests/SimpleDiskAndNetworkTest.cpp @@ -170,7 +170,9 @@ void emperTest() { spawn( [] { const std::vector<std::string> msgs{"foo", "bar"}; - echo_client("127.0.0.1", PORT, msgs); + std::string port = std::to_string(PORT); + std::string host("127.0.0.1"); + echo_client(host, port, msgs); }, cps); diff --git a/tests/SimpleNetworkTest.cpp b/tests/SimpleNetworkTest.cpp index 3515862506fe6ebafc4d78fe3785d3f502419dd1..d3161bc5166dd2a51febe2c06ba67a7e613ce380 100644 --- a/tests/SimpleNetworkTest.cpp +++ b/tests/SimpleNetworkTest.cpp @@ -9,17 +9,18 @@ #include "io.hpp" // for tcp_listener #define MAX 1024 -int PORT = 4242; -std::string ADDR = "127.0.0.1"; void emperTest() { + std::string port = "4242"; + std::string addr = "127.0.0.1"; + CPS cps; - async(emper::io::tcp_listener(ADDR, PORT, echo_serve)); + async(emper::io::tcp_listener(addr, port, echo_serve)); spawn( - [] { + [&] { const std::vector<std::string> strings = {"foo", "bar", std::string(MAX, 'a'), "quit\n"}; - echo_client(ADDR.c_str(), PORT, strings); + echo_client(addr, port, strings); }, cps); diff --git a/tests/fixtures/network.cpp b/tests/fixtures/network.cpp index 97858086292faeb42e314308209f90ef53b86e52..7ca1c6ce7b15585d6815c306b57816cd9d5e3835 100644 --- a/tests/fixtures/network.cpp +++ b/tests/fixtures/network.cpp @@ -2,17 +2,18 @@ // Copyright © 2020-2021 Florian Fischer #include "network.hpp" -#include <arpa/inet.h> -#include <netinet/in.h> +#include <netdb.h> #include <sys/socket.h> #include <sys/types.h> #include <cassert> #include <cstdlib> #include <cstring> +#include <ostream> #include <vector> #include "Common.hpp" +#include "Debug.hpp" #include "io.hpp" #define MAX 1024 @@ -47,22 +48,26 @@ void echo_serve(int client_socket) { } } -void echo_client(const char* addr, int port, const std::vector<std::string>& msgs, +void echo_client(std::string& host, std::string& port, const std::vector<std::string>& msgs, size_t iterations) { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); + struct addrinfo* server; + + int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server); + if (err) { + if (err == EAI_SYSTEM) { + DIE_MSG_ERRNO("getaddrinfo failed"); + } else { + LOGE("error in getaddrinfo: " << gai_strerror(err)); + exit(EXIT_FAILURE); + } + } + + int sockfd = socket(server->ai_family, server->ai_socktype, server->ai_protocol); if (sockfd == -1) { DIE_MSG_ERRNO("client socket creation failed"); } - struct sockaddr_in server_addr; - // assign IP, PORT - memset(&server_addr, 0, sizeof(server_addr)); - server_addr.sin_family = AF_INET; - server_addr.sin_addr.s_addr = inet_addr(addr); - server_addr.sin_port = htons(port); - - if (emper::io::connectAndWait(sockfd, reinterpret_cast<struct sockaddr*>(&server_addr), - sizeof(server_addr)) == -1) { + if (emper::io::connectAndWait(sockfd, server->ai_addr, server->ai_addrlen) == -1) { DIE_MSG_ERRNO("connect failed"); } diff --git a/tests/fixtures/network.hpp b/tests/fixtures/network.hpp index 271cb2d37375e7417f012d47a5f3ff0ef78d89dc..8c50c1532c6d2bb5c72a384804d61af7f0146336 100644 --- a/tests/fixtures/network.hpp +++ b/tests/fixtures/network.hpp @@ -8,5 +8,5 @@ void echo_serve(int client_socket); -void echo_client(const char* addr, int port, const std::vector<std::string>& msgs, +void echo_client(std::string& host, std::string& port, const std::vector<std::string>& msgs, size_t iterations = 1);