Commit f622d253 authored by fpj's avatar fpj
Browse files

Adding extensions. -F

parent 601207e1
......@@ -518,6 +518,7 @@ public class ClientCnxn {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
if(watcher == null) continue; // Modification for EZK: Bug fix preventing a NullPointerException
try {
watcher.process(pair.event);
} catch (Throwable t) {
......
......@@ -131,6 +131,8 @@ public abstract class KeeperException extends Exception {
return new SessionMovedException();
case NOTREADONLY:
return new NotReadOnlyException();
case EZKEXTENSIONERROR: // Modification for EZK: Added EZKExtensionException
return new EZKExtensionException();
case OK:
default:
......@@ -345,7 +347,9 @@ public abstract class KeeperException extends Exception {
/** Session moved to another server, so operation is ignored */
SESSIONMOVED (-118),
/** State-changing request is passed to read-only server */
NOTREADONLY (-119);
NOTREADONLY (-119),
/** Error while registering or executing an extension */
EZKEXTENSIONERROR (-120); // Modification for EZK: Added EZKExtensionException
private static final Map<Integer,Code> lookup
= new HashMap<Integer,Code>();
......@@ -422,6 +426,8 @@ public abstract class KeeperException extends Exception {
return "Session moved";
case NOTREADONLY:
return "Not a read-only call";
case EZKEXTENSIONERROR: // Modification for EZK: Added EZKExtensionException
return "Extension error";
default:
return "Unknown error " + code;
}
......@@ -699,4 +705,15 @@ public abstract class KeeperException extends Exception {
super(Code.UNIMPLEMENTED);
}
}
// Modification for EZK: Added EZKExtensionException
public static class EZKExtensionException extends KeeperException {
public EZKExtensionException() {
super(Code.EZKEXTENSIONERROR);
}
public EZKExtensionException(String message) {
super(Code.EZKEXTENSIONERROR, message);
}
}
}
......@@ -53,6 +53,9 @@ public class ZooDefs {
public final int check = 13;
public final int multi = 14;
// Modification for EZK: Add op code for result
public final int ezkResult = 15;
public final int auth = 100;
......
......@@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
......@@ -51,6 +50,7 @@ import org.apache.zookeeper.common.PathTrie;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.extension.EZKConfig;
import org.apache.zookeeper.txn.CheckVersionTxn;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.DeleteTxn;
......@@ -63,6 +63,7 @@ import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class maintains the tree data structure. It doesn't have any networking
* or client connection code in it so that it can be tested in a stand alone
......@@ -726,6 +727,7 @@ public class DataTree {
}
}
// Modification for EZK: Added data field
static public class ProcessTxnResult {
public long clientId;
......@@ -742,7 +744,9 @@ public class DataTree {
public Stat stat;
public List<ProcessTxnResult> multiResult;
public byte[] data;
/**
* Equality is defined as the clientId and the cxid being the same. This
* allows us to use hash tables to track completion of transactions.
......@@ -776,7 +780,7 @@ public class DataTree {
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
{
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
......@@ -795,6 +799,12 @@ public class DataTree {
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(), header.getTime());
// Modification for EZK: Register extension on followers
if(EZKConfig.IS_EXTENSIBLE) {
EZKExtensionManager.instance.finishExtensionRegistration(createTxn.getPath(), createTxn.getData());
}
break;
case OpCode.delete:
DeleteTxn deleteTxn = (DeleteTxn) txn;
......@@ -841,6 +851,7 @@ public class DataTree {
for (Txn subtxn : txns) {
ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
Record record = null;
switch (subtxn.getType()) {
case OpCode.create:
record = new CreateTxn();
......@@ -859,6 +870,11 @@ public class DataTree {
record = new CheckVersionTxn();
break;
default:
// Modification for EZK: Allow SetACLTxns to be included in a multi transaction
if(EZKConfig.IS_EXTENSIBLE) {
if(subtxn.getType() == OpCode.setACL) record = new SetACLTxn();
break;
}
throw new IOException("Invalid type of op: " + subtxn.getType());
}
assert(record != null);
......
......@@ -22,8 +22,8 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
......@@ -32,41 +32,44 @@ import java.util.Locale;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jute.Record;
import org.apache.jute.BinaryOutputArchive;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MultiTransactionRecord;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.extension.EZKConfig;
import org.apache.zookeeper.proto.CheckVersionRequest;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.SetACLRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.proto.CheckVersionRequest;
import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
import org.apache.zookeeper.server.auth.AuthenticationProvider;
import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.quorum.Leader.XidRolloverException;
import org.apache.zookeeper.txn.CheckVersionTxn;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.DeleteTxn;
import org.apache.zookeeper.txn.ErrorTxn;
import org.apache.zookeeper.txn.MultiTxn;
import org.apache.zookeeper.txn.SetACLTxn;
import org.apache.zookeeper.txn.SetDataTxn;
import org.apache.zookeeper.txn.CheckVersionTxn;
import org.apache.zookeeper.txn.Txn;
import org.apache.zookeeper.txn.MultiTxn;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This request processor is generally at the start of a RequestProcessor
......@@ -104,6 +107,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
+ " cport:" + zks.getClientPort() + "):");
this.nextProcessor = nextProcessor;
this.zks = zks;
if(EZKConfig.IS_EXTENSIBLE) EZKExtensionManager.instance.initPrep(zks, this); // Modification for EZK: Give extension manager access to prep request processor
}
/**
......@@ -163,10 +167,15 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
synchronized(n) {
acl = n.acl;
children = n.getChildren();
if(EZKConfig.IS_EXTENSIBLE) {
// Modification for EZK: Added the following line to copy the set of children
children = ((children != null) ? new HashSet<String>(children) : new HashSet<String>());
}
}
lastChange = new ChangeRecord(-1, path, n.stat,
children != null ? children.size() : 0,
zks.getZKDatabase().convertLong(acl));
zks.getZKDatabase().convertLong(acl), n.data, children);
}
}
}
......@@ -317,7 +326,6 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
* @param request
* @param record
*/
@SuppressWarnings("unchecked")
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException
{
......@@ -380,10 +388,11 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
}
parentRecord = parentRecord.duplicate(request.hdr.getZxid());
parentRecord.childCount++;
if(EZKConfig.IS_EXTENSIBLE) parentRecord.children.add(path.substring(lastSlash + 1)); // Modification for EZK: Update children
parentRecord.stat.setCversion(newCversion);
addChangeRecord(parentRecord);
addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
0, listACL));
0, listACL, createRequest.getData(), new HashSet<String>()));
break;
case OpCode.delete:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
......@@ -411,9 +420,10 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
request.txn = new DeleteTxn(path);
parentRecord = parentRecord.duplicate(request.hdr.getZxid());
parentRecord.childCount--;
if(EZKConfig.IS_EXTENSIBLE) parentRecord.children.remove(path.substring(lastSlash + 1)); // Modification for EZK: Update children
addChangeRecord(parentRecord);
addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path,
null, -1, null));
null, -1, null, null, null));
break;
case OpCode.setData:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
......@@ -432,6 +442,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
version = currentVersion + 1;
request.txn = new SetDataTxn(path, setDataRequest.getData(), version);
nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
if(EZKConfig.IS_EXTENSIBLE) nodeRecord.data = setDataRequest.getData(); // Modification for EZK: Update data
nodeRecord.stat.setVersion(version);
addChangeRecord(nodeRecord);
break;
......@@ -485,7 +496,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
}
for (String path2Delete : es) {
addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
path2Delete, null, 0, null));
path2Delete, null, 0, null, null, null));
}
zks.sessionTracker.setSessionClosing(request.sessionId);
......@@ -520,7 +531,6 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
*
* @param request
*/
@SuppressWarnings("unchecked")
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
......@@ -528,6 +538,32 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
request.txn = null;
try {
// Modification for EZK: Hand over control to the extension manager
if(EZKConfig.IS_EXTENSIBLE) {
Record ezkTxn = null;
try {
List<Txn> txns = EZKExtensionManager.instance.invokePreprocessor(request);
if(txns != null) {
request = new Request(request.cnxn, request.sessionId, request.cxid, OpCode.multi, request.request, request.authInfo);
request.hdr = new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), zks.getTime(), OpCode.multi);
ezkTxn = new MultiTxn(txns);
}
} catch(KeeperException ke) {
request.hdr = new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), zks.getTime(), OpCode.error);
ezkTxn = new ErrorTxn(ke.code().intValue());
}
// Forward Txn to next request processor if the request has been handled by the extension manager
if(ezkTxn != null) {
Object owner = request.getOwner();
request.setOwner(owner);
request.txn = ezkTxn;
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
return;
}
}
switch (request.type) {
case OpCode.create:
CreateRequest createRequest = new CreateRequest();
......
......@@ -21,15 +21,18 @@ package org.apache.zookeeper.server;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.extension.EZKConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class manages watches. It allows watches to be associated with a string
......@@ -113,6 +116,17 @@ public class WatchManager {
}
}
}
// Modification for EZK: Handle watches
if(EZKConfig.IS_EXTENSIBLE) {
for(Iterator<Watcher> iterator = watchers.iterator(); iterator.hasNext(); ) {
Watcher watcher = iterator.next();
if(supress != null && supress.contains(watcher)) continue;
boolean handled = EZKExtensionManager.instance.invokeWatchProcessor(type, path, watcher);
if(handled) iterator.remove();
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
......
......@@ -34,13 +34,12 @@ import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.extension.EZKConfig;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
......@@ -49,6 +48,9 @@ import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPacket;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class maintains the in memory database of zookeeper
......@@ -221,6 +223,7 @@ public class ZKDatabase {
};
long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
if(EZKConfig.IS_EXTENSIBLE) EZKExtensionManager.instance.init(this);
initialized = true;
return zxid;
}
......
......@@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import javax.security.sasl.SaslException;
......@@ -44,6 +45,7 @@ import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.extension.EZKConfig;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.proto.AuthPacket;
import org.apache.zookeeper.proto.ConnectRequest;
......@@ -67,6 +69,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class implements a simple standalone ZooKeeperServer. It sets up the
* following chain of RequestProcessors to process requests:
......@@ -490,18 +493,22 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return requestsInProcess;
}
// Modification for EZK: Added data field
// Modification for EZK: Added set of children
/**
* This structure is used to facilitate information sharing between PrepRP
* and FinalRP.
*/
static class ChangeRecord {
ChangeRecord(long zxid, String path, StatPersisted stat, int childCount,
List<ACL> acl) {
List<ACL> acl, byte[] data, Set<String> children) {
this.zxid = zxid;
this.path = path;
this.stat = stat;
this.childCount = childCount;
this.acl = acl;
this.data = EZKConfig.IS_EXTENSIBLE ? data : null;
this.children = EZKConfig.IS_EXTENSIBLE ? children : null;
}
long zxid;
......@@ -514,6 +521,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
List<ACL> acl; /* Make sure to create a new object when changing */
byte[] data;
Set<String> children;
@SuppressWarnings("unchecked")
ChangeRecord duplicate(long zxid) {
StatPersisted stat = new StatPersisted();
......@@ -521,7 +532,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
DataTree.copyStatPersisted(this.stat, stat);
}
return new ChangeRecord(zxid, path, stat, childCount,
acl == null ? new ArrayList<ACL>() : new ArrayList(acl));
acl == null ? new ArrayList<ACL>() : new ArrayList(acl), data, children);
}
}
......@@ -1003,6 +1014,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
+ txn.toString());
}
} else if (opCode == OpCode.closeSession) {
if(EZKConfig.IS_EXTENSIBLE) EZKExtensionManager.instance.deleteExtension(sessionId);
sessionTracker.removeSession(sessionId);
}
return rc;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment