Commit b713620d authored by Flavio Paiva Junqueira's avatar Flavio Paiva Junqueira
Browse files

ZOOKEEPER-1653. zookeeper fails to start because of inconsistent epoch (michim via fpj)



git-svn-id: https://svn.apache.org/repos/asf/zookeeper/branches/branch-3.4@1545883 13f79535-47bb-0310-9956-ffa450edef68
parent 6be07c7a
......@@ -164,6 +164,9 @@ BUGFIXES:
ZOOKEEPER-1817. Fix don't care for b3.4 (fpj)
ZOOKEEPER-1653. zookeeper fails to start because of inconsistent
epoch (michim via fpj)
IMPROVEMENTS:
ZOOKEEPER-1564. Allow JUnit test build with IBM Java
......
......@@ -23,6 +23,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
......@@ -423,8 +424,22 @@ public class Learner {
self.cnxnFactory.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
// Create updatingEpoch file and remove it after current
// epoch is set. QuorumPeer.loadDataBase() uses this file to
// detect the case where the server was terminated after
// taking a snapshot but before setting the current epoch.
File updating = new File(self.getTxnFactory().getSnapDir(),
QuorumPeer.UPDATING_EPOCH_FILENAME);
if (!updating.exists() && !updating.createNewFile()) {
throw new IOException("Failed to create " +
updating.toString());
}
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
if (!updating.delete()) {
throw new IOException("Failed to delete " +
updating.toString());
}
snapshotTaken = true;
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
......
......@@ -444,6 +444,8 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
}
private void loadDataBase() {
File updating = new File(getTxnFactory().getSnapDir(),
UPDATING_EPOCH_FILENAME);
try {
zkDb.loadDataBase();
......@@ -452,6 +454,17 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
if (epochOfZxid > currentEpoch && updating.exists()) {
LOG.info("{} found. The server was terminated after " +
"taking a snapshot but before updating current " +
"epoch. Setting current epoch to {}.",
UPDATING_EPOCH_FILENAME, epochOfZxid);
setCurrentEpoch(epochOfZxid);
if (!updating.delete()) {
throw new IOException("Failed to delete " +
updating.toString());
}
}
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
......@@ -1156,6 +1169,8 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
public static final String UPDATING_EPOCH_FILENAME = "updatingEpoch";
/**
* Write a long value to disk atomically. Either succeeds or an exception
* is thrown.
......
......@@ -20,8 +20,13 @@ package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileReader;
import java.io.LineNumberReader;
import java.io.OutputStreamWriter;
import java.io.StringReader;
import java.io.IOException;
import java.net.InetSocketAddress;
......@@ -29,6 +34,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.log4j.Layout;
......@@ -44,8 +50,10 @@ import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.common.AtomicFileOutputStream;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Assert;
import org.junit.Test;
......@@ -56,6 +64,9 @@ import org.junit.Test;
*
*/
public class QuorumPeerMainTest extends QuorumPeerTestBase {
protected static final Logger LOG =
Logger.getLogger(QuorumPeerMainTest.class);
/**
* Verify the ability to start a cluster.
*/
......@@ -669,4 +680,106 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
" to shutdown, expected " + maxwait);
}
}
static long readLongFromFile(File file) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(file));
String line = "";
try {
line = br.readLine();
return Long.parseLong(line);
} catch(NumberFormatException e) {
throw new IOException("Found " + line + " in " + file);
} finally {
br.close();
}
}
static void writeLongToFile(File file, long value) throws IOException {
AtomicFileOutputStream out = new AtomicFileOutputStream(file);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
try {
bw.write(Long.toString(value));
bw.flush();
out.flush();
out.close();
} catch (IOException e) {
LOG.error("Failed to write new file " + file, e);
out.abort();
throw e;
}
}
/**
* ZOOKEEPER-1653 Make sure the server starts if the current epoch is less
* than the epoch from last logged zxid and updatingEpoch file exists.
*/
@Test
public void testUpdatingEpoch() throws Exception {
// Create a cluster and restart them multiple times to bump the epoch.
int numServers = 3;
Servers servers = LaunchServers(numServers);
File currentEpochFile;
for (int i = 0; i < 10; i++) {
for (int j = 0; j < numServers; j++) {
servers.mt[j].shutdown();
}
waitForAll(servers.zk, States.CONNECTING);
for (int j = 0; j < numServers; j++) {
servers.mt[j].start();
}
waitForAll(servers.zk, States.CONNECTED);
}
// Current epoch is 11 now.
for (int i = 0; i < numServers; i++) {
currentEpochFile = new File(
new File(servers.mt[i].dataDir, "version-2"),
QuorumPeer.CURRENT_EPOCH_FILENAME);
LOG.info("Validating current epoch: " + servers.mt[i].dataDir);
Assert.assertEquals("Current epoch should be 11.", 11,
readLongFromFile(currentEpochFile));
}
// Find a follower and get epoch from the last logged zxid.
int followerIndex = -1;
for (int i = 0; i < numServers; i++) {
if (servers.mt[i].main.quorumPeer.leader == null) {
followerIndex = i;
break;
}
}
Assert.assertTrue("Found a valid follower",
followerIndex >= 0 && followerIndex < numServers);
MainThread follower = servers.mt[followerIndex];
long zxid = follower.main.quorumPeer.getLastLoggedZxid();
long epochFromZxid = ZxidUtils.getEpochFromZxid(zxid);
// Shutdown the cluster
for (int i = 0; i < numServers; i++) {
servers.mt[i].shutdown();
}
waitForAll(servers.zk, States.CONNECTING);
// Make current epoch less than epoch from the last logged zxid.
// The server should fail to start.
File followerDataDir = new File(follower.dataDir, "version-2");
currentEpochFile = new File(followerDataDir,
QuorumPeer.CURRENT_EPOCH_FILENAME);
writeLongToFile(currentEpochFile, epochFromZxid - 1);
follower.start();
Assert.assertTrue(follower.mainFailed.await(10, TimeUnit.SECONDS));
// Touch the updateEpoch file. Now the server should start.
File updatingEpochFile = new File(followerDataDir,
QuorumPeer.UPDATING_EPOCH_FILENAME);
updatingEpochFile.createNewFile();
for (int i = 0; i < numServers; i++) {
servers.mt[i].start();
}
waitForAll(servers.zk, States.CONNECTED);
Assert.assertNotNull("Make sure the server started with acceptEpoch",
follower.main.quorumPeer.getActiveServer());
Assert.assertFalse("updatingEpoch file should get deleted",
updatingEpochFile.exists());
}
}
......@@ -24,6 +24,7 @@ package org.apache.zookeeper.server.quorum;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
......@@ -57,6 +58,8 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
public static class MainThread implements Runnable {
final File confFile;
volatile TestQPMain main;
final File dataDir;
CountDownLatch mainFailed;
public MainThread(int myid, int clientPort, String quorumCfgSection)
throws IOException {
......@@ -70,7 +73,7 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
fwriter.write("initLimit=10\n");
fwriter.write("syncLimit=5\n");
File dataDir = new File(tmpDir, "data");
dataDir = new File(tmpDir, "data");
if (!dataDir.mkdir()) {
throw new IOException("Unable to mkdir " + dataDir);
}
......@@ -101,6 +104,7 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
main = new TestQPMain();
currentThread = new Thread(this);
currentThread.start();
mainFailed = new CountDownLatch(1);
}
public void run() {
......@@ -111,6 +115,8 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
} catch (Exception e) {
// test will still fail even though we just log/ignore
LOG.error("unexpected exception in run", e);
main.shutdown();
mainFailed.countDown();
} finally {
currentThread = null;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment