Commit 6be07c7a authored by Flavio Paiva Junqueira's avatar Flavio Paiva Junqueira
Browse files

ZOOKEEPER-1817. Fix don't care for b3.4 (fpj)



git-svn-id: https://svn.apache.org/repos/asf/zookeeper/branches/branch-3.4@1544858 13f79535-47bb-0310-9956-ffa450edef68
parent e8932816
......@@ -162,6 +162,8 @@ BUGFIXES:
ZOOKEEPER-1597. Windows build failing (michim via fpj)
ZOOKEEPER-1817. Fix don't care for b3.4 (fpj)
IMPROVEMENTS:
ZOOKEEPER-1564. Allow JUnit test build with IBM Java
......
......@@ -66,14 +66,6 @@ public class FastLeaderElection implements Election {
*/
final static int maxNotificationInterval = 60000;
/**
* This value is passed to the methods that check the quorum
* majority of an established ensemble for those values that
* should not be taken into account in the comparison
* (electionEpoch and zxid).
*/
final static int IGNOREVALUE = -1;
/**
* Connection manager. Fast leader election uses TCP for
......@@ -330,7 +322,8 @@ public class FastLeaderElection implements Election {
* Version added in 3.4.6
*/
n.version = (response.buffer.remaining() >= 4) ? response.buffer.getInt() : 0x0;
n.version = (response.buffer.remaining() >= 4) ?
response.buffer.getInt() : 0x0;
/*
* Print notification info
......@@ -377,14 +370,29 @@ public class FastLeaderElection implements Election {
Long.toHexString(current.getZxid()) +
" leader=" + current.getId());
}
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
ToSend notmsg;
if(n.version > 0x0) {
notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
} else {
Vote bcVote = self.getBCVote();
notmsg = new ToSend(
ToSend.mType.notification,
bcVote.getId(),
bcVote.getZxid(),
bcVote.getElectionEpoch(),
self.getPeerState(),
response.sid,
bcVote.getPeerEpoch());
}
sendqueue.offer(notmsg);
}
}
......@@ -625,7 +633,7 @@ public class FastLeaderElection implements Election {
* @param l Identifier of the vote received last
* @param zxid zxid of the the vote received last
*/
private boolean termPredicate(
protected boolean termPredicate(
HashMap<Long, Vote> votes,
Vote vote) {
......@@ -655,7 +663,7 @@ public class FastLeaderElection implements Election {
* @param leader leader id
* @param electionEpoch epoch id
*/
private boolean checkLeader(
protected boolean checkLeader(
HashMap<Long, Vote> votes,
long leader,
long electionEpoch){
......@@ -678,6 +686,30 @@ public class FastLeaderElection implements Election {
return predicate;
}
/**
* This predicate checks that a leader has been elected. It doesn't
* make a lot of sense without context (check lookForLeader) and it
* has been separated for testing purposes.
*
* @param recv map of received votes
* @param ooe map containing out of election votes (LEADING or FOLLOWING)
* @param n Notification
* @return
*/
protected boolean ooePredicate(HashMap<Long,Vote> recv,
HashMap<Long,Vote> ooe,
Notification n) {
return (termPredicate(recv, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state))
&& checkLeader(ooe, n.leader, n.electionEpoch));
}
synchronized void updateProposal(long leader, long zxid, long epoch){
if(LOG.isDebugEnabled()){
......@@ -881,7 +913,9 @@ public class FastLeaderElection implements Election {
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
proposedZxid,
logicalclock,
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
......@@ -897,51 +931,52 @@ public class FastLeaderElection implements Election {
* together.
*/
if(n.electionEpoch == logicalclock){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
* Only peer epoch is used to check that the votes come
* from the same ensemble. This is because there is at
* least one corner case in which the ensemble can be
* created with inconsistent zxid and election epoch
* info. However, given that only one ensemble can be
* running at a single point in time and that each
* epoch is used only once, using only the epoch to
* compare the votes is sufficient.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock = n.electionEpoch;
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: " + n.state
+ " (n.state), " + n.sid + " (n.sid)");
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
......
......@@ -204,6 +204,11 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
* This is who I think the leader currently is.
*/
volatile private Vote currentVote;
/**
* ... and its counterpart for backward compatibility
*/
volatile private Vote bcVote;
public synchronized Vote getCurrentVote(){
return currentVote;
......@@ -211,8 +216,20 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
public synchronized void setCurrentVote(Vote v){
currentVote = v;
}
}
synchronized Vote getBCVote() {
if (bcVote == null) {
return currentVote;
} else {
return bcVote;
}
}
synchronized void setBCVote(Vote v) {
bcVote = v;
}
volatile boolean running = true;
/**
......@@ -715,6 +732,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
};
try {
roZkMgr.start();
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
......@@ -727,6 +745,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
}
} else {
try {
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
......@@ -1202,6 +1221,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
*/
protected void updateElectionVote(long newEpoch) {
Vote currentVote = getCurrentVote();
setBCVote(currentVote);
if (currentVote != null) {
setCurrentVote(new Vote(currentVote.getId(),
currentVote.getZxid(),
......
......@@ -19,9 +19,12 @@
package org.apache.zookeeper.server.quorum;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Vote {
private static final Logger LOG = LoggerFactory.getLogger(Vote.class);
public Vote(long id,
long zxid) {
......@@ -125,19 +128,42 @@ public class Vote {
return false;
}
Vote other = (Vote) o;
return (id == other.id
/*
* There are two things going on in the logic below.
* First, we compare votes of servers out of election
* using only id and peer epoch. Second, if one version
* is 0x0 and the other isn't, then we only use the
* leader id. This case is here to enable rolling upgrades.
*
* {@see https://issues.apache.org/jira/browse/ZOOKEEPER-1805}
*/
if ((state == ServerState.LOOKING) ||
(other.state == ServerState.LOOKING)) {
return (id == other.id
&& zxid == other.zxid
&& electionEpoch == other.electionEpoch
&& peerEpoch == other.peerEpoch);
} else {
if ((version > 0x0) ^ (other.version > 0x0)) {
return id == other.id;
} else {
return (id == other.id
&& peerEpoch == other.peerEpoch);
}
}
}
@Override
public int hashCode() {
return (int) (id & zxid);
}
public String toString() {
return "(" + id + ", " + Long.toHexString(zxid) + ", " + Long.toHexString(peerEpoch) + ")";
return String.format("(%d, %s, %s)",
id,
Long.toHexString(zxid),
Long.toHexString(peerEpoch));
}
}
......@@ -20,6 +20,10 @@ package org.apache.zookeeper.server.quorum.flexible;
import java.util.HashSet;
//import org.apache.zookeeper.server.quorum.QuorumCnxManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class implements a validator for majority quorums. The
......@@ -27,6 +31,8 @@ import java.util.HashSet;
*
*/
public class QuorumMaj implements QuorumVerifier {
private static final Logger LOG = LoggerFactory.getLogger(QuorumMaj.class);
int half;
/**
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.server.quorum.FastLeaderElection;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.Vote;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.FLETest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class FLEDontCareTest {
protected static final Logger LOG = LoggerFactory.getLogger(FLEDontCareTest.class);
class MockFLE extends FastLeaderElection {
MockFLE(QuorumPeer peer, QuorumCnxManager cnxManager) {
super(peer, cnxManager);
}
public boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
return super.termPredicate(votes, vote);
}
public boolean checkLeader(HashMap<Long,Vote> votes, long leader, long electionEpoch) {
return super.checkLeader(votes, leader, electionEpoch);
}
public boolean ooePredicate(HashMap<Long,Vote> recv,
HashMap<Long,Vote> ooe,
FastLeaderElection.Notification n) {
return super.ooePredicate(recv, ooe, n);
}
}
HashMap<Long,QuorumServer> peers;
QuorumPeer peer;
File tmpdir;
@Before
public void setUp()
throws Exception {
tmpdir = ClientBase.createTmpDir();
peers = new HashMap<Long,QuorumServer>();
for(int i = 0; i < 5; i++) {
peers.put(Long.valueOf(i),
new QuorumServer(Long.valueOf(i),
new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
}
peer = new QuorumPeer(peers,
tmpdir,
tmpdir,
PortAssignment.unique(),
3, 3, 1000, 2, 2);
}
@After
public void tearDown(){
tmpdir.delete();
}
@Test
public void testDontCare() {
MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer));
HashMap<Long, Vote> votes = new HashMap<Long, Vote>();
votes.put(0L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 1), 1, 2, ServerState.FOLLOWING));
votes.put(1L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 2), 1, 2, ServerState.FOLLOWING));
votes.put(3L, new Vote(0x1, 4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.FOLLOWING));
votes.put(4L, new Vote(0x1, 4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.LEADING));
Assert.assertTrue(fle.termPredicate(votes,
new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.FOLLOWING)));
}
@Test
public void testDontCareVersion() {
MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer));
HashMap<Long, Vote> votes = new HashMap<Long, Vote>();
votes.put(0L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 1), 1, 1, ServerState.FOLLOWING));
votes.put(1L, new Vote(0x1, 4L, ZxidUtils.makeZxid(1, 1), 1, 1, ServerState.FOLLOWING));
votes.put(3L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.FOLLOWING));
votes.put(4L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.LEADING));
Assert.assertTrue(fle.termPredicate(votes,
new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.FOLLOWING)));
}
@Test
public void testLookingNormal() {
MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer));
HashMap<Long, Vote> votes = new HashMap<Long, Vote>();
votes.put(0L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING));
votes.put(1L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING));
votes.put(3L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING));
votes.put(4L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LEADING));
Assert.assertTrue(fle.termPredicate(votes,
new Vote(4L, ZxidUtils.makeZxid(2, 1), 1, 1, ServerState.LOOKING)));
}
@Test
public void testLookingDiffRounds() {
MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer));
HashMap<Long, Vote> votes = new HashMap<Long, Vote>();
votes.put(0L, new Vote(4L, ZxidUtils.makeZxid(1, 1), 1, 1, ServerState.LOOKING));
votes.put(1L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.LOOKING));
votes.put(3L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 3, 2, ServerState.LOOKING));
votes.put(4L, new Vote(4L, ZxidUtils.makeZxid(2, 1), 3, 2, ServerState.LEADING));
Assert.assertFalse(fle.termPredicate(votes,
new Vote(4L, ZxidUtils.makeZxid(2, 1), 2, 2, ServerState.LOOKING)));
}
/**
* Helper method to build notifications and populate outofelection.
*
*
* @param version
* @param leader
* @param zxid
* @param electionEpoch
* @param state
* @param sid
* @param peerEpoch
* @param outofelection
* @return
*/
FastLeaderElection.Notification genNotification(int version,
long leader,
long zxid,
long electionEpoch,
ServerState state,
long sid,
long peerEpoch,
HashMap<Long,Vote> outofelection) {
FastLeaderElection.Notification n = new FastLeaderElection.Notification();
n.version = version;
n.leader = leader;
n.zxid = zxid;
n.electionEpoch = electionEpoch;
n.state = state;
n.sid = sid;
n.peerEpoch = peerEpoch;
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
return n;
}
@Test
public void testOutofElection() {
MockFLE fle = new MockFLE(peer, new QuorumCnxManager(peer));
HashMap<Long,Vote> outofelection = new HashMap<Long,Vote>();
/*
* Generates notifications emulating servers 1, 2, 4, and 5.
* Server 5 is the elected leader.
*/
genNotification( 0x0,
5,
ZxidUtils.makeZxid(15, 0),
0xa,
ServerState.FOLLOWING,
1,
0x17,
outofelection);
genNotification( 0x0,
5,
ZxidUtils.makeZxid(15, 0),
0xa,
ServerState.FOLLOWING,
2,
0x17,
outofelection);
genNotification( 0x1,
5,
ZxidUtils.makeZxid(15, 0),
0xa,
ServerState.FOLLOWING,
4,
0x18,
outofelection);
FastLeaderElection.Notification n = genNotification( 0x1,
5,
ZxidUtils.makeZxid(15, 0),
0xa,
ServerState.LEADING,
5,
0x18,
outofelection);