Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package refit.client;
import java.io.File;
import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import refit.client.policy.REFITClientPolicy;
import refit.config.REFITConfig;
import refit.config.REFITLogger;
import refit.scheduler.REFITHangDetector;
import refit.scheduler.REFITScheduler;
import refit.scheduler.REFITSchedulerGroup;
import refit.scheduler.REFITSchedulerTaskType;
import refit.util.REFITCallTracer;
import refit.util.REFITIntervalStatistics;
import refit.util.REFITStatisticsListener;
import refit.util.REFITTime;
public class REFITBenchmark implements REFITStatisticsListener {
private REFITIntervalStatistics statistics;
public REFITBenchmark() {
File clientLatencies = REFITConfig.CLIENT_LATENCY_STATISTICS ? new File(REFITConfig.OUTPUT_DIRECTORY + "/" + REFITConfig.OUTPUT_ID + ".latency") : null;
statistics = new REFITIntervalStatistics(1000, this, clientLatencies);
}
@Override
public String toString() {
return "BENCH";
}
@SuppressWarnings("unchecked")
public void runBenchmark(long durationInMs, int clientProcessID,
Class<? extends REFITBenchmarkRunnable> benchmarkType,
Class<? extends REFITClientLibrary> libraryType,
Class<? extends REFITClientPolicy> policyType) throws InterruptedException {
int nrOfClients = REFITConfig.NR_OF_CLIENTS[clientProcessID];
int clientIDOffset = REFITConfig.TOTAL_NR_OF_REPLICAS;
for (int i = 0; i < clientProcessID; i++) {
clientIDOffset += REFITConfig.NR_OF_CLIENTS[i];
}
System.out.println("Number of clients: " + nrOfClients);
System.out.println("ClientID offset: " + clientIDOffset);
final boolean isClientTask = REFITClientTask.class.isAssignableFrom(benchmarkType);
// Create schedulers
REFITScheduler[] schedulers = new REFITScheduler[REFITConfig.NR_OF_CLIENT_SCHEDULERS];
REFITSchedulerGroup[] groups = new REFITSchedulerGroup[schedulers.length];
CountDownLatch startupSync = new CountDownLatch(REFITConfig.NR_OF_CLIENT_SCHEDULERS);
REFITHangDetector hangDetector = (REFITConfig.TRACK_SCHEDULER_HANGS) ? new REFITHangDetector() : null;
if (hangDetector != null) hangDetector.start();
for (int i = 0; i < schedulers.length; i++) {
schedulers[i] = new REFITScheduler(startupSync, hangDetector);
schedulers[i].setName("CLNT" + clientProcessID + "-" + i);
groups[i] = new REFITSchedulerGroup((REFITConfig.NR_OF_CLIENT_SCHEDULERS == 1) && isClientTask);
}
// Create client execution policy
REFITClientPolicy policy = null;
try {
Constructor<? extends REFITClientPolicy> policyConstructor = policyType.getConstructor(long.class, int.class, int.class, int.class);
policy = policyConstructor.newInstance(durationInMs, clientProcessID, nrOfClients, clientIDOffset);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
// Create and assign clients and client libraries
REFITClientThread[] clients = isClientTask ? null : new REFITClientThread[nrOfClients];
Thread[] clientThreads = isClientTask ? null : new Thread[nrOfClients];
REFITClientTask[] clientTasks = isClientTask ? new REFITClientTask[nrOfClients] : null;
REFITClientLibrary[] libraries = new REFITClientLibrary[nrOfClients];
REFITClientLibrary firstLibrary = null;
LinkedBlockingQueue<Object> sidechannel = new LinkedBlockingQueue<>();
for (short i = 0; i < nrOfClients; i++) {
try {
// Create client library
Constructor<? extends REFITClientLibrary> libraryConstructor = libraryType.getConstructor(short.class,
REFITSchedulerGroup.class, REFITIntervalStatistics.class, REFITClientPolicy.class, boolean.class,
LinkedBlockingQueue.class);
REFITClientLibrary library = libraryConstructor.newInstance((short) (clientIDOffset + i),
groups[i % schedulers.length], statistics, policy, i == 0, sidechannel);
libraries[i] = library;
if (firstLibrary == null) {
firstLibrary = library;
} else {
firstLibrary.addInitialCompleteListener(library);
}
// Create client
if (isClientTask) {
Constructor<? extends REFITClientTask> benchmarkConstructor =
(Constructor<? extends REFITClientTask>) benchmarkType.getConstructor(REFITClientLibrary.class);
REFITClientTask client = benchmarkConstructor.newInstance(library);
library.setClient(client);
clientTasks[i] = client;
} else {
Constructor<? extends REFITClientThread> benchmarkConstructor =
(Constructor<? extends REFITClientThread>) benchmarkType.getConstructor(REFITClientLibrary.class);
clients[i] = benchmarkConstructor.newInstance(library);
clientThreads[i] = new Thread(clients[i], "BENCH" + library.getNodeID());
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
// assign tasks to their schedulers
for (int i = 0; i < schedulers.length; i++) {
schedulers[i].assignTasks(groups[i], REFITSchedulerTaskType.NETWORK_ENDPOINT);
schedulers[i].assignTasks(groups[i], REFITSchedulerTaskType.CLIENT_ENDPOINT_WORKER);
schedulers[i].assignTasks(groups[i], REFITSchedulerTaskType.CLIENT_LIBRARY);
schedulers[i].assignTasks(groups[i], REFITSchedulerTaskType.CLIENT_TASK);
groups[i].assertAllTaskAreAssigned();
}
REFITLogger.logEvent(this, "Startup delay: " + REFITTime.timeSinceStart() * 0.001f + " s");
// Start schedulers
for (REFITScheduler scheduler : schedulers) scheduler.start();
try {
if (!startupSync.await(10, TimeUnit.SECONDS)) {
System.err.println("Client scheduler startup timed out");
System.exit(1);
}
} catch (InterruptedException e) {
e.printStackTrace();
System.exit(1);
}
// Start client threads
if (!isClientTask) {
for (Thread client : clientThreads) client.start();
}
// Wait for benchmark end
statistics.start();
// Delay the policy start a bit to make it very likely that the policy completes a short time
// after a statistics interval result is printed
REFITTime.sleep(79);
policy.execute();
statistics.end();
REFITCallTracer.instance.dumpStacks();
analyzeLibraryProgress(libraries);
System.exit((lastEventCount > 50) ? 0 : 1);
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
}
private long analyzeLibraryProgress(REFITClientLibrary[] libraries) {
long totalProgress = 0;
long[] progresses = new long[libraries.length];
for (int i = 0; i < libraries.length; i++) {
long progress = libraries[i].getProgress();
totalProgress += progress;
progresses[i] = progress;
}
Arrays.sort(progresses);
long averageProgress = totalProgress / libraries.length;
long progressBlock = averageProgress / 20;
long blockStart = progresses[0];
int blockStartIdx = 0;
System.out.println("=== Client progress ===");
for (int i = 0; i < progresses.length; i++) {
if (progresses[i] > blockStart + progressBlock) {
int blockLen = i - blockStartIdx;
System.out.printf("%d - %d: %d\n", blockStart, progresses[i - 1], blockLen);
blockStart = progresses[i];
blockStartIdx = i;
}
}
int blockLen = progresses.length - blockStartIdx;
System.out.printf("%d - %d: %d\n", blockStart, progresses[progresses.length - 1], blockLen);
return totalProgress;
}
// ##############
// # STATISTICS #
// ##############
volatile long lastEventCount = 0;
@Override
public void statisticsStartedResult() {
REFITLogger.logEvent(this, "OK");
REFITLogger.logEvent(this, String.format("Start time: %.3fs", REFITTime.timeSinceStart() * 0.001f));
}
@Override
public void statisticsIntervalResult(final int resultIndex, final int eventCount, final float eventValueAverage,
final long eventValueMin, final long eventValueMax) {
System.out.println(String.format("%13d %4d %6d %6.0f (%6d/%6d)", REFITTime.timeMicroUTC.getAsLong(),
resultIndex, eventCount, eventValueAverage, eventValueMin, eventValueMax));
lastEventCount = eventCount;
if (REFITConfig.CLIENT_REPLY_STATISTICS) {
statistics.printReplyStats();
}
}
@Override
public void statisticsOverallResult(int nrOfIntervals, int eventCount, float eventCountAverage, float eventValueAverage) {
REFITLogger.logEvent(this, String.format("END: %6d %6.0f %6.0f (%3d)", eventCount, eventCountAverage, eventValueAverage, nrOfIntervals));
}
// ########
// # MAIN #
// ########
@SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("usage: java " + REFITBenchmark.class.getSimpleName() + " <duration> <client-group-id> [<start-delay>]");
System.exit(1);
}
long durationInMs = Integer.parseInt(args[0]) * 1000;
int clientProcessID = Integer.parseInt(args[1]);
Class<? extends REFITBenchmarkRunnable> benchmarkType = REFITConfig.APPLICATION_CLIENT;
Class<? extends REFITClientLibrary> libraryType = REFITConfig.APPLICATION_CLIENT_LIBRARY;
Class<? extends REFITClientPolicy> policyType = REFITConfig.APPLICATION_CLIENT_POLICY;
REFITBenchmark benchmark = new REFITBenchmark();
benchmark.runBenchmark(durationInMs, clientProcessID, benchmarkType, libraryType, policyType);
}
}