diff --git a/apps/fsearch/fsearch.cpp b/apps/fsearch/fsearch.cpp new file mode 100644 index 0000000000000000000000000000000000000000..238d26ec1b0b5668aa6891239399203e078a0e4a --- /dev/null +++ b/apps/fsearch/fsearch.cpp @@ -0,0 +1,80 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include <fcntl.h> +#include <sys/types.h> + +#include <array> +#include <cstdio> +#include <cstdlib> +#include <cstring> +#include <filesystem> +#include <iostream> +#include <string> + +#include "Common.hpp" +#include "CountingPrivateSemaphore.hpp" +#include "Fiber.hpp" +#include "Runtime.hpp" +#include "emper.hpp" +#include "io.hpp" + +namespace fs = std::filesystem; + +#define EMPER_RIPGREP_BUFSIZE 4096 + +const char* needle; +size_t needle_len; + +void search(const std::string& path) { + int fd = emper::io::openAndWait(path.c_str(), O_RDONLY); + if (fd < 0) { + DIE_MSG_ERRNO("open failed"); + } + + std::array<char, EMPER_RIPGREP_BUFSIZE> buf; + size_t bytes_searched = 0; + + ssize_t bytes_read = emper::io::readFileAndWait(fd, buf.data(), buf.size(), bytes_searched); + while (bytes_read > 0) { + if (memmem(&buf[0], bytes_read, needle, needle_len)) { + printf("%s\n", path.c_str()); + return; + } + + bytes_searched += static_cast<size_t>(bytes_read); + bytes_read = emper::io::readFileAndWait(fd, buf.data(), buf.size(), -1); + } + + if (bytes_read < 0) { + DIE_MSG_ERRNO("read failed"); + } +} + +void walk_dir() { + CPS cps; + for (const auto& p : fs::recursive_directory_iterator(".")) { + if (p.is_regular_file()) { + spawn([=] { search(p.path()); }, cps); + } + } + + cps.wait(); + exit(EXIT_SUCCESS); +} + +auto main(int argc, char* argv[]) -> int { + if (argc < 2) { + std::cerr << "Usage: " << argv[0] << " <needle>" << std::endl; + return EXIT_FAILURE; + } + + needle = argv[1]; + needle_len = strlen(needle); + + Runtime runtime; + + auto* dirWalker = Fiber::from(walk_dir); + runtime.scheduleFromAnywhere(*dirWalker); + + runtime.waitUntilFinished(); +} diff --git a/apps/fsearch/meson.build b/apps/fsearch/meson.build new file mode 100644 index 0000000000000000000000000000000000000000..b5b548e034a13e215ea5dc24bd25bcb384ecc27c --- /dev/null +++ b/apps/fsearch/meson.build @@ -0,0 +1,5 @@ +fsearch_exe = executable( + 'fsearch', + 'fsearch.cpp', + dependencies: emper_dep, +) diff --git a/apps/meson.build b/apps/meson.build index 5075128a54a7118400b8d1d1965615331c7dd69d..015a15df3ff1713f0c0f818ad41c2c33a71f0af0 100644 --- a/apps/meson.build +++ b/apps/meson.build @@ -21,3 +21,5 @@ echoclient_exe = executable( 'EchoClient.cpp', dependencies: emper_dep, ) + +subdir('fsearch') diff --git a/emper/c_emper.cpp b/emper/c_emper.cpp index 85a9bf9605939c90edc2bd9c3f49c0db4aa61586..4a648a33b40ae464e694b49ae8eb2e834c904839 100644 --- a/emper/c_emper.cpp +++ b/emper/c_emper.cpp @@ -9,9 +9,11 @@ #include "emper.h" // for fiber, cps, bps, runtime #ifdef EMPER_IO +#include <fcntl.h> #include <sys/socket.h> // for socklen_t #include <sys/types.h> // for off_t +#include <cstdarg> #include <cstddef> // for size_t #include "io.hpp" // for emper::io::emper_* @@ -139,5 +141,33 @@ auto emper_writev(int fildes, const struct iovec* iov, int iovcnt) -> ssize_t { return emper::io::writevAndWait(fildes, iov, iovcnt); } +// C function overloading inspired by +// http://locklessinc.com/articles/overloading +auto emper_open(const char* pathname, int flags, ...) -> int { + if (flags | O_CREAT || flags | O_TMPFILE) { + va_list v; + va_start(v, flags); + + mode_t mode = va_arg(v, mode_t); + va_end(v); + return emper::io::openAndWait(pathname, flags, mode); + } + + return emper::io::openAndWait(pathname, flags); +} + +auto emper_openat(int dirfd, const char* pathname, int flags, ...) -> int { + if (flags | O_CREAT || flags | O_TMPFILE) { + va_list v; + va_start(v, flags); + + mode_t mode = va_arg(v, mode_t); + va_end(v); + return emper::io::openatAndWait(dirfd, pathname, flags, mode); + } + + return emper::io::openatAndWait(dirfd, pathname, flags); +} + auto emper_close(int fd) -> int { return emper::io::closeAndWait(fd); } #endif diff --git a/emper/include/emper.h b/emper/include/emper.h index 4a1402e7013f856c6808d1c3db84f23d9a48e04e..e21ad5a94aa3d24737e4ecc64338f094f7375cc3 100644 --- a/emper/include/emper.h +++ b/emper/include/emper.h @@ -2,12 +2,14 @@ // Copyright © 2020 Florian Schmaus, Florian Fischer #pragma once +#include <stdarg.h> // NOLINT(modernize-deprecated-headers) +#include <stddef.h> // NOLINT(modernize-deprecated-headers) + #include "emper-common.h" #include "emper-config.h" -#include "stddef.h" // NOLINT(modernize-deprecated-headers) #ifdef EMPER_IO -#include "sys/socket.h" +#include <sys/socket.h> #endif typedef struct runtime runtime; // NOLINT(modernize-use-using) @@ -88,6 +90,12 @@ ssize_t emper_write_file(int fildes, const void* buf, size_t nbyte, off_t offset // NOLINTNEXTLINE(modernize-use-trailing-return-type) ssize_t emper_writev(int fildes, const struct iovec* iov, int iovcnt); +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +int emper_open(const char* pathname, int flags, ...); + +// NOLINTNEXTLINE(modernize-use-trailing-return-type) +int emper_openat(int dirfd, const char* pathname, int flags, ...); + // NOLINTNEXTLINE(modernize-use-trailing-return-type) int emper_close(int fd); #endif diff --git a/emper/io.hpp b/emper/io.hpp index fc154f36359f0e5689162c16a3099ca7db72e26c..4addc610e9c80b0b7132736ae78a1471ef85cd61 100644 --- a/emper/io.hpp +++ b/emper/io.hpp @@ -2,6 +2,7 @@ // Copyright © 2020-2021 Florian Fischer #pragma once +#include <fcntl.h> #include <sys/socket.h> // for socklen_t #include <sys/types.h> // for ssize_t, off_t @@ -334,6 +335,81 @@ inline auto writevAndWait(int fildes, const struct iovec *iov, int iovcnt) -> ss return future.waitAndSetErrno(); } +/** + * @brief asynchronous openat mimicking POSIX openat(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param dirfd directory the pathname is interpreted relative to + * @param pathname path to the file to open + * @param flags specify access mode + * @param mode in which the file should be created + * + * @return Future object which signals the completion of the openat request + */ +inline auto openat(int dirfd, const char *pathname, int flags, mode_t mode = 0) + -> std::unique_ptr<Future> { + auto future = std::make_unique<OpenatFuture>(dirfd, pathname, flags, mode); + future->submit(); + return future; +} + +/** + * @brief synchronous openat mimicking POSIX openat(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param dirfd directory the pathname is interpreted relative to + * @param pathname path to the file to open + * @param flags specify access mode + * @param mode in which the file should be created + * + * @return fd on success, -1 on error + */ +inline auto openatAndWait(int dirfd, const char *pathname, int flags, mode_t mode = 0) -> size_t { + OpenatFuture future(dirfd, pathname, flags, mode); + future.submit(); + return future.waitAndSetErrno(); +} + +/** + * @brief asynchronous open mimicking POSIX open(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param pathname path to the file to open + * @param flags specify access mode + * @param mode in which the file should be created + * + * @return Future object which signals the completion of the open request + */ +inline auto open(const char *pathname, int flags, mode_t mode = 0) -> std::unique_ptr<Future> { + auto future = std::make_unique<OpenatFuture>(AT_FDCWD, pathname, flags, mode); + future->submit(); + return future; +} + +/** + * @brief synchronous open mimicking POSIX open(3) + * + * This method must be called from inside the emper runtime because it uses + * the worker-local IoContext + * + * @param pathname path to the file to open + * @param flags specify access mode + * @param mode in which the file should be created + * + * @return fd on success, -1 on error + */ +inline auto openAndWait(const char *pathname, int flags, mode_t mode = 0) -> size_t { + OpenatFuture future(AT_FDCWD, pathname, flags, mode); + future.submit(); + return future.waitAndSetErrno(); +} + /** * @brief Non-blocking close mimicking POSIX close(3) * diff --git a/emper/io/Future.hpp b/emper/io/Future.hpp index 772cd74ea707f402b2631f95d97b37d3c296b7f0..bbf29d5d481cc0add4b88f195a9889ca7cedffa9 100644 --- a/emper/io/Future.hpp +++ b/emper/io/Future.hpp @@ -421,6 +421,16 @@ class ReadFuture : public PartialCompletableFuture { read_all ? ENABLE_PARTIAL_COMPLETION : DISABLE_PARTIAL_COMPLETION){}; }; +class OpenatFuture : public Future { + void prepareSqe(io_uring_sqe* sqe) override { + io_uring_prep_openat(sqe, fd, reinterpret_cast<const char*>(buf), len, offsetOrFlags); + } + + public: + OpenatFuture(int dirfd, const void* pathname, int flags, mode_t mode = 0) + : Future(Operation::OPENAT, dirfd, const_cast<void*>(pathname), flags, mode){}; +}; + class WriteFuture : public PartialCompletableFuture { void prepareSqe(io_uring_sqe* sqe) override { if (partialCompletion == DISABLE_PARTIAL_COMPLETION) { diff --git a/emper/io/Operation.cpp b/emper/io/Operation.cpp index 83a67abef68c6c3648f05008b120dbb8734ad087..e881f1b1ab7cb5a39d03317d5b0f5e028e85f876 100644 --- a/emper/io/Operation.cpp +++ b/emper/io/Operation.cpp @@ -20,6 +20,9 @@ auto operator<<(std::ostream& os, const Operation& op) -> std::ostream& { case Operation::ACCEPT: os << "accept"; break; + case Operation::OPENAT: + os << "openat"; + break; case Operation::READ: os << "read"; break; diff --git a/emper/io/Operation.hpp b/emper/io/Operation.hpp index 498919293214983e1174e86aedb315e63361dbe4..d193b1cf4c5b0a357e8716eaaaa5d511c692e6b3 100644 --- a/emper/io/Operation.hpp +++ b/emper/io/Operation.hpp @@ -11,6 +11,7 @@ enum class Operation { RECV, CONNECT, ACCEPT, + OPENAT, READ, WRITE, WRITEV, diff --git a/iwyu-mappings.imp b/iwyu-mappings.imp index 834dff5a2aa9852fe93e269215c318a7f8a0e9ee..9f1e941b74a6e151c1b6dbb4feeecc115167c746 100644 --- a/iwyu-mappings.imp +++ b/iwyu-mappings.imp @@ -3,5 +3,7 @@ { include: ["@<gtest/.*>", "private", "<gtest/gtest.h>", "public"] }, { include: ["<urcu/map/urcu-memb.h>", "private", "<urcu.h>", "public"] }, { include: ["<bits/cxxabi_forced.h>", "private", "<ctime>", "public" ] }, + { symbol: ["__kernel_timespec", "private", "<liburing.h>", "public" ] }, + { symbol: ["std::filesystem", "private", "<filesystem>", "public" ] }, ] diff --git a/tests/SimpleDiskAndNetworkTest.cpp b/tests/SimpleDiskAndNetworkTest.cpp index cf5ec5036c7a698769eac274d98abc5c2cf396e1..60a2e569580ac658e8ad878fe4d4393f2039e102 100644 --- a/tests/SimpleDiskAndNetworkTest.cpp +++ b/tests/SimpleDiskAndNetworkTest.cpp @@ -63,7 +63,7 @@ static void server_func(int sockfd) { } close(file_fd); - file_fd = open(file_name, O_RDONLY); + file_fd = emper::io::openAndWait(file_name, O_RDONLY); if (file_fd == -1) { DIE_MSG_ERRNO("open failed"); } @@ -113,7 +113,7 @@ static void server_func(int sockfd) { } close(file_fd); - file_fd = open(file2_name, O_RDONLY); + file_fd = emper::io::openAndWait(file2_name, O_RDONLY); if (file_fd == -1) { DIE_MSG_ERRNO("open failed"); }