Commit ed2cbd91 authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'fix-repeated-ConcurrentNetworkEchoTest' into 'master'

Fix repeated ConcurrentNetworkEchoTest and SimpleNetworkTest

See merge request !255
parents 5d38e301 a0204b14
Pipeline #68336 passed with stages
in 12 minutes and 44 seconds
......@@ -11,6 +11,7 @@
#include <functional> // for function
#include <memory> // for unique_ptr
#include <string> // for string
#include <vector>
namespace fs = std::filesystem;
......@@ -637,6 +638,16 @@ void recursive_directory_walk(const fs::path &dirpath,
std::function<void(fs::directory_entry dirent)> fn);
#endif
struct SockOpt {
int level;
int name;
const void *val;
socklen_t val_len;
static const int ENABLE;
static const SockOpt ReUsePort;
};
/**
* @brief Fiber accepting TCP connections and scheduling handler fibers
*
......@@ -645,9 +656,11 @@ void recursive_directory_walk(const fs::path &dirpath,
* @param handler A function or lambda the started handler Fibers should execute.
* It is called with the file descriptor returned by accept.
* @param backlog The TCP connection backlog.
* @param sockopts The socket options set on the listen socket before binding it
*
* @return nullptr on error, otherwise the TCP listener Fiber
*/
auto tcp_listener(std::string &host, std::string &port, const std::function<void(int)> &handler,
int backlog = 1024) -> Fiber *;
int backlog = 1024, const std::vector<SockOpt> &sockopts = std::vector<SockOpt>())
-> Fiber *;
} // namespace emper::io
......@@ -23,6 +23,10 @@ namespace fs = std::filesystem;
namespace emper::io {
const int SockOpt::ENABLE = 1;
const SockOpt SockOpt::ReUsePort = {SOL_SOCKET, SO_REUSEPORT, &SockOpt::ENABLE,
sizeof(SockOpt::ENABLE)};
static void recursive_directory_walk_helper(
CPS& cps, const fs::path& dirpath,
std::function<bool(const fs::directory_entry& dirent)>& filter,
......@@ -55,10 +59,12 @@ void recursive_directory_walk(const fs::path& dirpath,
}
auto tcp_listener(std::string& host, std::string& port, const std::function<void(int)>& handler,
int backlog) -> Fiber* {
int backlog, const std::vector<SockOpt>& sockopts) -> Fiber* {
int listen_socket;
struct addrinfo* servaddr;
Runtime* runtime = Runtime::getRuntime();
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &servaddr);
if (err) {
if (err == EAI_SYSTEM) {
......@@ -77,24 +83,24 @@ auto tcp_listener(std::string& host, std::string& port, const std::function<void
return nullptr;
}
int reuseaddr = 1;
if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) {
LOGE("setsockopt failed: " << strerror(errno));
return nullptr;
for (const auto& sockopt : sockopts) {
if (setsockopt(listen_socket, sockopt.level, sockopt.name, sockopt.val, sockopt.val_len) ==
-1) {
LOGE("setsockopt " << sockopt.name << " failed: " << strerror(errno));
goto close_sock;
}
}
if (bind(listen_socket, servaddr->ai_addr, servaddr->ai_addrlen) == -1) {
LOGE("bind failed: " << strerror(errno));
return nullptr;
goto close_sock;
}
if (listen(listen_socket, backlog) != 0) {
LOGE("listen failed: " << strerror(errno));
return nullptr;
goto close_sock;
}
Runtime* runtime = Runtime::getRuntime();
return Fiber::from([=] {
for (;;) {
int client_fd = acceptAndWait(listen_socket, nullptr, nullptr);
......@@ -107,5 +113,9 @@ auto tcp_listener(std::string& host, std::string& port, const std::function<void
runtime->schedule(*handler_fiber);
}
});
close_sock:
closeAndForget(listen_socket);
return nullptr;
}
} // namespace emper::io
......@@ -5,6 +5,7 @@
#include <string>
#include <vector>
#include "Common.hpp"
#include "CountingPrivateSemaphore.hpp"
#include "Emper.hpp"
#include "Fiber.hpp"
......@@ -41,7 +42,13 @@ auto main(int argc, char* argv[]) -> int {
Runtime runtime;
runtime.scheduleFromAnywhere(*emper::io::tcp_listener(host, port, echo_serve));
Fiber* listener =
emper::io::tcp_listener(host, port, echo_serve, 1024, {emper::io::SockOpt::ReUsePort});
if (!listener) {
DIE_MSG("failed to create listener");
}
runtime.scheduleFromAnywhere(*listener);
Fiber* client_producer = Fiber::from([&] {
CPS cps;
......
......@@ -154,9 +154,9 @@ void emperTest() {
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
servaddr.sin_port = htons(PORT);
int reuseaddr = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) {
DIE_MSG_ERRNO("setsockopt failed");
int enable = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) == -1) {
DIE_MSG_ERRNO("setsockopt SO_REUSEPORT failed");
}
if (bind(sockfd, (sockaddr*)&servaddr, sizeof(servaddr)) == -1) {
DIE_MSG_ERRNO("bind failed");
......
......@@ -3,6 +3,7 @@
#include <string> // for string, allocator
#include <vector> // for vector
#include "Common.hpp"
#include "CountingPrivateSemaphore.hpp" // for CPS
#include "emper.hpp" // for async, spawn
#include "fixtures/network.hpp" // for echo_client, echo_serve
......@@ -10,12 +11,19 @@
#define MAX 1024
class Fiber;
void emperTest() {
std::string port = "4242";
std::string addr = "127.0.0.1";
CPS cps;
async(emper::io::tcp_listener(addr, port, echo_serve));
Fiber* listener =
emper::io::tcp_listener(addr, port, echo_serve, 1024, {emper::io::SockOpt::ReUsePort});
if (!listener) {
DIE_MSG("failed to create listener");
}
async(listener);
spawn(
[&] {
......
......@@ -33,9 +33,11 @@ static void setupSockPair(int& sock1, int& sock2) {
int listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sock == -1) DIE_MSG_ERRNO("creating listen socket failed");
int reuseaddr = 1;
if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1)
DIE_MSG_ERRNO("setsockopt failed");
int enable = 1;
if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) == -1)
DIE_MSG_ERRNO("setsockopt SO_REUSEADDR failed");
if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)) == -1)
DIE_MSG_ERRNO("setsockopt SO_REUSEPORT failed");
if (bind(listen_sock, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) == -1)
DIE_MSG_ERRNO("bind failed");
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment