Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • flow/emper
  • aj46ezos/emper
  • i4/manycore/emper
3 results
Show changes
Commits on Source (15)
Showing
with 510 additions and 31 deletions
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <sys/types.h>
#include <array>
#include <atomic>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <string>
#include <vector>
#include "Common.hpp"
#include "Runtime.hpp"
#include "io.hpp"
#include "io/Future.hpp"
#include "io/IoContext.hpp"
std::string HOST = "::";
std::string PORT = "12346";
std::atomic<unsigned> ready;
std::mutex lock;
std::vector<int> conns;
void notify() {
std::vector<emper::io::SendFuture*> futures;
std::string msg = "Go";
for (int conn : conns) {
auto* sf = new emper::io::SendFuture(conn, msg.c_str(), msg.size(), 0);
futures.push_back(sf);
}
IoContext::getIo()->submit(futures.begin(), futures.end());
for (auto* f : futures) {
int res = f->wait();
if (res < 0) DIE_MSG_ERRNO("send failed");
delete f;
}
for (int conn : conns) emper::io::closeAndForget(conn);
}
auto main(int argc, char* argv[]) -> int {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <count>" << std::endl;
exit(EXIT_FAILURE);
}
const int scount = std::stoi(argv[1]);
if (scount < 0) DIE_MSG("count must be positiv");
const auto count = static_cast<unsigned>(scount);
std::cout << "Network barrier listening on " << HOST << ":" << PORT << " for " << count
<< " connections" << std::endl;
Runtime runtime;
auto* listener = emper::io::tcp_listener(HOST, PORT, [&](int socket) {
{
std::lock_guard<std::mutex> l(lock);
conns.push_back(socket);
}
std::array<char, 16> buf;
ssize_t recv = emper::io::recvAndWait(socket, buf.data(), buf.size(), 0);
if (recv < 0) DIE_MSG_ERRNO("recv failed");
unsigned r = ready.fetch_add(1) + 1;
if (r == count) {
notify();
runtime.initiateTermination();
}
});
if (!listener) {
exit(EXIT_FAILURE);
}
runtime.scheduleFromAnywhere(*listener);
runtime.waitUntilFinished();
return EXIT_FAILURE;
}
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h> //for getaddrinfo
#include <netinet/in.h>
......@@ -9,6 +10,7 @@
#include <unistd.h>
#include <algorithm> // for find
#include <array>
#include <atomic>
#include <cerrno> // for errno, ECANCELED
#include <chrono> // for nanoseconds, duration, durat...
......@@ -34,9 +36,7 @@
#include "lib/math.hpp"
using emper::Semaphore;
using emper::io::CloseFuture;
using emper::io::RecvFuture;
using emper::io::SendFuture;
using namespace emper::io;
using emper::lib::math::RunningAverage;
......@@ -74,6 +74,22 @@ bool sendQuit = true;
std::atomic<bool> terminate = false;
struct addrinfo* server;
struct addrinfo* coordinator;
std::string COORDINATOR_PORT = "12346";
static auto getFirstAddrInfo(const std::string& host, const std::string& port) -> struct addrinfo* {
struct addrinfo* addrinfo;
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &addrinfo);
if (err) {
if (err == EAI_SYSTEM) {
DIE_MSG_ERRNO("getaddrinfo failed");
} else {
LOGE("error in getaddrinfo: " << gai_strerror(err));
exit(EXIT_FAILURE);
}
}
return addrinfo;
}
struct TimeStamps {
high_resolution_clock::time_point start;
......@@ -166,7 +182,7 @@ class Client {
void onCONNRESET() {
LOGW("Client " << id << " reconnecting");
close(sock);
closeAndForget(sock);
// reconnect to try again
connect();
}
......@@ -364,7 +380,7 @@ auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> boo
return true;
}
auto getOption(int argc, char** argv, const std::string& option) -> char* {
static auto getOption(int argc, char** argv, const std::string& option) -> char* {
char** end = argv + argc;
char** itr = std::find(argv, end, option);
......@@ -385,9 +401,40 @@ static void printUsage(char* name) {
<< "Usage: " << name
<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
" [-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]"
" [--coordinator <coordinator-host>"
<< std::endl;
}
static void printIp(struct sockaddr* addr) {
std::array<char, INET6_ADDRSTRLEN> serverAddrString;
void* serverAddr;
switch (addr->sa_family) {
case AF_INET:
serverAddr = &(((struct sockaddr_in*)addr)->sin_addr);
break;
case AF_INET6:
serverAddr = &(((struct sockaddr_in6*)addr)->sin6_addr);
break;
default:
DIE_MSG("uknown sa_family " << addr->sa_family);
}
inet_ntop(addr->sa_family, serverAddr, serverAddrString.data(), INET6_ADDRSTRLEN);
std::cout << "echo client connecting to " << serverAddrString.data() << ":" << port << std::endl;
}
static void printSettings() {
std::cout << "parameters: {termination=";
if (execution_seconds) {
std::cout << execution_seconds << "seconds";
} else {
std::cout << iterations << "iterations";
}
std::cout << ",clients=" << nclients;
std::cout << ",size=" << size;
std::cout << ",sendquit= " << (sendQuit ? "true" : "false");
std::cout << "}" << std::endl;
}
auto main(int argc, char* argv[]) -> int {
if (existsOption(argc, argv, "-h")) {
printUsage(argv[0]);
......@@ -444,7 +491,7 @@ auto main(int argc, char* argv[]) -> int {
DIE_MSG("Output file: " << output_file << " already exists");
}
out_fd = open(output_file, OPENFLAGS, OPENMODE);
out_fd = ::open(output_file, OPENFLAGS, OPENMODE);
if (out_fd < 0) {
DIE_MSG_ERRNO("opening output file failed");
}
......@@ -465,22 +512,22 @@ auto main(int argc, char* argv[]) -> int {
DIE_MSG("Histogram file: " << histogram_file << " already exists");
}
histogram_fd = open(histogram_file, OPENFLAGS, OPENMODE);
histogram_fd = ::open(histogram_file, OPENFLAGS, OPENMODE);
if (histogram_fd < 0) {
DIE_MSG_ERRNO("opening histogram file failed");
}
}
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
if (err) {
if (err == EAI_SYSTEM) {
DIE_MSG_ERRNO("getaddrinfo failed");
} else {
LOGE("error in getaddrinfo: " << gai_strerror(err));
exit(EXIT_FAILURE);
}
char* coordinator_s = getOption(argc, argv, "--coordinator");
if (coordinator_s) {
coordinator = getFirstAddrInfo(coordinator_s, COORDINATOR_PORT);
}
server = getFirstAddrInfo(host, port);
printIp(server->ai_addr);
printSettings();
Runtime runtime;
Fiber* alphaFiber = Fiber::from([&] {
......@@ -501,11 +548,34 @@ auto main(int argc, char* argv[]) -> int {
}
}
// await the clients
// await the connection of all clients
for (size_t i = 0; i < nclients; ++i) {
readySemaphore.acquire();
}
// synchronize with other peers using the coordinator
if (coordinator) {
int conn = socket(coordinator->ai_family, coordinator->ai_socktype, coordinator->ai_protocol);
if (conn < 0) DIE_MSG_ERRNO("coordinator socket creation failed");
int err = emper::io::connectAndWait(conn, coordinator->ai_addr, coordinator->ai_addrlen);
if (err) DIE_MSG_ERRNO("coordinator socket connect failed");
std::string m = "OK";
SendFuture sf(conn, m.c_str(), m.size(), 0);
std::array<char, 16> buf;
RecvFuture rf(conn, buf.data(), buf.size(), 0);
rf.setDependency(sf);
err = rf.submitAndWait();
if (err < 0) DIE_MSG("coordinator recv failed: " << strerror(-err));
closeAndForget(conn);
LOGI("Coordinator go received");
}
high_resolution_clock::time_point echoTerminationTime;
if (execution_seconds) {
iterations = SIZE_MAX;
......
......@@ -93,5 +93,5 @@ auto main(int argc, char* argv[]) -> int {
runtime.waitUntilFinished();
return EXIT_FAILURE;
return EXIT_SUCCESS;
}
walk_dir_code = '''#include <filesystem>
int main() { auto it = std::filesystem::recursive_directory_iterator("."); }
'''
does_link = cpp_compiler.links(walk_dir_code, args: '--std=c++2a', name : 'walk_dir')
if does_link
if cpp_has_fs_recursive_directory_iterator
fsearch_exe = executable(
'fsearch',
'fsearch.cpp',
......
......@@ -16,6 +16,12 @@ echoserver_exe = executable(
dependencies: emper_dep,
)
netbarrier_exe = executable(
'coordinator',
'Coordinator.cpp',
dependencies: emper_dep,
)
echoserver_callback_exe = executable(
'echoserver_callback',
'EchoServerCallback.cpp',
......
......@@ -78,6 +78,14 @@ static const bool OVERFLOW_QUEUE =
#endif
;
static const bool CHECK_ANYWHERE_QUEUE_WHILE_STEALING =
#ifdef EMPER_CHECK_ANYWHERE_QUEUE_WHILE_STEALING
true
#else
false
#endif
;
auto getFullVersion() -> std::string;
static const bool BLOCKED_CONTEXT_SET =
......@@ -129,4 +137,12 @@ static const bool SET_AFFINITY_ON_BLOCK =
#endif
;
static const bool HAS_FS_PATH =
#ifdef EMPER_HAS_FS_PATH
true
#else
false
#endif
;
} // namespace emper
......@@ -15,6 +15,7 @@
namespace fs = std::filesystem;
#include "Future.hpp" // for Futures
#include "emper-config.h"
/*
* Header defining the public POSIX-like IO interface of emper.
......@@ -65,6 +66,28 @@ inline auto recvAndWait(int socket, void *buffer, size_t length, int flags) -> s
return future.waitAndSetErrno();
}
/**
* @brief Blocking recv mimicking POSIX recv(3) with timeout
*
* This method must be called from inside the emper runtime because it uses
* the worker-local IoContext
*
* @param socket file descriptor for this operation
* @param buffer destination buffer
* @param length length of the message in bytes
* @param flags type of message reception
* @param ts Time to wait before aborting the request
* @return -1 on error, 0 when receiving from a closed socket, otherwise the received bytes
*/
inline auto recvAndTryWait(int socket, void *buffer, size_t length, int flags,
TimeoutWrapper::Timespec &ts) -> ssize_t {
RecvFuture future(socket, buffer, length, flags);
TimeoutWrapper wrapper(future, ts);
wrapper.submitAndWait();
return future.waitAndSetErrno();
}
/**
* @brief Non-blocking send mimicking POSIX send(3)
*
......@@ -110,6 +133,31 @@ inline auto sendAndWait(int socket, const void *buffer, size_t length, int flags
return future.waitAndSetErrno();
}
/**
* @brief Blocking send mimicking POSIX send(3) with timeout
*
* This method must be called from inside the emper runtime because it uses
* the worker-local IoContext
*
* @param socket file descriptor for this operation
* @param buffer source buffer
* @param length length of the message in bytes
* @param flags type of message transmission
* @param ts Time to wait before aborting the request
* @param send_all If true return to the user only if all bytes are sent or
* sending further bytes is not possible
*
* @return -1 on error, otherwise the number of sent bytes
*/
inline auto sendAndTryWait(int socket, const void *buffer, size_t length, int flags,
TimeoutWrapper::Timespec &ts, bool send_all = true) -> ssize_t {
void *mut_buf = const_cast<void *>(buffer);
SendFuture future(socket, mut_buf, length, flags, send_all);
TimeoutWrapper wrapper(future, ts);
wrapper.submitAndWait();
return future.waitAndSetErrno();
}
/**
* @brief Non-blocking connect mimicking POSIX connect(3)
*
......@@ -253,6 +301,30 @@ inline auto readFileAndWait(int fildes, void *buf, size_t nbyte, off_t offset =
return future.waitAndSetErrno();
}
/**
* @brief Blocking read for regular files mimicking POSIX recv(3) with timeout
*
* This method must be called from inside the emper runtime because it uses
* the worker-local IoContext
*
* @param fildes file descriptor to the regular file to be written to
* @param buf destination buffer
* @param nbyte amount of bytes to read
* @param offset offset in the file
* @param ts Time to wait before aborting the request
* @param read_all If true return only to the user if the buffer is completely
* filled or no further data can be read.
*
* @return -1 on error, 0 when receiving from a closed socket, otherwise the received bytes
*/
inline auto readFileAndTryWait(int fildes, void *buf, size_t nbyte, TimeoutWrapper::Timespec &ts,
off_t offset = -1, bool read_all = false) -> ssize_t {
ReadFuture future(fildes, buf, nbyte, offset, read_all);
TimeoutWrapper wrapper(future, ts);
wrapper.submitAndWait();
return future.waitAndSetErrno();
}
/**
* @brief Non-blocking write for regular files mimicking POSIX write(3)
*
......@@ -303,6 +375,34 @@ inline auto writeFileAndWait(int fildes, const void *buf, size_t nbyte, off_t of
return future.waitAndSetErrno();
}
/**
* @brief Blocking write for regular files mimicking POSIX write(3) with timeout
*
* Currently only writing to regular files is tested and supported.
* Writing to other file types could work but may result in undefined behavior.
*
* This method must be called from inside the emper runtime because it uses
* the worker-local IoContext
*
* @param fildes file descriptor to the regular file to be written to
* @param buf source buffer
* @param nbyte amount of bytes to write
* @param ts Time to wait before aborting the request
* @param offset offset in the file
* @param write_all If true return only to the user if all nbyte were written
* or no further data can be written.
*
* @return -1 on error, otherwise the number of bytes written
*/
inline auto writeFileAndTryWait(int fildes, const void *buf, size_t nbyte,
TimeoutWrapper::Timespec &ts, off_t offset = -1,
bool write_all = true) -> ssize_t {
WriteFuture future(fildes, buf, nbyte, offset, write_all);
TimeoutWrapper wrapper(future, ts);
wrapper.submitAndWait();
return future.waitAndSetErrno();
}
/**
* @brief Non-blocking writev mimicking POSIX writev(3)
*
......@@ -461,6 +561,7 @@ inline void closeAndForget(int fd) {
future.submitAndForget();
}
#ifdef EMPER_HAS_FS_PATH
/**
* @brief recursively walk a file tree in parallel
*
......@@ -499,6 +600,7 @@ inline void closeAndForget(int fd) {
void recursive_directory_walk(const fs::path &dirpath,
std::function<bool(const fs::directory_entry &dirent)> filter,
std::function<void(fs::directory_entry dirent)> fn);
#endif
/**
* @brief Fiber accepting TCP connections and scheduling handler fibers
......
......@@ -126,7 +126,7 @@ auto IoContext::submitPreparedSqesAndWait(unsigned wait_nr) -> unsigned {
template <CallerEnvironment callerEnvironment>
void IoContext::submitAndWait(Future &future, unsigned wait_nr) {
LOGD("submitting " << future << (future.dependency ? " and it's dependencies" : ""));
unsigned prepared = prepareFutureChain(future, 1);
unsigned prepared = prepareFutureChain(future);
// submit the Future to the io_uring
int submitted = submitPreparedSqesAndWait<callerEnvironment>(wait_nr);
......
......@@ -95,7 +95,7 @@ class IoContext : public Logger<LogSubsystem::IO> {
// Members useful for debugging
std::atomic<long> reqs_in_uring = 0;
// We need to keep track of prepared to sqes to invalidate them short submission
// We need to keep track of prepared sqes to invalidate them on short submission
// If a user wants to submit a chain of 5 Futures and the third is invalid
// io_uring_submit will only submit 3 of 5 prepared sqes leaving the 2 unsubmitted in
// the SQ.
......@@ -119,9 +119,12 @@ class IoContext : public Logger<LogSubsystem::IO> {
* set.
* Abort on preparing a longer list then there a places in the io_uring.
*
* @param future The future starting the Future chain to prepare
* @param chain_length Recursion parameter tracking the resulting chain length
*
* @return the number of prepared Futures
*/
auto prepareFutureChain(Future &future, unsigned chain_length) -> unsigned;
auto prepareFutureChain(Future &future, unsigned chain_length = 1) -> unsigned;
enum class PointerTags : uint16_t { Future, Callback, NewWorkAq, NewWorkWsq };
......@@ -221,6 +224,24 @@ class IoContext : public Logger<LogSubsystem::IO> {
submitAndWait(future, 0);
}
/**
* @brief Submit a future for asynchronous completion in this IoContext
*
* @param begin Iterator pointing at the start of the Future collection
* @param end Iterator pointing at the end of the Future collection
*
* TODO: gracefully handle cases where there would be more sqes prepared than
* there is room in the SQ
*/
template <typename InputIt, CallerEnvironment callerEnvironment = CallerEnvironment::EMPER>
auto submit(InputIt begin, InputIt end) -> unsigned {
for (InputIt cur = begin; cur != end; ++cur) {
auto future = *cur;
prepareFutureChain(*future);
}
return submitPreparedSqes<callerEnvironment>();
}
/**
* @brief Submit a future for asynchronous completion in this IoContext and reap completions
*
......
......@@ -9,10 +9,12 @@
#include <cstring>
#include <filesystem>
#include <ostream>
#include <stdexcept>
#include "Common.hpp"
#include "CountingPrivateSemaphore.hpp"
#include "Debug.hpp"
#include "Emper.hpp"
#include "Fiber.hpp"
#include "Runtime.hpp"
#include "SynchronizedFiber.hpp"
......@@ -25,6 +27,10 @@ static void recursive_directory_walk_helper(
CPS& cps, const fs::path& dirpath,
std::function<bool(const fs::directory_entry& dirent)>& filter,
std::function<void(fs::directory_entry dirent)>& fn) {
if constexpr (!emper::HAS_FS_PATH) {
throw std::logic_error("recursive_directory_walk need c++17 std::filesystem::path");
}
Runtime* runtime = Runtime::getRuntime();
for (const auto& p : fs::directory_iterator(dirpath)) {
if (filter(p)) {
......
# check if std::filesystem::recursive_directory_iterator is available
walk_dir_code = '''#include <filesystem>
int main() { auto it = std::filesystem::recursive_directory_iterator("."); }
'''
cpp_has_fs_recursive_directory_iterator = cpp_compiler.links(walk_dir_code,
args: '--std=c++2a',
name: 'walk_dir')
# check if std::filesystem::path is available
fs_path_code = '''#include <filesystem>
int main() { std::filesystem p = "/"; }
'''
cpp_has_fs_path = cpp_compiler.links(walk_dir_code, args: '--std=c++2a', name: 'fs_path')
if cpp_has_fs_path
conf_data.set('EMPER_HAS_FS_PATH', true)
endif
emper_io_include = include_directories('.')
emper_library_include += [emper_io_include]
emper_cpp_sources += files(
......
......@@ -29,6 +29,11 @@ class LockedQueue {
return SIZE - deque.size();
}
auto usedSlots() -> size_t {
std::lock_guard<std::mutex> lock(queue_mutex);
return deque.size();
}
auto pushBottom(const I item) -> bool {
std::lock_guard<std::mutex> lock(queue_mutex);
......
......@@ -9,9 +9,6 @@ nasm_gen = generator(nasm,
)
emper_asm_objects = nasm_gen.process(emper_asm_sources)
emper_config_header = configure_file(output: 'emper-config.h',
configuration: conf_data)
emper_cpp_sources = [
'Runtime.cpp',
'Emper.cpp',
......@@ -42,6 +39,11 @@ subdir('lib')
subdir('strategies')
subdir('sleep_strategy')
# consume the configuration object after all dubdirectories are visited
# to allow meson code in those subdirectories to change the configuration
emper_config_header = configure_file(output: 'emper-config.h',
configuration: conf_data)
emper_all_include = emper_library_include + [emper_include]
emper = library(
......
......@@ -48,7 +48,8 @@ void AbstractWorkStealingScheduler::scheduleViaWorkStealing(Fiber& fiber) {
ABORT("Could not push fiber " << &fiber << " into queue");
}
} else if constexpr (emper::STATS) {
awss::stats.scheduledFibersToLocal++;
uint64_t queueLength = queue.usedSlots();
awss::stats.recordScheduledToLocalAndQueueLength(queueLength);
}
// Classes using this method are supposed to always invoke this
......@@ -66,7 +67,7 @@ auto AbstractWorkStealingScheduler::maybeRecycle(Fiber* fiber) -> bool {
auto AbstractWorkStealingScheduler::nextFiberViaAnywhereQueue()
-> std::optional<std::pair<Fiber*, FiberSource>> {
const size_t KEEP_FREE_SLOTS = 64;
const size_t DEQUEUE_FROM_ANYWHERE_MAX = 128;
const size_t DEQUEUE_FROM_ANYWHERE_MAX = 512;
const uintptr_t remainingCapacityLocalWsQueue = queue.freeSlots();
size_t maxFiberCount;
......@@ -205,6 +206,8 @@ popBottom:
auto stolenFiber = tryStealFiberFrom(victim);
if (stolenFiber) return *stolenFiber;
if constexpr (!emper::CHECK_ANYWHERE_QUEUE_WHILE_STEALING) continue;
// If we failed to steal from a certain number of victims, check
// the anywhere queue for new fibers.
if (i == checkAnywhereQueueAt) {
......
......@@ -21,6 +21,8 @@ void AbstractWorkStealingStats::print() {
<< std::to_string(comulatedWorkerStats.scheduledFibersToLocal) << std::endl
<< "total-scheduled-fibers-to-overflow-queue: "
<< std::to_string(comulatedWorkerStats.scheduledFibersToOverflowQueue) << std::endl
<< "global-max-queue-length: " << std::to_string(comulatedWorkerStats.maxQueueLength)
<< std::endl
<< "total-next-fiber-from-local: "
<< std::to_string(comulatedWorkerStats.nextFiberFromLocal) << std::endl
<< "total-next-fiber-hint-local: "
......
......@@ -8,6 +8,7 @@ auto AbstractWorkStealingWorkerStats::operator+=(const AbstractWorkStealingWorke
-> AbstractWorkStealingWorkerStats& {
scheduledFibersToLocal += other.scheduledFibersToLocal;
scheduledFibersToOverflowQueue += other.scheduledFibersToOverflowQueue;
maxQueueLength = std::max(maxQueueLength, other.maxQueueLength);
nextFiberFromLocal += other.nextFiberFromLocal;
nextFiberStolen += other.nextFiberStolen;
nextFiberFromAnywhereQueue += other.nextFiberFromAnywhereQueue;
......@@ -27,3 +28,8 @@ void AbstractWorkStealingWorkerStats::recordFibersLiftedFromAnywhereQueue(
std::max(maxFibersLiftedFromAnywhereQueue, fibersLiftedFromAnywhereQueue);
avgFibersLiftedFromAnywhereQueue.update(fibersLiftedFromAnywhereQueue);
}
void AbstractWorkStealingWorkerStats::recordScheduledToLocalAndQueueLength(uint64_t queueLength) {
scheduledFibersToLocal++;
maxQueueLength = std::max(maxQueueLength, queueLength);
}
......@@ -15,6 +15,7 @@ class AbstractWorkStealingWorkerStats {
public:
uint64_t scheduledFibersToLocal = 0;
uint64_t scheduledFibersToOverflowQueue = 0;
uint64_t maxQueueLength = 0;
uint64_t nextFiberFromLocal = 0;
uint64_t nextFiberFromHintLocal = 0;
uint64_t nextFiberFromHintAnywhere = 0;
......@@ -29,4 +30,6 @@ class AbstractWorkStealingWorkerStats {
private:
void recordFibersLiftedFromAnywhereQueue(size_t fibersLiftedFromAnywhere);
void recordScheduledToLocalAndQueueLength(uint64_t queueLength);
};
......@@ -37,6 +37,7 @@ conf_data.set('EMPER_WORKER_WAKEUP_STRATEGY', get_option('worker_wakeup_strategy
conf_data.set('EMPER_LOCKED_WS_QUEUE', get_option('locked_ws_queue'))
conf_data.set('EMPER_LOCKED_MPSC_QUEUE', get_option('locked_mpsc_queue'))
conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue'))
conf_data.set('EMPER_CHECK_ANYWHERE_QUEUE_WHILE_STEALING', get_option('check_anywhere_queue_while_stealing'))
conf_data.set('EMPER_STATS', get_option('stats'))
conf_data.set('EMPER_OVERFLOW_QUEUE', get_option('overflow_queue'))
conf_data.set('EMPER_BLOCKED_CONTEXT_SET', get_option('blocked_context_set'))
......
......@@ -98,6 +98,12 @@ option(
value: true,
description: 'Use an overflow queue in case scheduling queues become full',
)
option(
'check_anywhere_queue_while_stealing',
type: 'boolean',
value: true,
description: 'Check the anywhere queue after some failed stealed attempts',
)
option(
'io',
type: 'boolean',
......
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <netinet/in.h>
#include <sys/eventfd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <cassert>
#include <cerrno>
#include <cstdint>
#include <cstring>
#include "Common.hpp"
#include "CountingPrivateSemaphore.hpp"
#include "Debug.hpp"
#include "Future.hpp"
#include "emper.hpp"
#include "io.hpp"
using emper::io::ReadFuture;
using emper::io::TimeoutWrapper;
static void setupSockPair(int& sock1, int& sock2) {
const int PORT = 4242;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(PORT);
int listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sock == -1) DIE_MSG_ERRNO("creating listen socket failed");
int reuseaddr = 1;
if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) == -1)
DIE_MSG_ERRNO("setsockopt failed");
if (bind(listen_sock, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) == -1)
DIE_MSG_ERRNO("bind failed");
if (listen(listen_sock, 1) != 0) DIE_MSG_ERRNO("listen failed");
sock2 = socket(AF_INET, SOCK_STREAM, 0);
if (sock2 == -1) DIE_MSG_ERRNO("creating client socket failed");
CPS cps;
spawn(
[&]() {
if ((sock1 = emper::io::acceptAndWait(listen_sock, nullptr, nullptr)) == -1)
DIE_MSG_ERRNO("accept failed");
},
cps);
if (emper::io::connectAndWait(sock2, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) ==
-1)
DIE_MSG_ERRNO("connect failed");
cps.wait();
emper::io::closeAndForget(listen_sock);
}
void sockTest() {
int sock1, sock2;
DBG("setup sockets");
setupSockPair(sock1, sock2);
uint64_t recvBuf;
DBG("submit recv");
TimeoutWrapper::Timespec ts = {.tv_sec = 1, .tv_nsec = 0};
ssize_t res = emper::io::recvAndTryWait(sock1, &recvBuf, sizeof(recvBuf), 0, ts);
assert(res == -1);
assert(errno == ECANCELED);
// TODO: find a way to test sendAndTryWait
// // allocate a huge buffer which is surely bigger then the sockets buffer and
// // thus causing the send to block triggering the timeout
// const ssize_t MEMB = 1 << 20;
// auto* sendBuf = new char[MEMB];
// DBG("submit send");
// res = emper::io::sendAndTryWait(sock1, &sendBuf, MEMB, 0, ts, true);
// assert(res == -1);
// assert(errno == ECANCELED);
// delete[] sendBuf;
emper::io::closeAndForget(sock1);
emper::io::closeAndForget(sock2);
}
void readTest() {
int efd = eventfd(0, EFD_SEMAPHORE);
if (efd == -1) {
DIE_MSG_ERRNO("eventfd failed");
}
uint64_t readBuf;
TimeoutWrapper::Timespec ts = {.tv_sec = 1, .tv_nsec = 0};
ssize_t res = emper::io::readFileAndTryWait(efd, &readBuf, sizeof(readBuf), ts);
assert(res == -1);
assert(errno == ECANCELED);
emper::io::closeAndForget(efd);
}
void writeTest() {
int efd = eventfd(0, EFD_SEMAPHORE);
if (efd == -1) {
DIE_MSG_ERRNO("eventfd failed");
}
// fill up the eventfd
uint64_t writeBuf = 0xfffffffffffffffe;
if (emper::io::writeFileAndWait(efd, &writeBuf, sizeof(writeBuf)) == -1) {
DIE_MSG("eventfd prep write failed");
}
writeBuf = 1;
TimeoutWrapper::Timespec ts = {.tv_sec = 1, .tv_nsec = 0};
ssize_t res = emper::io::writeFileAndTryWait(efd, &writeBuf, sizeof(writeBuf), ts);
assert(res == -1);
// write requests can't be canceled when in execution so this
// will return as interupted
assert(errno == EINTR);
emper::io::closeAndForget(efd);
}
void emperTest() {
sockTest();
readTest();
writeTest();
}