diff --git a/apps/fsearch/fsearch.cpp b/apps/fsearch/fsearch.cpp index 8ef73c0adb1eca949897986737df94fce9acfdfe..31bb72321030391d6b62668b413280b0ce54f7d3 100644 --- a/apps/fsearch/fsearch.cpp +++ b/apps/fsearch/fsearch.cpp @@ -1,10 +1,10 @@ // SPDX-License-Identifier: LGPL-3.0-or-later // Copyright © 2021 Florian Fischer #include <fcntl.h> -#include <sys/types.h> +#include <unistd.h> #include <array> -#include <cstdio> +#include <climits> #include <cstdlib> #include <cstring> #include <filesystem> @@ -12,11 +12,9 @@ #include <string> #include "Common.hpp" -#include "CountingPrivateSemaphore.hpp" #include "Fiber.hpp" #include "Runtime.hpp" #include "Semaphore.hpp" -#include "emper.hpp" #include "io.hpp" namespace fs = std::filesystem; @@ -44,7 +42,10 @@ void search(const std::string& path) { ssize_t bytes_read = emper::io::readFileAndWait(fd, buf.data(), buf.size(), bytes_searched); while (bytes_read > 0) { if (memmem(&buf[0], bytes_read, needle, needle_len)) { - printf("%s\n", path.c_str()); + std::array<char, PATH_MAX + 1> outBuf; + memcpy(outBuf.data(), path.c_str(), path.length()); + outBuf[path.length()] = '\n'; + emper::io::writeFileAndWait(STDOUT_FILENO, outBuf.data(), path.length() + 1, 0); goto out; } @@ -65,14 +66,12 @@ out: } void walk_dir() { - CPS cps; - for (const auto& p : fs::recursive_directory_iterator(".")) { - if (p.is_regular_file()) { - spawn([=] { search(p.path()); }, cps); - } - } + // Only search in regular files + auto filter = [](const fs::directory_entry& dirent) { return dirent.is_regular_file(); }; + // Search the file + auto fn = [](const fs::directory_entry& dirent) { search(dirent.path()); }; + emper::io::recursive_directory_walk(".", filter, fn); - cps.wait(); exit(EXIT_SUCCESS); } diff --git a/emper/io.hpp b/emper/io.hpp index b75f31cb1c6fa0143de9622f13e2fe81316c294b..de7203bf20bc00c8e69262f0b2de6061ecbbc212 100644 --- a/emper/io.hpp +++ b/emper/io.hpp @@ -6,11 +6,14 @@ #include <sys/socket.h> // for socklen_t #include <sys/types.h> // for ssize_t, off_t -#include <cstddef> // for size_t +#include <cstddef> // for size_t +#include <filesystem> #include <functional> // for function #include <memory> // for unique_ptr #include <string> // for string +namespace fs = std::filesystem; + #include "Future.hpp" // for Futures /* @@ -458,6 +461,45 @@ inline void closeAndForget(int fd) { future.submitAndForget(); } +/** + * @brief recursively walk a file tree in parallel + * + * This method must be called from inside the emper runtime because it spawn + * new Fibers. + * + * We use c++17 std::filesystem::directory_iterator to read each directory encountered. + * For each entry in a directory we use @p filter to test if we should call @p fn on + * the directory entry in a new Fiber. + * If we encounter a subdirectory we spawn a new fiber recursively walking this subdirectory. + * The used algorithm is a parallel depth first search. + * + * Algorithm + * --------- + * + *``` + * walk(dir, filter, fn) + * CPS cps; + * walk_helper(cps, dir, filter, fn); + * cps.wait(); + * + * walk_helper(cps, dir, filter, fn) + * for(entry: dir) + * if (filter(entry)) + * spawn(fn(entry), cps); + * if (entry.is_dir()) + * spawn(walk_helper(cps, entry, filter, fn), cps); + *``` + * + * + * @param dirpath The start directory where we start the walk + * @param filter Function to check if a enw Fiber should be spawned executing fn + * for a directory entry + * @param fn Function to execute in a new Fiber with the current directory entry + */ +void recursive_directory_walk(const fs::path &dirpath, + std::function<bool(const fs::directory_entry &dirent)> filter, + std::function<void(fs::directory_entry dirent)> fn); + /** * @brief Fiber accepting TCP connections and scheduling handler fibers * diff --git a/emper/io/io.cpp b/emper/io/io.cpp index 48fb629386c1b52ec31ad8bf7b016bbb8cf03f85..c1286774fa7e4daa0089ff3bb3ea9762ab6c0a59 100644 --- a/emper/io/io.cpp +++ b/emper/io/io.cpp @@ -7,14 +7,47 @@ #include <cerrno> #include <cstring> +#include <filesystem> #include <ostream> #include "Common.hpp" +#include "CountingPrivateSemaphore.hpp" #include "Debug.hpp" #include "Fiber.hpp" #include "Runtime.hpp" +#include "SynchronizedFiber.hpp" + +namespace fs = std::filesystem; namespace emper::io { + +static void recursive_directory_walk_helper( + CPS& cps, const fs::path& dirpath, + std::function<bool(const fs::directory_entry& dirent)>& filter, + std::function<void(fs::directory_entry dirent)>& fn) { + Runtime* runtime = Runtime::getRuntime(); + for (const auto& p : fs::directory_iterator(dirpath)) { + if (filter(p)) { + auto* fnFiber = SynchronizedFiber::from([=] { fn(p); }, cps); + runtime->schedule(*fnFiber); + } + + if (p.is_directory()) { + auto* subDirFiber = SynchronizedFiber::from( + [&, p = p] { recursive_directory_walk_helper(cps, p, filter, fn); }, cps); + runtime->schedule(*subDirFiber); + } + } +} + +void recursive_directory_walk(const fs::path& dirpath, + std::function<bool(const fs::directory_entry& dirent)> filter, + std::function<void(fs::directory_entry dirent)> fn) { + CPS cps; + recursive_directory_walk_helper(cps, dirpath, filter, fn); + cps.wait(); +} + auto tcp_listener(std::string& host, std::string& port, const std::function<void(int)>& handler, int backlog) -> Fiber* { int listen_socket;