Commit 51789dc4 authored by Camille Fournier's avatar Camille Fournier
Browse files

ZOOKEEPER-1382. Zookeeper server holds onto dead/expired session ids in the watch data structures

  (Germán Blanco and Michael Morello via camille)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/branches/branch-3.4@1550220 13f79535-47bb-0310-9956-ffa450edef68
parent e8aef14c
......@@ -179,6 +179,9 @@ BUGFIXES:
(Raul Gutierrez Segales via michim)
ZOOKEEPER-1834. Catch IOException in FileTxnLog (fpj via michim)
ZOOKEEPER-1382. Zookeeper server holds onto dead/expired session ids in the watch data structures
(Germán Blanco and Michael Morello via camille)
IMPROVEMENTS:
......
......@@ -204,9 +204,19 @@ public class NIOServerCnxn extends ServerCnxn {
}
}
/**
* Only used in order to allow testing
*/
protected boolean isSocketOpen() {
return sock.isOpen();
}
/**
* Handles read/write IO on connection.
*/
void doIO(SelectionKey k) throws InterruptedException {
try {
if (sock.isOpen() == false) {
if (isSocketOpen() == false) {
LOG.warn("trying to do i/o on a null socket for session:0x"
+ Long.toHexString(sessionId));
......
......@@ -619,9 +619,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
+ " with negotiated timeout " + cnxn.getSessionTimeout()
+ " for client "
+ cnxn.getRemoteSocketAddress());
cnxn.enableRecv();
}
cnxn.enableRecv();
} catch (Exception e) {
LOG.warn("Exception while establishing session, closing", e);
cnxn.close();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.jute.Record;
import org.apache.zookeeper.ZooKeeper.WatchRegistration;
import java.nio.ByteBuffer;
public class MockPacket extends ClientCnxn.Packet {
public MockPacket(RequestHeader requestHeader, ReplyHeader replyHeader,
Record request, Record response,
WatchRegistration watchRegistration) {
super(requestHeader, replyHeader, request, response, watchRegistration);
}
public MockPacket(RequestHeader requestHeader, ReplyHeader replyHeader,
Record request, Record response,
WatchRegistration watchRegistration, boolean readOnly) {
super(requestHeader, replyHeader, request, response, watchRegistration, readOnly);
}
public ByteBuffer createAndReturnBB() {
createBB();
return this.bb;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.io.IOException;
public class MockNIOServerCnxn extends NIOServerCnxn {
public MockNIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
SelectionKey sk, NIOServerCnxnFactory factory)
throws IOException {
super(zk, sock, sk, factory);
}
/**
* Handles read/write IO on connection.
*/
public void doIO(SelectionKey k) throws InterruptedException {
super.doIO(k);
}
@Override
protected boolean isSocketOpen() {
return true;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.junit.Assert.*;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.zookeeper.MockPacket;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.SetWatches;
import org.apache.zookeeper.server.MockNIOServerCnxn;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Demonstrate ZOOKEEPER-1382 : Watches leak on expired session
*/
public class WatchLeakTest {
protected static final Logger LOG = LoggerFactory
.getLogger(WatchLeakTest.class);
final long SESSION_ID = 0xBABEL;
/**
* ZOOKEEPR-1382 test class
*/
@Test
public void testWatchesWithClientSessionTimeout() throws Exception {
NIOServerCnxnFactory serverCnxnFactory = new NIOServerCnxnFactory();
ZKDatabase database = new ZKDatabase(null);
database.setlastProcessedZxid(2L);
QuorumPeer quorumPeer = mock(QuorumPeer.class);
FileTxnSnapLog logfactory = mock(FileTxnSnapLog.class);
// Directories are not used but we need it to avoid NPE
when(logfactory.getDataDir()).thenReturn(new File("/tmp"));
when(logfactory.getSnapDir()).thenReturn(new File("/tmp"));
FollowerZooKeeperServer fzks = null;
try {
fzks = new FollowerZooKeeperServer(logfactory, quorumPeer, null,
database);
fzks.startup();
fzks.setServerCnxnFactory(serverCnxnFactory);
quorumPeer.follower = new MyFollower(quorumPeer, fzks);
final SelectionKey sk = new FakeSK();
// Simulate a socket channel between a client and a follower
final SocketChannel socketChannel = createClientSocketChannel();
// Create the NIOServerCnxn that will handle the client requests
final MockNIOServerCnxn nioCnxn = new MockNIOServerCnxn(fzks,
socketChannel, sk, serverCnxnFactory);
// Send the connection request as a client do
nioCnxn.doIO(sk);
// Send the invalid session packet to the follower
QuorumPacket qp = createInvalidSessionPacket();
quorumPeer.follower.processPacket(qp);
// OK, now the follower knows that the session is invalid, let's try
// to
// send it the watches
nioCnxn.doIO(sk);
// wait for the the request processor to do his job
Thread.sleep(1000L);
// Session has not been re-validated !
// If session has not been validated, there must be NO watches
int watchCount = database.getDataTree().getWatchCount();
LOG.info("watches = " + watchCount);
assertEquals(0, watchCount);
} finally {
if (fzks != null) {
fzks.shutdown();
}
}
}
/**
* A follower with no real leader connection
*/
public static class MyFollower extends Follower {
/**
* Create a follower with a mocked leader connection
*
* @param self
* @param zk
*/
MyFollower(QuorumPeer self, FollowerZooKeeperServer zk) {
super(self, zk);
leaderOs = mock(OutputArchive.class);
leaderIs = mock(InputArchive.class);
bufferedOutput = mock(BufferedOutputStream.class);
}
}
/**
* Simulate the behavior of a real selection key
*/
private static class FakeSK extends SelectionKey {
@Override
public SelectableChannel channel() {
return null;
}
@Override
public Selector selector() {
return mock(Selector.class);
}
@Override
public boolean isValid() {
return true;
}
@Override
public void cancel() {
}
@Override
public int interestOps() {
return ops;
}
private int ops = OP_WRITE + OP_READ;
@Override
public SelectionKey interestOps(int ops) {
this.ops = ops;
return this;
}
@Override
public int readyOps() {
return ops;
}
}
/**
* Create a watches message with a single watch on /
*
* @return
*/
private ByteBuffer createWatchesMessage() {
List<String> dataWatches = new ArrayList<String>(1);
dataWatches.add("/");
List<String> existWatches = Collections.emptyList();
List<String> childWatches = Collections.emptyList();
SetWatches sw = new SetWatches(1L, dataWatches, existWatches,
childWatches);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
h.setXid(-8);
MockPacket p = new MockPacket(h, new ReplyHeader(), sw, null, null);
return p.createAndReturnBB();
}
/**
* This is the secret that we use to generate passwords, for the moment it
* is more of a sanity check.
*/
static final private long superSecret = 0XB3415C00L;
/**
* Create a connection request
*
* @return
*/
private ByteBuffer createConnRequest() {
Random r = new Random(SESSION_ID ^ superSecret);
byte p[] = new byte[16];
r.nextBytes(p);
ConnectRequest conReq = new ConnectRequest(0, 1L, 30000, SESSION_ID, p);
MockPacket packet = new MockPacket(null, null, conReq, null, null, false);
return packet.createAndReturnBB();
}
/**
* Mock a client channel with a connection request and a watches message
* inside.
*
* @return a socket channel
* @throws IOException
*/
private SocketChannel createClientSocketChannel() throws IOException {
SocketChannel socketChannel = mock(SocketChannel.class);
Socket socket = mock(Socket.class);
InetSocketAddress socketAddress = new InetSocketAddress(1234);
when(socket.getRemoteSocketAddress()).thenReturn(socketAddress);
when(socketChannel.socket()).thenReturn(socket);
// Send watches packet to server connection
final ByteBuffer connRequest = createConnRequest();
final ByteBuffer watchesMessage = createWatchesMessage();
final ByteBuffer request = ByteBuffer.allocate(connRequest.limit()
+ watchesMessage.limit());
request.put(connRequest);
request.put(watchesMessage);
Answer<Integer> answer = new Answer<Integer>() {
int i = 0;
@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
ByteBuffer bb = (ByteBuffer) args[0];
for (int k = 0; k < bb.limit(); k++) {
bb.put(request.get(i));
i = i + 1;
}
return bb.limit();
}
};
when(socketChannel.read(any(ByteBuffer.class))).thenAnswer(answer);
return socketChannel;
}
/**
* Forge an invalid session packet as a LEADER do
*
* @throws Exception
*/
private QuorumPacket createInvalidSessionPacket() throws Exception {
QuorumPacket qp = createValidateSessionQuorumPacket();
ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
DataInputStream dis = new DataInputStream(bis);
long id = dis.readLong();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeLong(id);
// false means that the session has expired
dos.writeBoolean(false);
qp.setData(bos.toByteArray());
return qp;
}
/**
* Forge an validate session packet as a LEARNER do
*
* @return
* @throws Exception
*/
private QuorumPacket createValidateSessionQuorumPacket() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeLong(SESSION_ID);
dos.writeInt(3000);
dos.close();
QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1,
baos.toByteArray(), null);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"To validate session 0x" + Long.toHexString(2L));
}
return qp;
}
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment