Commit ae2a87b0 authored by Flavio Paiva Junqueira's avatar Flavio Paiva Junqueira
Browse files

ZOOKEEPER-1872. QuorumPeer is not shutdown in few cases (Rakesh R via fpj)



git-svn-id: https://svn.apache.org/repos/asf/zookeeper/branches/branch-3.4@1566453 13f79535-47bb-0310-9956-ffa450edef68
parent 83b105f2
......@@ -231,6 +231,9 @@ BUGFIXES:
and starting server (Rakesh R via fpj)
ZOOKEEPER-1867. Bug in ZkDatabaseCorruptionTest (fpj)
ZOOKEEPER-1872. QuorumPeer is not shutdown in few cases
(Rakesh R via fpj)
IMPROVEMENTS:
......
......@@ -387,41 +387,43 @@ public class CnxManagerTest extends ZKTestCase {
@Test
public void testWorkerThreads() throws Exception {
ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>();
for (int sid = 0; sid < 3; sid++) {
QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[sid], peerTmpdir[sid],
peerClientPort[sid], 3, sid, 1000, 2, 2);
LOG.info("Starting peer " + peer.getId());
peer.start();
peerList.add(sid, peer);
}
String failure = verifyThreadCount(peerList, 4);
if (failure != null) {
Assert.fail(failure);
}
for (int myid = 0; myid < 3; myid++) {
for (int i = 0; i < 5; i++) {
// halt one of the listeners and verify count
QuorumPeer peer = peerList.get(myid);
LOG.info("Round " + i + ", halting peer " + peer.getId());
peer.shutdown();
peerList.remove(myid);
failure = verifyThreadCount(peerList, 2);
if (failure != null) {
Assert.fail(failure);
}
// Restart halted node and verify count
peer = new QuorumPeer(peers, peerTmpdir[myid], peerTmpdir[myid],
peerClientPort[myid], 3, myid, 1000, 2, 2);
LOG.info("Round " + i + ", restarting peer " + peer.getId());
try {
for (int sid = 0; sid < 3; sid++) {
QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[sid], peerTmpdir[sid],
peerClientPort[sid], 3, sid, 1000, 2, 2);
LOG.info("Starting peer {}", peer.getId());
peer.start();
peerList.add(myid, peer);
failure = verifyThreadCount(peerList, 4);
if (failure != null) {
Assert.fail(failure);
peerList.add(sid, peer);
}
String failure = verifyThreadCount(peerList, 4);
if (failure != null) {
Assert.fail(failure);
}
for (int myid = 0; myid < 3; myid++) {
for (int i = 0; i < 5; i++) {
// halt one of the listeners and verify count
QuorumPeer peer = peerList.get(myid);
LOG.info("Round {}, halting peer {}", new Object[] { i,
peer.getId() });
peer.shutdown();
peerList.remove(myid);
failure = verifyThreadCount(peerList, 2);
Assert.assertNull(failure, failure);
// Restart halted node and verify count
peer = new QuorumPeer(peers, peerTmpdir[myid], peerTmpdir[myid],
peerClientPort[myid], 3, myid, 1000, 2, 2);
LOG.info("Round {}, restarting peer {}"
+ new Object[] { i, peer.getId() });
peer.start();
peerList.add(myid, peer);
failure = verifyThreadCount(peerList, 4);
Assert.assertNull(failure, failure);
}
}
} finally {
for (QuorumPeer quorumPeer : peerList) {
quorumPeer.shutdown();
}
}
}
......
......@@ -33,7 +33,6 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
......@@ -44,17 +43,15 @@ import org.apache.log4j.WriterAppender;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.common.AtomicFileOutputStream;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.test.ClientBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
......@@ -67,6 +64,22 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
protected static final Logger LOG =
Logger.getLogger(QuorumPeerMainTest.class);
private Servers servers;
private int numServers = 0;
@After
public void tearDown() throws Exception {
if (servers == null || servers.mt == null) {
LOG.info("No servers to shutdown!");
return;
}
for (int i = 0; i < numServers; i++) {
if (i < servers.mt.length) {
servers.mt[i].shutdown();
}
}
}
/**
* Verify the ability to start a cluster.
*/
......@@ -229,14 +242,14 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
*/
@Test
public void testHighestZxidJoinLate() throws Exception {
int numServers = 3;
Servers svrs = LaunchServers(numServers);
numServers = 3;
servers = LaunchServers(numServers);
String path = "/hzxidtest";
int leader=-1;
// find the leader
for (int i=0; i < numServers; i++) {
if (svrs.mt[i].main.quorumPeer.leader != null) {
if (servers.mt[i].main.quorumPeer.leader != null) {
leader = i;
}
}
......@@ -251,47 +264,47 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
byte[] output;
// Create a couple of nodes
svrs.zk[leader].create(path+leader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
svrs.zk[leader].create(path+nonleader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
servers.zk[leader].create(path+leader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
servers.zk[leader].create(path+nonleader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// make sure the updates indeed committed. If it is not
// the following statement will throw.
output = svrs.zk[leader].getData(path+nonleader, false, null);
output = servers.zk[leader].getData(path+nonleader, false, null);
// Shutdown every one else but the leader
for (int i=0; i < numServers; i++) {
if (i != leader) {
svrs.mt[i].shutdown();
servers.mt[i].shutdown();
}
}
input[0] = 2;
// Update the node on the leader
svrs.zk[leader].setData(path+leader, input, -1, null, null);
servers.zk[leader].setData(path+leader, input, -1, null, null);
// wait some time to let this get written to disk
Thread.sleep(500);
// shut the leader down
svrs.mt[leader].shutdown();
servers.mt[leader].shutdown();
System.gc();
waitForAll(svrs.zk, States.CONNECTING);
waitForAll(servers.zk, States.CONNECTING);
// Start everyone but the leader
for (int i=0; i < numServers; i++) {
if (i != leader) {
svrs.mt[i].start();
servers.mt[i].start();
}
}
// wait to connect to one of these
waitForOne(svrs.zk[nonleader], States.CONNECTED);
waitForOne(servers.zk[nonleader], States.CONNECTED);
// validate that the old value is there and not the new one
output = svrs.zk[nonleader].getData(path+leader, false, null);
output = servers.zk[nonleader].getData(path+leader, false, null);
Assert.assertEquals(
"Expecting old value 1 since 2 isn't committed yet",
......@@ -299,22 +312,22 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
// Do some other update, so we bump the maxCommttedZxid
// by setting the value to 2
svrs.zk[nonleader].setData(path+nonleader, input, -1);
servers.zk[nonleader].setData(path+nonleader, input, -1);
// start the old leader
svrs.mt[leader].start();
servers.mt[leader].start();
// connect to it
waitForOne(svrs.zk[leader], States.CONNECTED);
waitForOne(servers.zk[leader], States.CONNECTED);
// make sure it doesn't have the new value that it alone had logged
output = svrs.zk[leader].getData(path+leader, false, null);
output = servers.zk[leader].getData(path+leader, false, null);
Assert.assertEquals(
"Validating that the deposed leader has rolled back that change it had written",
output[0], 1);
// make sure the leader has the subsequent changes that were made while it was offline
output = svrs.zk[leader].getData(path+nonleader, false, null);
output = servers.zk[leader].getData(path+nonleader, false, null);
Assert.assertEquals(
"Validating that the deposed leader caught up on changes it missed",
output[0], 2);
......@@ -716,8 +729,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
@Test
public void testUpdatingEpoch() throws Exception {
// Create a cluster and restart them multiple times to bump the epoch.
int numServers = 3;
Servers servers = LaunchServers(numServers);
numServers = 3;
servers = LaunchServers(numServers);
File currentEpochFile;
for (int i = 0; i < 10; i++) {
for (int j = 0; j < numServers; j++) {
......
......@@ -60,14 +60,18 @@ public class JMXEnv {
public static void tearDown() {
try {
cc.close();
if (cc != null) {
cc.close();
}
} catch (IOException e) {
LOG.warn("Unexpected, ignoring", e);
}
cc = null;
try {
cs.stop();
if (cs != null) {
cs.stop();
}
} catch (IOException e) {
LOG.warn("Unexpected, ignoring", e);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment