Commit f411e56f authored by Patrick D. Hunt's avatar Patrick D. Hunt
Browse files

ZOOKEEPER-1972. Fix invalid volatile long/int increment (++) (Hongchao Deng via phunt)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1612015 13f79535-47bb-0310-9956-ffa450edef68
parent 94210618
......@@ -711,6 +711,9 @@ BUGFIXES:
ZOOKEEPER-1807. Observers spam each other creating connections to the
election addr (Alex Shraer via fpj)
ZOOKEEPER-1972. Fix invalid volatile long/int increment (++)
(Hongchao Deng via phunt)
IMPROVEMENTS:
ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
......
......@@ -34,6 +34,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -284,7 +285,7 @@ public class AuthFastLeaderElection implements Election {
// Receive challenge request
ToSend c = new ToSend(ToSend.mType.challenge, tag,
current.getId(), current.getZxid(),
logicalclock, self.getPeerState(),
logicalclock.get(), self.getPeerState(),
(InetSocketAddress) responsePacket
.getSocketAddress());
sendqueue.offer(c);
......@@ -328,7 +329,7 @@ public class AuthFastLeaderElection implements Election {
ToSend a = new ToSend(ToSend.mType.ack,
tag, current.getId(),
current.getZxid(),
logicalclock, self.getPeerState(),
logicalclock.get(), self.getPeerState(),
addr);
sendqueue.offer(a);
......@@ -347,7 +348,7 @@ public class AuthFastLeaderElection implements Election {
ToSend a = new ToSend(ToSend.mType.ack, tag,
current.getId(), current.getZxid(),
logicalclock, self.getPeerState(),
logicalclock.get(), self.getPeerState(),
(InetSocketAddress) responsePacket
.getSocketAddress());
......@@ -662,7 +663,7 @@ public class AuthFastLeaderElection implements Election {
* Return message to queue for another attempt later if
* epoch hasn't changed.
*/
if (m.epoch == logicalclock) {
if (m.epoch == logicalclock.get()) {
challengeMap.remove(m.tag);
sendqueue.offer(m);
}
......@@ -734,7 +735,7 @@ public class AuthFastLeaderElection implements Election {
QuorumPeer self;
int port;
volatile long logicalclock; /* Election instance */
AtomicLong logicalclock = new AtomicLong(); /* Election instance */
DatagramSocket mySocket;
long proposedLeader;
long proposedZxid;
......@@ -769,7 +770,7 @@ public class AuthFastLeaderElection implements Election {
}
private void leaveInstance() {
logicalclock++;
logicalclock.incrementAndGet();
}
private void sendNotifications() {
......@@ -777,7 +778,7 @@ public class AuthFastLeaderElection implements Election {
ToSend notmsg = new ToSend(ToSend.mType.notification,
AuthFastLeaderElection.sequencer++, proposedLeader,
proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING,
proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING,
self.getView().get(server.id).electionAddr);
sendqueue.offer(notmsg);
......@@ -843,7 +844,7 @@ public class AuthFastLeaderElection implements Election {
HashMap<InetSocketAddress, Vote> outofelection =
new HashMap<InetSocketAddress, Vote>();
logicalclock++;
logicalclock.incrementAndGet();
proposedLeader = self.getId();
proposedZxid = self.getLastLoggedZxid();
......@@ -873,15 +874,15 @@ public class AuthFastLeaderElection implements Election {
} else
switch (n.state) {
case LOOKING:
if (n.epoch > logicalclock) {
logicalclock = n.epoch;
if (n.epoch > logicalclock.get()) {
logicalclock.set( n.epoch );
recvset.clear();
if (totalOrderPredicate(n.leader, n.zxid)) {
proposedLeader = n.leader;
proposedZxid = n.zxid;
}
sendNotifications();
} else if (n.epoch < logicalclock) {
} else if (n.epoch < logicalclock.get()) {
break;
} else if (totalOrderPredicate(n.leader, n.zxid)) {
proposedLeader = n.leader;
......
......@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.ZooKeeperThread;
......@@ -321,7 +322,7 @@ public class FastLeaderElection implements Election {
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock,
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
......@@ -382,13 +383,13 @@ public class FastLeaderElection implements Election {
* lagging behind.
*/
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock)){
&& (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock,
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
......@@ -526,7 +527,7 @@ public class FastLeaderElection implements Election {
QuorumPeer self;
Messenger messenger;
volatile long logicalclock; /* Election instance */
AtomicLong logicalclock = new AtomicLong(); /* Election instance */
long proposedLeader;
long proposedZxid;
long proposedEpoch;
......@@ -536,7 +537,7 @@ public class FastLeaderElection implements Election {
* Returns the current vlue of the logical clock counter
*/
public long getLogicalClock(){
return logicalclock;
return logicalclock.get();
}
static ByteBuffer buildMsg(int state,
......@@ -663,13 +664,13 @@ public class FastLeaderElection implements Election {
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock) +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
......@@ -776,7 +777,7 @@ public class FastLeaderElection implements Election {
if(leader != self.getId()){
if(votes.get(leader) == null) predicate = false;
else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;
} else if(logicalclock != electionEpoch) {
} else if(logicalclock.get() != electionEpoch) {
predicate = false;
}
......@@ -880,7 +881,7 @@ public class FastLeaderElection implements Election {
int notTimeout = finalizeWait;
synchronized(this){
logicalclock++;
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
......@@ -928,8 +929,8 @@ public class FastLeaderElection implements Election {
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock) {
logicalclock = n.electionEpoch;
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
......@@ -940,11 +941,11 @@ public class FastLeaderElection implements Election {
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock) {
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock));
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
......@@ -964,7 +965,7 @@ public class FastLeaderElection implements Election {
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock, proposedEpoch))) {
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
......@@ -1000,7 +1001,7 @@ public class FastLeaderElection implements Election {
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock){
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
......@@ -1034,7 +1035,7 @@ public class FastLeaderElection implements Election {
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock = n.electionEpoch;
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
......
......@@ -414,7 +414,7 @@ public class Leader {
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
self.tick = 0;
self.tick.set(0);
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
......@@ -558,7 +558,7 @@ public class Leader {
}
if (!tickSkip) {
self.tick++;
self.tick.incrementAndGet();
}
// We use an instance of SyncedLearnerTracker to
......
......@@ -322,7 +322,7 @@ public class LearnerHandler extends ZooKeeperThread {
@Override
public void run() {
try {
tickOfNextAckDeadline = leader.self.tick
tickOfNextAckDeadline = leader.self.tick.get()
+ leader.self.initLimit + leader.self.syncLimit;
ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
......@@ -499,7 +499,7 @@ public class LearnerHandler extends ZooKeeperThread {
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
tickOfNextAckDeadline = leader.self.tick + leader.self.syncLimit;
tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;
ByteBuffer bb;
......@@ -931,7 +931,7 @@ public class LearnerHandler extends ZooKeeperThread {
public boolean synced() {
return isAlive()
&& leader.self.tick <= tickOfNextAckDeadline;
&& leader.self.tick.get() <= tickOfNextAckDeadline;
}
/**
......
......@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.KeeperException.NoNodeException;
......@@ -434,7 +435,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
/**
* The current tick
*/
protected volatile int tick;
protected AtomicInteger tick = new AtomicInteger();
/**
* Whether or not to listen on all IPs for the two quorum ports
......@@ -1257,7 +1258,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
* Get the current tick
*/
public int getTick() {
return tick;
return tick.get();
}
public QuorumVerifier configFromString(String s) throws IOException, ConfigException{
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment