Commit e3909eaf authored by Mahadev Konar's avatar Mahadev Konar
Browse files

ZOOKEEPER-204. SetWatches needs to be the first message after auth messages to...

ZOOKEEPER-204. SetWatches needs to be the first message after auth messages to the server (ben via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/branches/branch-3.0@718926 13f79535-47bb-0310-9956-ffa450edef68
parent 622691c4
......@@ -30,6 +30,9 @@ BUGFIXES:
ZOOKEEPER-226. fix exists calls that fail on server if node has null data.
(mahadev)
ZOOKEEPER-204. SetWatches needs to be the first message after auth messages
to the server (ben via mahadev)
Release 3.0.0 - 2008-10-21
Non-backward compatible changes:
......
......@@ -336,87 +336,96 @@ public class ClientCnxn {
try {
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
break;
}
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair)event;
for (Watcher watcher: pair.watchers) {
watcher.process(pair.event);
}
} else {
Packet p = (Packet) event;
int rc = 0;
String path = p.path;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
try {
if (event == eventOfDeath) {
break;
}
if (p.cb == null) {
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
StatCallback cb = (StatCallback) p.cb;
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
cb.processResult(rc, path, p.ctx,
((ExistsResponse) p.response)
.getStat());
} else if (p.response instanceof SetDataResponse) {
cb.processResult(rc, path, p.ctx,
((SetDataResponse) p.response)
.getStat());
} else if (p.response instanceof SetACLResponse) {
cb.processResult(rc, path, p.ctx,
((SetACLResponse) p.response)
.getStat());
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
} else {
cb.processResult(rc, path, p.ctx, null);
}
} else if (p.response instanceof GetDataResponse) {
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc, path, p.ctx,
rsp.getData(), rsp.getStat());
} else {
cb.processResult(rc, path, p.ctx, null, null);
}
} else if (p.response instanceof GetACLResponse) {
ACLCallback cb = (ACLCallback) p.cb;
GetACLResponse rsp = (GetACLResponse) p.response;
if (rc == 0) {
cb.processResult(rc, path, p.ctx, rsp.getAcl(),
rsp.getStat());
} else {
cb.processResult(rc, path, p.ctx, null, null);
}
} else if (p.response instanceof GetChildrenResponse) {
ChildrenCallback cb = (ChildrenCallback) p.cb;
GetChildrenResponse rsp = (GetChildrenResponse) p.response;
if (rc == 0) {
cb.processResult(rc, path, p.ctx, rsp
.getChildren());
} else {
cb.processResult(rc, path, p.ctx, null);
} else {
Packet p = (Packet) event;
int rc = 0;
String path = p.path;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
} else if (p.response instanceof CreateResponse) {
StringCallback cb = (StringCallback) p.cb;
CreateResponse rsp = (CreateResponse) p.response;
if (rc == 0) {
cb
.processResult(rc, path, p.ctx, rsp
.getPath());
} else {
cb.processResult(rc, path, p.ctx, null);
if (p.cb == null) {
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
StatCallback cb = (StatCallback) p.cb;
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
cb.processResult(rc, path, p.ctx,
((ExistsResponse) p.response)
.getStat());
} else if (p.response instanceof SetDataResponse) {
cb.processResult(rc, path, p.ctx,
((SetDataResponse) p.response)
.getStat());
} else if (p.response instanceof SetACLResponse) {
cb.processResult(rc, path, p.ctx,
((SetACLResponse) p.response)
.getStat());
}
} else {
cb.processResult(rc, path, p.ctx, null);
}
} else if (p.response instanceof GetDataResponse) {
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc, path, p.ctx, rsp
.getData(), rsp.getStat());
} else {
cb.processResult(rc, path, p.ctx, null,
null);
}
} else if (p.response instanceof GetACLResponse) {
ACLCallback cb = (ACLCallback) p.cb;
GetACLResponse rsp = (GetACLResponse) p.response;
if (rc == 0) {
cb.processResult(rc, path, p.ctx, rsp
.getAcl(), rsp.getStat());
} else {
cb.processResult(rc, path, p.ctx, null,
null);
}
} else if (p.response instanceof GetChildrenResponse) {
ChildrenCallback cb = (ChildrenCallback) p.cb;
GetChildrenResponse rsp = (GetChildrenResponse) p.response;
if (rc == 0) {
cb.processResult(rc, path, p.ctx, rsp
.getChildren());
} else {
cb.processResult(rc, path, p.ctx, null);
}
} else if (p.response instanceof CreateResponse) {
StringCallback cb = (StringCallback) p.cb;
CreateResponse rsp = (CreateResponse) p.response;
if (rc == 0) {
cb.processResult(rc, path, p.ctx, rsp
.getPath());
} else {
cb.processResult(rc, path, p.ctx, null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb = (VoidCallback) p.cb;
cb.processResult(rc, path, p.ctx);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb = (VoidCallback) p.cb;
cb.processResult(rc, path, p.ctx);
}
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}
} catch (InterruptedException e) {
......@@ -504,15 +513,6 @@ public class ClientCnxn {
sessionPasswd = conRsp.getPasswd();
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected, null));
if (!disableAutoWatchReset) {
SetWatches sw = new SetWatches(lastZxid,
zooKeeper.getDataWatches(),
zooKeeper.getExistWatches(),
zooKeeper.getChildWatches());
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
queuePacket(h, new ReplyHeader(), sw, null, null, null, null, null);
}
}
void readResponse() throws IOException {
......@@ -702,6 +702,20 @@ public class ClientCnxn {
bb.putInt(bb.capacity() - 4);
bb.rewind();
synchronized (outgoingQueue) {
// We add backwards since we are pushing into the front
if (!disableAutoWatchReset) {
SetWatches sw = new SetWatches(lastZxid,
zooKeeper.getDataWatches(),
zooKeeper.getExistWatches(),
zooKeeper.getChildWatches());
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
h.setXid(-8);
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null,
null);
outgoingQueue.addFirst(packet);
}
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
......
......@@ -192,8 +192,11 @@ public class ZooKeeper {
}
// XXX This shouldn't be needed, but just in case
synchronized (existWatches) {
addTo(existWatches.remove(path), result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
Set<Watcher> list = existWatches.remove(path);
if (list != null) {
addTo(existWatches.remove(path), result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
addTo(childWatches.remove(path), result);
......
package org.apache.zookeeper;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.List;
public class TestableZooKeeper extends ZooKeeper {
public TestableZooKeeper(String host, int sessionTimeout,
Watcher watcher) throws IOException {
super(host, sessionTimeout, watcher);
}
@Override
public List<String> getChildWatches() {
return super.getChildWatches();
}
@Override
public List<String> getDataWatches() {
return super.getDataWatches();
}
@Override
public List<String> getExistWatches() {
return super.getExistWatches();
}
/**
* Cause this ZooKeeper object to stop receiving from the ZooKeeperServer
* for the given number of milliseconds.
* @param ms the number of milliseconds to pause.
*/
public void pauseCnxn(final long ms) {
new Thread() {
public void run() {
synchronized(cnxn) {
try {
try {
((SocketChannel)cnxn.sendThread.sockKey.channel()).socket().close();
} catch (IOException e) {
e.printStackTrace();
}
Thread.sleep(ms);
} catch (InterruptedException e) {
}
}
}
}.start();
}
}
......@@ -33,6 +33,7 @@ import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
......@@ -126,10 +127,10 @@ public abstract class ClientBase extends TestCase {
return createClient(watcher, hp);
}
protected ZooKeeper createClient(CountdownWatcher watcher, String hp)
protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp)
throws IOException, InterruptedException
{
ZooKeeper zk = new ZooKeeper(hp, 9000, watcher);
TestableZooKeeper zk = new TestableZooKeeper(hp, 9000, watcher);
if (!watcher.clientConnected.await(CONNECTION_TIMEOUT,
TimeUnit.MILLISECONDS))
{
......
......@@ -27,7 +27,10 @@ import org.apache.log4j.Logger;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
......@@ -40,6 +43,14 @@ import org.junit.Test;
public class WatcherTest extends ClientBase {
protected static final Logger LOG = Logger.getLogger(WatcherTest.class);
private final class MyStatCallback implements StatCallback {
int rc;
public void processResult(int rc, String path, Object ctx, Stat stat) {
((int[])ctx)[0]++;
this.rc = rc;
}
}
private class MyWatcher extends CountdownWatcher {
LinkedBlockingQueue<WatchedEvent> events =
new LinkedBlockingQueue<WatchedEvent>();
......@@ -118,6 +129,57 @@ public class WatcherTest extends ClientBase {
}
}
final static int COUNT = 100;
boolean hasSeenDelete = true;
/**
* This test checks that watches for pending requests do not get triggered,
* but watches set by previous requests do.
*
* @throws Exception
*/
@Test
public void testWatchAutoResetWithPending() throws Exception {
MyWatcher watches[] = new MyWatcher[COUNT];
MyStatCallback cbs[] = new MyStatCallback[COUNT];
MyWatcher watcher = new MyWatcher();
int count[] = new int[1];
TestableZooKeeper zk = createClient(watcher, hostPort);
ZooKeeper zk2 = createClient(watcher, hostPort);
zk2.create("/test", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
for(int i = 0; i < COUNT/2; i++) {
watches[i] = new MyWatcher();
cbs[i] = new MyStatCallback();
zk.exists("/test", watches[i], cbs[i], count);
}
zk.exists("/test", false);
zk.pauseCnxn(4000);
Thread.sleep(50);
zk2.close();
stopServer();
watches[0].waitForDisconnected(3000);
for(int i = COUNT/2; i < COUNT; i++) {
watches[i] = new MyWatcher();
cbs[i] = new MyStatCallback();
zk.exists("/test", watches[i], cbs[i], count);
}
startServer();
watches[49].waitForConnected(4000);
assertEquals(null, zk.exists("/test", false));
Thread.sleep(10);
for(int i = 0; i < COUNT/2; i++) {
assertEquals("For " + i, 1, watches[i].events.size());
}
for(int i = COUNT/2; i < COUNT; i++) {
if (cbs[i].rc == 0) {
assertEquals("For " +i, 1, watches[i].events.size());
} else {
assertEquals("For " +i, 0, watches[i].events.size());
}
}
assertEquals(COUNT, count[0]);
zk.close();
}
@Test
public void testWatcherAutoResetWithGlobal() throws Exception {
ZooKeeper zk = null;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment