Commit f1997278 authored by Patrick D. Hunt's avatar Patrick D. Hunt
Browse files

ZOOKEEPER-1789. 3.4.x observer causes NPE on 3.5.0 (trunk) participants (Alex Shraer via phunt)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1612885 13f79535-47bb-0310-9956-ffa450edef68
parent d17bc5d6
......@@ -729,6 +729,9 @@ BUGFIXES:
ZOOKEEPER-1984. testLeaderTimesoutOnNewQuorum is a flakey test
(Alex Shraer via phunt)
ZOOKEEPER-1789. 3.4.x observer causes NPE on 3.5.0 (trunk)
participants (Alex Shraer via phunt)
IMPROVEMENTS:
ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
......
......@@ -231,13 +231,12 @@ public class QuorumCnxManager {
*/
public boolean receiveConnection(Socket sock) {
Long sid = null, protocolVersion = null;
InetSocketAddress electionAddr;
InetSocketAddress electionAddr = null;
try {
DataInputStream din = new DataInputStream(sock.getInputStream());
protocolVersion = din.readLong();
if (protocolVersion >= 0) { // this is a server id and not a protocol version
sid = protocolVersion;
electionAddr = self.getVotingView().get(sid).electionAddr;
sid = protocolVersion;
} else {
sid = din.readLong();
int num_remaining_bytes = din.readInt();
......@@ -250,11 +249,9 @@ public class QuorumCnxManager {
electionAddr = new InetSocketAddress(host_port[0], Integer.parseInt(host_port[1]));
} else {
LOG.error("Got urecognized protocol version " + protocolVersion + " from " + sid);
electionAddr = null;
}
} else {
LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
electionAddr = null;
LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
}
}
if (sid == QuorumPeer.OBSERVER_ID) {
......@@ -289,7 +286,12 @@ public class QuorumCnxManager {
*/
LOG.debug("Create new connection to server: " + sid);
closeSocket(sock);
connectOne(sid, electionAddr);
if (electionAddr != null) {
connectOne(sid, electionAddr);
} else {
connectOne(sid);
}
// Otherwise start worker threads to receive data.
} else {
......
......@@ -37,6 +37,7 @@ import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.server.quorum.QuorumCnxManager;
import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.junit.Assert;
......@@ -274,6 +275,60 @@ public class CnxManagerTest extends ZKTestCase {
Assert.assertFalse(cnxManager.listener.isAlive());
}
/**
* Tests a bug in QuorumCnxManager that causes a NPE when a 3.4.6
* observer connects to a 3.5.0 server.
* {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1789}
*
* @throws Exception
*/
@Test
public void testCnxManagerNPE() throws Exception {
// the connecting peer (id = 2) is a 3.4.6 observer
peers.get(2L).type = LearnerType.OBSERVER;
QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1],
peerClientPort[1], 3, 1, 1000, 2, 2);
QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
QuorumCnxManager.Listener listener = cnxManager.listener;
if (listener != null) {
listener.start();
} else {
LOG.error("Null listener when initializing cnx manager");
}
int port = peers.get(peer.getId()).electionAddr.getPort();
LOG.info("Election port: " + port);
Thread.sleep(1000);
SocketChannel sc = SocketChannel.open();
sc.socket().connect(peers.get(1L).electionAddr, 5000);
/*
* Write id (3.4.6 protocol). This previously caused a NPE in
* QuorumCnxManager.
*/
byte[] msgBytes = new byte[8];
ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
msgBuffer.putLong(2L);
msgBuffer.position(0);
sc.write(msgBuffer);
msgBuffer = ByteBuffer.wrap(new byte[8]);
// write length of message
msgBuffer.putInt(4);
// write message
msgBuffer.putInt(5);
msgBuffer.position(0);
sc.write(msgBuffer);
Message m = cnxManager.pollRecvQueue(1000, TimeUnit.MILLISECONDS);
Assert.assertNotNull(m);
peer.shutdown();
cnxManager.halt();
Assert.assertFalse(cnxManager.listener.isAlive());
}
/*
* Test if a receiveConnection is able to timeout on socket errors
*/
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment