Commit d0a172f1 authored by Flavio Paiva Junqueira's avatar Flavio Paiva Junqueira
Browse files

ZOOKEEPER-1807. Observers spam each other creating connections to the election...

ZOOKEEPER-1807. Observers spam each other creating connections to the election addr (Alex Shraer via fpj)



git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1611765 13f79535-47bb-0310-9956-ffa450edef68
parent 468d241f
......@@ -708,6 +708,9 @@ BUGFIXES:
ZOOKEEPER-1851. Follower and Observer Request Processors Do Not Forward
create2 Requests (Chris Chen via rakeshr)
ZOOKEEPER-1807. Observers spam each other creating connections to the
election addr (Alex Shraer via fpj)
IMPROVEMENTS:
ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
......
......@@ -315,7 +315,7 @@ public class FastLeaderElection implements Election {
* If it is from a non-voting server (such as an observer or
* a non-voting follower), respond right away.
*/
if(!self.getVotingView().containsKey(response.sid)){
if(!self.getCurrentAndNextConfigVoters().contains(response.sid)) {
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
......@@ -658,7 +658,7 @@ public class FastLeaderElection implements Election {
* Send notifications to all peers upon a change in our vote
*/
private void sendNotifications() {
for (long sid : self.getAllKnownServerIds()) {
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
......@@ -718,31 +718,36 @@ public class FastLeaderElection implements Election {
}
/**
* Termination predicate. Given a set of votes, determines if
* have sufficient to declare the end of the election round.
*
* @param votes Set of votes
* @param vote Identifier of the vote received last
* Termination predicate. Given a set of votes, determines if have
* sufficient to declare the end of the election round.
*
* @param votes
* Set of votes
* @param vote
* Identifier of the vote received last
*/
private boolean termPredicate(
HashMap<Long, Vote> votes,
Vote vote) {
HashSet<Long> set = new HashSet<Long>();
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
/*
* First make the views consistent. Sometimes peers will have
* different zxids for a server depending on timing.
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
if (self.getQuorumVerifier().getVotingMembers().containsKey(entry.getKey())
&& vote.equals(entry.getValue())){
set.add(entry.getKey());
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
return self.getQuorumVerifier().containsQuorum(set);
return voteSet.hasAllQuorums();
}
/**
* In the case there is a leader elected, and a quorum supporting
* this leader, we have to check if the leader has voted and acked
......@@ -914,10 +919,10 @@ public class FastLeaderElection implements Election {
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if(self.getVotingView().containsKey(n.sid)) {
}
else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
/*
* Only proceed if the vote comes from a replica in the
* Only proceed if the vote comes from a replica in the current or next
* voting view.
*/
switch (n.state) {
......
......@@ -1107,12 +1107,14 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
return getQuorumVerifier().getObservingMembers();
}
public synchronized Set<Long> getAllKnownServerIds(){
Set<Long> tmp = new HashSet<Long>(getQuorumVerifier().getAllMembers().keySet());
if (getLastSeenQuorumVerifier()!=null) {
tmp.addAll(getLastSeenQuorumVerifier().getAllMembers().keySet());
}
return tmp;
public synchronized Set<Long> getCurrentAndNextConfigVoters() {
Set<Long> voterIds = new HashSet<Long>(getQuorumVerifier()
.getVotingMembers().keySet());
if (getLastSeenQuorumVerifier() != null) {
voterIds.addAll(getLastSeenQuorumVerifier().getVotingMembers()
.keySet());
}
return voterIds;
}
/**
......
......@@ -55,21 +55,6 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
}
}
public static class MainThreadReconfigRecovery extends MainThread {
final File nextDynamicConfigFile;
public MainThreadReconfigRecovery(int myid, int clientPort,
String currentQuorumCfgSection, String nextQuorumCfgSection)
throws IOException {
super(myid, clientPort, currentQuorumCfgSection);
nextDynamicConfigFile = new File(tmpDir, "zoo.dynamic.next");
FileWriter fwriter = new FileWriter(nextDynamicConfigFile);
fwriter.write(nextQuorumCfgSection + "\n");
fwriter.flush();
fwriter.close();
}
}
public static class MainThread implements Runnable {
final File confFile;
final File dynamicConfigFile;
......@@ -141,7 +126,16 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
fwriter.flush();
fwriter.close();
}
public void writeTempDynamicConfigFile(String nextQuorumCfgSection)
throws IOException {
File nextDynamicConfigFile = new File(tmpDir, "zoo.dynamic.next");
FileWriter fwriter = new FileWriter(nextDynamicConfigFile);
fwriter.write(nextQuorumCfgSection + "\n");
fwriter.flush();
fwriter.close();
}
Thread currentThread;
synchronized public void start() {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment