Skip to content
Snippets Groups Projects
Select Git revision
  • remote-put-get-free-context-cycle
  • master default
  • ci-bump-test
  • abp-queue
  • new-delete-leaks
  • fused-continuation-and-completion-stealing
  • emper-fix-invalid-conv
  • linux-version-construct-on-first-use
  • libstdc++-asserts
  • msan
  • libc++
  • completer-strategies
  • cactus_stack_devel_one_commit
  • client-load-change
  • cppcheck
  • flow
  • cast-if-future
  • async_network2
  • thread_safe_log_config
  • burak
  • attic/clang-release-tls-optimization-debug-and-fix
  • attic/continuation-stealing-dev
22 results

Coordinator.cpp

Blame
  • user avatar
    Florian Fischer authored
    053cc4bf
    History
    Coordinator.cpp 2.03 KiB
    // SPDX-License-Identifier: LGPL-3.0-or-later
    // Copyright © 2021 Florian Fischer
    #include <sys/types.h>
    
    #include <array>
    #include <atomic>
    #include <cstdlib>
    #include <iostream>
    #include <mutex>
    #include <string>
    #include <vector>
    
    #include "Common.hpp"
    #include "Runtime.hpp"
    #include "io.hpp"
    #include "io/Future.hpp"
    #include "io/IoContext.hpp"
    
    std::string HOST = "::";
    std::string PORT = "12346";
    
    std::atomic<unsigned> ready;
    std::mutex lock;
    std::vector<int> conns;
    
    void notify() {
    	std::vector<emper::io::SendFuture*> futures;
    	std::string msg = "Go";
    	for (int conn : conns) {
    		auto* sf = new emper::io::SendFuture(conn, msg.c_str(), msg.size(), 0);
    		futures.push_back(sf);
    	}
    
    	IoContext::getIo()->submit(futures.begin(), futures.end());
    	for (auto* f : futures) {
    		int res = f->wait();
    		if (res < 0) DIE_MSG_ERRNO("send failed");
    		delete f;
    	}
    
    	for (int conn : conns) emper::io::closeAndForget(conn);
    }
    
    // NOLINTNEXTLINE(bugprone-exception-escape)
    auto main(int argc, char* argv[]) -> int {
    	if (argc != 2) {
    		std::cerr << "Usage: " << argv[0] << " <count>" << std::endl;
    		exit(EXIT_FAILURE);
    	}
    
    	const int scount = std::stoi(argv[1]);
    	if (scount < 0) DIE_MSG("count must be positiv");
    	const auto count = static_cast<unsigned>(scount);
    
    	std::cout << "Network barrier listening on " << HOST << ":" << PORT << " for " << count
    						<< " connections" << std::endl;
    
    	Runtime runtime;
    	auto coordinator_func = [&](int socket) {
    		{
    			std::lock_guard<std::mutex> l(lock);
    			conns.push_back(socket);
    		}
    
    		std::array<char, 16> buf;
    		ssize_t recv = emper::io::recvAndWait(socket, buf.data(), buf.size(), 0);
    		if (recv < 0) DIE_MSG_ERRNO("recv failed");
    
    		unsigned r = ready.fetch_add(1) + 1;
    		if (r == count) {
    			notify();
    			runtime.initiateTermination();
    		}
    	};
    
    	auto* listener = emper::io::tcp_listener(HOST, PORT, coordinator_func, scount,
    																					 {emper::io::SockOpt::ReusePort});
    
    	if (!listener) {
    		exit(EXIT_FAILURE);
    	}
    
    	runtime.scheduleFromAnywhere(*listener);
    
    	runtime.waitUntilFinished();
    
    	return EXIT_SUCCESS;
    }