Commit d07a81c4 authored by Florian Fischer's avatar Florian Fischer
Browse files

io: improve recursive_directory_walk

* Add optional throttle Semaphore pointer to limit the number
  of spawned fn as well as directory walk fibers
* Use const references to the passed functions instead of values
* fsearch: Use max_running as fn and recursion throttle
parent 819a0246
Pipeline #79969 passed with stages
in 12 minutes and 53 seconds
......@@ -36,10 +36,6 @@ static emper::lib::ShardedFileBuffer* outBuf;
static enum emper::StealingMode stealingMode;
void search(const std::string& path) {
if (max_running) {
max_running->acquire();
}
int fd = emper::io::openAndWait(path.c_str(), O_RDONLY);
if (fd < 0) {
DIE_MSG_ERRNO("open of " << path << " failed");
......@@ -62,10 +58,6 @@ void search(const std::string& path) {
}
out:
if (max_running) {
max_running->release();
}
emper::io::closeAndForget(fd);
}
......@@ -79,7 +71,7 @@ void walk_dir() {
switch (stealingMode) {
case emper::StealingMode::child:
emper::io::recursive_directory_walk(".", filter, fn);
emper::io::recursive_directory_walk(".", filter, fn, max_running, max_running);
break;
case emper::StealingMode::continuation:
emper::io::fibril_recursive_directory_walk(".", filter, fn);
......
......@@ -26,6 +26,9 @@ namespace fs = std::filesystem;
*/
class Fiber;
namespace emper {
class Semaphore;
}
namespace emper::io {
/**
......@@ -632,10 +635,13 @@ inline void closeAndForget(int fd) {
* @param filter Function to check if a enw Fiber should be spawned executing fn
* for a directory entry
* @param fn Function to execute in a new Fiber with the current directory entry
* @param fn_throttle A semaphore limiting the number of concurrent fibers executing fn
* @param recursion_throttle A semaphore limiting the number of rucrsive directory walkers
*/
void recursive_directory_walk(const fs::path &dirpath,
std::function<bool(const fs::directory_entry &dirent)> filter,
std::function<void(fs::directory_entry dirent)> fn);
const std::function<bool(const fs::directory_entry &dirent)> &filter,
const std::function<void(fs::directory_entry dirent)> &fn,
emper::Semaphore *fn_throttle, emper::Semaphore *recursion_throttle);
void fibril_recursive_directory_walk(const fs::path dirpath,
std::function<bool(const fs::directory_entry &dirent)> filter,
......
......@@ -18,6 +18,7 @@
#include "Fiber.hpp"
#include "Fibril.hpp"
#include "Runtime.hpp"
#include "Semaphore.hpp"
#include "SynchronizedFiber.hpp"
namespace fs = std::filesystem;
......@@ -30,8 +31,9 @@ const SockOpt SockOpt::ReusePort = {SOL_SOCKET, SO_REUSEPORT, &SockOpt::ENABLE,
static void recursive_directory_walk_helper(
CPS& cps, const fs::path& dirpath,
std::function<bool(const fs::directory_entry& dirent)>& filter,
std::function<void(fs::directory_entry dirent)>& fn) {
const std::function<bool(const fs::directory_entry& dirent)>& filter,
const std::function<void(fs::directory_entry dirent)>& fn, emper::Semaphore* fn_throttle,
emper::Semaphore* recursion_throttle) {
if constexpr (!emper::HAS_FS_PATH) {
throw std::logic_error("recursive_directory_walk need c++17 std::filesystem::path");
}
......@@ -39,23 +41,35 @@ static void recursive_directory_walk_helper(
Runtime* runtime = Runtime::getRuntime();
for (const auto& p : fs::directory_iterator(dirpath)) {
if (filter(p)) {
auto* fnFiber = SynchronizedFiber::from([=] { fn(p); }, cps);
auto* fnFiber = SynchronizedFiber::from(
[=] {
if (fn_throttle) fn_throttle->acquire();
fn(p);
if (fn_throttle) fn_throttle->release();
},
cps);
runtime->schedule(*fnFiber);
}
if (p.is_directory()) {
auto* subDirFiber = SynchronizedFiber::from(
[&, p = p] { recursive_directory_walk_helper(cps, p, filter, fn); }, cps);
[=, &cps, &filter] {
if (recursion_throttle) recursion_throttle->acquire();
recursive_directory_walk_helper(cps, p, filter, fn, fn_throttle, recursion_throttle);
if (recursion_throttle) recursion_throttle->release();
},
cps);
runtime->schedule(*subDirFiber);
}
}
}
void recursive_directory_walk(const fs::path& dirpath,
std::function<bool(const fs::directory_entry& dirent)> filter,
std::function<void(fs::directory_entry dirent)> fn) {
const std::function<bool(const fs::directory_entry& dirent)>& filter,
const std::function<void(fs::directory_entry dirent)>& fn,
emper::Semaphore* fn_throttle, emper::Semaphore* recursion_throttle) {
CPS cps;
recursive_directory_walk_helper(cps, dirpath, filter, fn);
recursive_directory_walk_helper(cps, dirpath, filter, fn, fn_throttle, recursion_throttle);
cps.wait();
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment