Something went wrong on our end
Select Git revision
file_contexts
-
Calin Juravle authored
Since it was introduced it caused quite a few issues and it spams the SElinux logs unnecessary. The end goal of the audit was to whitelist the access to the interpreter. However that's unfeasible for now given the complexity. Test: devices boots and everything works as expected no more auditallow logs Bug: 29795519 Bug: 32871170 Change-Id: I9a7a65835e1e1d3f81be635bed2a3acf75a264f6
Calin Juravle authoredSince it was introduced it caused quite a few issues and it spams the SElinux logs unnecessary. The end goal of the audit was to whitelist the access to the interpreter. However that's unfeasible for now given the complexity. Test: devices boots and everything works as expected no more auditallow logs Bug: 29795519 Bug: 32871170 Change-Id: I9a7a65835e1e1d3f81be635bed2a3acf75a264f6
EchoClient.cpp 17.40 KiB
// SPDX-License-Identifier: LGPL-3.0-or-later
// Copyright © 2021 Florian Fischer
#include <fcntl.h>
#include <netdb.h> //for getaddrinfo
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h> // for shutdown, socket, AF_INET
#include <sys/stat.h>
#include <unistd.h>
#include <algorithm> // for find
#include <atomic>
#include <cerrno> // for errno, ECANCELED
#include <chrono> // for nanoseconds, duration, durat...
#include <cstdint> // for uint64_t, int32_t
#include <cstdio>
#include <cstdlib> // for size_t, strtol, exit, EXIT_F...
#include <cstring> // for memcmp
#include <iomanip>
#include <iostream> // for operator<<, basic_ostream, endl
#include <ratio>
#include <string>
#include <thread>
#include <utility>
#include "Common.hpp" // for DIE_MSG_ERRNO, DIE_MSG, unli...
#include "CountingPrivateSemaphore.hpp" // for CPS
#include "Debug.hpp" // for LOGE
#include "Fiber.hpp" // for Fiber
#include "Runtime.hpp" // for Runtime
#include "Semaphore.hpp" // for Semaphore
#include "emper.hpp" // for spawn
#include "io.hpp" // for connectAndWait
#include "io/Future.hpp" // for CloseFuture, RecvFuture, Sen...
#include "lib/math.hpp"
using emper::Semaphore;
using emper::io::CloseFuture;
using emper::io::RecvFuture;
using emper::io::SendFuture;
using emper::lib::math::RunningAverage;
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
using std::chrono::nanoseconds;
using std::chrono::seconds;
// Defaults
const int DECIMAL = 10;
const std::string HOST = "0.0.0.0";
const std::string PORT = "12345";
const size_t ITERATIONS = 10000;
const size_t CLIENTS = 10000;
const size_t SIZE = 32;
const size_t SERVER_BACKLOG = 1024;
const seconds ONERROR_SLEEP(10);
const int OPENFLAGS = O_CREAT | O_WRONLY;
const int OPENMODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH;
// Globals
std::string host = HOST;
std::string port = PORT;
size_t iterations = ITERATIONS;
size_t execution_seconds = 0;
size_t nclients = CLIENTS;
size_t size = SIZE;
size_t server_backlog = SERVER_BACKLOG;
bool linked_futures;
bool histogram = false;
static ssize_t maxLowLoadClients = -1;
static std::chrono::milliseconds loadSwitchPeriod(1000);
std::atomic<bool> terminate = false;
struct addrinfo* server;
struct TimeStamps {
high_resolution_clock::time_point start;
high_resolution_clock::time_point afterSend;
high_resolution_clock::time_point afterSendDispatch;
high_resolution_clock::time_point afterRecv;
};
class Client {
using IterationResult = std::pair<int, std::string>;
size_t id;
Semaphore& readySem;
Semaphore& startSem;
CPS& cps;
int sock;
bool initialized = false;
char* inBuf;
char* outBuf;
public:
static std::atomic<size_t> client_ids;
// results
size_t iteration = 0;
RunningAverage<double> avg_ns;
size_t unexpectedEchos = 0;
size_t reconnects = 0;
struct TimeStamps* timeStamps = nullptr;
Client(size_t id, Semaphore& readySem, Semaphore& startSem, CPS& cps, bool collectTimeStamps)
: id(id), readySem(readySem), startSem(startSem), cps(cps) {
outBuf = new char[size];
inBuf = new char[size];
if (collectTimeStamps) {
timeStamps = new TimeStamps[iterations];
}
}
~Client() {
delete[] outBuf;
delete[] inBuf;
delete[] timeStamps;
}
static auto startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool;
private:
template <bool collectTimeStamps = false>
auto echo(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
if constexpr (collectTimeStamps) {
timeStamps[iteration].start = high_resolution_clock::now();
}
sendFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
int32_t bytes_send = sendFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = high_resolution_clock::now();
}
if (unlikely(bytes_send < 0)) {
return {bytes_send, "send failed"};
}
int32_t bytes_recv = recvFuture.submitAndWait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterRecv = high_resolution_clock::now();
}
if (unlikely(bytes_recv < 0)) {
return {bytes_recv, "recv failed"};
}
return {bytes_recv, ""};
}
template <bool collectTimeStamps = false>
auto linkedEcho(SendFuture& sendFuture, RecvFuture& recvFuture) -> IterationResult {
recvFuture.setDependency(sendFuture);
recvFuture.submit();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSend = high_resolution_clock::now();
}
int32_t res = recvFuture.wait();
if constexpr (collectTimeStamps) {
timeStamps[iteration].afterSendDispatch = timeStamps[iteration].afterRecv =
high_resolution_clock::now();
}
if (unlikely(res <= 0)) {
// recv failed
if (res != -ECANCELED) {
if (res == 0) {
return {0, "server closed the connection"};
}
return {res, "recv failed"};
}
// send failed
int32_t send_res = sendFuture.wait();
if (send_res < 0) {
return {send_res, "send failed"};
}
// send must have sent less than expected because recvFuture returned -ECANCELED
return {0, EMPER_BUILD_STR("short send: " << send_res)};
}
return {res, ""};
}
void connect() {
sock = socket(server->ai_family, server->ai_socktype, server->ai_protocol);
if (sock < 0) {
DIE_MSG_ERRNO("creating new client socket failed");
}
int enable = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(enable)) == -1) {
DIE_MSG_ERRNO("setsockopt failed");
}
int err = emper::io::connectAndWait(sock, server->ai_addr, server->ai_addrlen);
if (err) {
DIE_MSG_ERRNO("connecting new client socket failed");
}
if (!initialized) {
// We are connected -> start next client
Client::startNew(readySem, startSem, cps);
initialized = true;
} else {
reconnects++;
}
}
[[nodiscard]] auto shouldYield() const -> bool {
if (maxLowLoadClients < 0) {
return false;
}
if (id < static_cast<size_t>(maxLowLoadClients)) {
return false;
}
auto now = std::chrono::steady_clock::now();
auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
auto ms_since_epoch = now_ms.time_since_epoch();
auto ms_since_epoch_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(ms_since_epoch);
uint64_t periodNum = ms_since_epoch_duration / loadSwitchPeriod;
return periodNum % 2;
}
public:
template <bool collectTimeStamps = false>
void run() {
connect();
// signal that this client is initialized and connected
readySem.release();
// wait for all clients to be ready
startSem.acquire();
while (iteration < iterations && !terminate.load(std::memory_order_relaxed)) {
if (shouldYield()) {
emper::yield();
continue;
}
SendFuture sendFuture(sock, outBuf, size, MSG_NOSIGNAL);
RecvFuture recvFuture(sock, inBuf, size, MSG_WAITALL);
// prepare output buf
sprintf(outBuf, "%lu:%lu", id, iteration);
high_resolution_clock::time_point start;
if constexpr (!collectTimeStamps) {
start = high_resolution_clock::now();
}
IterationResult iterationResult;
if (linked_futures) {
iterationResult = linkedEcho<collectTimeStamps>(sendFuture, recvFuture);
} else {
iterationResult = echo<collectTimeStamps>(sendFuture, recvFuture);
}
high_resolution_clock::time_point end;
if constexpr (!collectTimeStamps) {
end = high_resolution_clock::now();
}
// less than 1 byte was received -> an error occurred
// (connection closed by server is considered an error)
if (unlikely(iterationResult.first <= 0)) {
int err = -iterationResult.first;
auto& msg = iterationResult.second;
if (err == ECONNRESET) {
LOGW("Client " << id << " reconnecting");
close(sock);
// reconnect and try again
connect();
continue;
}
LOGE("Client failed at iteration " << iteration << " with " << msg << ": "
<< strerror(err));
std::this_thread::sleep_for(ONERROR_SLEEP);
DIE;
}
if constexpr (!collectTimeStamps) {
nanoseconds duration = duration_cast<nanoseconds>(end - start);
avg_ns.update(duration.count());
}
if (memcmp(outBuf, inBuf, size) != 0) {
std::stringstream sst;
sst << "got unexpected echo from server" << std::endl;
sst << "expected: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)outBuf[i]);
}
sst << std::endl;
sst << "received: ";
for (unsigned i = 0; i < size; ++i) {
sst << std::setfill('0') << std::setw(2) << std::hex << (0xff & (unsigned)inBuf[i]);
sst << (unsigned)inBuf[i] << " ";
}
sst << std::endl;
LOGE(sst.str());
unexpectedEchos++;
}
iteration++;
}
shutdown(sock, SHUT_RDWR);
// ShutdownFuture shut_f(sock, SHUT_RDWR);
CloseFuture cf(sock);
// cf.setDependency(shut_f);
cf.submit();
cf.wait();
}
};
Client** clients;
std::atomic<size_t> Client::client_ids = 0;
auto Client::startNew(Semaphore& readySem, Semaphore& startSem, CPS& cps) -> bool {
size_t next_client_id = client_ids.fetch_add(1);
if (next_client_id >= nclients) {
return false;
}
spawn(
[&, id = next_client_id] {
clients[id] = new Client(id, readySem, startSem, cps, histogram);
if (histogram) {
clients[id]->run<true>();
} else {
clients[id]->run();
}
},
cps);
return true;
}
auto getOption(int argc, char** argv, const std::string& option) -> char* {
char** end = argv + argc;
char** itr = std::find(argv, end, option);
if (itr != end && ++itr != end) {
return *itr;
}
return nullptr;
}
static auto existsOption(int argc, char** argv, const std::string& option) -> bool {
char** end = argv + argc;
return std::find(argv, end, option) != end;
}
static void printUsage(char* name) {
std::cerr
<< "Usage: " << name
<< "[-h] [-p <port>] [-c <clients>] [-a <address>] [-s <size>] [-b <server backlog>]"
<< std::endl
<< "[-f <output-file>] [-i <iterations> | -t <execution time in sec>] [--linked-futures]"
<< std::endl
<< "[--max-low-load-clients <num>] [--load-switch-period-ms <num>]" << std::endl;
}
auto main(int argc, char* argv[]) -> int {
if (existsOption(argc, argv, "-h")) {
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
char* port_s = getOption(argc, argv, "-p");
if (port_s) {
port = std::string(port_s);
}
char* addr_s = getOption(argc, argv, "-a");
if (addr_s) {
host = std::string(addr_s);
}
char* client_s = getOption(argc, argv, "-c");
if (client_s) {
nclients = strtol(client_s, nullptr, DECIMAL);
}
char* iterations_s = getOption(argc, argv, "-i");
if (iterations_s) {
iterations = strtol(iterations_s, nullptr, DECIMAL);
}
char* time_s = getOption(argc, argv, "-t");
if (time_s) {
if (iterations_s) {
std::cerr << "-t and -i are mutual exclusive" << std::endl;
printUsage(argv[0]);
exit(EXIT_FAILURE);
}
execution_seconds = strtol(time_s, nullptr, DECIMAL);
}
char* size_s = getOption(argc, argv, "-s");
if (size_s) {
size = strtol(size_s, nullptr, DECIMAL);
}
char* server_backlog_s = getOption(argc, argv, "-b");
if (server_backlog_s) {
server_backlog = strtol(server_backlog_s, nullptr, DECIMAL);
}
linked_futures = getOption(argc, argv, "--linked-futures");
int out_fd = STDOUT_FILENO;
char* output_file = getOption(argc, argv, "-f");
if (output_file) {
if (access(output_file, W_OK) == 0) {
DIE_MSG("Output file: " << output_file << " already exists");
}
out_fd = open(output_file, OPENFLAGS, OPENMODE);
if (out_fd < 0) {
DIE_MSG_ERRNO("opening output file failed");
}
}
int histogram_fd;
char* histogram_file = getOption(argc, argv, "--histogram");
if (histogram_file) {
if (!iterations_s) {
DIE_MSG("histograms are currently only possible with fixed iterations");
}
histogram = true;
if (access(histogram_file, W_OK) == 0) {
DIE_MSG("Histogram file: " << histogram_file << " already exists");
}
histogram_fd = open(histogram_file, OPENFLAGS, OPENMODE);
if (histogram_fd < 0) {
DIE_MSG_ERRNO("opening histogram file failed");
}
}
char* max_low_load_clients = getOption(argc, argv, "--max-low-load-clients");
if (max_low_load_clients) {
maxLowLoadClients = std::stoi(max_low_load_clients);
if (static_cast<size_t>(maxLowLoadClients) >= nclients) {
DIE_MSG("--max-low-load-clients " << maxLowLoadClients
<< " must be smaller than the total number of clients "
<< nclients);
}
}
char* load_switch_period_ms_string = getOption(argc, argv, "--load-switch-period-ms");
if (load_switch_period_ms_string) {
if (!max_low_load_clients) {
DIE_MSG("Can't use --load-switch-period-ms without --max-low-load-clients");
}
int load_switch_period_ms = std::stoi(load_switch_period_ms_string);
loadSwitchPeriod = std::chrono::milliseconds(load_switch_period_ms);
}
int err = getaddrinfo(host.c_str(), port.c_str(), nullptr, &server);
if (err) {
if (err == EAI_SYSTEM) {
DIE_MSG_ERRNO("getaddrinfo failed");
} else {
LOGE("error in getaddrinfo: " << gai_strerror(err));
exit(EXIT_FAILURE);
}
}
Runtime runtime;
Fiber* alphaFiber = Fiber::from([&] {
clients = new Client*[nclients];
CPS cps;
Semaphore readySemaphore;
Semaphore startSemaphore;
std::thread terminator;
auto connect_start = high_resolution_clock::now();
// start first client batch
for (size_t i = 0; i < server_backlog; ++i) {
// Start new clients until we have started all or the amount
// of parallel connecting clients is reached
if (!Client::startNew(readySemaphore, startSemaphore, cps)) {
break;
}
}
// await the clients
for (size_t i = 0; i < nclients; ++i) {
readySemaphore.acquire();
}
if (execution_seconds) {
iterations = SIZE_MAX;
terminator = std::thread([] {
std::this_thread::sleep_for(seconds(execution_seconds));
terminate.store(true);
});
}
auto echo_start = high_resolution_clock::now();
// start the clients
for (size_t i = 0; i < nclients; ++i) {
startSemaphore.release();
}
// await client termination
cps.wait();
if (execution_seconds) {
terminator.join();
}
auto echo_end = high_resolution_clock::now();
auto connect_duration = duration_cast<nanoseconds>(echo_start - connect_start).count();
auto echo_duration = duration_cast<nanoseconds>(echo_end - echo_start).count();
auto total_duration = duration_cast<nanoseconds>(echo_end - connect_start).count();
uint64_t total_iterations = 0;
uint64_t total_reconnects = 0;
uint64_t total_unexpected_echos = 0;
RunningAverage avg_ns;
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
total_iterations += client->iteration;
total_reconnects += client->reconnects;
total_unexpected_echos += client->unexpectedEchos;
if (!histogram) {
avg_ns.update(client->avg_ns.getAverage());
}
}
std::stringstream sst;
sst << "[global]" << std::endl;
sst << "clients = " << nclients << std::endl;
sst << "size = " << size << std::endl;
sst << "total_iterations = " << total_iterations << std::endl;
sst << "total_reconnects = " << total_reconnects << std::endl;
sst << "total_unexpected_echos = " << total_unexpected_echos << std::endl;
sst << "avg_ns = " << avg_ns.getAverage() << std::endl;
sst << "connect_duration = " << connect_duration << std::endl;
sst << "echo_duration = " << echo_duration << std::endl;
sst << "total_duration = " << total_duration << std::endl;
sst << std::endl;
sst << "[clients]" << std::endl;
sst << "csv =" << std::endl;
sst << " iterations,avg_ns,reconnects,unexpected_echos" << std::endl;
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
sst << " " << client->iteration << ",";
sst << client->avg_ns.getAverage() << ",";
sst << client->reconnects << ",";
sst << client->unexpectedEchos << std::endl;
}
auto output = sst.str();
if (emper::io::writeFileAndWait(out_fd, output.c_str(), output.size()) < 0) {
DIE_MSG_ERRNO("writing results failed");
}
if (output_file) {
emper::io::closeAndForget(out_fd);
}
if (histogram) {
std::string histogramDataHeader =
"total_latency,after_send_latency,after_send_dispatch_latency\n";
if (emper::io::writeFileAndWait(histogram_fd, histogramDataHeader.c_str(),
histogramDataHeader.size()) < 0) {
DIE_MSG_ERRNO("writing histogram data header failed");
}
for (size_t i = 0; i < nclients; ++i) {
auto* client = clients[i];
std::stringstream sst;
for (size_t iteration = 0; iteration < iterations; ++iteration) {
auto timeStamps = client->timeStamps[iteration];
nanoseconds totalLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.start);
nanoseconds afterSendLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSend);
nanoseconds afterSendDispatchLatency =
duration_cast<nanoseconds>(timeStamps.afterRecv - timeStamps.afterSendDispatch);
sst << totalLatency.count() << ",";
sst << afterSendLatency.count() << ",";
sst << afterSendDispatchLatency.count() << std::endl;
}
auto histoOutput = sst.str();
if (emper::io::writeFileAndWait(histogram_fd, histoOutput.c_str(), histoOutput.size()) <
0) {
DIE_MSG_ERRNO("writing histogram data failed");
}
}
emper::io::closeAndForget(histogram_fd);
}
for (size_t i = 0; i < nclients; ++i) {
delete clients[i];
}
delete[] clients;
exit(EXIT_SUCCESS);
});
runtime.scheduleFromAnywhere(*alphaFiber);
runtime.waitUntilFinished();
return EXIT_FAILURE;
}