Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package refit.replica.order;
import java.util.*;
import refit.config.REFITConfig;
import refit.config.REFITLogger;
import refit.message.REFITInstruction;
import refit.message.REFITInstruction.REFITFlushRequestNotification;
import refit.message.REFITMessage;
import refit.replica.REFITBaseReplica;
import refit.replica.REFITObserver;
import refit.replica.REFITStage;
import refit.scheduler.REFITSchedulerTaskType;
import refit.util.REFITAssert;
import refit.util.REFITTime;
public class REFITTimingTracker extends REFITStage {
enum ProposalAction {
STOP, SLOW, NORMAL, FAST, VERY_FAST
}
public final REFITObserver<REFITFlushRequestNotification> flushSender;
private final short ownGroup;
private final short replicaID;
private final Map<Short, GroupData> groupDataMap;
private final int[][] oneWayDelays;
private final int[] currentView;
private final boolean[] isViewStable;
private long nextProposalTime;
public REFITTimingTracker(REFITBaseReplica replica, short ownGroup) {
super(REFITSchedulerTaskType.ORDER_STAGE, replica);
flushSender = new REFITObserver<>();
this.ownGroup = ownGroup;
this.replicaID = replica.id;
groupDataMap = new HashMap<>();
for (short i = 0; i < REFITOrderGroups.COUNT; i++) {
final GroupData groupData = new GroupData();
groupData.highestPrepare = i == 0 ? 0 : i - REFITOrderGroups.COUNT;
groupData.highestPrepareArrivalTime = 0L;
groupData.prepareNum = new LinkedList<>();
groupData.prepareTime = new LinkedList<>();
groupData.pingTimes = new ArrayList<>();
groupDataMap.put(i, groupData);
}
int[][] delays = REFITConfig.EXPECTED_GROUP_NETWORK_DELAYS_MS;
oneWayDelays = new int[delays.length][];
for (int i = 0; i < oneWayDelays.length; i++) {
oneWayDelays[i] = Arrays.copyOf(delays[i], delays[i].length);
}
currentView = new int[REFITOrderGroups.COUNT];
isViewStable = new boolean[REFITOrderGroups.COUNT];
nextProposalTime = Long.MAX_VALUE;
}
@Override
public String toString() {
return "TT";
}
public void updateView(short groupID, int newView, boolean isStable) {
currentView[groupID] = newView;
isViewStable[groupID] = isStable;
}
public void receivedPrepare(long instanceId, short from) {
final short group = REFITOrderGroups.getGroupID(instanceId);
final GroupData groupData = groupDataMap.get(group);
// ignore old or duplicate prepare
if (instanceId <= groupData.highestPrepare) return;
long ct = REFITTime.currentTimeMillis.getAsLong();
// Save arrival time
groupData.highestPrepare = instanceId;
groupData.highestPrepareArrivalTime = ct;
if (from == replicaID) {
// track own prepares
for (int i = 0; i < REFITOrderGroups.COUNT; i++) {
if (i == ownGroup) continue;
final GroupData gd = groupDataMap.get((short) i);
gd.prepareNum.add(instanceId);
gd.prepareTime.add(ct);
}
// REFITLogger.logWarning(this, "prep " + instanceId);
}
// REFITLogger.logWarning(this, "prep " + group + " " + instanceId);
}
public void receivedCommit(long instanceId, short otherReplica) {
short otherGroup = REFITOrderGroups.getPrimaryGroup(otherReplica);
short groupLeader = REFITOrderGroups.getGroupPrimary(otherGroup, currentView[otherGroup]);
// only measure ping times to the other leader
if (groupLeader != otherReplica) return;
// REFITLogger.logWarning(this, "commit " + otherGroup + " " + instanceId);
final short group = REFITOrderGroups.getGroupID(instanceId);
REFITAssert.assertTrue(group == ownGroup);
final GroupData groupData = groupDataMap.get(otherGroup);
// ignore delayed prepared
while (!groupData.prepareNum.isEmpty() && groupData.prepareNum.peek() < instanceId) {
groupData.prepareNum.removeFirst();
groupData.prepareTime.removeFirst();
}
// only use commit if we have a matching prepare
if (groupData.prepareNum.isEmpty() || groupData.prepareNum.peek() != instanceId) return;
// Save arrival time
long ct = REFITTime.currentTimeMillis.getAsLong();
// FIXME: ignore pings during view-changes
groupData.pingTimes.add((int) (ct - groupData.prepareTime.peek()));
if (groupData.pingTimes.size() >= REFITConfig.NETWORK_PING_ITERATIONS) {
int lowestOneWayDelay = Collections.min(groupData.pingTimes) / 2;
oneWayDelays[ownGroup][group] = lowestOneWayDelay;
oneWayDelays[group][ownGroup] = lowestOneWayDelay;
REFITLogger.logWarning(this, "Update delay between " + ownGroup + " and " + otherGroup + " to " + lowestOneWayDelay + " ms");
groupData.pingTimes = new ArrayList<>();
}
}
@Override
protected void handleMessage(REFITMessage message) {
switch (message.type) {
case INSTRUCTION:
handleInstruction(((REFITInstruction) message));
break;
default:
REFITLogger.logError(this, "drop message of unexpected type " + message.type);
}
}
private void handleInstruction(REFITInstruction instruction) {
switch (instruction.instructionType) {
case INITIAL_CONNECTIONS_COMPLETED:
// just needed to trigger stageComplete()
break;
default:
REFITLogger.logError(this, "drop instruction of unexpected type " + instruction.instructionType);
}
}
@Override
protected void stageComplete() {
// Nothing to do if not leader or view not yet stable
if (REFITOrderGroups.getGroupPrimary(ownGroup, currentView[ownGroup]) != replica.id || !isViewStable[ownGroup])
return;
// At start or after a previous proposal stop has ended, propose immediately
if (nextProposalTime == Long.MAX_VALUE && getNextProposalAction() != REFITTimingTracker.ProposalAction.STOP) {
nextProposalTime = REFITTime.currentTimeMillis.getAsLong();
}
// wait for turn
if (REFITTime.currentTimeMillis.getAsLong() < nextProposalTime) return;
long currentSeq = groupDataMap.get(ownGroup).highestPrepare;
long newLimit = currentSeq + REFITOrderGroups.COUNT + 1;
flushSender.broadcast(new REFITFlushRequestNotification(newLimit, newLimit, newLimit));
ProposalAction nextProposalAction = getNextProposalAction();
switch (nextProposalAction) {
case STOP:
nextProposalTime = Long.MAX_VALUE;
break;
case SLOW:
nextProposalTime += REFITConfig.PROPOSALS_INTERVAL_MS + 1;
break;
case NORMAL:
nextProposalTime += REFITConfig.PROPOSALS_INTERVAL_MS;
break;
case FAST:
nextProposalTime += REFITConfig.PROPOSALS_INTERVAL_MS - 1;
break;
case VERY_FAST:
nextProposalTime += REFITConfig.PROPOSALS_INTERVAL_MS / 2;
break;
}
final long currentTime = REFITTime.currentTimeMillis.getAsLong();
setTimeoutAbsolute(Math.min(nextProposalTime, currentTime + 2 * REFITConfig.PROPOSALS_INTERVAL_MS));
}
private long getEstimatedCurrentState(final short group, final short ownGroup) {
if (group == ownGroup) {
return 0;
}
final GroupData groupData = groupDataMap.get(group);
final long lastKnownState = (groupData.highestPrepare + 1) / REFITOrderGroups.COUNT;
final GroupData ownGroupData = groupDataMap.get(ownGroup);
long ownPrepare = (ownGroupData.highestPrepare + 1) / REFITOrderGroups.COUNT;
// expected time difference based on the known proposals
final long seqTimeOffset = (ownPrepare - lastKnownState) * REFITConfig.PROPOSALS_INTERVAL_MS;
final long delay = oneWayDelays[group][ownGroup];
// time since latest proposal arrived, limited to one missing proposal
long currentTime = REFITTime.currentTimeMillis.getAsLong();
final long timePassed = Math.min(2 * REFITConfig.PROPOSALS_INTERVAL_MS, currentTime - groupData.highestPrepareArrivalTime);
return seqTimeOffset - delay - timePassed;
}
public ProposalAction getNextProposalAction() {
long slowestOtherGroupOffset = -Long.MAX_VALUE;
// long ct = REFITTime.currentTimeMillis.getAsLong();
// String str = "a";
// String stro = "o";
for (short i = 0; i < REFITOrderGroups.COUNT; i++) {
if (i == ownGroup) continue;
final long groupOffset = getEstimatedCurrentState(i, ownGroup);
slowestOtherGroupOffset = Math.max(slowestOtherGroupOffset, groupOffset);
// str += " " + (groupDataMap.get(i).highestPrepare + 1) / REFITConfig.REPLICA_GROUPS.size() + "@" + (ct - groupDataMap.get(i).highestPrepareArrivalTime);
// stro += " " + groupOffset;
}
// str += " m " + (groupDataMap.get(ownGroup).highestPrepare + 1) / REFITConfig.REPLICA_GROUPS.size();
// REFITLogger.logWarning(this, "reference " + slowestOtherGroupOffset + " " + str + " " + stro);
int pi = REFITConfig.PROPOSALS_INTERVAL_MS;
if (slowestOtherGroupOffset < -10 * pi) {
// If we are very far behind the next slowest (e.g. after view-change), catch up fast
return ProposalAction.VERY_FAST;
} else if (slowestOtherGroupOffset < -pi / 2) {
// If we are a bit behind, speed up
return ProposalAction.FAST;
} else if (slowestOtherGroupOffset < pi / 2) {
// If we share the last place with at least one other group, continue normally
return ProposalAction.NORMAL;
} else if (slowestOtherGroupOffset < 20 * pi) {
// Slow down if we are at least one prepare ahead
return ProposalAction.SLOW;
} else {
// Stop if we are more than 20 prepares ahead
return ProposalAction.STOP;
}
}
private static class GroupData {
LinkedList<Long> prepareNum;
LinkedList<Long> prepareTime;
long highestPrepare;
long highestPrepareArrivalTime;
ArrayList<Integer> pingTimes;
}
}