diff --git a/emper/io.hpp b/emper/io.hpp index a820fe0bc2ac9d12b10beecbd644a05c850e29e6..b36b287bdb30570fe6dd88831957866200007101 100644 --- a/emper/io.hpp +++ b/emper/io.hpp @@ -11,6 +11,7 @@ #include <functional> // for function #include <memory> // for unique_ptr #include <string> // for string +#include <vector> namespace fs = std::filesystem; @@ -637,6 +638,16 @@ void recursive_directory_walk(const fs::path &dirpath, std::function<void(fs::directory_entry dirent)> fn); #endif +struct SockOpt { + int level; + int name; + const void *val; + socklen_t val_len; + + static const int ENABLE; + static const SockOpt ReUsePort; +}; + /** * @brief Fiber accepting TCP connections and scheduling handler fibers * @@ -645,9 +656,11 @@ void recursive_directory_walk(const fs::path &dirpath, * @param handler A function or lambda the started handler Fibers should execute. * It is called with the file descriptor returned by accept. * @param backlog The TCP connection backlog. + * @param sockopts The socket options set on the listen socket before binding it * * @return nullptr on error, otherwise the TCP listener Fiber */ auto tcp_listener(std::string &host, std::string &port, const std::function<void(int)> &handler, - int backlog = 1024) -> Fiber *; + int backlog = 1024, const std::vector<SockOpt> &sockopts = std::vector<SockOpt>()) + -> Fiber *; } // namespace emper::io diff --git a/emper/io/io.cpp b/emper/io/io.cpp index e858aff6027cb961c0f67c1b4a32f56842e63fce..bba906cd09f79cb0a91d75d2eab07817a6e2e9a7 100644 --- a/emper/io/io.cpp +++ b/emper/io/io.cpp @@ -23,6 +23,10 @@ namespace fs = std::filesystem; namespace emper::io { +const int SockOpt::ENABLE = 1; +const SockOpt SockOpt::ReUsePort = {SOL_SOCKET, SO_REUSEPORT, &SockOpt::ENABLE, + sizeof(SockOpt::ENABLE)}; + static void recursive_directory_walk_helper( CPS& cps, const fs::path& dirpath, std::function<bool(const fs::directory_entry& dirent)>& filter, @@ -55,10 +59,12 @@ void recursive_directory_walk(const fs::path& dirpath, } auto tcp_listener(std::string& host, std::string& port, const std::function<void(int)>& handler, - int backlog) -> Fiber* { + int backlog, const std::vector<SockOpt>& sockopts) -> Fiber* { int listen_socket; struct addrinfo* servaddr; + Runtime* runtime = Runtime::getRuntime(); + int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &servaddr); if (err) { if (err == EAI_SYSTEM) { @@ -77,24 +83,24 @@ auto tcp_listener(std::string& host, std::string& port, const std::function<void return nullptr; } - int reuseaddr = 1; - if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) { - LOGE("setsockopt failed: " << strerror(errno)); - return nullptr; + for (const auto& sockopt : sockopts) { + if (setsockopt(listen_socket, sockopt.level, sockopt.name, sockopt.val, sockopt.val_len) == + -1) { + LOGE("setsockopt " << sockopt.name << " failed: " << strerror(errno)); + goto close_sock; + } } if (bind(listen_socket, servaddr->ai_addr, servaddr->ai_addrlen) == -1) { LOGE("bind failed: " << strerror(errno)); - return nullptr; + goto close_sock; } if (listen(listen_socket, backlog) != 0) { LOGE("listen failed: " << strerror(errno)); - return nullptr; + goto close_sock; } - Runtime* runtime = Runtime::getRuntime(); - return Fiber::from([=] { for (;;) { int client_fd = acceptAndWait(listen_socket, nullptr, nullptr); @@ -107,5 +113,9 @@ auto tcp_listener(std::string& host, std::string& port, const std::function<void runtime->schedule(*handler_fiber); } }); + +close_sock: + closeAndForget(listen_socket); + return nullptr; } } // namespace emper::io diff --git a/tests/io/ConcurrentNetworkEchoTest.cpp b/tests/io/ConcurrentNetworkEchoTest.cpp index 11c39b9f3cf2b914f129161bf53fa8d31bf4d3de..4f3b8d23c22557b605a5309cd71cb498c86c15e0 100644 --- a/tests/io/ConcurrentNetworkEchoTest.cpp +++ b/tests/io/ConcurrentNetworkEchoTest.cpp @@ -5,6 +5,7 @@ #include <string> #include <vector> +#include "Common.hpp" #include "CountingPrivateSemaphore.hpp" #include "Emper.hpp" #include "Fiber.hpp" @@ -41,7 +42,13 @@ auto main(int argc, char* argv[]) -> int { Runtime runtime; - runtime.scheduleFromAnywhere(*emper::io::tcp_listener(host, port, echo_serve)); + Fiber* listener = + emper::io::tcp_listener(host, port, echo_serve, 1024, {emper::io::SockOpt::ReUsePort}); + if (!listener) { + DIE_MSG("failed to create listener"); + } + + runtime.scheduleFromAnywhere(*listener); Fiber* client_producer = Fiber::from([&] { CPS cps; diff --git a/tests/io/SimpleDiskAndNetworkTest.cpp b/tests/io/SimpleDiskAndNetworkTest.cpp index 145773fb1457c23bfe8ada191b173ffbb42a0076..99bf37e22293c8c2ed6f7078b1dace3bca325d24 100644 --- a/tests/io/SimpleDiskAndNetworkTest.cpp +++ b/tests/io/SimpleDiskAndNetworkTest.cpp @@ -154,9 +154,9 @@ void emperTest() { servaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); servaddr.sin_port = htons(PORT); - int reuseaddr = 1; - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) { - DIE_MSG_ERRNO("setsockopt failed"); + int enable = 1; + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) == -1) { + DIE_MSG_ERRNO("setsockopt SO_REUSEPORT failed"); } if (bind(sockfd, (sockaddr*)&servaddr, sizeof(servaddr)) == -1) { DIE_MSG_ERRNO("bind failed"); diff --git a/tests/io/SimpleNetworkTest.cpp b/tests/io/SimpleNetworkTest.cpp index d3161bc5166dd2a51febe2c06ba67a7e613ce380..3040f69ba711bb9ad36bb19410658421b9e4a94c 100644 --- a/tests/io/SimpleNetworkTest.cpp +++ b/tests/io/SimpleNetworkTest.cpp @@ -3,6 +3,7 @@ #include <string> // for string, allocator #include <vector> // for vector +#include "Common.hpp" #include "CountingPrivateSemaphore.hpp" // for CPS #include "emper.hpp" // for async, spawn #include "fixtures/network.hpp" // for echo_client, echo_serve @@ -10,12 +11,19 @@ #define MAX 1024 +class Fiber; + void emperTest() { std::string port = "4242"; std::string addr = "127.0.0.1"; CPS cps; - async(emper::io::tcp_listener(addr, port, echo_serve)); + Fiber* listener = + emper::io::tcp_listener(addr, port, echo_serve, 1024, {emper::io::SockOpt::ReUsePort}); + if (!listener) { + DIE_MSG("failed to create listener"); + } + async(listener); spawn( [&] { diff --git a/tests/io/TimeoutTest.cpp b/tests/io/TimeoutTest.cpp index f551f7d1d75b65c7db5a316c9fb38d7ac1c556b9..dfd7283e6ef4877086d38ed78eae3bf2cd8ace0b 100644 --- a/tests/io/TimeoutTest.cpp +++ b/tests/io/TimeoutTest.cpp @@ -33,9 +33,11 @@ static void setupSockPair(int& sock1, int& sock2) { int listen_sock = socket(AF_INET, SOCK_STREAM, 0); if (listen_sock == -1) DIE_MSG_ERRNO("creating listen socket failed"); - int reuseaddr = 1; - if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) - DIE_MSG_ERRNO("setsockopt failed"); + int enable = 1; + if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) == -1) + DIE_MSG_ERRNO("setsockopt SO_REUSEADDR failed"); + if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) == -1) + DIE_MSG_ERRNO("setsockopt SO_REUSEPORT failed"); if (bind(listen_sock, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) == -1) DIE_MSG_ERRNO("bind failed");