Commit d7bdb47b authored by Mahadev Konar's avatar Mahadev Konar
Browse files

ZOOKEEPER-43. Server side of auto reset watches.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@706815 13f79535-47bb-0310-9956-ffa450edef68
parent de6d9710
......@@ -30,6 +30,8 @@ Non-backward compatible changes:
ZOOKEEPER-33. Better ACL management
(mahadev)
ZOOKEEPER-43. Server side of auto reset watches.
Backward compatibile changes:
......
......@@ -87,9 +87,11 @@ public class ObservableDataTree extends DataTree implements ObservableComponent
return result;
}
public void deleteNode(String path) throws KeeperException.NoNodeException {
public void deleteNode(String path, long zxid)
throws KeeperException.NoNodeException
{
DataNode deleted=getNode(path);
super.deleteNode(path);
super.deleteNode(path, zxid);
ObserverManager.getInstance().notifyObservers(this,
new TreeEventInfo(Event.DELETE,deleted));
}
......
......@@ -60,6 +60,7 @@ import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.SetACLResponse;
import org.apache.zookeeper.proto.SetDataResponse;
import org.apache.zookeeper.proto.SetWatches;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.ZooTrace;
......@@ -72,6 +73,19 @@ import org.apache.zookeeper.server.ZooTrace;
*/
public class ClientCnxn {
private static final Logger LOG = Logger.getLogger(ClientCnxn.class);
/** This controls whether automatic watch resetting is enabled.
* Clients automatically reset watches during session reconnect, this
* option allows the client to turn off this behavior by setting
* the environment variable "zookeeper.disableAutoWatchReset" to "true" */
public static boolean disableAutoWatchReset;
static {
// this var should not be public, but otw there is no easy way
// to test
disableAutoWatchReset =
Boolean.getBoolean("zookeeper.disableAutoWatchReset");
LOG.info("zookeeper.disableAutoWatchReset is " + disableAutoWatchReset);
}
private ArrayList<InetSocketAddress> serverAddrs = new ArrayList<InetSocketAddress>();
......@@ -446,7 +460,7 @@ public class ClientCnxn {
finishPacket(p);
}
long lastZxid;
volatile long lastZxid;
/**
* This class services the outgoing request queue and generates the heart
......@@ -479,7 +493,8 @@ public class ClientCnxn {
if (sessionTimeout <= 0) {
zooKeeper.state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
throw new IOException("Session Expired");
}
......@@ -489,6 +504,15 @@ public class ClientCnxn {
sessionPasswd = conRsp.getPasswd();
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected, null));
if (!disableAutoWatchReset) {
SetWatches sw = new SetWatches(lastZxid,
zooKeeper.getDataWatches(),
zooKeeper.getExistWatches(),
zooKeeper.getChildWatches());
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
queuePacket(h, new ReplyHeader(), sw, null, null, null, null, null);
}
}
void readResponse() throws IOException {
......@@ -540,27 +564,31 @@ public class ClientCnxn {
* Since requests are processed in order, we better get a response
* to the first request!
*/
if (packet.header.getXid() != replyHdr.getXid()) {
throw new IOException("Xid out of order. Got "
+ replyHdr.getXid() + " expected "
+ packet.header.getXid());
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
lastZxid = replyHdr.getZxid();
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
packet.finished = true;
try {
if (packet.header.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.ConnectionLoss);
throw new IOException("Xid out of order. Got "
+ replyHdr.getXid() + " expected "
+ packet.header.getXid());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
finishPacket(packet);
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet);
}
}
/**
......
......@@ -49,6 +49,8 @@ public class ZooDefs {
public final int ping = 11;
public final int auth = 100;
public final int setWatches = 101;
public final int createSession = -10;
......
......@@ -19,6 +19,7 @@
package org.apache.zookeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
......@@ -99,7 +100,20 @@ public class ZooKeeper {
private final ZKWatchManager watchManager = new ZKWatchManager();
/**
List<String> getDataWatches() {
List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet());
return rc;
}
List<String> getExistWatches() {
List<String> rc = new ArrayList<String>(watchManager.existWatches.keySet());
return rc;
}
List<String> getChildWatches() {
List<String> rc = new ArrayList<String>(watchManager.childWatches.keySet());
return rc;
}
/**
* Manage watchers & handle events generated by the ClientCnxn object.
*
* We are implementing this as a nested class of ZooKeeper so that
......@@ -109,11 +123,19 @@ public class ZooKeeper {
private class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
private volatile Watcher defaultWatcher;
final private void addTo(Set<Watcher> from, Set<Watcher> to) {
if (from != null) {
to.addAll(from);
}
}
/* (non-Javadoc)
* @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, Event.EventType, java.lang.String)
*/
......@@ -121,58 +143,60 @@ public class ZooKeeper {
Watcher.Event.EventType type, String path) {
Set<Watcher> result = new HashSet<Watcher>();
// clear the watches if we are not connected
switch (type) {
case None:
result.add(defaultWatcher);
for(Set<Watcher> ws: dataWatches.values()) {
result.addAll(ws);
}
for(Set<Watcher> ws: existWatches.values()) {
result.addAll(ws);
}
for(Set<Watcher> ws: childWatches.values()) {
result.addAll(ws);
}
if (state != Watcher.Event.KeeperState.SyncConnected) {
synchronized (dataWatches) {
for (Set<Watcher> watchers : dataWatches.values()) {
for (Watcher watcher : watchers) {
result.add(watcher);
}
// clear the watches if auto watch reset is not enabled
if (ClientCnxn.disableAutoWatchReset &&
state != Watcher.Event.KeeperState.SyncConnected)
{
synchronized(dataWatches) {
dataWatches.clear();
}
dataWatches.clear();
}
synchronized (childWatches) {
for (Set<Watcher> watchers : childWatches.values()) {
for (Watcher watcher : watchers) {
result.add(watcher);
}
synchronized(existWatches) {
existWatches.clear();
}
synchronized(childWatches) {
childWatches.clear();
}
childWatches.clear();
}
}
Set<Watcher> watchers = null;
switch (type) {
case None:
result.add(defaultWatcher);
return result;
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
watchers = dataWatches.remove(path);
addTo(dataWatches.remove(path), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(path), result);
}
break;
case NodeChildrenChanged:
synchronized (childWatches) {
watchers = childWatches.remove(path);
addTo(childWatches.remove(path), result);
}
break;
case NodeDeleted:
synchronized (dataWatches) {
watchers = dataWatches.remove(path);
addTo(dataWatches.remove(path), result);
}
Set<Watcher> cwatches;
synchronized (childWatches) {
cwatches = childWatches.remove(path);
// XXX This shouldn't be needed, but just in case
synchronized (existWatches) {
addTo(existWatches.remove(path), result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
if (cwatches != null) {
if (watchers == null) {
watchers = cwatches;
} else {
watchers.addAll(cwatches);
}
synchronized (childWatches) {
addTo(childWatches.remove(path), result);
}
break;
default:
......@@ -182,26 +206,24 @@ public class ZooKeeper {
throw new RuntimeException(msg);
}
result.addAll(watchers);
return result;
}
}
/**
* Register a watcher for a particular path.
*/
class WatchRegistration {
private Map<String, Set<Watcher>> watches;
abstract class WatchRegistration {
private Watcher watcher;
private String path;
public WatchRegistration(Map<String, Set<Watcher>> watches,
Watcher watcher, String path)
public WatchRegistration(Watcher watcher, String path)
{
this.watches = watches;
this.watcher = watcher;
this.path = path;
}
abstract protected Map<String, Set<Watcher>> getWatches(int rc);
/**
* Register the watcher with the set of watches on path.
* @param rc the result code of the operation that attempted to
......@@ -209,6 +231,7 @@ public class ZooKeeper {
*/
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(path);
if (watchers == null) {
......@@ -234,17 +257,43 @@ public class ZooKeeper {
* even in the case where NONODE result code is returned.
*/
class ExistsWatchRegistration extends WatchRegistration {
public ExistsWatchRegistration(Map<String, Set<Watcher>> watches,
Watcher watcher, String path)
{
super(watches, watcher, path);
public ExistsWatchRegistration(Watcher watcher, String path) {
super(watcher, path);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return rc == 0 ? watchManager.dataWatches : watchManager.existWatches;
}
@Override
protected boolean shouldAddWatch(int rc) {
return rc == 0 || rc == KeeperException.Code.NoNode;
}
}
class DataWatchRegistration extends WatchRegistration {
public DataWatchRegistration(Watcher watcher, String path) {
super(watcher, path);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return watchManager.dataWatches;
}
}
class ChildWatchRegistration extends WatchRegistration {
public ChildWatchRegistration(Watcher watcher, String path) {
super(watcher, path);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return watchManager.childWatches;
}
}
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CLOSED, AUTH_FAILED;
......@@ -534,8 +583,7 @@ public class ZooKeeper {
SetDataResponse response = new SetDataResponse();
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watchManager.dataWatches, watcher,
path);
wcb = new ExistsWatchRegistration(watcher, path);
}
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
......@@ -589,8 +637,7 @@ public class ZooKeeper {
SetDataResponse response = new SetDataResponse();
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watchManager.dataWatches, watcher,
path);
wcb = new ExistsWatchRegistration(watcher, path);
}
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
ctx, wcb);
......@@ -634,8 +681,7 @@ public class ZooKeeper {
GetDataResponse response = new GetDataResponse();
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new WatchRegistration(watchManager.dataWatches, watcher,
path);
wcb = new DataWatchRegistration(watcher, path);
}
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
......@@ -685,8 +731,7 @@ public class ZooKeeper {
GetDataResponse response = new GetDataResponse();
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new WatchRegistration(watchManager.dataWatches, watcher,
path);
wcb = new DataWatchRegistration(watcher, path);
}
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
ctx, wcb);
......@@ -899,8 +944,7 @@ public class ZooKeeper {
GetChildrenResponse response = new GetChildrenResponse();
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new WatchRegistration(watchManager.childWatches, watcher,
path);
wcb = new ChildWatchRegistration(watcher, path);
}
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
......@@ -950,8 +994,7 @@ public class ZooKeeper {
GetChildrenResponse response = new GetChildrenResponse();
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new WatchRegistration(watchManager.childWatches, watcher,
path);
wcb = new ChildWatchRegistration(watcher, path);
}
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
ctx, wcb);
......
......@@ -83,13 +83,19 @@ public class ZooKeeperMain {
}
private static void printStat(Stat stat) {
System.err.println("ctime = " + new Date(stat.getCtime()).toString());
System.err.println("ctime = " + new Date(stat.getMtime()).toString());
System.err.println("cversion = " + stat.getCversion());
System.err.println("cZxid = " + stat.getCzxid());
System.err.println("ctime = " + new Date(stat.getCtime()).toString());
System.err.println("mZxid = " + stat.getMzxid());
System.err.println("mtime = " + new Date(stat.getMtime()).toString());
System.err.println("pZxid = " + stat.getPzxid());
System.err.println("cversion = " + stat.getCversion());
System.err.println("dataVersion = " + stat.getVersion());
System.err.println("aclVersion = " + stat.getAversion());
System.err.println("ephemeralOwner = " + stat.getEphemeralOwner());
System.err.println("dataLength = " + stat.getDataLength());
System.err.println("numChildren = " + stat.getNumChildren());
}
public static void main(String args[]) throws NumberFormatException,
......
......@@ -93,6 +93,7 @@ public class DataNode implements Record {
to.setCzxid(stat.getCzxid());
to.setMtime(stat.getMtime());
to.setMzxid(stat.getMzxid());
to.setPzxid(stat.getPzxid());
to.setVersion(stat.getVersion());
to.setEphemeralOwner(stat.getEphemeralOwner());
to.setDataLength(data.length);
......
......@@ -34,14 +34,17 @@ import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.DeleteTxn;
import org.apache.zookeeper.txn.ErrorTxn;
......@@ -229,6 +232,7 @@ public class DataTree {
to.setCzxid(from.getCzxid());
to.setMtime(from.getMtime());
to.setMzxid(from.getMzxid());
to.setPzxid(from.getPzxid());
to.setVersion(from.getVersion());
to.setEphemeralOwner(from.getEphemeralOwner());
}
......@@ -240,6 +244,7 @@ public class DataTree {
to.setCzxid(from.getCzxid());
to.setMtime(from.getMtime());
to.setMzxid(from.getMzxid());
to.setPzxid(from.getPzxid());
to.setVersion(from.getVersion());
to.setEphemeralOwner(from.getEphemeralOwner());
to.setDataLength(from.getDataLength());
......@@ -285,6 +290,7 @@ public class DataTree {
stat.setMtime(time);
stat.setCzxid(zxid);
stat.setMzxid(zxid);
stat.setPzxid(zxid);
stat.setVersion(0);
stat.setAversion(0);
stat.setEphemeralOwner(ephemeralOwner);
......@@ -299,6 +305,7 @@ public class DataTree {
int cver = parent.stat.getCversion();
cver++;
parent.stat.setCversion(cver);
parent.stat.setPzxid(zxid);
Long longval = convertAcls(acl);
DataNode child = new DataNode(parent, data, longval, stat);
parent.children.add(childName);
......@@ -321,10 +328,13 @@ public class DataTree {
/**
* remove the path from the datatree
* @param path the path to be deleted
* @param path the path to of the node to be deleted
* @param zxid the current zxid
* @throws KeeperException.NoNodeException
*/
public void deleteNode(String path) throws KeeperException.NoNodeException {
public void deleteNode(String path, long zxid)
throws KeeperException.NoNodeException
{
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
......@@ -340,6 +350,7 @@ public class DataTree {
synchronized (parent) {
parent.children.remove(childName);
parent.stat.setCversion(parent.stat.getCversion() + 1);
parent.stat.setPzxid(zxid);
long eowner = node.stat.getEphemeralOwner();
if (eowner != 0) {
HashSet<String> nodes = ephemerals.get(eowner);
......@@ -522,7 +533,7 @@ public class DataTree {
case OpCode.delete:
DeleteTxn deleteTxn = (DeleteTxn) txn;
debug = "Delete transaction for " + deleteTxn.getPath();
deleteNode(deleteTxn.getPath());
deleteNode(deleteTxn.getPath(), header.getZxid());
break;
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
......@@ -537,7 +548,7 @@ public class DataTree {
setACLTxn.getVersion());
break;
case OpCode.closeSession:
killSession(header.getClientId());
killSession(header.getClientId(), header.getZxid());
break;
case OpCode.error:
ErrorTxn errTxn = (ErrorTxn) txn;
......@@ -555,7 +566,7 @@ public class DataTree {
return rc;
}
void killSession(long session) {
void killSession(long session, long zxid) {
// the list is already removed from the ephemerals
// so we do not have to worry about synchronyzing on
// the list. This is only called from FinalRequestProcessor
......@@ -566,7 +577,7 @@ public class DataTree {
if (list != null) {
for (String path : list) {
try {
deleteNode(path);
deleteNode(path, zxid);
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"Deleting ephemeral node "
......@@ -727,4 +738,61 @@ public class DataTree {
// dataWatches = null;
// childWatches = null;
}
public void setWatches(long relativeZxid, List<String> dataWatches,
List<String> existWatches, List<String> childWatches, Watcher watcher) {
for(String path: dataWatches) {
DataNode node = getNode(path);
WatchedEvent e = null;
if (node == null) {
e = new WatchedEvent(EventType.NodeDeleted,
KeeperState.SyncConnected, path);
} else if (node.stat.getCzxid() > relativeZxid) {
e = new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected, path);