diff --git a/apps/fsearch/fsearch.cpp b/apps/fsearch/fsearch.cpp index 59a2217082c43e9f818f7c5e4a1b93f6abe613b1..acd56733ccfba0894b1316955bf9b9e99cb6ea17 100644 --- a/apps/fsearch/fsearch.cpp +++ b/apps/fsearch/fsearch.cpp @@ -19,6 +19,7 @@ #include "Semaphore.hpp" #include "StealingMode.hpp" #include "io.hpp" +#include "io/Future.hpp" #include "lib/ShardedFileBuffer.hpp" namespace fs = std::filesystem; @@ -29,44 +30,47 @@ namespace po = boost::program_options; static const char* needle; static size_t needle_len; -static emper::Semaphore* max_running; - static emper::lib::ShardedFileBuffer* outBuf; static enum emper::StealingMode stealingMode; -void search(const std::string& path) { - if (max_running) { - max_running->acquire(); - } +void onRead(const int32_t& res, char* buf, int fd, const std::string path) { + if (unlikely(res < 0)) DIE_MSG_ERRNO("reading " << path << " failed"); - int fd = emper::io::openAndWait(path.c_str(), O_RDONLY); - if (fd < 0) { - DIE_MSG_ERRNO("open of " << path << " failed"); - } + if (res == 0) goto done; - std::array<char, EMPER_RIPGREP_BUFSIZE> buf; - - ssize_t bytes_read = emper::io::readFileAndWait(fd, buf.data(), buf.size(), 0); - while (bytes_read > 0) { - if (memmem(&buf[0], bytes_read, needle, needle_len)) { - outBuf->getStream() << path << std::endl; - goto out; - } - - bytes_read = emper::io::readFileAndWait(fd, buf.data(), buf.size(), -1); + if (memmem(buf, res, needle, needle_len)) { + outBuf->getStream() << path << std::endl; + goto done; } - if (bytes_read < 0) { - DIE_MSG_ERRNO("read of " << path << " failed"); + { + emper::io::ReadFuture nextRead(fd, buf, EMPER_RIPGREP_BUFSIZE, -1); + nextRead.setCallback([buf, fd, path](const int32_t& res) { onRead(res, buf, fd, path); }); + nextRead.submit(); + return; } -out: - if (max_running) { - max_running->release(); +done: + delete[] buf; + emper::io::closeAndForget(fd); +} + +void onOpen(const int32_t& fd, const std::string path) { + if (fd < 0) { + DIE_MSG_ERRNO("opening " << path << " failed"); } - emper::io::closeAndForget(fd); + auto buf = new char[EMPER_RIPGREP_BUFSIZE]; + emper::io::ReadFuture firstRead(fd, buf, EMPER_RIPGREP_BUFSIZE, -1); + firstRead.setCallback([buf, fd, path](const int32_t& res) { onRead(res, buf, fd, path); }); + firstRead.submit(); +}; + +void search(const std::string& path) { + emper::io::OpenatFuture openFuture(AT_FDCWD, path.c_str(), O_RDONLY); + openFuture.setCallback([path](const int32_t& res) { onOpen(res, path); }); + openFuture.submit(); } void walk_dir() { @@ -100,15 +104,9 @@ static auto fssearch(const po::variables_map& vm) -> int { needle = needleStr.c_str(); needle_len = needleStr.length(); - if (vm.count("max-fibers")) { - unsigned int maxFibersCount = vm["max-fibers"].as<unsigned int>(); - max_running = new emper::Semaphore(maxFibersCount); - } - Runtime runtime; std::cerr << "Starting fsearch with stealingMode=" << stealingMode; - if (max_running) std::cerr << " and " << max_running->getCount() << " file fibers"; std::cerr << std::endl; auto* dirWalker = Fiber::from(walk_dir); @@ -116,8 +114,6 @@ static auto fssearch(const po::variables_map& vm) -> int { runtime.waitUntilFinished(); - delete max_running; - return EXIT_SUCCESS; }