Commit 4c643cd3 authored by Michi Mutsuzaki's avatar Michi Mutsuzaki
Browse files

ZOOKEEPER-1683. ZooKeeper client NPE when updating server list on disconnected...

ZOOKEEPER-1683. ZooKeeper client NPE when updating server list on disconnected client (shralex via michim)


git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1611474 13f79535-47bb-0310-9956-ffa450edef68
parent b42a1c22
......@@ -696,6 +696,9 @@ BUGFIXES:
ZOOKEEPER-1966. VS and line breaks (Orion Hodson via fpj)
ZOOKEEPER-1683. ZooKeeper client NPE when updating server list on disconnected
client (shralex via michim)
IMPROVEMENTS:
ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
......
......@@ -470,11 +470,13 @@ public:
// Assert next server is in the 'new' list
size_t found = newComing.find(next);
CPPUNIT_ASSERT(found != string::npos);
CPPUNIT_ASSERT_MESSAGE(next + " not in newComing list",
found != string::npos);
// Assert not in seen list then append
found = seen.find(next);
CPPUNIT_ASSERT(found == string::npos);
CPPUNIT_ASSERT_MESSAGE(next + " in seen list",
found == string::npos);
seen += found + ", ";
}
......
......@@ -377,7 +377,12 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket {
@Override
void testableCloseSocket() throws IOException {
LOG.info("testableCloseSocket() called");
((SocketChannel) sockKey.channel()).socket().close();
// sockKey may be concurrently accessed by multiple
// threads. We use tmp here to avoid a race condition
SelectionKey tmp = sockKey;
if (tmp!=null) {
((SocketChannel) tmp.channel()).socket().close();
}
}
@Override
......
......@@ -152,8 +152,9 @@ public final class StaticHostProvider implements HostProvider {
@Override
public boolean updateServerList(Collection<InetSocketAddress> serverAddresses,
InetSocketAddress currentHost) {
public synchronized boolean updateServerList(
Collection<InetSocketAddress> serverAddresses,
InetSocketAddress currentHost) {
// Resolve server addresses and shuffle them
List<InetSocketAddress> resolvedList = resolveAndShuffle(serverAddresses);
if (resolvedList.isEmpty()) {
......@@ -162,74 +163,106 @@ public final class StaticHostProvider implements HostProvider {
}
// Check if client's current server is in the new list of servers
boolean myServerInNewConfig = false;
InetSocketAddress myServer = currentHost;
// choose "current" server according to the client rebalancing algorithm
if (reconfigMode) {
myServer = next(0);
}
// if the client is not currently connected to any server
if (myServer == null) {
// reconfigMode = false (next shouldn't return null).
if (lastIndex >= 0) {
// take the last server to which we were connected
myServer = this.serverAddresses.get(lastIndex);
} else {
// take the first server on the list
myServer = this.serverAddresses.get(0);
}
}
for (InetSocketAddress addr : resolvedList) {
if (addr.getPort() == currentHost.getPort() &&
((addr.getAddress()!=null && currentHost.getAddress()!=null &&
addr.getAddress().equals(currentHost.getAddress()))
|| addr.getHostName().equals(currentHost.getHostName()))) {
myServerInNewConfig = true;
break;
}
if (addr.getPort() == myServer.getPort()
&& ((addr.getAddress() != null
&& myServer.getAddress() != null && addr
.getAddress().equals(myServer.getAddress())) || addr
.getHostName().equals(myServer.getHostName()))) {
myServerInNewConfig = true;
break;
}
}
synchronized(this) {
reconfigMode = true;
newServers.clear();
oldServers.clear();
// Divide the new servers into oldServers that were in the previous list
// and newServers that were not in the previous list
for (InetSocketAddress resolvedAddress : resolvedList) {
if (this.serverAddresses.contains(resolvedAddress)) {
oldServers.add(resolvedAddress);
} else {
newServers.add(resolvedAddress);
}
}
reconfigMode = true;
newServers.clear();
oldServers.clear();
// Divide the new servers into oldServers that were in the previous list
// and newServers that were not in the previous list
for (InetSocketAddress resolvedAddress : resolvedList) {
if (this.serverAddresses.contains(resolvedAddress)) {
oldServers.add(resolvedAddress);
} else {
newServers.add(resolvedAddress);
}
}
int numOld = oldServers.size();
int numNew = newServers.size();
// number of servers increased
if (numOld + numNew > this.serverAddresses.size()) {
if (myServerInNewConfig) {
// my server is in new config, but load should be decreased.
// Need to decide if this client
// is moving to one of the new servers
if (sourceOfRandomness.nextFloat() <= (1 - ((float) this.serverAddresses
.size()) / (numOld + numNew))) {
pNew = 1;
pOld = 0;
} else {
// do nothing special - stay with the current server
reconfigMode = false;
}
} else {
// my server is not in new config, and load on old servers must
// be decreased, so connect to
// one of the new servers
int numOld = oldServers.size();
int numNew = newServers.size();
// number of servers increased
if (numOld + numNew > this.serverAddresses.size()) {
if (myServerInNewConfig) {
// my server is in new config, but load should be decreased.
// Need to decide if this client
// is moving to one of the new servers
if (sourceOfRandomness.nextFloat() <= (1 - ((float) this.serverAddresses
.size()) / (numOld + numNew))) {
pNew = 1;
pOld = 0;
}
} else { // number of servers stayed the same or decreased
if (myServerInNewConfig) {
// my server is in new config, and load should be increased, so
// stay with this server and do nothing special
reconfigMode = false;
} else {
pOld = ((float) (numOld * (this.serverAddresses.size() - (numOld + numNew))))
/ ((numOld + numNew) * (this.serverAddresses.size() - numOld));
pNew = 1 - pOld;
// do nothing special - stay with the current server
reconfigMode = false;
}
} else {
// my server is not in new config, and load on old servers must
// be decreased, so connect to
// one of the new servers
pNew = 1;
pOld = 0;
}
} else { // number of servers stayed the same or decreased
if (myServerInNewConfig) {
// my server is in new config, and load should be increased, so
// stay with this server and do nothing special
reconfigMode = false;
} else {
pOld = ((float) (numOld * (this.serverAddresses.size() - (numOld + numNew))))
/ ((numOld + numNew) * (this.serverAddresses.size() - numOld));
pNew = 1 - pOld;
}
}
this.serverAddresses = resolvedList;
currentIndexOld = -1;
currentIndexNew = -1;
if (!reconfigMode) {
currentIndex = resolvedList.indexOf(getServerAtCurrentIndex());
} else {
currentIndex = -1;
lastIndex = -1;
return reconfigMode;
}
this.serverAddresses = resolvedList;
currentIndexOld = -1;
currentIndexNew = -1;
lastIndex = currentIndex;
return reconfigMode;
}
public synchronized InetSocketAddress getServerAtIndex(int i) {
if (i < 0 || i >= serverAddresses.size()) return null;
return serverAddresses.get(i);
}
public synchronized InetSocketAddress getServerAtCurrentIndex() {
return getServerAtIndex(currentIndex);
}
public synchronized int size() {
......@@ -279,7 +312,10 @@ public final class StaticHostProvider implements HostProvider {
synchronized(this) {
if (reconfigMode) {
addr = nextHostInReconfigMode();
if (addr != null) return addr;
if (addr != null) {
currentIndex = serverAddresses.indexOf(addr);
return addr;
}
//tried all servers and couldn't connect
reconfigMode = false;
needToSleep = (spinDelay > 0);
......
......@@ -123,21 +123,25 @@ public class StaticHostProviderTest extends ZKTestCase {
// Number of machines becomes smaller, my server is in the new cluster
boolean disconnectRequired = hostProvider.updateServerList(newList, myServer);
assertTrue(!disconnectRequired);
hostProvider.onConnected();
// Number of machines stayed the same, my server is in the new cluster
disconnectRequired = hostProvider.updateServerList(newList, myServer);
assertTrue(!disconnectRequired);
hostProvider.onConnected();
// Number of machines became smaller, my server is not in the new
// cluster
newList = getServerAddresses((byte) 2); // 10.10.10.2:1236, 10.10.10.1:1235
disconnectRequired = hostProvider.updateServerList(newList, myServer);
assertTrue(disconnectRequired);
hostProvider.onConnected();
// Number of machines stayed the same, my server is not in the new
// cluster
disconnectRequired = hostProvider.updateServerList(newList, myServer);
assertTrue(disconnectRequired);
hostProvider.onConnected();
// Number of machines increased, my server is not in the new cluster
newList = new ArrayList<InetSocketAddress>(3);
......@@ -147,6 +151,7 @@ public class StaticHostProviderTest extends ZKTestCase {
myServer = new InetSocketAddress(InetAddress.getByAddress(new byte[]{10, 10, 10, 1}), 1235);
disconnectRequired = hostProvider.updateServerList(newList, myServer);
assertTrue(disconnectRequired);
hostProvider.onConnected();
// Number of machines increased, my server is in the new cluster
// Here whether to move or not depends on the difference of cluster
......@@ -162,6 +167,7 @@ public class StaticHostProviderTest extends ZKTestCase {
if (disconnectRequired)
numDisconnects++;
}
hostProvider.onConnected();
// should be numClients/10 in expectation, we test that its numClients/10 +- slackPercent
assertTrue(numDisconnects < upperboundCPS(numClients, 10));
......@@ -227,6 +233,7 @@ public class StaticHostProviderTest extends ZKTestCase {
}
assertEquals(first, hostProvider.next(0));
hostProvider.onConnected();
}
@Test
......@@ -242,6 +249,7 @@ public class StaticHostProviderTest extends ZKTestCase {
hostProviderArray[i] = getHostProvider((byte) 9);
curHostForEachClient[i] = hostProviderArray[i].next(0);
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
hostProviderArray[i].onConnected();
}
for (int i = 0; i < 9; i++) {
......@@ -257,6 +265,7 @@ public class StaticHostProviderTest extends ZKTestCase {
disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
hostProviderArray[i].onConnected();
}
for (int i = 0; i < 8; i++) {
......@@ -273,6 +282,7 @@ public class StaticHostProviderTest extends ZKTestCase {
disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
hostProviderArray[i].onConnected();
}
for (int i = 0; i < 6; i++) {
......@@ -295,6 +305,7 @@ public class StaticHostProviderTest extends ZKTestCase {
disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
hostProviderArray[i].onConnected();
}
assertTrue(numClientsPerHost[0] == 0);
......@@ -312,6 +323,130 @@ public class StaticHostProviderTest extends ZKTestCase {
disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
hostProviderArray[i].onConnected();
}
for (int i = 0; i < 9; i++) {
assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 9));
assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 9));
}
}
@Test
public void testNoCurrentHostDuringNormalMode() throws UnknownHostException {
// Start with 9 servers and 10000 clients
boolean disconnectRequired;
StaticHostProvider[] hostProviderArray = new StaticHostProvider[numClients];
InetSocketAddress[] curHostForEachClient = new InetSocketAddress[numClients];
int[] numClientsPerHost = new int[9];
// initialization
for (int i = 0; i < numClients; i++) {
hostProviderArray[i] = getHostProvider((byte) 9);
if (i >= (numClients / 2)) {
curHostForEachClient[i] = hostProviderArray[i].next(0);
} else {
// its supposed to be the first server on serverList.
// we'll set it later, see below (*)
curHostForEachClient[i] = null;
}
}
// remove hosts 7 and 8 (the last two in a list of 9 hosts)
Collection<InetSocketAddress> newList = getServerAddresses((byte) 7);
for (int i = 0; i < numClients; i++) {
// tests the case currentHost == null && lastIndex == -1
// calls next for clients with index < numClients/2
disconnectRequired = hostProviderArray[i].updateServerList(newList,
curHostForEachClient[i]);
if (disconnectRequired)
curHostForEachClient[i] = hostProviderArray[i].next(0);
else if (curHostForEachClient[i] == null) {
// (*) setting it to what it should be
curHostForEachClient[i] = hostProviderArray[i]
.getServerAtIndex(0);
}
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
// sets lastIndex, resets reconfigMode
hostProviderArray[i].onConnected();
}
for (int i = 0; i < 7; i++) {
assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 7));
assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 7));
numClientsPerHost[i] = 0; // prepare for next test
}
assertTrue(numClientsPerHost[7] == 0);
assertTrue(numClientsPerHost[8] == 0);
// add back server 7
newList = getServerAddresses((byte) 8);
for (int i = 0; i < numClients; i++) {
InetSocketAddress myServer = (i < (numClients / 2)) ? null
: curHostForEachClient[i];
// tests the case currentHost == null && lastIndex >= 0
disconnectRequired = hostProviderArray[i].updateServerList(newList,
myServer);
if (disconnectRequired)
curHostForEachClient[i] = hostProviderArray[i].next(0);
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
hostProviderArray[i].onConnected();
}
for (int i = 0; i < 8; i++) {
assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 8));
assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 8));
}
}
@Test
public void testReconfigDuringReconfigMode() throws UnknownHostException {
// Start with 9 servers and 10000 clients
boolean disconnectRequired;
StaticHostProvider[] hostProviderArray = new StaticHostProvider[numClients];
InetSocketAddress[] curHostForEachClient = new InetSocketAddress[numClients];
int[] numClientsPerHost = new int[9];
// initialization
for (int i = 0; i < numClients; i++) {
hostProviderArray[i] = getHostProvider((byte) 9);
curHostForEachClient[i] = hostProviderArray[i].next(0);
}
// remove hosts 7 and 8 (the last two in a list of 9 hosts)
Collection<InetSocketAddress> newList = getServerAddresses((byte) 7);
for (int i = 0; i < numClients; i++) {
// sets reconfigMode
hostProviderArray[i].updateServerList(newList,
curHostForEachClient[i]);
}
// add back servers 7 and 8 while still in reconfigMode (we didn't call
// next)
newList = getServerAddresses((byte) 9);
for (int i = 0; i < numClients; i++) {
InetSocketAddress myServer = (i < (numClients / 2)) ? null
: curHostForEachClient[i];
// for i < (numClients/2) this tests the case currentHost == null &&
// reconfigMode = true
// for i >= (numClients/2) this tests the case currentHost!=null &&
// reconfigMode = true
disconnectRequired = hostProviderArray[i].updateServerList(newList,
myServer);
if (disconnectRequired)
curHostForEachClient[i] = hostProviderArray[i].next(0);
else {
// currentIndex was set by the call to updateServerList, which
// called next
curHostForEachClient[i] = hostProviderArray[i]
.getServerAtCurrentIndex();
}
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
hostProviderArray[i].onConnected();
}
for (int i = 0; i < 9; i++) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment