Commit fff0e83a authored by Flavio Paiva Junqueira's avatar Flavio Paiva Junqueira
Browse files

ZOOKEEPER-1667. Watch event isn't handled correctly when a client reestablish...

ZOOKEEPER-1667. Watch event isn't handled correctly when a client reestablish to a server (jacky007, fpj via fpj)



git-svn-id: https://svn.apache.org/repos/asf/zookeeper/branches/branch-3.4@1534598 13f79535-47bb-0310-9956-ffa450edef68
parent 2d0df08e
......@@ -137,7 +137,11 @@ BUGFIXES:
ZOOKEEPER-1558. Leader should not snapshot uncommitted state (fpj)
ZOOKEEPER-1732. ZooKeeper server unable to join established ensemble (German Blanco via fpj)
ZOOKEEPER-1732. ZooKeeper server unable to join established
ensemble (German Blanco via fpj)
ZOOKEEPER-1667. Watch event isn't handled correctly when
a client reestablish to a server (jacky007, fpj via fpj)
IMPROVEMENTS:
......
......@@ -182,7 +182,7 @@
which uses a ZooKeeper service.</para>
</note>
<para>Znodes are the main enitity that a programmer access. They have
<para> A znode is the main abstraction a programmer needs to be aware of. Znodes have
several characteristics that are worth mentioning here.</para>
<section id="sc_zkDataMode_watches">
......@@ -613,6 +613,37 @@
may be missed: a watch for the existence of a znode not yet created will
be missed if the znode is created and deleted while disconnected.</para>
<section id="sc_WatchSemantics">
<title>Semantics of Watches</title>
<para> We can set watches with the three calls that read the state of
ZooKeeper: exists, getData, and getChildren. The following list details
the events that a watch can trigger and the calls that enable them:
</para>
<itemizedlist>
<listitem>
<para><emphasis role="bold">Created event:</emphasis></para>
<para>Enabled with a call to exists.</para>
</listitem>
<listitem>
<para><emphasis role="bold">Deleted event:</emphasis></para>
<para>Enabled with a call to exists, getData, and getChildren.</para>
</listitem>
<listitem>
<para><emphasis role="bold">Changed event:</emphasis></para>
<para>Enabled with a call to exists and getData.</para>
</listitem>
<listitem>
<para><emphasis role="bold">Child event:</emphasis></para>
<para>Enabled with a call to getChildren.</para>
</listitem>
</itemizedlist>
</section>
<section id="sc_WatchGuarantees">
<title>What ZooKeeper Guarantees about Watches</title>
......
......@@ -1270,8 +1270,6 @@ public class DataTree {
root = null;
nodes.clear();
ephemerals.clear();
// dataWatches = null;
// childWatches = null;
}
public void setWatches(long relativeZxid, List<String> dataWatches,
......@@ -1279,53 +1277,33 @@ public class DataTree {
Watcher watcher) {
for (String path : dataWatches) {
DataNode node = getNode(path);
WatchedEvent e = null;
if (node == null) {
e = new WatchedEvent(EventType.NodeDeleted,
KeeperState.SyncConnected, path);
} else if (node.stat.getCzxid() > relativeZxid) {
e = new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected, path);
watcher.process(new WatchedEvent(EventType.NodeDeleted,
KeeperState.SyncConnected, path));
} else if (node.stat.getMzxid() > relativeZxid) {
e = new WatchedEvent(EventType.NodeDataChanged,
KeeperState.SyncConnected, path);
}
if (e != null) {
watcher.process(e);
watcher.process(new WatchedEvent(EventType.NodeDataChanged,
KeeperState.SyncConnected, path));
} else {
this.dataWatches.addWatch(path, watcher);
}
}
for (String path : existWatches) {
DataNode node = getNode(path);
WatchedEvent e = null;
if (node == null) {
// This is the case when the watch was registered
} else if (node.stat.getMzxid() > relativeZxid) {
e = new WatchedEvent(EventType.NodeDataChanged,
KeeperState.SyncConnected, path);
} else {
e = new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected, path);
}
if (e != null) {
watcher.process(e);
if (node != null) {
watcher.process(new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected, path));
} else {
this.dataWatches.addWatch(path, watcher);
}
}
for (String path : childWatches) {
DataNode node = getNode(path);
WatchedEvent e = null;
if (node == null) {
e = new WatchedEvent(EventType.NodeDeleted,
KeeperState.SyncConnected, path);
watcher.process(new WatchedEvent(EventType.NodeDeleted,
KeeperState.SyncConnected, path));
} else if (node.stat.getPzxid() > relativeZxid) {
e = new WatchedEvent(EventType.NodeChildrenChanged,
KeeperState.SyncConnected, path);
}
if (e != null) {
watcher.process(e);
watcher.process(new WatchedEvent(EventType.NodeChildrenChanged,
KeeperState.SyncConnected, path));
} else {
this.childWatches.addWatch(path, watcher);
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.test;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import junit.framework.TestCase;
public class WatchEventWhenAutoReset extends TestCase {
protected static final Logger LOG = LoggerFactory
.getLogger(WatchEventWhenAutoReset.class);
// waiting time for expected condition
private static final int TIMEOUT = 30000;
static public class EventsWatcher extends CountdownWatcher {
private LinkedBlockingQueue<WatchedEvent> dataEvents = new LinkedBlockingQueue<WatchedEvent>();
@Override
public void process(WatchedEvent event) {
super.process(event);
try {
if (event.getType() != Event.EventType.None) {
dataEvents.put(event);
}
} catch (InterruptedException e) {
LOG.warn("ignoring interrupt during EventsWatcher process");
}
}
public void assertEvent(long timeout, EventType eventType) {
try {
WatchedEvent event = dataEvents.poll(timeout,
TimeUnit.MILLISECONDS);
Assert.assertNotNull("do not receive a " + eventType, event);
Assert.assertEquals(eventType, event.getType());
} catch (InterruptedException e) {
LOG.warn("ignoring interrupt during EventsWatcher assertEvent");
}
}
}
private ZooKeeper createClient(QuorumUtil qu, int id, EventsWatcher watcher)
throws IOException {
String hostPort = "127.0.0.1:" + qu.getPeer(id).clientPort;
ZooKeeper zk = new ZooKeeper(hostPort, TIMEOUT, watcher);
try {
watcher.waitForConnected(TIMEOUT);
} catch (InterruptedException e) {
// ignoring the interrupt
} catch (TimeoutException e) {
fail("can not connect to " + hostPort);
}
return zk;
}
private ZooKeeper createClient(QuorumUtil qu, int id) throws IOException {
return createClient(qu, id, new EventsWatcher());
}
@Test
public void testNodeDataChanged() throws Exception {
QuorumUtil qu = new QuorumUtil(1);
qu.startAll();
EventsWatcher watcher = new EventsWatcher();
ZooKeeper zk1 = createClient(qu, 1, watcher);
ZooKeeper zk2 = createClient(qu, 2);
String path = "/test-changed";
zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zk1.getData(path, watcher, null);
qu.shutdown(1);
zk2.delete(path, -1);
zk2.create(path, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
qu.start(1);
watcher.waitForConnected(TIMEOUT);
watcher.assertEvent(TIMEOUT, EventType.NodeDataChanged);
zk1.exists(path, watcher);
qu.shutdown(1);
zk2.delete(path, -1);
zk2.create(path, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
qu.start(1);
watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, EventType.NodeDataChanged);
qu.shutdownAll();
}
@Test
public void testNodeCreated() throws Exception {
QuorumUtil qu = new QuorumUtil(1);
qu.startAll();
EventsWatcher watcher = new EventsWatcher();
ZooKeeper zk1 = createClient(qu, 1, watcher);
ZooKeeper zk2 = createClient(qu, 2);
String path = "/test1-created";
zk1.exists(path, watcher);
qu.shutdown(1);
zk2.create(path, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
qu.start(1);
watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, EventType.NodeCreated);
qu.shutdownAll();
}
@Test
public void testNodeDeleted() throws Exception {
QuorumUtil qu = new QuorumUtil(1);
qu.startAll();
EventsWatcher watcher = new EventsWatcher();
ZooKeeper zk1 = createClient(qu, 1, watcher);
ZooKeeper zk2 = createClient(qu, 2);
String path = "/test-deleted";
zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zk1.getData(path, watcher, null);
qu.shutdown(1);
zk2.delete(path, -1);
qu.start(1);
watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, EventType.NodeDeleted);
zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zk1.exists(path, watcher);
qu.shutdown(1);
zk2.delete(path, -1);
qu.start(1);
watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, EventType.NodeDeleted);
zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zk1.getChildren(path, watcher);
qu.shutdown(1);
zk2.delete(path, -1);
qu.start(1);
watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, EventType.NodeDeleted);
qu.shutdownAll();
}
@Test
public void testNodeChildrenChanged() throws Exception {
QuorumUtil qu = new QuorumUtil(1);
qu.startAll();
EventsWatcher watcher = new EventsWatcher();
ZooKeeper zk1 = createClient(qu, 1, watcher);
ZooKeeper zk2 = createClient(qu, 2);
String path = "/test-children-changed";
zk1.create(path, new byte[1], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zk1.getChildren(path, watcher);
qu.shutdown(1);
zk2.create(path + "/children-1", new byte[2],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
qu.start(1);
watcher.waitForConnected(TIMEOUT * 1000L);
watcher.assertEvent(TIMEOUT, EventType.NodeChildrenChanged);
qu.shutdownAll();
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment