Skip to content
Snippets Groups Projects
Commit 16ed71a6 authored by Florian Fischer's avatar Florian Fischer
Browse files

implement a Coordinator

The Coordinator is used for our echo evaluation and implements
a barrier style synchronization mechanism for processes spread across the network.
parent ed1b0c8f
No related branches found
No related tags found
1 merge request!228Support distributing multiple echoclient over the network
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <sys/types.h>
#include <array>
#include <atomic>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <string>
#include <vector>
#include "Common.hpp"
#include "Future.hpp"
#include "Runtime.hpp"
#include "io.hpp"
std::string HOST = "::";
std::string PORT = "12346";
std::atomic<unsigned> ready;
std::mutex lock;
std::vector<int> conns;
void notify() {
std::vector<emper::io::SendFuture*> futures;
std::string msg = "Go";
for (int conn : conns) {
auto* sf = new emper::io::SendFuture(conn, msg.c_str(), msg.size(), 0);
if (!futures.empty()) sf->setDependency(*futures.back());
futures.push_back(sf);
}
int res = futures.back()->submitAndWait();
if (res < 0) DIE_MSG_ERRNO("send failed");
for (int conn : conns) emper::io::closeAndForget(conn);
for (auto* sf : futures) delete sf;
}
auto main(int argc, char* argv[]) -> int {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <count>" << std::endl;
exit(EXIT_FAILURE);
}
const int scount = std::stoi(argv[1]);
if (scount < 0) DIE_MSG("count must be positiv");
const auto count = static_cast<unsigned>(scount);
std::cout << "Network barrier listening on " << HOST << ":" << PORT << " for " << count
<< " connections" << std::endl;
Runtime runtime;
auto* listener = emper::io::tcp_listener(HOST, PORT, [&](int socket) {
{
std::lock_guard<std::mutex> l(lock);
conns.push_back(socket);
}
std::array<char, 16> buf;
ssize_t recv = emper::io::recvAndWait(socket, buf.data(), buf.size(), 0);
if (recv < 0) DIE_MSG_ERRNO("recv failed");
unsigned r = ready.fetch_add(1) + 1;
if (r == count) {
notify();
runtime.initiateTermination();
}
});
if (!listener) {
exit(EXIT_FAILURE);
}
runtime.scheduleFromAnywhere(*listener);
runtime.waitUntilFinished();
return EXIT_FAILURE;
}
......@@ -16,6 +16,12 @@ echoserver_exe = executable(
dependencies: emper_dep,
)
netbarrier_exe = executable(
'coordinator',
'Coordinator.cpp',
dependencies: emper_dep,
)
echoserver_callback_exe = executable(
'echoserver_callback',
'EchoServerCallback.cpp',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment