Select Git revision
Coordinator.cpp
Forked from
Lehrstuhl für Informatik 4 (Systemsoftware) / manycore / emper
118 commits behind, 4 commits ahead of the upstream repository.
Coordinator.cpp 2.03 KiB
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <sys/types.h>
#include <array>
#include <atomic>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <string>
#include <vector>
#include "Common.hpp"
#include "Runtime.hpp"
#include "io.hpp"
#include "io/Future.hpp"
#include "io/IoContext.hpp"
std::string HOST = "::";
std::string PORT = "12346";
std::atomic<unsigned> ready;
std::mutex lock;
std::vector<int> conns;
void notify() {
std::vector<emper::io::SendFuture*> futures;
std::string msg = "Go";
for (int conn : conns) {
auto* sf = new emper::io::SendFuture(conn, msg.c_str(), msg.size(), 0);
futures.push_back(sf);
}
IoContext::getIo()->submit(futures.begin(), futures.end());
for (auto* f : futures) {
int res = f->wait();
if (res < 0) DIE_MSG_ERRNO("send failed");
delete f;
}
for (int conn : conns) emper::io::closeAndForget(conn);
}
// NOLINTNEXTLINE(bugprone-exception-escape)
auto main(int argc, char* argv[]) -> int {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <count>" << std::endl;
exit(EXIT_FAILURE);
}
const int scount = std::stoi(argv[1]);
if (scount < 0) DIE_MSG("count must be positiv");
const auto count = static_cast<unsigned>(scount);
std::cout << "Network barrier listening on " << HOST << ":" << PORT << " for " << count
<< " connections" << std::endl;
Runtime runtime;
auto coordinator_func = [&](int socket) {
{
std::lock_guard<std::mutex> l(lock);
conns.push_back(socket);
}
std::array<char, 16> buf;
ssize_t recv = emper::io::recvAndWait(socket, buf.data(), buf.size(), 0);
if (recv < 0) DIE_MSG_ERRNO("recv failed");
unsigned r = ready.fetch_add(1) + 1;
if (r == count) {
notify();
runtime.initiateTermination();
}
};
auto* listener = emper::io::tcp_listener(HOST, PORT, coordinator_func, scount,
{emper::io::SockOpt::ReusePort});
if (!listener) {
exit(EXIT_FAILURE);
}
runtime.scheduleFromAnywhere(*listener);
runtime.waitUntilFinished();
return EXIT_SUCCESS;
}