Skip to content
Snippets Groups Projects
Commit 1e495961 authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'improve-fsearch' into 'master'

Further improve fsearch

See merge request !217
parents d3080cef 41d31c71
No related branches found
No related tags found
1 merge request!217Further improve fsearch
Pipeline #66039 passed
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>
#include <array>
#include <cstdio>
#include <climits>
#include <cstdlib>
#include <cstring>
#include <filesystem>
......@@ -12,11 +12,9 @@
#include <string>
#include "Common.hpp"
#include "CountingPrivateSemaphore.hpp"
#include "Fiber.hpp"
#include "Runtime.hpp"
#include "Semaphore.hpp"
#include "emper.hpp"
#include "io.hpp"
namespace fs = std::filesystem;
......@@ -44,7 +42,10 @@ void search(const std::string& path) {
ssize_t bytes_read = emper::io::readFileAndWait(fd, buf.data(), buf.size(), bytes_searched);
while (bytes_read > 0) {
if (memmem(&buf[0], bytes_read, needle, needle_len)) {
printf("%s\n", path.c_str());
std::array<char, PATH_MAX + 1> outBuf;
memcpy(outBuf.data(), path.c_str(), path.length());
outBuf[path.length()] = '\n';
emper::io::writeFileAndWait(STDOUT_FILENO, outBuf.data(), path.length() + 1, 0);
goto out;
}
......@@ -65,14 +66,12 @@ out:
}
void walk_dir() {
CPS cps;
for (const auto& p : fs::recursive_directory_iterator(".")) {
if (p.is_regular_file()) {
spawn([=] { search(p.path()); }, cps);
}
}
// Only search in regular files
auto filter = [](const fs::directory_entry& dirent) { return dirent.is_regular_file(); };
// Search the file
auto fn = [](const fs::directory_entry& dirent) { search(dirent.path()); };
emper::io::recursive_directory_walk(".", filter, fn);
cps.wait();
exit(EXIT_SUCCESS);
}
......
......@@ -6,11 +6,14 @@
#include <sys/socket.h> // for socklen_t
#include <sys/types.h> // for ssize_t, off_t
#include <cstddef> // for size_t
#include <cstddef> // for size_t
#include <filesystem>
#include <functional> // for function
#include <memory> // for unique_ptr
#include <string> // for string
namespace fs = std::filesystem;
#include "Future.hpp" // for Futures
/*
......@@ -458,6 +461,45 @@ inline void closeAndForget(int fd) {
future.submitAndForget();
}
/**
* @brief recursively walk a file tree in parallel
*
* This method must be called from inside the emper runtime because it spawn
* new Fibers.
*
* We use c++17 std::filesystem::directory_iterator to read each directory encountered.
* For each entry in a directory we use @p filter to test if we should call @p fn on
* the directory entry in a new Fiber.
* If we encounter a subdirectory we spawn a new fiber recursively walking this subdirectory.
* The used algorithm is a parallel depth first search.
*
* Algorithm
* ---------
*
*```
* walk(dir, filter, fn)
* CPS cps;
* walk_helper(cps, dir, filter, fn);
* cps.wait();
*
* walk_helper(cps, dir, filter, fn)
* for(entry: dir)
* if (filter(entry))
* spawn(fn(entry), cps);
* if (entry.is_dir())
* spawn(walk_helper(cps, entry, filter, fn), cps);
*```
*
*
* @param dirpath The start directory where we start the walk
* @param filter Function to check if a enw Fiber should be spawned executing fn
* for a directory entry
* @param fn Function to execute in a new Fiber with the current directory entry
*/
void recursive_directory_walk(const fs::path &dirpath,
std::function<bool(const fs::directory_entry &dirent)> filter,
std::function<void(fs::directory_entry dirent)> fn);
/**
* @brief Fiber accepting TCP connections and scheduling handler fibers
*
......
......@@ -7,14 +7,47 @@
#include <cerrno>
#include <cstring>
#include <filesystem>
#include <ostream>
#include "Common.hpp"
#include "CountingPrivateSemaphore.hpp"
#include "Debug.hpp"
#include "Fiber.hpp"
#include "Runtime.hpp"
#include "SynchronizedFiber.hpp"
namespace fs = std::filesystem;
namespace emper::io {
static void recursive_directory_walk_helper(
CPS& cps, const fs::path& dirpath,
std::function<bool(const fs::directory_entry& dirent)>& filter,
std::function<void(fs::directory_entry dirent)>& fn) {
Runtime* runtime = Runtime::getRuntime();
for (const auto& p : fs::directory_iterator(dirpath)) {
if (filter(p)) {
auto* fnFiber = SynchronizedFiber::from([=] { fn(p); }, cps);
runtime->schedule(*fnFiber);
}
if (p.is_directory()) {
auto* subDirFiber = SynchronizedFiber::from(
[&, p = p] { recursive_directory_walk_helper(cps, p, filter, fn); }, cps);
runtime->schedule(*subDirFiber);
}
}
}
void recursive_directory_walk(const fs::path& dirpath,
std::function<bool(const fs::directory_entry& dirent)> filter,
std::function<void(fs::directory_entry dirent)> fn) {
CPS cps;
recursive_directory_walk_helper(cps, dirpath, filter, fn);
cps.wait();
}
auto tcp_listener(std::string& host, std::string& port, const std::function<void(int)>& handler,
int backlog) -> Fiber* {
int listen_socket;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment