diff --git a/emper/AsyncLibrary.cpp b/emper/AsyncLibrary.cpp new file mode 100644 index 0000000000000000000000000000000000000000..15f9ac32e0b1f939bada8f43d44519155ca20870 --- /dev/null +++ b/emper/AsyncLibrary.cpp @@ -0,0 +1,750 @@ +#include "AsyncLibrary.hpp" +#include "BinaryPrivateSemaphore.hpp" +#include "ContextManager.hpp" + +#ifdef EMPER_ASYNC_LIB +#include <memory> +#include <thread> +#include <mutex> +#include <condition_variable> +#include <unistd.h> +#include <sys/epoll.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/eventfd.h> +#include <unistd.h> +#include <fcntl.h> + +#ifdef EMPER_ASYNC_DISK_IO +#include <liburing.h> +#endif + +class AsyncNetworkLogger : public Logger<LogSubsystem::AsyncTcp> +{ +}; + +static AsyncNetworkLogger log; + +enum class aio_op +{ + send, + recv, + connect, + accept, + uring_read, + uring_write +}; +struct aio_handle +{ + std::unique_ptr<BPS> sem; + + aio_op op; + int aio_errno = 0; + int fd; + unsigned char* buff; + int buff_len; + int curr_offset; + unsigned int epoll_events; + bool serialized; + + struct iovec *iov; + size_t liburing_offset; + + sockaddr* sockaddress; + socklen_t socklen; + + bool read_once; + + int return_value; +}; + + +static int epoll_fd; +static std::unique_ptr<std::thread> epoll_thread; +static bool done; +static int done_fd; +static aio_handle done_handle; + + +#ifdef EMPER_ASYNC_DISK_IO +#define URING_ENTRIES 512 +static struct io_uring ring; +static aio_handle ring_handle; + +static std::vector<aio_handle*> wait_queue; +static std::mutex wait_mutex; +static std::condition_variable wait_cv; +#endif + + +#ifdef EMPER_ASYNC_NETWORK + +static std::map<int, std::vector<aio_handle*>> serialization_list; +static std::mutex serialization_list_mutex; + +void network_add_epoll(aio_handle* handle) +{ + epoll_event event; + event.events = handle->epoll_events; + event.data.ptr = handle; + + if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handle->fd, &event) != 0) + perror("epoll_ctl"); +} + +void network_remove_epoll(int fd) +{ + if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr) != 0) { + perror("epoll_ctl"); + } +} + +bool network_serialized_add(aio_handle *handle, bool use_epoll = true) +{ + bool first = false; + { + std::unique_lock<std::mutex> lock(serialization_list_mutex); + + auto& list = serialization_list[handle->fd]; + if (list.size() == 0) // I'm currently the only one interested in this fd + first = true; + list.push_back(handle); + } + + if (first) + { + if (!use_epoll) + return true; + else + network_add_epoll(handle); + } + return false; +} + +void network_serialzed_remove_first(int fd, bool use_epoll = true) +{ + std::unique_lock<std::mutex> lock(serialization_list_mutex); + + aio_handle* next_notify = nullptr; + auto& list = serialization_list[fd]; + list.erase(list.begin()); + if (!list.empty()) + next_notify = list[0]; + + if (next_notify != nullptr) + { + epoll_event event; + event.events = next_notify->epoll_events; + event.data.ptr = next_notify; + + if (use_epoll) + { + if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) != 0) { + perror("epoll_ctl"); + } + } + else + { + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) != 0) { + perror("epoll_ctl"); + } + } + } + else if (use_epoll) + network_remove_epoll(fd); +} + +static bool do_async_connect(int fd, sockaddr* sockaddress, socklen_t len, int* return_value) +{ + *return_value = connect(fd, sockaddress, len); + if (*return_value == 0) + { + return true; + } + else if (*return_value < 0 && (errno != EINPROGRESS && errno != EAGAIN)) + { + *return_value = -1; + return true; + } + return false; +} +static bool do_async_accept(int fd, sockaddr* sockaddress, socklen_t len, int* return_value) +{ + *return_value = accept(fd, sockaddress, &len); + if (*return_value >= 0) + { + return true; + } + else if (*return_value < 0 && (errno != EWOULDBLOCK && errno != EAGAIN)) + { + *return_value = -1; + return true; + } + return false; +} +static bool do_async_recv(int fd, unsigned char* buff, int buff_len, int* return_value) +{ + *return_value = recv(fd, buff, buff_len, MSG_DONTWAIT | MSG_NOSIGNAL); + if(*return_value == 0) + { + return true; + } + else if (*return_value == -1 && (errno != EWOULDBLOCK && errno != EAGAIN)) + { + return true; + } + return false; +} +static bool do_async_send(int fd, unsigned char* buff, int buff_len, int* return_value) +{ + *return_value = send(fd, buff, buff_len, MSG_DONTWAIT | MSG_NOSIGNAL); + if(*return_value == 0) + { + return true; + } + else if (*return_value == -1 && (errno != EWOULDBLOCK && errno != EAGAIN)) + { + return true; + } + return false; +} + +int async_connect(int fd, sockaddr* sockaddress, socklen_t len, bool fd_serialized) +{ + + int flags = fcntl(fd, F_GETFL, 0); + if (flags == -1) + log.logE("fcntl F_GETFL failed for async_connect"); + flags = (flags | O_NONBLOCK); + if (fcntl(fd, F_SETFL, flags) != 0) { + log.logE("fcntl F_SETFL failed for async_connect"); + } + + if (!fd_serialized) + { + int connect_result; + if (do_async_connect(fd, sockaddress, len, &connect_result)) + { + return connect_result; + } + } + + aio_handle notify; + notify.fd = fd; + notify.sockaddress = sockaddress; + notify.socklen = len; + notify.op = aio_op::connect; + notify.epoll_events = EPOLLOUT; + notify.serialized = fd_serialized; + notify.sem.reset(new BinaryPrivateSemaphore()); + + if (fd_serialized) + { + if (network_serialized_add(¬ify, false)) // True if we are first, false if not -> then we get added to epoll by the predecessor + { + int connect_result; + if (do_async_connect(fd, sockaddress, len, &connect_result)) + { + network_serialzed_remove_first(fd, false); + return connect_result; + } + network_add_epoll(¬ify); // Did not finish yet, add to epoll; we are first + } + } + else + { + network_add_epoll(¬ify); // Not serialized and it wasn't successful before + } + + notify.sem->wait(); + + if (notify.aio_errno) + { + errno = notify.aio_errno; + return -1; + } + return 0; +} + +int async_accept(int fd, sockaddr* sockaddress, socklen_t len, bool fd_serialized) +{ + + int flags = fcntl(fd, F_GETFL, 0); + if (flags == -1) + log.logE("fcntl F_GETFL failed for async_accept"); + flags = (flags | O_NONBLOCK); + if (fcntl(fd, F_SETFL, flags) != 0) + log.logE("fcntl F_SETFL failed for async_accept"); + + if (!fd_serialized) + { + int accept_result; + if (do_async_accept(fd, sockaddress, len, &accept_result)) + { + return accept_result; + } + } + + aio_handle notify; + notify.fd = fd; + notify.sockaddress = sockaddress; + notify.socklen = len; + notify.op = aio_op::accept; + notify.epoll_events = EPOLLIN; + notify.serialized = fd_serialized; + notify.sem.reset(new BinaryPrivateSemaphore()); + + if (fd_serialized) + { + if (network_serialized_add(¬ify, false)) // True if we are first, false if not -> then we get added to epoll by the predecessor + { + int accept_result; + if (do_async_accept(fd, sockaddress, len, &accept_result)) + { + network_serialzed_remove_first(fd, false); + return accept_result; + } + network_add_epoll(¬ify); // Did not finish yet, add to epoll; we are first + } + } + else + { + network_add_epoll(¬ify); // Not serialized and it wasn't successful before + } + + + notify.sem->wait(); + + return notify.return_value; +} + +int async_recv(int fd, unsigned char* buff, int buff_len, bool read_once, bool fd_serialized) +{ + + size_t offset = 0; + if (!fd_serialized) + { + int actually_read; + if (do_async_recv(fd, buff, buff_len, &actually_read)) + { + return actually_read; + } + if (actually_read > 0 && (read_once || actually_read == buff_len)) + { + return actually_read; + } + + if (actually_read != -1) + offset = actually_read; + } + + aio_handle notify; + notify.fd = fd; + notify.buff = buff; + notify.buff_len = buff_len; + notify.read_once = read_once; + notify.curr_offset = offset; + notify.op = aio_op::recv; + notify.epoll_events = EPOLLIN; + notify.serialized = fd_serialized; + notify.sem.reset(new BinaryPrivateSemaphore()); + + if (fd_serialized) + { + if (network_serialized_add(¬ify, false)) // True if we are first, false if not -> then we get added to epoll by the predecessor + { + int actually_read; + if (do_async_recv(fd, buff, buff_len, &actually_read)) + { + network_serialzed_remove_first(fd, false); + return actually_read; + } + if (actually_read > 0 && (read_once || actually_read == buff_len)) + { + network_serialzed_remove_first(fd, false); + return actually_read; + } + + if (actually_read != -1) + { + offset = actually_read; + notify.curr_offset = actually_read; + } + network_add_epoll(¬ify); // Did not finish yet, add to epoll; we are first + } + } + else + { + network_add_epoll(¬ify); // Not serialized and it wasn't successful before + } + + notify.sem->wait(); + + if (notify.aio_errno) + { + errno = notify.aio_errno; + return -1; + } + return notify.curr_offset; +} + +int async_send(int fd, unsigned char* buff, int buff_len, bool fd_serialized) +{ + + size_t offset = 0; + if (!fd_serialized) + { + int actually_send; + if (do_async_send(fd, buff, buff_len, &actually_send)) + { + return actually_send; + } + if (actually_send == buff_len) + { + return actually_send; + } + + if (actually_send != -1) + offset = actually_send; + } + + aio_handle notify; + notify.fd = fd; + notify.buff = buff; + notify.buff_len = buff_len; + notify.curr_offset = offset; + notify.op = aio_op::send; + notify.epoll_events = EPOLLOUT; + notify.serialized = fd_serialized; + notify.sem.reset(new BinaryPrivateSemaphore()); + + if (fd_serialized) + { + if (network_serialized_add(¬ify, false)) // True if we are first, false if not -> then we get added to epoll by the predecessor + { + int actually_send; + if (do_async_send(fd, buff, buff_len, &actually_send)) + { + network_serialzed_remove_first(fd, false); + return actually_send; + } + if (actually_send == buff_len) + { + network_serialzed_remove_first(fd, false); + return actually_send; + } + + if (actually_send != -1) + { + offset = actually_send; + notify.curr_offset = actually_send; + } + network_add_epoll(¬ify); // Did not finish yet, add to epoll; we are first + } + } + else + { + network_add_epoll(¬ify); // Not serialized and it wasn't successful before + } + + notify.sem->wait(); + + if (notify.aio_errno) + { + errno = notify.aio_errno; + return -1; + } + return notify.curr_offset; +} +#endif + +#ifdef EMPER_ASYNC_DISK_IO +static std::mutex io_uring_mutex; +static std::vector<aio_handle*> overflow_queue; +static std::mutex overflow_queue_mutex; + +int async_read_file(int fd, unsigned char* buff, size_t count, off_t offset) { + + aio_handle notify; + notify.op = aio_op::uring_read; + notify.sem.reset(new BinaryPrivateSemaphore()); + + struct iovec iov; + iov.iov_base = buff; + iov.iov_len = count; + + bool successfull = true; + { + std::unique_lock<std::mutex> lock(io_uring_mutex); + struct io_uring_sqe *sqe; + sqe = io_uring_get_sqe(&ring); + if (!sqe) + successfull = false; + else + { + io_uring_prep_readv(sqe, fd, &iov, 1, offset); + io_uring_sqe_set_data(sqe, ¬ify); + io_uring_submit(&ring); + } + } + + if (!successfull) + { + notify.fd = fd; + notify.iov = &iov; + notify.liburing_offset = offset; + std::unique_lock<std::mutex> lock(overflow_queue_mutex); + overflow_queue.emplace_back(¬ify); + } + + log.logD("Waiting for file read..."); + + notify.sem->wait(); + + return notify.return_value; +} + +int async_write_file(int fd, const unsigned char* buff, size_t count, off_t offset) { + + aio_handle notify; + notify.op = aio_op::uring_write; + notify.sem.reset(new BinaryPrivateSemaphore()); + + struct iovec iov; + iov.iov_base = const_cast<unsigned char*>(buff); + iov.iov_len = count; + + bool successfull = true; + { + std::unique_lock<std::mutex> lock(io_uring_mutex); + struct io_uring_sqe *sqe; + sqe = io_uring_get_sqe(&ring); + if (!sqe) + successfull = false; + else + { + io_uring_prep_writev(sqe, fd, &iov, 1, offset); + io_uring_sqe_set_data(sqe, ¬ify); + io_uring_submit(&ring); + } + } + + if (!successfull) + { + notify.fd = fd; + notify.iov = &iov; + notify.liburing_offset = offset; + std::unique_lock<std::mutex> lock(overflow_queue_mutex); + overflow_queue.emplace_back(¬ify); + } + + log.logD("Waiting for file write..."); + + notify.sem->wait(); + + return notify.return_value; +} +#endif + +void setup_epoll() +{ + if (epoll_thread != nullptr) + return; + epoll_fd = epoll_create1(0); + done_fd = eventfd(0, 0); + + epoll_event event; + event.events = EPOLLIN; + event.data.ptr = &done_handle; + if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, done_fd, &event) != 0) + perror("epoll_ctl"); + + done = false; + //epoll_thread.reset(new std::thread(run_epoll_thread)); +} + +void setup_io_uring() +{ +#ifdef EMPER_ASYNC_DISK_IO + auto ret = io_uring_queue_init(URING_ENTRIES, &ring, 0); + if (ret < 0) { + perror("io_uring_queue_init"); + } + + ring_handle.fd = ring.ring_fd; + log.logD("Ring fd is " + std::to_string(ring.ring_fd)); + + epoll_event event; + event.events = EPOLLIN; + event.data.ptr = &ring_handle; + + if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, ring.ring_fd, &event) != 0) + perror("io_uring epoll_ctl"); +#endif +} + +void handle_aio_event(aio_handle *notify, int result = -1) { + bool wakeup = false; + int error = 0; + int return_value = 0; + + if (result != -1) + return_value = result; + + switch (notify->op) { + case aio_op::recv: + { + int actually_read = recv(notify->fd, notify->buff + notify->curr_offset, notify->buff_len - notify->curr_offset, + MSG_DONTWAIT | MSG_NOSIGNAL); + if (actually_read != -1) + notify->curr_offset += actually_read; + // Wake up if only one read was requested or the buffer is full or there was an error or EOF + if (notify->read_once || notify->curr_offset == notify->buff_len || actually_read == -1 || actually_read == 0) + wakeup = true; + if (actually_read == -1) + error = errno; + }break; + case aio_op::send: + { + int actually_written = send(notify->fd, notify->buff + notify->curr_offset, notify->buff_len - notify->curr_offset, + MSG_DONTWAIT | MSG_NOSIGNAL); + if (actually_written != -1) + notify->curr_offset += actually_written; + + // Wake up if the buffer is full or there was an error + if (notify->curr_offset == notify->buff_len || actually_written == -1) + wakeup = true; + if (actually_written == -1) + error = errno; + }break; + case aio_op::connect: + { + int result; + socklen_t result_len = sizeof(result); + int getsockopt_result = getsockopt(notify->fd, SOL_SOCKET, SO_ERROR, &result, &result_len); + if (getsockopt_result < 0) { + error = getsockopt_result; + } + else if (result != 0) { + error = result; + } + wakeup = true; + }break; + case aio_op::accept: + { + auto fd = accept(notify->fd, notify->sockaddress, ¬ify->socklen); + + if (fd < -1) + return_value = error = errno; + else + return_value = fd; + wakeup = true; + }break; + case aio_op::uring_read: + case aio_op::uring_write: + { + wakeup = true; + }break; + } + + if (wakeup) + { + notify->aio_errno = error; + notify->return_value = return_value; + + if (notify->op != aio_op::uring_read && notify->op != aio_op::uring_write) + { + if (notify->serialized) + { + network_serialzed_remove_first(notify->fd, true); + } + else + { + network_remove_epoll(notify->fd); + } + } + notify->sem->signal(); + } +} + +void run_epoll_thread() +{ + setup_epoll(); + setup_io_uring(); + while(!done) + { + const int max_events = 1024; + epoll_event events[max_events]; + + log.logD("Polling for events..."); + int event_count = epoll_wait(epoll_fd, events, max_events, -1); + if (event_count == -1) + perror("epoll_wait"); + log.logD(std::to_string(event_count) + " ready events"); + for(int i = 0; i < event_count; i++) + { + epoll_event *my_event = &events[i]; + aio_handle* notify = reinterpret_cast<aio_handle*>(my_event->data.ptr); + if (notify == nullptr) + { + printf("\n????????? Panic\n"); + continue; + } + else if (notify->sem == nullptr) + { + if (notify->fd == done_fd) //Our signal to end + break; +#ifdef EMPER_ASYNC_DISK_IO + else if (notify->fd == ring.ring_fd) // File IO Completion + { + io_uring_cqe *cqe; + while (io_uring_peek_cqe(&ring, &cqe) == 0 && cqe != nullptr) + { + auto notify_ptr = reinterpret_cast<aio_handle*>(io_uring_cqe_get_data(cqe)); + handle_aio_event(notify_ptr, cqe->res); + io_uring_cqe_seen(&ring, cqe); + } + + std::unique_lock<std::mutex> lock(overflow_queue_mutex); + while(!overflow_queue.empty()) + { + auto handle = overflow_queue.back(); + + struct io_uring_sqe *sqe; + sqe = io_uring_get_sqe(&ring); + if (!sqe) + break; + else if (handle->op == aio_op::uring_read) + io_uring_prep_readv(sqe, handle->fd, handle->iov, 1, handle->liburing_offset); + else// if (handle->op == aio_op::uring_write) + io_uring_prep_writev(sqe, handle->fd, handle->iov, 1, handle->liburing_offset); + + overflow_queue.pop_back(); + } + } +#endif + } +#ifdef EMPER_ASYNC_NETWORK + else { // "Normal" epoll network event + handle_aio_event(notify); + } +#endif + } + } +} + +/* +void stop_epoll_thread() +{ + if (epoll_thread == nullptr) + return; + + done = true; + + uint64_t something = 1; + write(done_fd, &something, sizeof(something)); + + epoll_thread->join(); + epoll_thread.reset(); +}*/ +#endif diff --git a/emper/AsyncLibrary.hpp b/emper/AsyncLibrary.hpp new file mode 100644 index 0000000000000000000000000000000000000000..65152f10c1cf4eae32fbd4aafc85e8586841fb31 --- /dev/null +++ b/emper/AsyncLibrary.hpp @@ -0,0 +1,50 @@ +#pragma once + +#include "emper-config.h" + +#include <sys/socket.h> +/** + * @param fd filedescriptor for this operation + * @param buff destination buffer + * @param buff_len length of that buffer + * @param read_once default is true. If true it will read only once. Else it will return only when the buff is filled + */ +int async_recv(int fd, unsigned char* buff, int buff_len, bool read_once = true, bool fd_serialized = true); + +/** + * @param fd filedescriptor for this operation + * @param buff source buffer + * @param buff_len length of that buffer + */ +int async_send(int fd, unsigned char* buff, int buff_len, bool fd_serialized = true); + +/** + * @brief Same as normal connect + */ +int async_connect(int fd, sockaddr* sockaddress, socklen_t len, bool fd_serialized = false); +/** + * @brief Same as normal accept + */ +int async_accept(int fd, sockaddr* sockaddress, socklen_t len, bool fd_serialized = false); + +/** + * @param fd filedescriptor to the regular file to be read from + * @param buff destination buffer + * @param count amount of bytes to read + * @param offset offset in the file + */ +int async_read_file(int fd, unsigned char* buff, size_t count, off_t offset = 0); + +/** + * @param fd filedescriptor to the regular file to be written to + * @param buff source buffer + * @param count amount of bytes to write + * @param offset offset in the file + */ +int async_write_file(int fd, const unsigned char* buff, size_t count, off_t offset = 0); + + +#ifdef EMPER_ASYNC_LIB +// Internal, move to somewhere else +void run_epoll_thread(); +#endif diff --git a/emper/Debug.hpp b/emper/Debug.hpp index dca6fe1c946d2dbb33ad5c24e84ec42fe9d81752..e4c9f003cb03f67a03f2688a6d37de8a4874ab9b 100644 --- a/emper/Debug.hpp +++ b/emper/Debug.hpp @@ -45,6 +45,7 @@ enum class LogSubsystem { SCHED, RUNTI, U_B_MPSC_Q, + AsyncTcp, }; enum LogLevel { @@ -69,6 +70,7 @@ static const std::map<LogSubsystem, LogLevel> LOG_CONFIG = { { LogSubsystem::SCHED, ALL }, { LogSubsystem::RUNTI, ALL }, { LogSubsystem::U_B_MPSC_Q, ALL }, + { LogSubsystem::AsyncTcp, ALL }, }; template <LogSubsystem logSubsystem> @@ -93,6 +95,8 @@ private: return "RUNTI"; case LogSubsystem::U_B_MPSC_Q: return "UBSCQ"; + case LogSubsystem::AsyncTcp: + return "AIO "; default: return "UNKNOWN SUBSYSTEM (Add it *now*)"; } @@ -108,7 +112,7 @@ private: } } -protected: +public: inline void log(LogLevel level, const std::string& string) const { #ifdef NDEBUG diff --git a/emper/PrivateSemaphore.hpp b/emper/PrivateSemaphore.hpp index 6d9bcefe22c080b65393d9795fa13aff272a407e..5f8b4efb158a8166b35ac4fb823bdbf13b41dda0 100644 --- a/emper/PrivateSemaphore.hpp +++ b/emper/PrivateSemaphore.hpp @@ -7,7 +7,7 @@ #include "Fiber.hpp" class PrivateSemaphore : protected Logger<LogSubsystem::PS> { -protected: +public: Runtime& runtime; ContextManager& contextManager; diff --git a/emper/Runtime.cpp b/emper/Runtime.cpp index dda344bc65291845131aaac697591160830209bc..b6aab4350c6f69997b73777808b4d967faf40947 100644 --- a/emper/Runtime.cpp +++ b/emper/Runtime.cpp @@ -15,6 +15,10 @@ #include "Common.hpp" #include "Debug.hpp" +#ifdef EMPER_ASYNC_LIB +#include "AsyncLibrary.hpp" +#endif + std::mutex Runtime::currentRuntimeMutex; Runtime* Runtime::currentRuntime; thread_local unsigned int Runtime::seed; @@ -117,6 +121,9 @@ Fiber* Runtime::nextFiber() { } void Runtime::waitUntilFinished() { +#ifdef EMPER_ASYNC_LIB + run_epoll_thread(); +#endif for (workerid_t i = 0; i < workerCount; ++i) { pthread_join(threads[i], nullptr); } diff --git a/emper/meson.build b/emper/meson.build index ec79201a227800ebff0466ccb5f8e6e9dbbf9b58..2dc4c389e11289c4d0f4b324d4c46607e0869e5b 100644 --- a/emper/meson.build +++ b/emper/meson.build @@ -25,6 +25,7 @@ emper_cpp_sources = [ 'BinaryPrivateSemaphore.cpp', 'CountingPrivateSemaphore.cpp', 'Semaphore.cpp', + 'AsyncLibrary.cpp', ] emper_generated_files = [] diff --git a/meson.build b/meson.build index 0733d8e9a733df39e6f71ecc8575996b6f9a4458..2764386ca185738b7e2f14ac543b879bf76f42cc 100644 --- a/meson.build +++ b/meson.build @@ -20,6 +20,9 @@ conf_data.set('EMPER_LOCKED_WS_QUEUE', get_option('locked_ws_queue')) conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue')) conf_data.set('EMPER_LOCKED_MPSC_QUEUE', get_option('locked_mpsc_queue')) conf_data.set('EMPER_STATS', get_option('stats')) +conf_data.set('EMPER_ASYNC_NETWORK', get_option('async_network')) +conf_data.set('EMPER_ASYNC_DISK_IO', get_option('async_disk_io')) +conf_data.set('EMPER_ASYNC_LIB', get_option('async_network') or get_option('async_disk_io')) subdir('emper') subdir('tests') diff --git a/meson_options.txt b/meson_options.txt index 98a9ca1502ba1b7aa5fe3a03bd7d85a587771147..3ab92b0b0b1b3e0a1233f1d2d929fb31e01d27bb 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -28,3 +28,15 @@ option( value: false, description: 'Collect stats and print them at the end of the execution' ) +option( + 'async_network', + type: 'boolean', + value: false, + description: 'Add support for asynchronous network IO' +) +option( + 'async_disk_io', + type: 'boolean', + value: false, + description: 'Add support for asynchronous disk IO' +) diff --git a/tests/SimpleDiskAndNetworkTest.cpp b/tests/SimpleDiskAndNetworkTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..eff621b52fd73fadd9a8a93c9ac535b0a0f2abf9 --- /dev/null +++ b/tests/SimpleDiskAndNetworkTest.cpp @@ -0,0 +1,123 @@ +#include <cstdlib> +#include <iostream> +#include <list> +#include <string> +#include <atomic> +#include <mutex> +#include <condition_variable> +#include <thread> +#include <unistd.h> +#include <fstream> +#include <netdb.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <linux/sockios.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "BinaryPrivateSemaphore.hpp" +#include "Common.hpp" +#include "CountingPrivateSemaphore.hpp" +#include "Debug.hpp" +#include "PrivateSemaphore.hpp" +#include "Runtime.hpp" +#include "emper.hpp" +#include "AsyncLibrary.hpp" + + +#define MAX 1024 +void func() +{ + int sockfd; + struct sockaddr_in servaddr; + + // socket create and varification + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd == -1) { + printf("socket creation failed...\n"); + exit(0); + } + bzero(&servaddr, sizeof(servaddr)); + + // assign IP, PORT + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = inet_addr("0.0.0.0"); + servaddr.sin_port = htons(4242); + + int reuseaddr=1; + if (setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr,sizeof(reuseaddr))==-1) + { + die("%s",strerror(errno)); + } + if (bind(sockfd, (sockaddr*) &servaddr , sizeof(servaddr))==-1) + { + die("%s",strerror(errno)); + } + if (listen(sockfd, 1) != 0) + perror("listen"); + auto client_fd = async_accept(sockfd, (sockaddr*)&servaddr, sizeof(servaddr)); + if (client_fd < 0) { + printf("accept failed with code %d...\n", errno); + exit(EXIT_FAILURE); + } + + char read_buff[MAX]; + char file_write_buff[MAX]; + char file_read_buff[MAX]; + char write_buff[MAX]; + + for (;;) { + int read = async_recv(client_fd, (unsigned char*) read_buff, sizeof(read_buff)); + //int read = async_read(sockfd, (unsigned char*) read_buff, 10, false); + if (read == -1) + { + perror("wait_read"); + break; + } + read_buff[read] = '\0'; + snprintf(file_write_buff, MAX, "Echo: %s", read_buff); + + int file_fd = open("/home/burak/test.txt", O_WRONLY, O_CREAT | O_TRUNC); + if (file_fd == -1) + perror("open for write"); + async_write_file(file_fd, (unsigned char*) file_write_buff, strlen(file_write_buff), 0); + close(file_fd); + + file_fd = open("/home/burak/test.txt", O_RDONLY); + if (file_fd == -1) + perror("open for write"); + async_read_file(file_fd, (unsigned char*) file_read_buff, strlen(file_write_buff), 0); + close(file_fd); + file_read_buff[strlen(file_write_buff)] = '\0'; + + snprintf(write_buff, MAX, "File: %s", file_read_buff); + + int write = async_send(client_fd, (unsigned char*) write_buff, strlen(write_buff)); + if (write == -1) + { + perror("wait_write"); + break; + } + if ((strncmp(read_buff, "exit", 4)) == 0) { + break; + } + } + exit(EXIT_SUCCESS); +} + +int main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) { + Runtime runtime(1); + + Fiber* fibFiber = Fiber::from([=]{func();}); + + runtime.schedule(*fibFiber); + + runtime.waitUntilFinished(); + + return 0; +} diff --git a/tests/SimpleNetworkTest.cpp b/tests/SimpleNetworkTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b746c4b750ec389a09c71446e0597afd08cd7790 --- /dev/null +++ b/tests/SimpleNetworkTest.cpp @@ -0,0 +1,115 @@ +#include <cstdlib> +#include <iostream> +#include <list> +#include <string> +#include <atomic> +#include <mutex> +#include <condition_variable> +#include <thread> +#include <unistd.h> +#include <fstream> +#include <netdb.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <linux/sockios.h> + +#include "BinaryPrivateSemaphore.hpp" +#include "Common.hpp" +#include "CountingPrivateSemaphore.hpp" +#include "Debug.hpp" +#include "PrivateSemaphore.hpp" +#include "Runtime.hpp" +#include "emper.hpp" +#include "AsyncLibrary.hpp" + + +#define MAX 1024 +void func() +{ + // auto r = Runtime::getRuntime(); + int sockfd; + struct sockaddr_in servaddr; + + // socket create and varification + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd == -1) { + printf("socket creation failed...\n"); + exit(0); + } + bzero(&servaddr, sizeof(servaddr)); + + // assign IP, PORT + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); + servaddr.sin_port = htons(4242); + + /*// connect the client socket to server socket + if (async_connect(sockfd, (sockaddr*)&servaddr, sizeof(servaddr)) != 0) { + printf("connection with the server failed with code %d...\n", errno); + exit(EXIT_FAILURE); + }*/ + + int reuseaddr=1; + if (setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&reuseaddr,sizeof(reuseaddr))==-1) + { + die("%s",strerror(errno)); + } + if (bind(sockfd, (sockaddr*) &servaddr , sizeof(servaddr))==-1) + { + die("%s",strerror(errno)); + } + if (listen(sockfd, 1) != 0) + perror("listen"); + auto client_fd = async_accept(sockfd, (sockaddr*)&servaddr, sizeof(servaddr)); + if (client_fd < 0) { + printf("accept failed with code %d...\n", errno); + exit(EXIT_FAILURE); + } + // For testing purposes + sockfd = client_fd; + + + + + + char read_buff[MAX]; + char write_buff[MAX]; + for (;;) { + int read = async_recv(sockfd, (unsigned char*) read_buff, sizeof(read_buff)); + //int read = async_read(sockfd, (unsigned char*) read_buff, 10, false); + if (read == -1) + { + perror("wait_read"); + break; + } + read_buff[read] = '\0'; + snprintf(write_buff, MAX, "%s", read_buff); + int write = async_send(sockfd, (unsigned char*) write_buff, strlen(write_buff)); + if (write == -1) + { + perror("wait_write"); + break; + } + if ((strncmp(read_buff, "exit", 4)) == 0) { + break; + } + } + exit(EXIT_SUCCESS); +} + +int main(UNUSED_ARG int argc, UNUSED_ARG char* argv[]) { + Runtime runtime(1); + + Fiber* fibFiber = Fiber::from([=]{func();}); + + runtime.schedule(*fibFiber); + + runtime.waitUntilFinished(); + + return 0; +} diff --git a/tests/meson.build b/tests/meson.build index eca563f180751dea1384f3ccbf970a3ddec1abc1..fe7adbfa43c6c6ffbc369f1833c2d01a919a0858 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -29,12 +29,36 @@ tests = { { 'description': 'Simple LAWS scheduling strategy test', }, + 'SimpleNetworkTest.cpp': + { + 'feature_flags': ['async_network'], + 'description': 'Simple async network test', + }, + 'SimpleDiskAndNetworkTest.cpp': + { + 'feature_flags': ['async_network', 'async_disk_io'], + 'description': 'Simple LAWS scheduling strategy test', + }, } undef_ndebug = '-UNDEBUG' test_dep = [thread_dep] foreach source, test_dict : tests + # check feature flags + if test_dict.has_key('feature_flags') + include_test = true + foreach flag : test_dict.get('feature_flags') + if not get_option(flag) + include_test = false + break + endif + endforeach + + if not include_test + continue + endif + endif # TODO: Use meson fs (filesystem) module once meson >= 0.53 is in # buster-backports, instead of split('.')[0] # test_name = fs.replace_suffix(source, '')