Commit 20af5d26 authored by Florian Fischer's avatar Florian Fischer
Browse files

clients/io_uring: reconnect on ECONNRESET

parent ea0d65dd
Pipeline #72848 passed with stage
in 1 minute and 25 seconds
...@@ -127,8 +127,11 @@ class Connection { ...@@ -127,8 +127,11 @@ class Connection {
high_resolution_clock::time_point echoStart; high_resolution_clock::time_point echoStart;
high_resolution_clock::time_point echoLoopEnd; high_resolution_clock::time_point echoLoopEnd;
void newSocket();
void prepConnect();
void prepSend(); void prepSend();
void onConnect(int ret); void onConnect(int ret);
void onConnectionReset();
void onSend(int ret); void onSend(int ret);
auto onRecv(int ret) -> bool; auto onRecv(int ret) -> bool;
...@@ -298,16 +301,8 @@ Worker::~Worker() { ...@@ -298,16 +301,8 @@ Worker::~Worker() {
Connection::Connection(Worker& worker) : worker(worker) { Connection::Connection(Worker& worker) : worker(worker) {
buf = new char[worker.client.echoSize]; buf = new char[worker.client.echoSize];
struct addrinfo* server = worker.client.server; newSocket();
sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol); prepConnect();
if (sock < 0) DIE_MSG_ERRNO("creating socket failed");
struct io_uring_sqe* sqe = io_uring_get_sqe(&worker.ring);
assert(sqe);
// All initial connect requests are submitted in teh Worker constructor
io_uring_prep_connect(sqe, sock, server->ai_addr, server->ai_addrlen);
io_uring_sqe_set_data(sqe, this);
} }
Connection::~Connection() { Connection::~Connection() {
...@@ -315,6 +310,12 @@ Connection::~Connection() { ...@@ -315,6 +310,12 @@ Connection::~Connection() {
delete[] buf; delete[] buf;
} }
void Connection::newSocket() {
struct addrinfo* server = worker.client.server;
sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
if (sock < 0) DIE_MSG_ERRNO("creating socket failed");
}
void Connection::prepSend() { void Connection::prepSend() {
state = State::sending; state = State::sending;
...@@ -327,14 +328,46 @@ void Connection::prepSend() { ...@@ -327,14 +328,46 @@ void Connection::prepSend() {
echoStart = high_resolution_clock::now(); echoStart = high_resolution_clock::now();
} }
void Connection::prepConnect() {
state = State::connecting;
struct addrinfo* server = worker.client.server;
struct io_uring_sqe* sqe = io_uring_get_sqe(&worker.ring);
assert(sqe);
// All initial connect requests are submitted in teh Worker constructor
io_uring_prep_connect(sqe, sock, server->ai_addr, server->ai_addrlen);
io_uring_sqe_set_data(sqe, this);
}
void Connection::onConnect(int ret) { void Connection::onConnect(int ret) {
if (unlikely(ret < 0)) DIE_MSG("connect failed: " << strerror(-ret)); if (unlikely(ret < 0)) {
if (ret == -ECONNRESET) {
onConnectionReset();
return;
}
DIE_MSG("connect failed: " << strerror(-ret));
}
prepSend(); prepSend();
} }
void Connection::onConnectionReset() {
++reconnects;
close(sock);
newSocket();
prepConnect();
}
void Connection::onSend(int ret) { void Connection::onSend(int ret) {
if (unlikely(ret < 0)) DIE_MSG("send failed: " << strerror(-ret)); if (unlikely(ret < 0)) {
if (ret == -ECONNRESET) {
onConnectionReset();
return;
}
DIE_MSG("send failed: " << strerror(-ret));
}
state = State::receiving; state = State::receiving;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment