diff --git a/apps/Coordinator.cpp b/apps/Coordinator.cpp new file mode 100644 index 0000000000000000000000000000000000000000..dada5daa8f7910416553a7b906d7a781c860b60c --- /dev/null +++ b/apps/Coordinator.cpp @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright © 2021 Florian Fischer +#include <sys/types.h> + +#include <array> +#include <atomic> +#include <cstdlib> +#include <iostream> +#include <mutex> +#include <string> +#include <vector> + +#include "Common.hpp" +#include "Future.hpp" +#include "Runtime.hpp" +#include "io.hpp" + +std::string HOST = "::"; +std::string PORT = "12346"; + +std::atomic<unsigned> ready; +std::mutex lock; +std::vector<int> conns; + +void notify() { + std::vector<emper::io::SendFuture*> futures; + std::string msg = "Go"; + for (int conn : conns) { + auto* sf = new emper::io::SendFuture(conn, msg.c_str(), msg.size(), 0); + if (!futures.empty()) sf->setDependency(*futures.back()); + futures.push_back(sf); + } + + int res = futures.back()->submitAndWait(); + if (res < 0) DIE_MSG_ERRNO("send failed"); + + for (int conn : conns) emper::io::closeAndForget(conn); + + for (auto* sf : futures) delete sf; +} + +auto main(int argc, char* argv[]) -> int { + if (argc != 2) { + std::cerr << "Usage: " << argv[0] << " <count>" << std::endl; + exit(EXIT_FAILURE); + } + + const int scount = std::stoi(argv[1]); + if (scount < 0) DIE_MSG("count must be positiv"); + const auto count = static_cast<unsigned>(scount); + + std::cout << "Network barrier listening on " << HOST << ":" << PORT << " for " << count + << " connections" << std::endl; + + Runtime runtime; + auto* listener = emper::io::tcp_listener(HOST, PORT, [&](int socket) { + { + std::lock_guard<std::mutex> l(lock); + conns.push_back(socket); + } + + std::array<char, 16> buf; + ssize_t recv = emper::io::recvAndWait(socket, buf.data(), buf.size(), 0); + if (recv < 0) DIE_MSG_ERRNO("recv failed"); + + unsigned r = ready.fetch_add(1) + 1; + if (r == count) { + notify(); + runtime.initiateTermination(); + } + }); + + if (!listener) { + exit(EXIT_FAILURE); + } + + runtime.scheduleFromAnywhere(*listener); + + runtime.waitUntilFinished(); + + return EXIT_FAILURE; +} diff --git a/apps/meson.build b/apps/meson.build index 44ff1edbae77b3a586eaff3163834f694a449089..22b360affd5ae5edb962550ca86cf053ffa02a17 100644 --- a/apps/meson.build +++ b/apps/meson.build @@ -16,6 +16,12 @@ echoserver_exe = executable( dependencies: emper_dep, ) +netbarrier_exe = executable( + 'coordinator', + 'Coordinator.cpp', + dependencies: emper_dep, +) + echoserver_callback_exe = executable( 'echoserver_callback', 'EchoServerCallback.cpp',