Commit 3368d143 authored by Camille Fournier's avatar Camille Fournier
Browse files

ZOOKEEPER-1863. Race condition in commit processor leading to out of order...

ZOOKEEPER-1863. Race condition in commit processor leading to out of order request completion, xid mismatch on client. (fpj and Dutch T Meyer via camille)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1610861 13f79535-47bb-0310-9956-ffa450edef68
parent b41f8af3
......@@ -690,6 +690,9 @@ BUGFIXES:
stat is not null (Michi Mutsuzaki via rakeshr)
ZOOKEEPER-1964. Fix Flaky Test in ReconfigTest.java (Hongchao Deng via fpj)
ZOOKEEPER-1863. Race condition in commit processor leading to out of order
request completion, xid mismatch on client. (fpj and Dutch T Meyer via camille)
IMPROVEMENTS:
......
......@@ -77,30 +77,30 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
/**
* Requests that we are holding until the commit comes in.
*/
private final LinkedBlockingQueue<Request> queuedRequests =
protected final LinkedBlockingQueue<Request> queuedRequests =
new LinkedBlockingQueue<Request>();
/**
* Requests that have been committed.
*/
private final LinkedBlockingQueue<Request> committedRequests =
protected final LinkedBlockingQueue<Request> committedRequests =
new LinkedBlockingQueue<Request>();
/** Request for which we are currently awaiting a commit */
private final AtomicReference<Request> nextPending =
protected final AtomicReference<Request> nextPending =
new AtomicReference<Request>();
/** Request currently being committed (ie, sent off to next processor) */
private final AtomicReference<Request> currentlyCommitting =
new AtomicReference<Request>();
/** The number of requests currently being processed */
private AtomicInteger numRequestsProcessing = new AtomicInteger(0);
protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);
RequestProcessor nextProcessor;
private volatile boolean stopped = true;
protected volatile boolean stopped = true;
private long workerShutdownTimeoutMS;
private WorkerService workerPool;
protected WorkerService workerPool;
/**
* This flag indicates whether we need to wait for a response to come back from the
......@@ -181,37 +181,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
* came in for the pending request. We can only commit a
* request when there is no other request being processed.
*/
if (!stopped && !isProcessingRequest() &&
(request = committedRequests.poll()) != null) {
/*
* We match with nextPending so that we can move to the
* next request when it is committed. We also want to
* use nextPending because it has the cnxn member set
* properly.
*/
Request pending = nextPending.get();
if (pending != null &&
pending.sessionId == request.sessionId &&
pending.cxid == request.cxid) {
// we want to send our version of the request.
// the pointer to the connection in the request
pending.setHdr(request.getHdr());
pending.setTxn(request.getTxn());
pending.zxid = request.zxid;
// Set currentlyCommitting so we will block until this
// completes. Cleared by CommitWorkRequest after
// nextProcessor returns.
currentlyCommitting.set(pending);
nextPending.set(null);
sendToNextProcessor(pending);
} else {
// this request came from someone else so just
// send the commit packet
currentlyCommitting.set(request);
sendToNextProcessor(request);
}
}
processCommitted();
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
......@@ -221,6 +191,56 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
LOG.info("CommitProcessor exited loop!");
}
/*
* Separated this method from the main run loop
* for test purposes (ZOOKEEPER-1863)
*/
protected void processCommitted() {
Request request;
if (!stopped && !isProcessingRequest() &&
(committedRequests.peek() != null)) {
/*
* ZOOKEEPER-1863: continue only if there is no new request
* waiting in queuedRequests or it is waiting for a
* commit.
*/
if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {
return;
}
request = committedRequests.poll();
/*
* We match with nextPending so that we can move to the
* next request when it is committed. We also want to
* use nextPending because it has the cnxn member set
* properly.
*/
Request pending = nextPending.get();
if (pending != null &&
pending.sessionId == request.sessionId &&
pending.cxid == request.cxid) {
// we want to send our version of the request.
// the pointer to the connection in the request
pending.setHdr(request.getHdr());
pending.setTxn(request.getTxn());
pending.zxid = request.zxid;
// Set currentlyCommitting so we will block until this
// completes. Cleared by CommitWorkRequest after
// nextProcessor returns.
currentlyCommitting.set(pending);
nextPending.set(null);
sendToNextProcessor(pending);
} else {
// this request came from someone else so just
// send the commit packet
currentlyCommitting.set(request);
sendToNextProcessor(request);
}
}
}
@Override
public void start() {
int numCores = Runtime.getRuntime().availableProcessors();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.apache.jute.BinaryOutputArchive;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.GetDataRequest;
import org.apache.zookeeper.proto.SyncRequest;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.WorkerService;
import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CommitProcessorConcurrencyTest {
protected static final Logger LOG =
LoggerFactory.getLogger(CommitProcessorConcurrencyTest.class);
Boolean executedFlag = false;
MockCommitProcessor processor;
@Before
public void setUp() throws Exception {
processor = new MockCommitProcessor();
}
@After
public void tearDown() throws Exception {
processor.shutdown();
}
class MockCommitProcessor extends CommitProcessor {
MockCommitProcessor() {
super(
new RequestProcessor() {
public void processRequest(Request request)
throws RequestProcessorException {
executedFlag = true;
}
public void shutdown(){}
},
"0",
false);
}
public void testStart() {
this.stopped = false;
this.workerPool = new WorkerService(
"CommitProcWork", 1, true);
}
public void addToCommittedRequests(Request req) {
this.committedRequests.add(req);
}
public void addToNextPending(Request req) {
this.nextPending.set(req);
}
public void addToQueuedRequests(Request req) {
//this.numRequestsProcessing.incrementAndGet();
this.queuedRequests.add(req);
}
public void testProcessCommitted() {
this.processCommitted();
}
@Override
public void shutdown() {
this.workerPool.stop();
}
}
/*
* We populate the necessary data structures in the CommitProcessor
* instance and run processCommitted
*/
@Test
public void raceTest()
throws Exception {
ByteArrayOutputStream boas = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
GetDataRequest getReq = new GetDataRequest("/testrace", false);
getReq.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
Request readReq = new Request(null, 0x0, 0, OpCode.getData,
bb, new ArrayList<Id>());
boas.reset();
SyncRequest syncReq = new SyncRequest("/testrace");
syncReq.serialize(boa, "request");
bb = ByteBuffer.wrap(boas.toByteArray());
Request writeReq = new Request(null, 0x0, 0, OpCode.sync,
bb, new ArrayList<Id>());
processor.addToCommittedRequests(writeReq);
processor.addToQueuedRequests(readReq);
processor.addToQueuedRequests(writeReq);
processor.testStart();
processor.testProcessCommitted();
Assert.assertFalse("Next request processor executed", executedFlag);
}
}
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment