Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • abp-queue
  • async_network2
  • burak
  • cactus_stack_devel
  • cactus_stack_devel_one_commit
  • cast-if-future
  • ci-bump-test
  • client-load-change
  • completer-strategies
  • cppcheck
  • emper-fix-invalid-conv
  • flow
  • fused-continuation-and-completion-stealing
  • libc++
  • libstdc++-asserts
  • linux-version-construct-on-first-use
  • master
  • msan
  • new-delete-leaks
  • remote-put-get-free-context-cycle
  • thread_safe_log_config
  • attic/clang-release-tls-optimization-debug-and-fix
  • attic/continuation-stealing-dev
23 results

Target

Select target project
  • flow/emper
  • aj46ezos/emper
  • i4/manycore/emper
3 results
Select Git revision
  • burak
  • cactus_stack_devel
  • emper-fs-eval
  • io-sleep-main
  • io_uring_network
  • kickoff-without-ret
  • master
  • tsan_ci_target
  • worker_exclusive_uring_no_partial_completion
  • worker_exclusive_uring_weak
10 results
Show changes

Commits on Source 9

#include "AsyncLibrary.hpp"
#include "BinaryPrivateSemaphore.hpp"
#include "ContextManager.hpp"
#ifdef EMPER_ASYNC_LIB
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
#ifdef EMPER_ASYNC_DISK_IO
#include <liburing.h>
#endif
class AsyncNetworkLogger : public Logger<LogSubsystem::AsyncTcp> {};
static AsyncNetworkLogger log;
enum class aio_op { send, recv, connect, accept, uring_read, uring_write };
struct aio_handle {
std::unique_ptr<BPS> sem;
aio_op op;
int aio_errno = 0;
int fd;
unsigned char *buff;
int buff_len;
int curr_offset;
unsigned int epoll_events;
bool serialized;
struct iovec *iov;
size_t liburing_offset;
sockaddr *sockaddress;
socklen_t socklen;
bool read_once;
int return_value;
};
static int epoll_fd;
static std::unique_ptr<std::thread> epoll_thread;
static bool done;
static int done_fd;
static aio_handle done_handle;
#ifdef EMPER_ASYNC_DISK_IO
#define URING_ENTRIES 512
static struct io_uring ring;
static aio_handle ring_handle;
static std::vector<aio_handle *> wait_queue;
static std::mutex wait_mutex;
static std::condition_variable wait_cv;
#endif
#ifdef EMPER_ASYNC_NETWORK
static std::map<int, std::vector<aio_handle *>> serialization_list;
static std::mutex serialization_list_mutex;
static void network_add_epoll(aio_handle *handle) {
epoll_event event;
event.events = handle->epoll_events;
event.data.ptr = handle;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handle->fd, &event) != 0) {
perror("epoll_ctl");
}
}
static void network_remove_epoll(int fd) {
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
perror("epoll_ctl");
}
}
static bool network_serialized_add(aio_handle *handle, bool use_epoll = true) {
bool first = false;
{
std::unique_lock<std::mutex> lock(serialization_list_mutex);
auto &list = serialization_list[handle->fd];
if (list.size() == 0) // I'm currently the only one interested in this fd
first = true;
list.push_back(handle);
}
if (first) {
if (!use_epoll)
return true;
else
network_add_epoll(handle);
}
return false;
}
static void network_serialzed_remove_first(int fd, bool use_epoll = true) {
std::unique_lock<std::mutex> lock(serialization_list_mutex);
aio_handle *next_notify = nullptr;
auto &list = serialization_list[fd];
list.erase(list.begin());
if (!list.empty())
next_notify = list[0];
if (next_notify != nullptr) {
epoll_event event;
event.events = next_notify->epoll_events;
event.data.ptr = next_notify;
if (use_epoll) {
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) != 0) {
perror("epoll_ctl");
}
} else {
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) != 0) {
perror("epoll_ctl");
}
}
} else if (use_epoll)
network_remove_epoll(fd);
}
static bool do_async_connect(int fd, sockaddr *sockaddress, socklen_t len, int *return_value) {
*return_value = connect(fd, sockaddress, len);
if (*return_value == 0) {
return true;
} else if (*return_value < 0 && (errno != EINPROGRESS && errno != EAGAIN)) {
*return_value = -1;
return true;
}
return false;
}
static bool do_async_accept(int fd, sockaddr *sockaddress, socklen_t len, int *return_value) {
*return_value = accept(fd, sockaddress, &len);
if (*return_value >= 0) {
return true;
} else if (*return_value < 0 && (errno != EWOULDBLOCK && errno != EAGAIN)) {
*return_value = -1;
return true;
}
return false;
}
static bool do_async_recv(int fd, unsigned char *buff, int buff_len, int *return_value) {
*return_value = recv(fd, buff, buff_len, MSG_DONTWAIT | MSG_NOSIGNAL);
if (*return_value == 0) {
return true;
} else if (*return_value == -1 && (errno != EWOULDBLOCK && errno != EAGAIN)) {
return true;
}
return false;
}
static bool do_async_send(int fd, unsigned char *buff, int buff_len, int *return_value) {
*return_value = send(fd, buff, buff_len, MSG_DONTWAIT | MSG_NOSIGNAL);
if (*return_value == 0) {
return true;
} else if (*return_value == -1 && (errno != EWOULDBLOCK && errno != EAGAIN)) {
return true;
}
return false;
}
int io::async_connect(int fd, sockaddr *sockaddress, socklen_t len, bool fd_serialized) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
log.logE("fcntl F_GETFL failed for async_connect");
flags = (flags | O_NONBLOCK);
if (fcntl(fd, F_SETFL, flags) != 0) {
log.logE("fcntl F_SETFL failed for async_connect");
}
if (!fd_serialized) {
int connect_result;
if (do_async_connect(fd, sockaddress, len, &connect_result)) {
return connect_result;
}
}
aio_handle notify;
notify.fd = fd;
notify.sockaddress = sockaddress;
notify.socklen = len;
notify.op = aio_op::connect;
notify.epoll_events = EPOLLOUT;
notify.serialized = fd_serialized;
notify.sem.reset(new BinaryPrivateSemaphore());
if (fd_serialized) {
if (network_serialized_add(&notify, false)) // True if we are first, false if not -> then we
// get added to epoll by the predecessor
{
int connect_result;
if (do_async_connect(fd, sockaddress, len, &connect_result)) {
network_serialzed_remove_first(fd, false);
return connect_result;
}
network_add_epoll(&notify); // Did not finish yet, add to epoll; we are first
}
} else {
network_add_epoll(&notify); // Not serialized and it wasn't successful before
}
notify.sem->wait();
if (notify.aio_errno) {
errno = notify.aio_errno;
return -1;
}
return 0;
}
int io::async_accept(int fd, sockaddr *sockaddress, socklen_t len, bool fd_serialized) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
log.logE("fcntl F_GETFL failed for async_accept");
flags = (flags | O_NONBLOCK);
if (fcntl(fd, F_SETFL, flags) != 0)
log.logE("fcntl F_SETFL failed for async_accept");
if (!fd_serialized) {
int accept_result;
if (do_async_accept(fd, sockaddress, len, &accept_result)) {
return accept_result;
}
}
aio_handle notify;
notify.fd = fd;
notify.sockaddress = sockaddress;
notify.socklen = len;
notify.op = aio_op::accept;
notify.epoll_events = EPOLLIN;
notify.serialized = fd_serialized;
notify.sem.reset(new BinaryPrivateSemaphore());
if (fd_serialized) {
if (network_serialized_add(&notify, false)) // True if we are first, false if not -> then we
// get added to epoll by the predecessor
{
int accept_result;
if (do_async_accept(fd, sockaddress, len, &accept_result)) {
network_serialzed_remove_first(fd, false);
return accept_result;
}
network_add_epoll(&notify); // Did not finish yet, add to epoll; we are first
}
} else {
network_add_epoll(&notify); // Not serialized and it wasn't successful before
}
notify.sem->wait();
return notify.return_value;
}
int io::async_recv(int fd, unsigned char *buff, int buff_len, bool read_once, bool fd_serialized) {
size_t offset = 0;
if (!fd_serialized) {
int actually_read;
if (do_async_recv(fd, buff, buff_len, &actually_read)) {
return actually_read;
}
if (actually_read > 0 && (read_once || actually_read == buff_len)) {
return actually_read;
}
if (actually_read != -1)
offset = actually_read;
}
aio_handle notify;
notify.fd = fd;
notify.buff = buff;
notify.buff_len = buff_len;
notify.read_once = read_once;
notify.curr_offset = offset;
notify.op = aio_op::recv;
notify.epoll_events = EPOLLIN;
notify.serialized = fd_serialized;
notify.sem.reset(new BinaryPrivateSemaphore());
if (fd_serialized) {
if (network_serialized_add(&notify, false)) // True if we are first, false if not -> then we
// get added to epoll by the predecessor
{
int actually_read;
if (do_async_recv(fd, buff, buff_len, &actually_read)) {
network_serialzed_remove_first(fd, false);
return actually_read;
}
if (actually_read > 0 && (read_once || actually_read == buff_len)) {
network_serialzed_remove_first(fd, false);
return actually_read;
}
if (actually_read != -1) {
notify.curr_offset = actually_read;
}
network_add_epoll(&notify); // Did not finish yet, add to epoll; we are first
}
} else {
network_add_epoll(&notify); // Not serialized and it wasn't successful before
}
notify.sem->wait();
if (notify.aio_errno) {
errno = notify.aio_errno;
return -1;
}
return notify.curr_offset;
}
int io::async_send(int fd, unsigned char *buff, int buff_len, bool fd_serialized) {
size_t offset = 0;
if (!fd_serialized) {
int actually_send;
if (do_async_send(fd, buff, buff_len, &actually_send)) {
return actually_send;
}
if (actually_send == buff_len) {
return actually_send;
}
if (actually_send != -1)
offset = actually_send;
}
aio_handle notify;
notify.fd = fd;
notify.buff = buff;
notify.buff_len = buff_len;
notify.curr_offset = offset;
notify.op = aio_op::send;
notify.epoll_events = EPOLLOUT;
notify.serialized = fd_serialized;
notify.sem.reset(new BinaryPrivateSemaphore());
if (fd_serialized) {
if (network_serialized_add(&notify, false)) // True if we are first, false if not -> then we
// get added to epoll by the predecessor
{
int actually_send;
if (do_async_send(fd, buff, buff_len, &actually_send)) {
network_serialzed_remove_first(fd, false);
return actually_send;
}
if (actually_send == buff_len) {
network_serialzed_remove_first(fd, false);
return actually_send;
}
if (actually_send != -1) {
notify.curr_offset = actually_send;
}
network_add_epoll(&notify); // Did not finish yet, add to epoll; we are first
}
} else {
network_add_epoll(&notify); // Not serialized and it wasn't successful before
}
notify.sem->wait();
if (notify.aio_errno) {
errno = notify.aio_errno;
return -1;
}
return notify.curr_offset;
}
#endif
#ifdef EMPER_ASYNC_DISK_IO
static std::mutex io_uring_mutex;
static std::vector<aio_handle *> overflow_queue;
static std::mutex overflow_queue_mutex;
int io::async_read_file(int fd, unsigned char *buff, size_t count, off_t offset) {
aio_handle notify;
notify.op = aio_op::uring_read;
notify.sem.reset(new BinaryPrivateSemaphore());
struct iovec iov;
iov.iov_base = buff;
iov.iov_len = count;
bool successfull = true;
{
std::unique_lock<std::mutex> lock(io_uring_mutex);
struct io_uring_sqe *sqe;
sqe = io_uring_get_sqe(&ring);
if (!sqe)
successfull = false;
else {
io_uring_prep_readv(sqe, fd, &iov, 1, offset);
io_uring_sqe_set_data(sqe, &notify);
io_uring_submit(&ring);
}
}
if (!successfull) {
notify.fd = fd;
notify.iov = &iov;
notify.liburing_offset = offset;
std::unique_lock<std::mutex> lock(overflow_queue_mutex);
overflow_queue.emplace_back(&notify);
}
log.logD("Waiting for file read...");
notify.sem->wait();
return notify.return_value;
}
int io::async_write_file(int fd, const unsigned char *buff, size_t count, off_t offset) {
aio_handle notify;
notify.op = aio_op::uring_write;
notify.sem.reset(new BinaryPrivateSemaphore());
struct iovec iov;
iov.iov_base = const_cast<unsigned char *>(buff);
iov.iov_len = count;
bool successfull = true;
{
std::unique_lock<std::mutex> lock(io_uring_mutex);
struct io_uring_sqe *sqe;
sqe = io_uring_get_sqe(&ring);
if (!sqe)
successfull = false;
else {
io_uring_prep_writev(sqe, fd, &iov, 1, offset);
io_uring_sqe_set_data(sqe, &notify);
io_uring_submit(&ring);
}
}
if (!successfull) {
notify.fd = fd;
notify.iov = &iov;
notify.liburing_offset = offset;
std::unique_lock<std::mutex> lock(overflow_queue_mutex);
overflow_queue.emplace_back(&notify);
}
log.logD("Waiting for file write...");
notify.sem->wait();
return notify.return_value;
}
#endif
void setup_epoll() {
if (epoll_thread != nullptr)
return;
epoll_fd = epoll_create1(0);
done_fd = eventfd(0, 0);
epoll_event event;
event.events = EPOLLIN;
event.data.ptr = &done_handle;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, done_fd, &event) != 0)
perror("epoll_ctl");
done = false;
// epoll_thread.reset(new std::thread(run_epoll_thread));
}
void setup_io_uring() {
#ifdef EMPER_ASYNC_DISK_IO
auto ret = io_uring_queue_init(URING_ENTRIES, &ring, 0);
if (ret < 0) {
perror("io_uring_queue_init");
}
ring_handle.fd = ring.ring_fd;
log.logD("Ring fd is " + std::to_string(ring.ring_fd));
epoll_event event;
event.events = EPOLLIN;
event.data.ptr = &ring_handle;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, ring.ring_fd, &event) != 0)
perror("io_uring epoll_ctl");
#endif
}
void handle_aio_event(aio_handle *notify, int result = -1) {
bool wakeup = false;
int error = 0;
int return_value = 0;
if (result != -1)
return_value = result;
switch (notify->op) {
case aio_op::recv: {
int actually_read =
recv(notify->fd, notify->buff + notify->curr_offset,
notify->buff_len - notify->curr_offset, MSG_DONTWAIT | MSG_NOSIGNAL);
if (actually_read != -1)
notify->curr_offset += actually_read;
// Wake up if only one read was requested or the buffer is full or there was an error or EOF
if (notify->read_once || notify->curr_offset == notify->buff_len || actually_read == -1 ||
actually_read == 0)
wakeup = true;
if (actually_read == -1)
error = errno;
} break;
case aio_op::send: {
int actually_written =
send(notify->fd, notify->buff + notify->curr_offset,
notify->buff_len - notify->curr_offset, MSG_DONTWAIT | MSG_NOSIGNAL);
if (actually_written != -1)
notify->curr_offset += actually_written;
// Wake up if the buffer is full or there was an error
if (notify->curr_offset == notify->buff_len || actually_written == -1)
wakeup = true;
if (actually_written == -1)
error = errno;
} break;
case aio_op::connect: {
int result;
socklen_t result_len = sizeof(result);
int getsockopt_result = getsockopt(notify->fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
if (getsockopt_result < 0) {
error = getsockopt_result;
} else if (result != 0) {
error = result;
}
wakeup = true;
} break;
case aio_op::accept: {
auto fd = accept(notify->fd, notify->sockaddress, &notify->socklen);
if (fd < -1)
return_value = error = errno;
else
return_value = fd;
wakeup = true;
} break;
case aio_op::uring_read:
case aio_op::uring_write: {
wakeup = true;
} break;
}
if (wakeup) {
notify->aio_errno = error;
notify->return_value = return_value;
if (notify->op != aio_op::uring_read && notify->op != aio_op::uring_write) {
if (notify->serialized) {
network_serialzed_remove_first(notify->fd, true);
} else {
network_remove_epoll(notify->fd);
}
}
notify->sem->signal();
}
}
void io::run_epoll_thread() {
setup_epoll();
setup_io_uring();
while (!done) {
const int max_events = 1024;
epoll_event events[max_events];
log.logD("Polling for events...");
int event_count = epoll_wait(epoll_fd, events, max_events, -1);
if (event_count == -1)
perror("epoll_wait");
log.logD(std::to_string(event_count) + " ready events");
for (int i = 0; i < event_count; i++) {
epoll_event *my_event = &events[i];
aio_handle *notify = reinterpret_cast<aio_handle *>(my_event->data.ptr);
if (notify == nullptr) {
printf("\n????????? Panic\n");
continue;
} else if (notify->sem == nullptr) {
if (notify->fd == done_fd) // Our signal to end
break;
#ifdef EMPER_ASYNC_DISK_IO
else if (notify->fd == ring.ring_fd) // File IO Completion
{
io_uring_cqe *cqe;
while (io_uring_peek_cqe(&ring, &cqe) == 0 && cqe != nullptr) {
auto notify_ptr =
reinterpret_cast<aio_handle *>(io_uring_cqe_get_data(cqe));
handle_aio_event(notify_ptr, cqe->res);
io_uring_cqe_seen(&ring, cqe);
}
std::unique_lock<std::mutex> lock(overflow_queue_mutex);
while (!overflow_queue.empty()) {
auto handle = overflow_queue.back();
struct io_uring_sqe *sqe;
sqe = io_uring_get_sqe(&ring);
if (!sqe)
break;
else if (handle->op == aio_op::uring_read)
io_uring_prep_readv(sqe, handle->fd, handle->iov, 1,
handle->liburing_offset);
else // if (handle->op == aio_op::uring_write)
io_uring_prep_writev(sqe, handle->fd, handle->iov, 1,
handle->liburing_offset);
overflow_queue.pop_back();
}
}
#endif
}
#ifdef EMPER_ASYNC_NETWORK
else { // "Normal" epoll network event
handle_aio_event(notify);
}
#endif
}
}
}
/*
void stop_epoll_thread()
{
if (epoll_thread == nullptr)
return;
done = true;
uint64_t something = 1;
write(done_fd, &something, sizeof(something));
epoll_thread->join();
epoll_thread.reset();
}*/
#endif
#pragma once
#include "emper-config.h"
#include <sys/socket.h>
namespace io {
/**
* @param fd filedescriptor for this operation
* @param buff destination buffer
* @param buff_len length of that buffer
* @param read_once default is true. If true it will read only once. Else it will return only when the buff is filled
*/
int async_recv(int fd, unsigned char* buff, int buff_len, bool read_once = true, bool fd_serialized = true);
/**
* @param fd filedescriptor for this operation
* @param buff source buffer
* @param buff_len length of that buffer
*/
int async_send(int fd, unsigned char* buff, int buff_len, bool fd_serialized = true);
/**
* @brief Same as normal connect
*/
int async_connect(int fd, sockaddr* sockaddress, socklen_t len, bool fd_serialized = false);
/**
* @brief Same as normal accept
*/
int async_accept(int fd, sockaddr* sockaddress, socklen_t len, bool fd_serialized = false);
/**
* @param fd filedescriptor to the regular file to be read from
* @param buff destination buffer
* @param count amount of bytes to read
* @param offset offset in the file
*/
int async_read_file(int fd, unsigned char* buff, size_t count, off_t offset = 0);
/**
* @param fd filedescriptor to the regular file to be written to
* @param buff source buffer
* @param count amount of bytes to write
* @param offset offset in the file
*/
int async_write_file(int fd, const unsigned char* buff, size_t count, off_t offset = 0);
#ifdef EMPER_ASYNC_LIB
// Internal, move to somewhere else
void run_epoll_thread();
#endif
}
......@@ -45,6 +45,7 @@ enum class LogSubsystem {
SCHED,
RUNTI,
U_B_MPSC_Q,
AsyncTcp,
};
enum LogLevel {
......@@ -69,6 +70,7 @@ static const std::map<LogSubsystem, LogLevel> LOG_CONFIG = {
{ LogSubsystem::SCHED, ALL },
{ LogSubsystem::RUNTI, ALL },
{ LogSubsystem::U_B_MPSC_Q, ALL },
{ LogSubsystem::AsyncTcp, ALL },
};
template <LogSubsystem logSubsystem>
......@@ -93,6 +95,8 @@ private:
return "RUNTI";
case LogSubsystem::U_B_MPSC_Q:
return "UBSCQ";
case LogSubsystem::AsyncTcp:
return "AIO ";
default:
return "UNKNOWN SUBSYSTEM (Add it *now*)";
}
......@@ -108,7 +112,7 @@ private:
}
}
protected:
public:
inline void log(LogLevel level, const std::string& string) const {
#ifdef NDEBUG
......
......@@ -7,7 +7,7 @@
#include "Fiber.hpp"
class PrivateSemaphore : protected Logger<LogSubsystem::PS> {
protected:
public:
Runtime& runtime;
ContextManager& contextManager;
......
......@@ -15,6 +15,10 @@
#include "Common.hpp"
#include "Debug.hpp"
#ifdef EMPER_ASYNC_LIB
#include "AsyncLibrary.hpp"
#endif
std::mutex Runtime::currentRuntimeMutex;
Runtime* Runtime::currentRuntime;
thread_local unsigned int Runtime::seed;
......@@ -117,6 +121,9 @@ Fiber* Runtime::nextFiber() {
}
void Runtime::waitUntilFinished() {
#ifdef EMPER_ASYNC_LIB
io::run_epoll_thread();
#endif
for (workerid_t i = 0; i < workerCount; ++i) {
pthread_join(threads[i], nullptr);
}
......
#include "emper.h"
#include "emper-config.h"
#include "Runtime.hpp"
#include "Fiber.hpp"
#include "BinaryPrivateSemaphore.hpp"
#include "CountingPrivateSemaphore.hpp"
#ifdef EMPER_ASYNC_LIB
#include "AsyncLibrary.hpp"
#endif
runtime* init_runtime(void) {
Runtime* r = new Runtime();
return reinterpret_cast<runtime*>(r);
......@@ -96,3 +101,39 @@ void wait_cps(cps* sem) {
countingPrivateSemaphore->wait();
}
#ifdef EMPER_ASYNC_NETWORK
int async_recv(int fd, void* buff, int buff_len, int flags) {
bool read_once = (flags & READ_FULL_BUFFER) == 0;
bool fd_serialized = (flags & UNSERIALIZED) == 0;
return io::async_recv(fd, (unsigned char*)buff, buff_len, read_once, fd_serialized);
}
int async_send(int fd, const void* buff, int buff_len, int flags) {
bool fd_serialized = (flags & UNSERIALIZED) == 0;
return io::async_send(fd, (unsigned char*)buff, buff_len, fd_serialized);
}
int async_connect(int fd, struct sockaddr* sockaddress, size_t len, int flags) {
bool fd_serialized = (flags & UNSERIALIZED) == 0;
return io::async_connect(fd, sockaddress, len, fd_serialized);
}
int async_accept(int fd, struct sockaddr* sockaddress, size_t len, int flags) {
bool fd_serialized = (flags & UNSERIALIZED) == 0;
return io::async_accept(fd, sockaddress, len, fd_serialized);
}
#endif
#ifdef EMPER_ASYNC_DISK_IO
int async_read_file(int fd, void* buff, size_t count, off_t offset) {
return io::async_read_file(fd, buff, count, offset);
}
int async_write_file(int fd, const void* buff, size_t count, off_t offset) {
return io::async_write_file(fd, buff, count, offset);
}
#endif
#pragma once
#include "emper-common.h"
#include "emper-config.h"
#include "stddef.h"
#ifdef EMPER_ASYNC_NETWORK
#include "sys/socket.h"
#endif
typedef struct runtime runtime;
......@@ -56,6 +62,27 @@ extern "C" {
void wait_cps(cps* sem);
#ifdef EMPER_ASYNC_NETWORK
enum EMPER_ASYNC_NETWORK_FLAGS {
READ_FULL_BUFFER = 1,
UNSERIALIZED = 2,
};
int async_recv(int fd, void* buff, int buff_len, int flags);
int async_send(int fd, const void* buff, int buff_len, int flags);
int async_connect(int fd, struct sockaddr* sockaddress, size_t len, int flags);
int async_accept(int fd, struct sockaddr* sockaddress, size_t len, int flags);
#endif
#ifdef EMPER_ASYNC_DISK_IO
int async_read_file(int fd, void* buff, size_t count, off_t offset);
int async_write_file(int fd, const void* buff, size_t count, off_t offset);
#endif
#ifdef __cplusplus
}
#endif
......@@ -25,6 +25,7 @@ emper_cpp_sources = [
'BinaryPrivateSemaphore.cpp',
'CountingPrivateSemaphore.cpp',
'Semaphore.cpp',
'AsyncLibrary.cpp',
]
emper_generated_files = []
......
......@@ -20,6 +20,9 @@ conf_data.set('EMPER_LOCKED_WS_QUEUE', get_option('locked_ws_queue'))
conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue'))
conf_data.set('EMPER_LOCKED_MPSC_QUEUE', get_option('locked_mpsc_queue'))
conf_data.set('EMPER_STATS', get_option('stats'))
conf_data.set('EMPER_ASYNC_NETWORK', get_option('async_network'))
conf_data.set('EMPER_ASYNC_DISK_IO', get_option('async_disk_io'))
conf_data.set('EMPER_ASYNC_LIB', get_option('async_network') or get_option('async_disk_io'))
subdir('emper')
subdir('tests')
......
......@@ -28,3 +28,15 @@ option(
value: false,
description: 'Collect stats and print them at the end of the execution'
)
option(
'async_network',
type: 'boolean',
value: false,
description: 'Add support for asynchronous network IO'
)
option(
'async_disk_io',
type: 'boolean',
value: false,
description: 'Add support for asynchronous disk IO'
)
#include <cstdlib>
#include <iostream>
#include <list>
#include <string>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <unistd.h>
#include <fstream>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <linux/sockios.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "BinaryPrivateSemaphore.hpp"
#include "Common.hpp"
#include "CountingPrivateSemaphore.hpp"
#include "Debug.hpp"
#include "PrivateSemaphore.hpp"
#include "Runtime.hpp"
#include "emper.hpp"
#include "AsyncLibrary.hpp"
using namespace io;
#define MAX 1024
void func()
{
int sockfd;
struct sockaddr_in servaddr;
// socket create and varification
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
printf("socket creation failed...\n");
exit(0);
}
bzero(&servaddr, sizeof(servaddr));
// assign IP, PORT
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = inet_addr("0.0.0.0");
servaddr.sin_port = htons(4242);
int reuseaddr=1;
if (setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr,sizeof(reuseaddr))==-1)
{
die("%s",strerror(errno));
}
if (bind(sockfd, (sockaddr*) &servaddr , sizeof(servaddr))==-1)
{
die("%s",strerror(errno));
}
if (listen(sockfd, 1) != 0)
perror("listen");
auto client_fd = async_accept(sockfd, (sockaddr*)&servaddr, sizeof(servaddr));
if (client_fd < 0) {
printf("accept failed with code %d...\n", errno);
exit(EXIT_FAILURE);
}
char read_buff[MAX];
char file_write_buff[MAX];
char file_read_buff[MAX];
char write_buff[MAX];
for (;;) {
int read = async_recv(client_fd, (unsigned char*) read_buff, sizeof(read_buff));
//int read = async_read(sockfd, (unsigned char*) read_buff, 10, false);
if (read == -1)
{
perror("wait_read");
break;
}
read_buff[read] = '\0';
snprintf(file_write_buff, MAX, "Echo: %s", read_buff);
int file_fd = open("/home/burak/test.txt", O_WRONLY, O_CREAT | O_TRUNC);
if (file_fd == -1)
perror("open for write");
async_write_file(file_fd, (unsigned char*) file_write_buff, strlen(file_write_buff), 0);
close(file_fd);
file_fd = open("/home/burak/test.txt", O_RDONLY);
if (file_fd == -1)
perror("open for write");
async_read_file(file_fd, (unsigned char*) file_read_buff, strlen(file_write_buff), 0);
close(file_fd);
file_read_buff[strlen(file_write_buff)] = '\0';
snprintf(write_buff, MAX, "File: %s", file_read_buff);
int write = async_send(client_fd, (unsigned char*) write_buff, strlen(write_buff));
if (write == -1)
{
perror("wait_write");
break;
}
if ((strncmp(read_buff, "exit", 4)) == 0) {
break;
}
}
exit(EXIT_SUCCESS);
}
int main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) {
Runtime runtime(1);
Fiber* fibFiber = Fiber::from([=]{func();});
runtime.schedule(*fibFiber);
runtime.waitUntilFinished();
return 0;
}
#include <arpa/inet.h>
#include <atomic>
#include <condition_variable>
#include <cstdlib>
#include <fstream>
#include <iostream>
#include <linux/sockios.h>
#include <list>
#include <mutex>
#include <netdb.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <thread>
#include <unistd.h>
#include <vector>
#include "AsyncLibrary.hpp"
#include "BinaryPrivateSemaphore.hpp"
#include "Common.hpp"
#include "CountingPrivateSemaphore.hpp"
#include "Debug.hpp"
#include "PrivateSemaphore.hpp"
#include "Runtime.hpp"
#include "emper.hpp"
using namespace io;
#define PORT 4242
#define MAX 1024
static void server_func() {
int sockfd;
struct sockaddr_in servaddr;
// socket create and varification
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
printf("server socket creation failed...\n");
exit(0);
}
bzero(&servaddr, sizeof(servaddr));
// assign IP, PORT
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
servaddr.sin_port = htons(PORT);
int reuseaddr = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1) {
die("%s", strerror(errno));
}
if (bind(sockfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
die("%s", strerror(errno));
}
if (listen(sockfd, 1) != 0) {
perror("listen");
}
auto client_fd = async_accept(sockfd, (sockaddr *)&servaddr, sizeof(servaddr));
if (client_fd < 0) {
printf("accept failed with code %d...\n", errno);
exit(EXIT_FAILURE);
}
// For testing purposes
sockfd = client_fd;
char read_buff[MAX];
char write_buff[MAX];
for (;;) {
int read = async_recv(sockfd, (unsigned char *)read_buff, sizeof(read_buff));
if (read == -1) {
perror("wait_read");
break;
}
read_buff[read] = '\0';
snprintf(write_buff, MAX, "%s", read_buff);
int write = async_send(sockfd, (unsigned char *)write_buff, strlen(write_buff));
if (write == -1) {
perror("wait_write");
break;
}
if ((strncmp(read_buff, "exit", 4)) == 0) {
break;
}
}
exit(EXIT_SUCCESS);
}
static void client_func() {
int sockfd;
struct sockaddr_in servaddr;
// socket create and varification
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
printf("server socket creation failed...\n");
exit(0);
}
// assign IP, PORT
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
servaddr.sin_port = htons(PORT);
if (async_connect(sockfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
printf("connect failed with code %d...\n", errno);
exit(EXIT_FAILURE);
}
std::vector<std::string> strings {"foo", "bar"};
char read_buff[MAX];
char write_buff[MAX];
for (auto s: strings) {
snprintf(write_buff, MAX, "%s", s.c_str());
int bytes_send = async_send(sockfd, (unsigned char*)write_buff, strlen(write_buff));
if (bytes_send == -1) {
perror("client write failed");
exit(EXIT_FAILURE);
}
int bytes_read = async_recv(sockfd, (unsigned char*)read_buff, sizeof(read_buff));
if (bytes_read == -1) {
perror("client read failed");
exit(EXIT_FAILURE);
}
if (strncmp(write_buff, read_buff, s.length()) != 0) {
fprintf(stderr, "unexpected string received from server\n");
exit(EXIT_FAILURE);
}
}
snprintf(write_buff, MAX, "exit");
int bytes_send = async_send(sockfd, (unsigned char*) write_buff, strlen(write_buff));
if (bytes_send == -1) {
perror("client write failed");
exit(EXIT_FAILURE);
}
shutdown(sockfd, SHUT_RDWR);
}
int main(UNUSED_ARG int argc, UNUSED_ARG char *argv[]) {
Runtime runtime(1);
Fiber* server = Fiber::from([=]{server_func();});
runtime.schedule(*server);
Fiber* client = Fiber::from([=]{client_func();});
runtime.schedule(*client);
runtime.waitUntilFinished();
return EXIT_SUCCESS;
}
......@@ -29,12 +29,36 @@ tests = {
{
'description': 'Simple LAWS scheduling strategy test',
},
'SimpleNetworkTest.cpp':
{
'feature_flags': ['async_network'],
'description': 'Simple async network test',
},
'SimpleDiskAndNetworkTest.cpp':
{
'feature_flags': ['async_network', 'async_disk_io'],
'description': 'Simple LAWS scheduling strategy test',
},
}
undef_ndebug = '-UNDEBUG'
test_dep = [thread_dep]
foreach source, test_dict : tests
# check feature flags
if test_dict.has_key('feature_flags')
include_test = true
foreach flag : test_dict.get('feature_flags')
if not get_option(flag)
include_test = false
break
endif
endforeach
if not include_test
continue
endif
endif
# TODO: Use meson fs (filesystem) module once meson >= 0.53 is in
# buster-backports, instead of split('.')[0]
# test_name = fs.replace_suffix(source, '')
......