Commit c3f7d60d authored by Patrick D. Hunt's avatar Patrick D. Hunt
Browse files

ZOOKEEPER-5. Upgrade Feature in Zookeeper server.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@705421 13f79535-47bb-0310-9956-ffa450edef68
parent 713c637c
......@@ -35,6 +35,8 @@ Backward compatibile changes:
BUGFIXES:
ZOOKEEPER-5. Upgrade Feature in Zookeeper server. (mahadev via phunt)
ZOOKEEPER-194. Fix terminology in zookeeperAdmin.xml
(Flavio Paiva Junqueira)
......
......@@ -38,6 +38,13 @@ stop)
echo kill | nc localhost $(grep clientPort $ZOOCFG | sed -e 's/.*=//')
echo STOPPED
;;
upgrade)
shift
echo "upgrading the servers to 3.*"
java "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-cp $CLASSPATH $JVMFLAGS org.apache.zookeeper.server.upgrade.UpgradeMain ${@}
echo "Upgrading ... "
;;
restart)
shift
$0 stop ${@}
......
......@@ -52,6 +52,8 @@
<property name="test.build.dir" value="${build.dir}/test" />
<property name="test.src.dir" value="${src.dir}/java/test"/>
<property name="test.log.dir" value="${test.build.dir}/logs" />
<property name="test.data.dir" value="${test.build.dir}/data" />
<property name="test.data.upgrade.dir" value="${test.data.dir}/upgrade" />
<property name="test.tmp.dir" value="${test.build.dir}/tmp" />
<property name="test.output" value="no" />
<property name="test.timeout" value="900000" />
......@@ -505,8 +507,15 @@
<target name="test-init" depends="jar,compile-test">
<delete dir="${test.log.dir}" />
<delete dir="${test.tmp.dir}" />
<delete dir="${test.data.upgrade.dir}" />
<delete dir="${test.data.dir}" />
<mkdir dir="${test.log.dir}" />
<mkdir dir="${test.tmp.dir}" />
<mkdir dir="${test.data.dir}" />
<mkdir dir="${test.data.upgrade.dir}" />
<copy todir="${test.data.upgrade.dir}">
<fileset dir="${basedir}/src/java/test/data/upgrade"/>
</copy>
</target>
<target name="junit.run">
......@@ -519,6 +528,7 @@
dir="${basedir}" timeout="${test.timeout}"
errorProperty="tests.failed" failureProperty="tests.failed">
<sysproperty key="build.test.dir" value="${test.tmp.dir}" />
<sysproperty key="test.data.dir" value="${test.data.dir}" />
<sysproperty key="log4j.configuration"
value="file:${basedir}/conf/log4j.properties" />
<classpath refid="test.classpath"/>
......@@ -656,6 +666,7 @@
maxmemory="${test.junit.maxmem}" dir="${basedir}" timeout="${test.timeout}"
errorProperty="tests.failed" failureProperty="tests.failed">
<sysproperty key="build.test.dir" value="${test.tmp.dir}" />
<sysproperty key="test.data.dir" value="${test.data.dir}" />
<sysproperty key="log4j.configuration"
value="file:${basedir}/conf/log4j.properties" />
<classpath>
......
......@@ -43,7 +43,15 @@ public class DataNode implements Record {
// default rather than public constructor
}
DataNode(DataNode parent, byte data[], Long acl, StatPersisted stat) {
/**
* create a DataNode with parent, data, acls and stat
* @param parent the parent of this DataNode
* @param data the data to be set
* @param acl the acls for this node
* @param stat the stat for this node.
*/
public DataNode(DataNode parent, byte data[], Long acl, StatPersisted stat) {
this.parent = parent;
this.data = data;
this.acl = acl;
......@@ -51,6 +59,23 @@ public class DataNode implements Record {
this.children = new HashSet<String>();
}
/**
* convenience method for creating DataNode
* fully
* @param children
*/
public void setChildren(HashSet<String> children) {
this.children = children;
}
/**
* convenience methods to get the children
* @return the children of this datanode
*/
public HashSet<String> getChildren() {
return this.children;
}
DataNode parent;
byte data[];
......
......@@ -26,7 +26,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.jute.Index;
......@@ -75,7 +74,7 @@ public class DataTree {
/**
* This hashtable lists the paths of the ephemeral nodes of a session.
*/
private ConcurrentHashMap<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
private Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
/**
* this is map from longs to acl's. It saves acl's being stored
......@@ -112,6 +111,15 @@ public class DataTree {
return cloned;
}
public Map<Long, HashSet<String>> getEphemeralsMap() {
return ephemerals;
}
public void setEphemerals(Map<Long, HashSet<String>> ephemerals) {
this.ephemerals = ephemerals;
}
private long incrementIndex() {
return ++aclIndex;
}
......@@ -178,6 +186,17 @@ public class DataTree {
return ephemerals.keySet();
}
/**
* just an accessor method to allow raw creation
* of datatree's from a bunch of datanodes
* @param path the path of the datanode
* @param node the datanode corresponding to this
* path
*/
public void addDataNode(String path, DataNode node) {
nodes.put(path, node);
}
public DataNode getNode(String path) {
return nodes.get(path);
}
......
......@@ -43,11 +43,11 @@ public class WatchManager {
private HashMap<Watcher, HashSet<String>> watch2Paths =
new HashMap<Watcher, HashSet<String>>();
synchronized int size(){
public synchronized int size(){
return watchTable.size();
}
synchronized void addWatch(String path, Watcher watcher) {
public synchronized void addWatch(String path, Watcher watcher) {
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
list = new HashSet<Watcher>();
......@@ -63,7 +63,7 @@ public class WatchManager {
paths.add(path);
}
synchronized void removeWatcher(Watcher watcher) {
public synchronized void removeWatcher(Watcher watcher) {
HashSet<String> paths = watch2Paths.remove(watcher);
if (paths == null) {
return;
......@@ -79,11 +79,11 @@ public class WatchManager {
}
}
Set<Watcher> triggerWatch(String path, EventType type) {
public Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
......
......@@ -49,6 +49,8 @@ public class FileTxnSnapLog {
File snapDir;
TxnLog txnLog;
SnapShot snapLog;
public final static int VERSION = 2;
public final static String version = "version-";
private static final Logger LOG = Logger.getLogger(FileTxnSnapLog.class);
......@@ -70,10 +72,16 @@ public class FileTxnSnapLog {
* @param snapDir the snapshot directory
*/
public FileTxnSnapLog(File dataDir, File snapDir) {
this.dataDir = dataDir;
this.snapDir = snapDir;
txnLog = new FileTxnLog(dataDir);
snapLog = new FileSnap(snapDir);
this.dataDir = new File(dataDir, version + VERSION);
this.snapDir = new File(snapDir, version + VERSION);
if (!this.dataDir.exists()) {
this.dataDir.mkdirs();
}
if (!this.snapDir.exists()) {
this.snapDir.mkdirs();
}
txnLog = new FileTxnLog(this.dataDir);
snapLog = new FileSnap(this.snapDir);
}
/**
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.upgrade;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.jute.Index;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.StatPersistedV1;
/**
* This class contains the data for a node in the data tree.
* <p>
* A data node contains a reference to its parent, a byte array as its data, an
* array of ACLs, a stat object, and a set of its children's paths.
*
*/
public class DataNodeV1 implements Record {
DataNodeV1() {
// default rather than public constructor
}
DataNodeV1(DataNodeV1 parent, byte data[], List<ACL> acl, StatPersistedV1 stat) {
this.parent = parent;
this.data = data;
this.acl = acl;
this.stat = stat;
this.children = new HashSet<String>();
}
/**
* convenience method for creating DataNode
* fully
* @param children
*/
public void setChildren(HashSet<String> children) {
this.children = children;
}
/**
* convenience methods to get the children
* @return the children of this datanode
*/
public HashSet<String> getChildren() {
return this.children;
}
DataNodeV1 parent;
byte data[];
List<ACL> acl;
public StatPersistedV1 stat;
HashSet<String> children = new HashSet<String>();
public void copyStat(Stat to) {
to.setAversion(stat.getAversion());
to.setCtime(stat.getCtime());
to.setCversion(stat.getCversion());
to.setCzxid(stat.getCzxid());
to.setMtime(stat.getMtime());
to.setMzxid(stat.getMzxid());
to.setVersion(stat.getVersion());
to.setEphemeralOwner(stat.getEphemeralOwner());
to.setDataLength(data.length);
to.setNumChildren(children.size());
}
public void deserialize(InputArchive archive, String tag)
throws IOException {
archive.startRecord("node");
data = archive.readBuffer("data");
Index i = archive.startVector("acl");
if (i != null) {
acl = new ArrayList<ACL>();
while (!i.done()) {
ACL a = new ACL();
a.deserialize(archive, "aclEntry");
acl.add(a);
i.incr();
}
}
archive.endVector("acl");
stat = new StatPersistedV1();
stat.deserialize(archive, "stat");
archive.endRecord("node");
}
synchronized public void serialize(OutputArchive archive, String tag)
throws IOException {
archive.startRecord(this, "node");
archive.writeBuffer(data, "data");
archive.startVector(acl, "acl");
if (acl != null) {
for (ACL a : acl) {
a.serialize(archive, "aclEntry");
}
}
archive.endVector(acl, "acl");
stat.serialize(archive, "stat");
archive.endRecord(this, "node");
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.upgrade;
import java.io.File;
import java.io.IOException;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
/**
* This class upgrades the older database
* to a new database for the zookeeper
* servers.
* The way to run it is
* java -class path zookeeper.jar Upgrade dataDir snapShotDir
* or using zookeeper scripts with zkServer -upgrade dataDir snapShotDir
* it creates a backup in the dataDir/.bkup and snapShotDir/.bkup which
* can be retrieved back to the snapShotDir and dataDir
*/
public class UpgradeMain {
File snapShotDir;
File dataDir;
File bkupsnapShotDir;
File bkupdataDir;
File currentdataDir;
File currentsnapShotDir;
private static final Logger LOG = Logger.getLogger(UpgradeMain.class);
private static final String USAGE = "Usage: UpgradeMain dataDir snapShotDir";
private static final int LASTVERSION = 1;
private static final int CURRENTVERSION = FileTxnSnapLog.VERSION;
private static final String dirName = FileTxnSnapLog.version;
private static final String manual = "Please take manual steps to " +
"sanitize your database.\n Please read the upgrade manual";
/**
* upgrade class that takes the two file
* directories.
* @param dataDir the directory that contains the
* transaction logs
* @param snapShotDir the directory that contains
* the snapshots
*/
public UpgradeMain(File dataDir, File snapShotDir) {
this.snapShotDir = snapShotDir;
this.dataDir = dataDir;
this.bkupdataDir = new File(dataDir, dirName + LASTVERSION);
this.bkupsnapShotDir = new File(snapShotDir, dirName + LASTVERSION );
this.currentsnapShotDir = new File(snapShotDir, dirName + CURRENTVERSION);
this.currentdataDir = new File(dataDir, dirName + CURRENTVERSION);
}
/**
* create all the bkup directories and the current
* database directories
* @throws IOException
*/
private void createAllDirs() throws IOException {
String error = "backup directory " + bkupdataDir + " already exists";
LOG.info("Creating previous version data dir " + bkupdataDir);
if (!bkupdataDir.mkdirs()) {
LOG.error(error);
LOG.error(manual);
throw new IOException(error);
}
LOG.info("Creating previous version snapshot dir " + bkupdataDir);
if (!bkupsnapShotDir.mkdirs() && !bkupsnapShotDir.exists()) {
LOG.error(error);
LOG.error(manual);
throw new IOException(error);
}
error = "current directory " + currentdataDir + " already exists";
LOG.info("Creating current data dir " + currentdataDir);
if (!currentdataDir.mkdirs()) {
LOG.error(error);
LOG.error(manual);
throw new IOException(error);
}
LOG.info("Creating current snapshot dir " + currentdataDir);
if (!currentsnapShotDir.mkdirs() && !currentsnapShotDir.exists()) {
LOG.error(error);
LOG.error(manual);
throw new IOException(error);
}
}
/**
* copy files from srcdir to dstdir that have the string
* filter in the srcdir filenames
* @param srcDir the source directory
* @param dstDir the destination directory
* @param filter the filter of filenames that
* need to be copied.
* @throws IOException
*/
void copyFiles(File srcDir, File dstDir, String filter) throws IOException {
File[] list = srcDir.listFiles();
for (File file: list) {
String name = file.getName();
if (name.startsWith(filter)) {
// we need to copy this file
File dest = new File(dstDir, name);
LOG.info("Renaming " + file + " to " + dest);
if (!file.renameTo(dest)) {
throw new IOException("Unable to rename "
+ file + " to " + dest);
}
}
}
}
/**
* run the upgrade
* @return 0 if success else it failed
* @throws IOException
*/
public void runUpgrade() throws IOException {
if (!dataDir.exists()) {
throw new IOException(dataDir + " does not exist");
}
if (!snapShotDir.exists()) {
throw new IOException(snapShotDir + " does not exist");
}
// create the bkup directorya
createAllDirs();
//copy all the files for backup
try {
copyFiles(dataDir, bkupdataDir, "log");
copyFiles(snapShotDir, bkupsnapShotDir, "snapshot");
} catch(IOException io) {
LOG.error("Failed in backing up.");
throw io;
}
//evrything is backed up
// read old database and create
// an old snapshot
UpgradeSnapShotV1 upgrade = new UpgradeSnapShotV1(bkupdataDir,
bkupsnapShotDir);
LOG.info("Creating new data tree");
DataTree dt = upgrade.getNewDataTree();
FileTxnSnapLog filesnapLog = new FileTxnSnapLog(dataDir,
snapShotDir);
LOG.info("snapshotting the new datatree");
filesnapLog.save(dt, upgrade.getSessionWithTimeOuts());
//done saving.
LOG.info("Upgrade is complete");
}
public static void main(String[] argv) {
if (argv.length < 2) {
LOG.error(USAGE);
System.exit(-1);
}
try {
UpgradeMain upgrade = new UpgradeMain(new File(argv[0]), new File(argv[1]));
upgrade.runUpgrade();
} catch(Throwable th) {
LOG.error("Upgrade Error: Please read the " +
"docs for manual failure recovery ", th);
}
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.upgrade;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.zookeeper.server.DataTree;
/**
* interface for snapshot conversion.
*
*/
public interface UpgradeSnapShot {
public DataTree getNewDataTree() throws IOException;
public ConcurrentHashMap<Long, Integer> getSessionWithTimeOuts();
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.upgrade;