Commit 7c341849 authored by Florian Schmaus's avatar Florian Schmaus
Browse files

Merge branch 'throttle-recursive-dir-walk' into 'master'

io: improve recursive_directory_walk

See merge request i4/manycore/emper!374
parents 819a0246 d07a81c4
...@@ -36,10 +36,6 @@ static emper::lib::ShardedFileBuffer* outBuf; ...@@ -36,10 +36,6 @@ static emper::lib::ShardedFileBuffer* outBuf;
static enum emper::StealingMode stealingMode; static enum emper::StealingMode stealingMode;
void search(const std::string& path) { void search(const std::string& path) {
if (max_running) {
max_running->acquire();
}
int fd = emper::io::openAndWait(path.c_str(), O_RDONLY); int fd = emper::io::openAndWait(path.c_str(), O_RDONLY);
if (fd < 0) { if (fd < 0) {
DIE_MSG_ERRNO("open of " << path << " failed"); DIE_MSG_ERRNO("open of " << path << " failed");
...@@ -62,10 +58,6 @@ void search(const std::string& path) { ...@@ -62,10 +58,6 @@ void search(const std::string& path) {
} }
out: out:
if (max_running) {
max_running->release();
}
emper::io::closeAndForget(fd); emper::io::closeAndForget(fd);
} }
...@@ -79,7 +71,7 @@ void walk_dir() { ...@@ -79,7 +71,7 @@ void walk_dir() {
switch (stealingMode) { switch (stealingMode) {
case emper::StealingMode::child: case emper::StealingMode::child:
emper::io::recursive_directory_walk(".", filter, fn); emper::io::recursive_directory_walk(".", filter, fn, max_running, max_running);
break; break;
case emper::StealingMode::continuation: case emper::StealingMode::continuation:
emper::io::fibril_recursive_directory_walk(".", filter, fn); emper::io::fibril_recursive_directory_walk(".", filter, fn);
......
...@@ -26,6 +26,9 @@ namespace fs = std::filesystem; ...@@ -26,6 +26,9 @@ namespace fs = std::filesystem;
*/ */
class Fiber; class Fiber;
namespace emper {
class Semaphore;
}
namespace emper::io { namespace emper::io {
/** /**
...@@ -632,10 +635,13 @@ inline void closeAndForget(int fd) { ...@@ -632,10 +635,13 @@ inline void closeAndForget(int fd) {
* @param filter Function to check if a enw Fiber should be spawned executing fn * @param filter Function to check if a enw Fiber should be spawned executing fn
* for a directory entry * for a directory entry
* @param fn Function to execute in a new Fiber with the current directory entry * @param fn Function to execute in a new Fiber with the current directory entry
* @param fn_throttle A semaphore limiting the number of concurrent fibers executing fn
* @param recursion_throttle A semaphore limiting the number of rucrsive directory walkers
*/ */
void recursive_directory_walk(const fs::path &dirpath, void recursive_directory_walk(const fs::path &dirpath,
std::function<bool(const fs::directory_entry &dirent)> filter, const std::function<bool(const fs::directory_entry &dirent)> &filter,
std::function<void(fs::directory_entry dirent)> fn); const std::function<void(fs::directory_entry dirent)> &fn,
emper::Semaphore *fn_throttle, emper::Semaphore *recursion_throttle);
void fibril_recursive_directory_walk(const fs::path dirpath, void fibril_recursive_directory_walk(const fs::path dirpath,
std::function<bool(const fs::directory_entry &dirent)> filter, std::function<bool(const fs::directory_entry &dirent)> filter,
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "Fiber.hpp" #include "Fiber.hpp"
#include "Fibril.hpp" #include "Fibril.hpp"
#include "Runtime.hpp" #include "Runtime.hpp"
#include "Semaphore.hpp"
#include "SynchronizedFiber.hpp" #include "SynchronizedFiber.hpp"
namespace fs = std::filesystem; namespace fs = std::filesystem;
...@@ -30,8 +31,9 @@ const SockOpt SockOpt::ReusePort = {SOL_SOCKET, SO_REUSEPORT, &SockOpt::ENABLE, ...@@ -30,8 +31,9 @@ const SockOpt SockOpt::ReusePort = {SOL_SOCKET, SO_REUSEPORT, &SockOpt::ENABLE,
static void recursive_directory_walk_helper( static void recursive_directory_walk_helper(
CPS& cps, const fs::path& dirpath, CPS& cps, const fs::path& dirpath,
std::function<bool(const fs::directory_entry& dirent)>& filter, const std::function<bool(const fs::directory_entry& dirent)>& filter,
std::function<void(fs::directory_entry dirent)>& fn) { const std::function<void(fs::directory_entry dirent)>& fn, emper::Semaphore* fn_throttle,
emper::Semaphore* recursion_throttle) {
if constexpr (!emper::HAS_FS_PATH) { if constexpr (!emper::HAS_FS_PATH) {
throw std::logic_error("recursive_directory_walk need c++17 std::filesystem::path"); throw std::logic_error("recursive_directory_walk need c++17 std::filesystem::path");
} }
...@@ -39,23 +41,35 @@ static void recursive_directory_walk_helper( ...@@ -39,23 +41,35 @@ static void recursive_directory_walk_helper(
Runtime* runtime = Runtime::getRuntime(); Runtime* runtime = Runtime::getRuntime();
for (const auto& p : fs::directory_iterator(dirpath)) { for (const auto& p : fs::directory_iterator(dirpath)) {
if (filter(p)) { if (filter(p)) {
auto* fnFiber = SynchronizedFiber::from([=] { fn(p); }, cps); auto* fnFiber = SynchronizedFiber::from(
[=] {
if (fn_throttle) fn_throttle->acquire();
fn(p);
if (fn_throttle) fn_throttle->release();
},
cps);
runtime->schedule(*fnFiber); runtime->schedule(*fnFiber);
} }
if (p.is_directory()) { if (p.is_directory()) {
auto* subDirFiber = SynchronizedFiber::from( auto* subDirFiber = SynchronizedFiber::from(
[&, p = p] { recursive_directory_walk_helper(cps, p, filter, fn); }, cps); [=, &cps, &filter] {
if (recursion_throttle) recursion_throttle->acquire();
recursive_directory_walk_helper(cps, p, filter, fn, fn_throttle, recursion_throttle);
if (recursion_throttle) recursion_throttle->release();
},
cps);
runtime->schedule(*subDirFiber); runtime->schedule(*subDirFiber);
} }
} }
} }
void recursive_directory_walk(const fs::path& dirpath, void recursive_directory_walk(const fs::path& dirpath,
std::function<bool(const fs::directory_entry& dirent)> filter, const std::function<bool(const fs::directory_entry& dirent)>& filter,
std::function<void(fs::directory_entry dirent)> fn) { const std::function<void(fs::directory_entry dirent)>& fn,
emper::Semaphore* fn_throttle, emper::Semaphore* recursion_throttle) {
CPS cps; CPS cps;
recursive_directory_walk_helper(cps, dirpath, filter, fn); recursive_directory_walk_helper(cps, dirpath, filter, fn, fn_throttle, recursion_throttle);
cps.wait(); cps.wait();
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment