Commit a6a1fadd authored by Alexander Shraer's avatar Alexander Shraer
Browse files

ZOOKEEPER-1992. Backward compatibility of the static configuration file...

ZOOKEEPER-1992. Backward compatibility of the static configuration file (Hongchao Deng via Alex Shraer)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1615037 13f79535-47bb-0310-9956-ffa450edef68
parent 4bb76bd2
......@@ -39,6 +39,8 @@ NEW FEATURES:
BUGFIXES:
ZOOKEEPER-1992. Backward compatibility of the static configuration file (Hongchao Deng via Alex Shraer).
ZOOKEEPER-1900. NullPointerException in truncate (Camille Fournier)
ZOOKEEPER-786. Exception in ZooKeeper.toString
......
......@@ -1315,7 +1315,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
connectNewPeers();
if (writeToDisk) {
try {
QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename + ".next", null, false, qv);
QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename + ".next", null, false, qv, false);
} catch(IOException e){
LOG.error("Error closing file: ", e.getMessage());
}
......@@ -1344,7 +1344,9 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
if (configBackwardCompatibility) {
setDynamicConfigFilename(configFilename + ".dynamic");
}
QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, configFilename, configBackwardCompatibility, qv);
QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, configFilename,
configBackwardCompatibility, qv,
needEraseClientInfoFromStaticConfig(prevQV, qv));
configBackwardCompatibility = false;
} catch(IOException e){
LOG.error("Error closing file: ", e.getMessage());
......@@ -1366,11 +1368,19 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
setClientAddress(qs.clientAddr);
}
return prevQV;
}
}
private boolean needEraseClientInfoFromStaticConfig(QuorumVerifier oldQV,
QuorumVerifier newQV) {
QuorumServer myOldSpec = oldQV.getAllMembers().get(getId());
QuorumServer myNewSpec = newQV.getAllMembers().get(getId());
return (myNewSpec != null && myNewSpec.clientAddr != null
&& (myOldSpec == null || myOldSpec.clientAddr == null));
}
/**
* Get an instance of LeaderElection
*/
public Election getElectionAlg(){
return electionAlg;
}
......
......@@ -300,9 +300,12 @@ public class QuorumPeerConfig {
* @param configFileStr
* @param configBackwardCompatibilityMode
* @param qv
* @param needEraseStaticClientInfo indicates whether we need to erase the clientPort
* and clientPortAddress from static config file.
*/
public static void writeDynamicConfig(String dynamicConfigFilename, String configFileStr,
boolean configBackwardCompatibilityMode, final QuorumVerifier qv) throws IOException {
public static void writeDynamicConfig(String dynamicConfigFilename, String configFileStr,
final boolean configBackwardCompatibilityMode, final QuorumVerifier qv,
final boolean needEraseStaticClientInfo) throws IOException {
final String actualDynamicConfigFilename = dynamicConfigFilename;
new AtomicFileWritingIdiom(new File(actualDynamicConfigFilename), new OutputStreamStatement() {
......@@ -313,41 +316,68 @@ public class QuorumPeerConfig {
}
});
// the following is for users who run without a dynamic config file (old config file)
// if the configuration changes (reconfiguration executes), we create a dynamic config
// file, remove all the dynamic definitions from the config file and add a pointer
// we create a dynamic config file, remove all the dynamic definitions from the config file and add a pointer
// to the config file. The dynamic config file's name will be the same as the config file's
// with ".dynamic" appended to it
if (configBackwardCompatibilityMode) {
File configFile = (new VerifyingFileFactory.Builder(LOG)
.warnForRelativePath()
.failForNonExistingPath()
.build()).create(configFileStr);
final Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
cfg.load(in);
} finally {
in.close();
}
new AtomicFileWritingIdiom(new File(configFileStr), new WriterStatement() {
@Override
public void write(Writer out) throws IOException {
for (Entry<Object, Object> entry : cfg.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
if (!key.startsWith("server.") && !key.startsWith("group")
&& !key.startsWith("weight") && !key.equals("clientPort") && !key.equals("clientPortAddress")){
out.write(key.concat("=").concat(value).concat("\n"));
}
}
out.write("dynamicConfigFile=".concat(actualDynamicConfigFilename).concat("\n"));
}
});
}
}
if (!configBackwardCompatibilityMode && !needEraseStaticClientInfo)
return;
editStaticConfig(configFileStr, actualDynamicConfigFilename,
configBackwardCompatibilityMode, needEraseStaticClientInfo);
}
private static void editStaticConfig(final String configFileStr,
final String dynamicFileStr,
final boolean backwardCompatible,
final boolean eraseClientPortAddress)
throws IOException {
// Some tests may not have a static config file.
if (configFileStr == null)
return;
File configFile = (new VerifyingFileFactory.Builder(LOG)
.warnForRelativePath()
.failForNonExistingPath()
.build()).create(configFileStr);
final Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
cfg.load(in);
} finally {
in.close();
}
new AtomicFileWritingIdiom(new File(configFileStr), new WriterStatement() {
@Override
public void write(Writer out) throws IOException {
for (Entry<Object, Object> entry : cfg.entrySet()) {
String key = entry.getKey().toString().trim();
if (key.startsWith("server.")
|| key.startsWith("group")
|| key.startsWith("weight")
|| (eraseClientPortAddress
&& (key.startsWith("clientPort")
|| key.startsWith("clientPortAddress")))) {
// not writing them back to static file
continue;
}
String value = entry.getValue().toString().trim();
out.write(key.concat("=").concat(value).concat("\n"));
}
if ( ! backwardCompatible )
return;
out.write("dynamicConfigFile=".concat(dynamicFileStr).concat("\n"));
}
});
}
public static void deleteFile(String filename){
File f = new File(filename);
if (f.exists()) {
......@@ -375,7 +405,7 @@ public class QuorumPeerConfig {
/**
* Parse dynamic configuration file and return
* quorumVerifier for new configuration.
* @param zkProp Properties to parse from.
* @param dynamicConfigProp Properties to parse from.
* @throws IOException
* @throws ConfigException
*/
......@@ -467,12 +497,15 @@ public class QuorumPeerConfig {
QuorumServer qs = quorumVerifier.getAllMembers().get(serverId);
if (clientPortAddress!=null && qs!=null && qs.clientAddr!=null){
if ((!clientPortAddress.getAddress().isAnyLocalAddress()
if ((!clientPortAddress.getAddress().isAnyLocalAddress()
&& !clientPortAddress.equals(qs.clientAddr)) ||
(clientPortAddress.getAddress().isAnyLocalAddress()
&& clientPortAddress.getPort()!=qs.clientAddr.getPort()))
throw new ConfigException("client address for this server (id = " + serverId + ") in static config file is " + clientPortAddress + " is different from client address found in dynamic file: " + qs.clientAddr);
}
throw new ConfigException("client address for this server (id = " + serverId + ") in static config file is " + clientPortAddress + " is different from client address found in dynamic file: " + qs.clientAddr);
else {
editStaticConfig(configFileStr, null, false, true);
}
}
if (qs!=null && qs.clientAddr != null) clientPortAddress = qs.clientAddr;
// Warn about inconsistent peer type
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ReconfigTest;
import org.junit.Assert;
import org.junit.Test;
public class DynamicConfigBackwardCompatibilityTest extends QuorumPeerTestBase {
/**
* This test checks that when started with a single static config file the
* servers will create a valid dynamic config file.
*/
@Test
public void testConfigFileBackwardCompatibility() throws Exception {
ClientBase.setupTestEnv();
final int SERVER_COUNT = 3;
final int clientPorts[] = new int[SERVER_COUNT];
StringBuilder sb = new StringBuilder();
String server;
ArrayList<String> allServers = new ArrayList<String>();
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=localhost:" + PortAssignment.unique()
+ ":" + PortAssignment.unique() + ":participant;localhost:"
+ clientPorts[i];
allServers.add(server);
sb.append(server + "\n");
}
String currentQuorumCfgSection = sb.toString();
MainThread mt[] = new MainThread[SERVER_COUNT];
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
// Start the servers with the static config file, without the dynamic
// config file.
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
false);
// check that a dynamic configuration file wasn't created
Assert.assertFalse(mt[i].dynamicConfigFile.exists());
mt[i].start();
}
// Check that the servers are up, have the right config and can process
// operations
// Check that the static config was split to static and dynamic files
// correctly
for (int i = 0; i < SERVER_COUNT; i++) {
Assert.assertTrue("waiting for server " + i + " being up",
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
CONNECTION_TIMEOUT));
zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
ClientBase.CONNECTION_TIMEOUT, this);
Assert.assertTrue(mt[i].dynamicConfigFile.exists());
ReconfigTest.testServerHasConfig(zk[i], allServers, null);
// check that static config file doesn't include membership info
// and has a pointer to the dynamic configuration file
Properties cfg = readPropertiesFromFile(mt[i].confFile);
for (int j = 0; j < SERVER_COUNT; j++) {
Assert.assertFalse(cfg.containsKey("server." + j));
}
Assert.assertTrue(cfg.containsKey("dynamicConfigFile"));
Assert.assertFalse(cfg.containsKey("clientPort"));
// check that the dynamic configuration file contains the membership
// info
cfg = readPropertiesFromFile(mt[i].dynamicConfigFile);
for (int j = 0; j < SERVER_COUNT; j++) {
String serverLine = cfg.getProperty("server." + j, "");
Assert.assertEquals(allServers.get(j), "server." + j + "="
+ serverLine);
}
Assert.assertFalse(cfg.containsKey("dynamicConfigFile"));
}
ReconfigTest.testNormalOperation(zk[0], zk[1]);
// now shut down the servers and restart them
for (int i = 0; i < SERVER_COUNT; i++) {
zk[i].close();
mt[i].shutdown();
}
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].start();
}
for (int i = 0; i < SERVER_COUNT; i++) {
Assert.assertTrue("waiting for server " + i + " being up",
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
CONNECTION_TIMEOUT));
zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
ClientBase.CONNECTION_TIMEOUT, this);
ReconfigTest.testServerHasConfig(zk[i], allServers, null);
}
ReconfigTest.testNormalOperation(zk[0], zk[1]);
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
zk[i].close();
}
}
private Properties readPropertiesFromFile(File file) throws IOException {
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(file);
try {
cfg.load(in);
} finally {
in.close();
}
return cfg;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ReconfigTest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ReconfigLegacyTest extends QuorumPeerTestBase {
private static final int SERVER_COUNT = 3;
@Before
public void setup() {
ClientBase.setupTestEnv();
}
/**
* This test checks that when started with a single static config file the
* servers will create a valid dynamic config file. Also checks that when
* the static config includes a clientPort but the dynamic definition also
* includes it, the static definition is erased.
*/
@Test
public void testConfigFileBackwardCompatibility() throws Exception {
final int clientPorts[] = new int[SERVER_COUNT];
StringBuilder sb = new StringBuilder();
String server;
ArrayList<String> allServers = new ArrayList<String>();
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=localhost:" + PortAssignment.unique()
+ ":" + PortAssignment.unique() + ":participant;localhost:"
+ clientPorts[i];
allServers.add(server);
sb.append(server + "\n");
}
String currentQuorumCfgSection = sb.toString();
MainThread mt[] = new MainThread[SERVER_COUNT];
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
// Start the servers with a static config file, without a dynamic
// config file.
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false);
// check that a dynamic configuration file doesn't exist
Assert.assertFalse(mt[i].dynamicConfigFile.exists());
mt[i].start();
}
// Check that the servers are up, have the right config and can process operations.
// Check that the static config was split into static and dynamic files correctly.
for (int i = 0; i < SERVER_COUNT; i++) {
Assert.assertTrue("waiting for server " + i + " being up",
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
CONNECTION_TIMEOUT));
zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
ClientBase.CONNECTION_TIMEOUT, this);
Assert.assertTrue(mt[i].dynamicConfigFile.exists());
ReconfigTest.testServerHasConfig(zk[i], allServers, null);
// check that static config file doesn't include membership info
// and has a pointer to the dynamic configuration file
Properties cfg = readPropertiesFromFile(mt[i].confFile);
for (int j = 0; j < SERVER_COUNT; j++) {
Assert.assertFalse(cfg.containsKey("server." + j));
}
Assert.assertTrue(cfg.containsKey("dynamicConfigFile"));
Assert.assertFalse(cfg.containsKey("clientPort"));
// check that the dynamic configuration file contains the membership info
cfg = readPropertiesFromFile(mt[i].dynamicConfigFile);
for (int j = 0; j < SERVER_COUNT; j++) {
String serverLine = cfg.getProperty("server." + j, "");
Assert.assertEquals(allServers.get(j), "server." + j + "="
+ serverLine);
}
Assert.assertFalse(cfg.containsKey("dynamicConfigFile"));
}
ReconfigTest.testNormalOperation(zk[0], zk[1]);
// now shut down the servers and restart them
for (int i = 0; i < SERVER_COUNT; i++) {
zk[i].close();
mt[i].shutdown();
}
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].start();
}
for (int i = 0; i < SERVER_COUNT; i++) {
Assert.assertTrue("waiting for server " + i + " being up",
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
CONNECTION_TIMEOUT));
zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
ClientBase.CONNECTION_TIMEOUT, this);
ReconfigTest.testServerHasConfig(zk[i], allServers, null);
}
ReconfigTest.testNormalOperation(zk[0], zk[1]);
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
zk[i].close();
}
}
/**
* {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1992}
* 1. When a server starts from old style static config, without a client port in the server
* specification, it should keep the client port in static config file.
* 2. After port reconfig, the old port should be removed from static file
* and new port added to dynamic file.
* @throws Exception
*/
@Test
public void testReconfigRemoveClientFromStatic() throws Exception {
final int clientPorts[] = new int[SERVER_COUNT];
final int quorumPorts[] = new int[SERVER_COUNT];
final int electionPorts[] = new int[SERVER_COUNT];
final int changedServerId = 0;
final int newClientPort = PortAssignment.unique();
StringBuilder sb = new StringBuilder();
ArrayList<String> allServers = new ArrayList<String>();
ArrayList<String> newServers = new ArrayList<String>();
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
quorumPorts[i] = PortAssignment.unique();
electionPorts[i] = PortAssignment.unique();
String server = "server." + i + "=localhost:" + quorumPorts[i]
+":" + electionPorts[i] + ":participant";
allServers.add(server);
sb.append(server + "\n");
if(i == changedServerId) {
newServers.add(server + ";0.0.0.0:" + newClientPort);
} else {
newServers.add(server);
}
}
String quorumCfgSection = sb.toString();
MainThread mt[] = new MainThread[SERVER_COUNT];
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
// Start the servers with a static config file, without a dynamic config file.
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection, false);
// check that a dynamic configuration file doesn't exist
Assert.assertFalse(mt[i].dynamicConfigFile.exists());
mt[i].start();
}
// Check that when a server starts from old style config, it should keep the client
// port in static config file.
for (int i = 0; i < SERVER_COUNT; i++) {
Assert.assertTrue("waiting for server " + i + " being up",
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
CONNECTION_TIMEOUT));
zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
ClientBase.CONNECTION_TIMEOUT, this);
ReconfigTest.testServerHasConfig(zk[i], allServers, null);
Properties cfg = readPropertiesFromFile(mt[i].confFile);
Assert.assertTrue(cfg.containsKey("dynamicConfigFile"));
Assert.assertTrue(cfg.containsKey("clientPort"));
}
ReconfigTest.testNormalOperation(zk[0], zk[1]);
ReconfigTest.reconfig(zk[1], null, null, newServers, -1);
ReconfigTest.testNormalOperation(zk[0], zk[1]);
// Sleep since writing the config files may take time.
Thread.sleep(1000);
// Check that new dynamic config includes the updated client port.
// Check that server changedServerId erased clientPort from static config.
// Check that other servers still have clientPort in static config.
for (int i = 0; i < SERVER_COUNT; i++) {
ReconfigTest.testServerHasConfig(zk[i], newServers, null);
Properties staticCfg = readPropertiesFromFile(mt[i].confFile);
if (i == changedServerId) {
Assert.assertFalse(staticCfg.containsKey("clientPort"));
} else {
Assert.assertTrue(staticCfg.containsKey("clientPort"));
}
}
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
zk[i].close();
}
}
private Properties readPropertiesFromFile(File file) throws IOException {
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(file);