Newer
Older
package refit.communication;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import refit.config.REFITLogger;
import refit.scheduler.REFITSchedulerTask;
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
public class REFITConnection {
private final REFITUnmarshaller unmarshaller;
// use volatile to make sure that isOpen() can return a somewhat reasonable result
private volatile SocketChannel socketChannel;
private SocketAddress socketAddress;
public SelectionKey selectionKey;
private boolean isReadable;
private boolean isWriteable;
private boolean isConnectable;
private final short id;
public REFITConnection(int receiveBufferSize, int sendBufferSize, REFITUnmarshaller unmarshaller, short nodeID) {
this.unmarshaller = unmarshaller;
this.receiveBuffer = ByteBuffer.allocateDirect(receiveBufferSize);
this.boundedSendQueue = new ConcurrentLinkedQueue<>();
this.boundedSendQueueBytes = new AtomicInteger(0);
this.waitingForSend = new HashSet<>();
// FIXME: eventually get rid of sendQueue. However, this requires potentially large modifications to every single stage
this.sendQueue = new ConcurrentLinkedQueue<>();
this.useStaticSendBuffer = (sendBufferSize > 0);
this.staticSendBuffer = useStaticSendBuffer ? ByteBuffer.allocateDirect(sendBufferSize) : null;
this.boundedSendQueueLimit = (sendBufferSize > 0) ? sendBufferSize / 2 : 10 * 1024;
this.sendBuffer = null;
this.id = nodeID;
}
@Override
public String toString() {
Object socket = (socketChannel != null) ? socketChannel.socket() : null;
return "CNCTN[" + socket + "]";
}
public void init(SocketChannel socketChannel) throws IOException {
if (socketChannel == null) throw new IllegalArgumentException();
// Reset helper data structures
this.socketChannel = socketChannel;
selectionKey = null;
socketAddress = null;
// wakeup every waiting outbox
sendWakeups();
waitingForSend.clear();
hasWaitingForSend = false;
// reset send and receive queues
boundedSendQueue.clear();
boundedSendQueueBytes.set(0);
sendQueue.clear();
sendPending = false;
sendBuffer = null;
receiveBuffer.clear();
receiveProcessedMark = 0;
isReadable = true;
isWriteable = true;
isConnectable = true;
try {
// Configure channel
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
} catch (IOException e) {
closeConnection();
throw e;
}
}
public void connect(SocketAddress address) throws IOException {
init(SocketChannel.open());
socketAddress = address;
try {
socketChannel.connect(address);
} catch (IOException e) {
closeConnection();
throw e;
}
isReadable = false;
isWriteable = false;
// test immediately whether the connection can be completed
// without this, the connection setup seems to hang at least on localhost
isConnectable = true;
}
public void register(REFITSchedulerTask task) {
try {
// Register channel for I/O operations
selectionKey = task.registerIO(socketChannel, (socketAddress == null) ? SelectionKey.OP_READ : SelectionKey.OP_CONNECT);
} catch (IOException ioe) {
closeConnection();
}
}
public void select() {
isReadable = selectionKey.isReadable();
isWriteable = (selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0 || selectionKey.isWritable();
isConnectable = selectionKey.isConnectable();
}
public boolean isOpen() {
return (socketChannel != null);
}
public void closeConnection() {
if (selectionKey != null) selectionKey.cancel();
try {
socketChannel.close();
} catch (IOException ignored) {
}
socketChannel = null;
selectionKey = null;
}
public SocketChannel takeChannel() {
SocketChannel channel = socketChannel;
selectionKey = null;
socketChannel = null;
socketAddress = null;
return channel;
}
// ###########
// # SENDING #
// ###########
private final boolean useStaticSendBuffer;
private final ByteBuffer staticSendBuffer;
private final Queue<ByteBuffer> boundedSendQueue;
private final AtomicInteger boundedSendQueueBytes;
private final int boundedSendQueueLimit;
private final Set<REFITOutbox> waitingForSend;
private volatile boolean hasWaitingForSend;
private final Queue<ByteBuffer> sendQueue;
private ByteBuffer sendBuffer;
private boolean sendPending;
public void enqueue(ByteBuffer message) {
if (!isOpen()) return;
sendQueue.add(message);
}
public boolean connect() throws IOException {
if (!isConnectable) return false;
boolean isConnected = socketChannel.finishConnect();
if (!isConnected) {
// connection not yet complete
return false;
}
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
return true;
}
private boolean isBoundedQueueFull() {
return boundedSendQueueBytes.get() > boundedSendQueueLimit;
}
private boolean isBoundedQueueBusy() {
return boundedSendQueueBytes.get() > boundedSendQueueLimit / 2;
}
public boolean boundedEnqueue(ByteBuffer message, REFITOutbox outbox) {
if (!isOpen() || isBoundedQueueFull() || hasWaitingForSend) {
synchronized (waitingForSend) {
hasWaitingForSend = true;
waitingForSend.add(outbox);
}
return false;
}
boundedSendQueue.add(message);
boundedSendQueueBytes.addAndGet(message.remaining());
return true;
}
private void sendWakeups() {
synchronized (waitingForSend) {
for (REFITOutbox outbox : waitingForSend) {
outbox.getStage().tell(new REFITNetworkEvents.REFITConnectionWriteableNotification(outbox, id));
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
}
waitingForSend.clear();
hasWaitingForSend = false;
}
}
// TODO: Single send operation for retry and new data when using a static send buffer
public void send() {
// Check whether connection is open
if (!isOpen()) return;
// Try to complete pending send
if (sendPending) {
if (!isWriteable) return;
boolean success = trySend();
if (!success) return;
}
// Prepare send buffer
while (!boundedSendQueue.isEmpty() || !sendQueue.isEmpty()) {
ByteBuffer queueHead = boundedSendQueue.poll();
if (queueHead == null) {
queueHead = sendQueue.poll();
} else {
boundedSendQueueBytes.addAndGet(-queueHead.remaining());
}
// Only fallback to message buffer, if message is too large
if (useStaticSendBuffer && staticSendBuffer.remaining() >= queueHead.remaining()) {
sendBuffer = staticSendBuffer;
sendBuffer.put(queueHead);
while (!boundedSendQueue.isEmpty() || !sendQueue.isEmpty()) {
boolean fromBoundedQueue = true;
ByteBuffer message = boundedSendQueue.peek();
if (message == null) {
message = sendQueue.peek();
fromBoundedQueue = false;
}
if (sendBuffer.remaining() < message.remaining()) break;
if (fromBoundedQueue) {
boundedSendQueue.poll();
boundedSendQueueBytes.addAndGet(-message.remaining());
} else {
sendQueue.poll();
}
sendBuffer.put(message);
}
sendBuffer.flip();
} else {
sendBuffer = queueHead;
}
// Try to send data
boolean success = trySend();
if (!success) break;
}
if (!isBoundedQueueBusy()) {
sendWakeups();
}
}
private boolean trySend() {
try {
// Try to send data
int bytesToSend = sendBuffer.remaining();
int bytesSent = socketChannel.write(sendBuffer);
sendPending = (bytesSent < bytesToSend);
if (sendPending) {
//REFITLogger.logWarning(this, "write stalled (" + socketChannel.socket().getRemoteSocketAddress() + "): only " + bytesSent + " of " + bytesToSend + " bytes sent");
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
isWriteable = false;
return false;
}
// Sending has been successful
if (sendBuffer == staticSendBuffer) sendBuffer.clear();
else sendBuffer = null;
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
return true;
} catch (IOException ioe) {
REFITLogger.logWarning(this, "Broken connection");
closeConnection();
return false;
}
}
// #############
// # RECEIVING #
// #############
private final ByteBuffer receiveBuffer;
private int receiveProcessedMark;
public List<ByteBuffer> receive() {
// Check whether connection is open and data is available
if (!isOpen()) return Collections.emptyList();
if (!isReadable) return Collections.emptyList();
// Compact buffer if it is almost full
if (receiveBuffer.position() > (receiveBuffer.capacity() * 3 / 4)) {
receiveBuffer.limit(receiveBuffer.position());
receiveBuffer.position(receiveProcessedMark);
receiveProcessedMark = 0;
receiveBuffer.compact();
}
// Get data
int bytesRead;
try {
// Receive new data
bytesRead = socketChannel.read(receiveBuffer);
} catch (IOException ioe) {
bytesRead = -1;
}
// Close connection in case of an exception or EOF
if (bytesRead < 0) {
closeConnection();
return Collections.emptyList();
}
// Prepare buffer for unmarshalling
int dataEndPosition = receiveBuffer.position();
receiveBuffer.limit(dataEndPosition);
receiveBuffer.position(receiveProcessedMark);
// Unmarshal messages
List<ByteBuffer> receiveQueue = new LinkedList<>();
while (receiveBuffer.hasRemaining()) {
ByteBuffer messageBuffer = unmarshaller.unmarshalMessage(receiveBuffer);
if (messageBuffer == null) break;
final ByteBuffer messageCopy = ByteBuffer.allocate(messageBuffer.remaining());
messageCopy.put(messageBuffer);
messageCopy.flip();
receiveQueue.add(messageCopy.asReadOnlyBuffer());
}
// Update processed mark and reset buffer
receiveProcessedMark = receiveBuffer.position();
receiveBuffer.limit(receiveBuffer.capacity());
receiveBuffer.position(dataEndPosition);
isReadable = false;
// Message is too large for the receive buffer
if (receiveProcessedMark == 0 && receiveBuffer.remaining() == 0 && receiveQueue.size() == 0) {
REFITLogger.logWarning(this, "Message too large to receive, buffer size " + receiveBuffer.capacity());
closeConnection();
return Collections.emptyList();
}
return receiveQueue;
}
}