Commit d09d545a authored by Flavio Paiva Junqueira's avatar Flavio Paiva Junqueira
Browse files

ZOOKEEPER-1808. Add version to FLE notifications for 3.4 branch (fpj)



git-svn-id: https://svn.apache.org/repos/asf/zookeeper/branches/branch-3.4@1542489 13f79535-47bb-0310-9956-ffa450edef68
parent dcbd7522
......@@ -155,6 +155,8 @@ BUGFIXES:
ZOOKEEPER-1798. Fix race condition in testNormalObserverRun
(thawan, fpj via thawan)
ZOOKEEPER-1808. Add version to FLE notifications for 3.4 branch (fpj)
ZOOKEEPER-1812. ZooInspector reconnection always fails if first
connection fails (Benjamin Jaton via phunt)
......
......@@ -92,6 +92,13 @@ public class FastLeaderElection implements Election {
*/
static public class Notification {
/*
* Format version, introduced in 3.4.6
*/
public final static int CURRENTVERSION = 0x1;
int version;
/*
* Proposed leader
*/
......@@ -121,6 +128,39 @@ public class FastLeaderElection implements Election {
* epoch of the proposed leader
*/
long peerEpoch;
@Override
public String toString() {
return new String(Long.toHexString(version) + " (message format version), "
+ leader + " (n.leader), 0x"
+ Long.toHexString(zxid) + " (n.zxid), 0x"
+ Long.toHexString(electionEpoch) + " (n.round), " + state
+ " (n.state), " + sid + " (n.sid), 0x"
+ Long.toHexString(peerEpoch) + " (n.peerEpoch) ");
}
}
static ByteBuffer buildMsg(int state,
long leader,
long zxid,
long electionEpoch,
long epoch) {
byte requestBytes[] = new byte[40];
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
/*
* Building notification packet to send
*/
requestBuffer.clear();
requestBuffer.putInt(state);
requestBuffer.putLong(leader);
requestBuffer.putLong(zxid);
requestBuffer.putLong(electionEpoch);
requestBuffer.putLong(epoch);
requestBuffer.putInt(Notification.CURRENTVERSION);
return requestBuffer;
}
/**
......@@ -188,7 +228,7 @@ public class FastLeaderElection implements Election {
* spawns a new thread.
*/
private class Messenger {
protected class Messenger {
/**
* Receives messages from instance of QuorumCnxManager on
......@@ -250,6 +290,9 @@ public class FastLeaderElection implements Election {
boolean backCompatibility = (response.buffer.capacity() == 28);
response.buffer.clear();
// Instantiate Notification and set its attributes
Notification n = new Notification();
// State of peer that sent this message
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (response.buffer.getInt()) {
......@@ -265,10 +308,10 @@ public class FastLeaderElection implements Election {
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
// Instantiate Notification and set its attributes
Notification n = new Notification();
n.leader = response.buffer.getLong();
n.zxid = response.buffer.getLong();
n.electionEpoch = response.buffer.getLong();
......@@ -283,6 +326,12 @@ public class FastLeaderElection implements Election {
n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
}
/*
* Version added in 3.4.6
*/
n.version = (response.buffer.remaining() >= 4) ? response.buffer.getInt() : 0x0;
/*
* Print notification info
*/
......@@ -383,23 +432,13 @@ public class FastLeaderElection implements Election {
*
* @param m message to send
*/
private void process(ToSend m) {
byte requestBytes[] = new byte[36];
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
/*
* Building notification packet to send
*/
requestBuffer.clear();
requestBuffer.putInt(m.state.ordinal());
requestBuffer.putLong(m.leader);
requestBuffer.putLong(m.zxid);
requestBuffer.putLong(m.electionEpoch);
requestBuffer.putLong(m.peerEpoch);
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch);
manager.toSend(m.sid, requestBuffer);
}
}
......@@ -547,11 +586,7 @@ public class FastLeaderElection implements Election {
private void printNotification(Notification n){
LOG.info("Notification: " + n.leader + " (n.leader), 0x"
+ Long.toHexString(n.zxid) + " (n.zxid), 0x"
+ Long.toHexString(n.electionEpoch) + " (n.round), " + n.state
+ " (n.state), " + n.sid + " (n.sid), 0x"
+ Long.toHexString(n.peerEpoch) + " (n.peerEPoch), "
LOG.info("Notification: " + n.toString()
+ self.getPeerState() + " (my state)");
}
......
......@@ -72,7 +72,7 @@ public class QuorumCnxManager {
// stale notifications to peers
static final int SEND_CAPACITY = 1;
static final int PACKETMAXSIZE = 1024 * 1024;
static final int PACKETMAXSIZE = 1024 * 512;
/*
* Maximum number of attempts to connect to a peer
*/
......@@ -129,6 +129,7 @@ public class QuorumCnxManager {
private AtomicInteger threadCnt = new AtomicInteger(0);
static public class Message {
Message(ByteBuffer buffer, long sid) {
this.buffer = buffer;
this.sid = sid;
......
......@@ -23,7 +23,9 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
public class Vote {
public Vote(long id, long zxid) {
public Vote(long id,
long zxid) {
this.version = 0x0;
this.id = id;
this.zxid = zxid;
this.electionEpoch = -1;
......@@ -31,7 +33,10 @@ public class Vote {
this.state = ServerState.LOOKING;
}
public Vote(long id, long zxid, long peerEpoch) {
public Vote(long id,
long zxid,
long peerEpoch) {
this.version = 0x0;
this.id = id;
this.zxid = zxid;
this.electionEpoch = -1;
......@@ -39,7 +44,11 @@ public class Vote {
this.state = ServerState.LOOKING;
}
public Vote(long id, long zxid, long electionEpoch, long peerEpoch) {
public Vote(long id,
long zxid,
long electionEpoch,
long peerEpoch) {
this.version = 0x0;
this.id = id;
this.zxid = zxid;
this.electionEpoch = electionEpoch;
......@@ -47,7 +56,13 @@ public class Vote {
this.state = ServerState.LOOKING;
}
public Vote(long id, long zxid, long electionEpoch, long peerEpoch, ServerState state) {
public Vote(int version,
long id,
long zxid,
long electionEpoch,
long peerEpoch,
ServerState state) {
this.version = version;
this.id = id;
this.zxid = zxid;
this.electionEpoch = electionEpoch;
......@@ -55,6 +70,21 @@ public class Vote {
this.peerEpoch = peerEpoch;
}
public Vote(long id,
long zxid,
long electionEpoch,
long peerEpoch,
ServerState state) {
this.id = id;
this.zxid = zxid;
this.electionEpoch = electionEpoch;
this.state = state;
this.peerEpoch = peerEpoch;
this.version = 0x0;
}
final private int version;
final private long id;
final private long zxid;
......@@ -63,6 +93,10 @@ public class Vote {
final private long peerEpoch;
public int getVersion() {
return version;
}
public long getId() {
return id;
}
......@@ -91,10 +125,13 @@ public class Vote {
return false;
}
Vote other = (Vote) o;
return (id == other.id && zxid == other.zxid && electionEpoch == other.electionEpoch && peerEpoch == other.peerEpoch);
return (id == other.id
&& zxid == other.zxid
&& electionEpoch == other.electionEpoch
&& peerEpoch == other.peerEpoch);
}
@Override
public int hashCode() {
return (int) (id & zxid);
......
......@@ -74,20 +74,7 @@ public class CnxManagerTest extends ZKTestCase {
}
ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
byte requestBytes[] = new byte[28];
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
/*
* Building notification packet to send
*/
requestBuffer.clear();
requestBuffer.putInt(state);
requestBuffer.putLong(leader);
requestBuffer.putLong(zxid);
requestBuffer.putLong(epoch);
return requestBuffer;
return FastLeaderElection.buildMsg(state, leader, zxid, 0, epoch);
}
class CnxManagerThread extends Thread {
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.zookeeper.test;
package org.apache.zookeeper.server.quorum;
import java.io.File;
import java.io.IOException;
......@@ -32,12 +32,12 @@ import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.Vote;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.test.ClientBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.zookeeper.test.FLETestUtils.LEThread;
public class FLEBackwardElectionRoundTest extends ZKTestCase {
protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
......@@ -118,7 +118,8 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
QuorumCnxManager.Listener listener = cnxManagers[0].listener;
listener.start();
cnxManagers[0].toSend(0l, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1));
ByteBuffer msg = FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1);
cnxManagers[0].toSend(0l, msg);
/*
* Start mock server 2
......@@ -128,7 +129,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
listener = cnxManagers[1].listener;
listener.start();
cnxManagers[1].toSend(0l, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1));
cnxManagers[1].toSend(0l, msg);
/*
* Run another instance of leader election.
......@@ -140,8 +141,8 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
/*
* Send the same messages, this time should not make 0 the leader.
*/
cnxManagers[0].toSend(0l, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1));
cnxManagers[1].toSend(0l, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1));
cnxManagers[0].toSend(0l, msg);
cnxManagers[1].toSend(0l, msg);
thread.join(5000);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import java.io.File;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.server.quorum.FastLeaderElection.Notification;
import org.apache.zookeeper.server.quorum.FastLeaderElection.ToSend;
import org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver;
import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.FLETest;
import org.apache.zookeeper.test.QuorumBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FLECompatibilityTest extends ZKTestCase {
protected static final Logger LOG = LoggerFactory.getLogger(FLECompatibilityTest.class);
int count;
HashMap<Long,QuorumServer> peers;
File tmpdir[];
int port[];
@Before
public void setUp() throws Exception {
count = 3;
peers = new HashMap<Long,QuorumServer>(count);
tmpdir = new File[count];
port = new int[count];
}
@After
public void tearDown() throws Exception {
}
class MockFLEMessengerBackward {
QuorumCnxManager manager;
QuorumPeer self;
long logicalclock = 1L;
LinkedBlockingQueue<ToSend> sendqueue = new LinkedBlockingQueue<ToSend>();
LinkedBlockingQueue<ToSend> internalqueue = new LinkedBlockingQueue<ToSend>();
LinkedBlockingQueue<Notification> recvqueue = new LinkedBlockingQueue<Notification>();
WorkerReceiver wr;
MockFLEMessengerBackward(QuorumPeer self, QuorumCnxManager manager){
this.manager = manager;
this.self = self;
this.wr = new WorkerReceiver(manager);
Thread t = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
}
void halt() {
wr.stop = true;
}
/*
* This class has been copied from before adding versions to notifications.
*
* {@see https://issues.apache.org/jira/browse/ZOOKEEPER-1808}
*/
class WorkerReceiver implements Runnable {
volatile boolean stop;
QuorumCnxManager manager;
final long proposedLeader = 2;
final long proposedZxid = 0x1;
final long proposedEpoch = 1;
WorkerReceiver(QuorumCnxManager manager) {
this.stop = false;
this.manager = manager;
}
/*
* The vote we return here is fixed for test purposes.
*/
Vote getVote(){
return new Vote(proposedLeader, proposedZxid, proposedEpoch);
}
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try{
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
/*
* If it is from an observer, respond right away.
* Note that the following predicate assumes that
* if a server is not a follower, then it must be
* an observer. If we ever have any other type of
* learner in the future, we'll have to change the
* way we check for observers.
*/
if(!self.getVotingView().containsKey(response.sid)){
Vote current = self.getCurrentVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
current.getPeerEpoch());
internalqueue.offer(notmsg);
} else {
// Receive new message
if (LOG.isDebugEnabled()) {
LOG.debug("Receive new notification message. My id = "
+ self.getId());
}
/*
* We check for 28 bytes for backward compatibility
*/
if (response.buffer.capacity() < 28) {
LOG.error("Got a short response: "
+ response.buffer.capacity());
continue;
}
boolean backCompatibility = (response.buffer.capacity() == 28);
response.buffer.clear();
// State of peer that sent this message
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (response.buffer.getInt()) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
}
// Instantiate Notification and set its attributes
Notification n = new Notification();
n.leader = response.buffer.getLong();
n.zxid = response.buffer.getLong();
n.electionEpoch = response.buffer.getLong();
n.state = ackstate;
n.sid = response.sid;
if(!backCompatibility){
n.peerEpoch = response.buffer.getLong();
} else {
if(LOG.isInfoEnabled()){
LOG.info("Backward compatibility mode, server id=" + n.sid);
}
n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
}
/*
* If this server is looking, then send proposed leader
*/
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock)){
Vote v = getVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
v.getPeerEpoch());
internalqueue.offer(notmsg);
}
} else {
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id = " +
self.getId() + " recipient=" +
response.sid + " zxid=0x" +
Long.toHexString(current.getZxid()) +
" leader=" + current.getId());
}
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),