Commit 38e21d3b authored by Tobias Distler's avatar Tobias Distler
Browse files

Adding extensions.

parent f622d253
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This script cleans up old transaction logs and snapshots
#
#
# If this scripted is run out of /usr/bin or some other system bin directory
# it should be linked to and not copied. Things like java jar files are found
# relative to the canonical path of this script.
#
# use POSTIX interface, symlink is followed automatically
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
. "$ZOOBINDIR"/../libexec/zkEnv.sh
else
. "$ZOOBINDIR"/zkEnv.sh
fi
"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-cp "$CLASSPATH:$ZOOBINDIR/../build/recipes/value/classes" $CLIENT_JVMFLAGS $JVMFLAGS \
org.apache.zookeeper.recipes.value.EZKSharedValueDemo $ZOOBINDIR/../src/recipes/value/src/java/ "$@"
package org.apache.zookeeper.extension;
import java.util.List;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.CreateResponse;
import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.ExistsRequest;
import org.apache.zookeeper.proto.ExistsResponse;
import org.apache.zookeeper.proto.GetACLRequest;
import org.apache.zookeeper.proto.GetACLResponse;
import org.apache.zookeeper.proto.GetChildren2Request;
import org.apache.zookeeper.proto.GetChildren2Response;
import org.apache.zookeeper.proto.GetChildrenRequest;
import org.apache.zookeeper.proto.GetChildrenResponse;
import org.apache.zookeeper.proto.GetDataRequest;
import org.apache.zookeeper.proto.GetDataResponse;
import org.apache.zookeeper.proto.SetACLRequest;
import org.apache.zookeeper.proto.SetACLResponse;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.proto.SetDataResponse;
import org.apache.zookeeper.server.EZKExtensionGate;
import org.apache.zookeeper.server.Request;
public abstract class EZKBaseExtension implements EZKExtension {
protected EZKExtensionGate extensionGate;
@Override
public final void setExtensionGate(EZKExtensionGate extensionGate) {
this.extensionGate = extensionGate;
}
// ################
// # SUBSCRIPTION #
// ################
@Override
public boolean matchesOperation(int requestType, String path) {
return false;
}
@Override
public boolean matchesEvent(EventType type, String path) {
return false;
}
// #############
// # EXECUTION #
// #############
@Override
public void init() throws KeeperException {
// Override in sub classes if necessary
}
@Override
public final void handleOperation(Request request, Record record) throws KeeperException {
switch(request.type) {
case OpCode.create:
CreateRequest createRequest = (CreateRequest) record;
String path = create(createRequest.getPath(), createRequest.getData(), createRequest.getAcl(), CreateMode.fromFlag(createRequest.getFlags()));
extensionGate.setResult(new CreateResponse(path));
break;
case OpCode.delete:
DeleteRequest deleteRequest = (DeleteRequest) record;
delete(deleteRequest.getPath(), deleteRequest.getVersion());
break;
case OpCode.exists:
ExistsRequest existsRequest = (ExistsRequest) record;
Stat existsStat = exists(existsRequest.getPath(), existsRequest.getWatch());
if(existsStat == null) existsStat = EZKExtensionGate.EXCEPTION_STAT;
extensionGate.setResult(new ExistsResponse(existsStat));
break;
case OpCode.getACL:
GetACLRequest aclRequest = (GetACLRequest) record;
Stat stat = new Stat();
List<ACL> acls = getACL(aclRequest.getPath(), stat);
extensionGate.setResult(new GetACLResponse(acls, stat));
break;
case OpCode.getChildren:
GetChildrenRequest getChildrenRequest = (GetChildrenRequest) record;
List<String> children = getChildren(getChildrenRequest.getPath(), getChildrenRequest.getWatch(), null);
extensionGate.setResult(new GetChildrenResponse(children));
break;
case OpCode.getChildren2:
GetChildren2Request getChildren2Request = (GetChildren2Request) record;
stat = new Stat();
List<String> children2 = getChildren(getChildren2Request.getPath(), getChildren2Request.getWatch(), stat);
extensionGate.setResult(new GetChildren2Response(children2, stat));
break;
case OpCode.getData:
GetDataRequest getDataRequest = (GetDataRequest) record;
Stat getDataStat = new Stat();
byte[] data = getData(getDataRequest.getPath(), getDataRequest.getWatch(), getDataStat);
extensionGate.setResult(new GetDataResponse(data, getDataStat));
break;
case OpCode.setACL:
SetACLRequest setACLRequest = (SetACLRequest) record;
stat = setACL(setACLRequest.getPath(), setACLRequest.getAcl(), setACLRequest.getVersion());
extensionGate.setResult(new SetACLResponse(stat));
break;
case OpCode.setData:
SetDataRequest setDataRequest = (SetDataRequest) record;
stat = setData(setDataRequest.getPath(), setDataRequest.getData(), setDataRequest.getVersion());
extensionGate.setResult(new SetDataResponse(stat));
break;
default:
System.err.println("Operation not implemented in base extension: " + request.type);
}
}
@Override
public boolean handleEvent(EventType type, String path) {
// Override in sub classes if necessary
return false;
}
protected String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException {
return extensionGate.create(path, data, acl, createMode);
}
protected void delete(final String path, int version) throws KeeperException {
extensionGate.delete(path, version);
}
protected Stat exists(String path, boolean watch) throws KeeperException {
return extensionGate.exists(path, watch);
}
protected List<ACL> getACL(String path, Stat stat) throws KeeperException {
return extensionGate.getACL(path, stat);
}
protected byte[] getData(String path, boolean watch, Stat stat) throws KeeperException {
return extensionGate.getData(path, watch, stat);
}
protected List<String> getChildren(String path, boolean watch, Stat stat) throws KeeperException {
return extensionGate.getChildren(path, watch, stat);
}
protected Stat setACL(String path, List<ACL> acls, int version) throws KeeperException {
return extensionGate.setACL(path, acls, version);
}
protected Stat setData(String path, byte[] data, int version) throws KeeperException {
return extensionGate.setData(path, data, version);
}
}
package org.apache.zookeeper.extension;
public class EZKConfig {
// Extensibility switch
public static final boolean IS_EXTENSIBLE = true;
// Sandbox configuration
public static final boolean ENABLE_SANDBOX = false;
public static final int MAX_EXTENSION_CODE_SIZE = 8192;
public static final int MAX_SORT_LIST_SIZE = 10000;
}
package org.apache.zookeeper.extension;
import org.apache.jute.Record;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.server.EZKExtensionGate;
import org.apache.zookeeper.server.Request;
public interface EZKExtension {
public void setExtensionGate(EZKExtensionGate extensionGate);
public boolean matchesOperation(int requestType, String path);
public boolean matchesEvent(EventType type, String path);
public void init() throws KeeperException;
public void handleOperation(Request request, Record record) throws KeeperException;
public boolean handleEvent(EventType type, String path);
}
package org.apache.zookeeper.extension;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.EZKExtensionCompiler;
import org.apache.zookeeper.server.EZKExtensionLoader;
import org.apache.zookeeper.server.EZKExtensionManager;
public class EZKExtensionRegistration {
private static String loadExtensionCode(Class<? extends EZKExtension> extensionClass, String codeBasePath) throws KeeperException {
// Check file
String codePath = codeBasePath + File.separatorChar + extensionClass.getName().replace('.', File.separatorChar) + ".java";
File file = new File(codePath);
if(!file.isFile()) throw new KeeperException.BadArgumentsException(codePath + " is not a file");
// Get extension code
try {
BufferedReader stream = new BufferedReader(new FileReader(file));
String extensionCode = "";
for(String line = stream.readLine(); line != null; line = stream.readLine()) extensionCode += line + "\n";
stream.close();
return extensionCode;
} catch(Exception e) {
e.printStackTrace();
throw new KeeperException.EZKExtensionException(e.toString());
}
}
public static boolean checkExtension(Class<? extends EZKExtension> extensionClass, String codeBasePath) throws KeeperException {
// Compile extension
String extensionCode = loadExtensionCode(extensionClass, codeBasePath);
EZKExtensionCompiler compiler = new EZKExtensionCompiler(extensionClass.getName(), extensionCode);
byte[] extensionBinary = compiler.compileExtension();
// Load extension
EZKExtensionLoader extensionLoader = new EZKExtensionLoader(extensionClass.getName(), extensionBinary);
extensionLoader.loadExtension();
return true;
}
public static boolean registerExtension(ZooKeeper zooKeeper, Class<? extends EZKExtension> extensionClass, String codeBasePath) throws KeeperException, InterruptedException {
try {
String extensionCode = loadExtensionCode(extensionClass, codeBasePath);
zooKeeper.create(EZKExtensionManager.EZK_EXTENSION_MANAGER_NODE + "/" + extensionClass.getName(), extensionCode.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch(KeeperException.NodeExistsException nee) {
return false;
}
}
}
package org.apache.zookeeper.extension;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.CreateResponse;
import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.ExistsRequest;
import org.apache.zookeeper.proto.ExistsResponse;
import org.apache.zookeeper.proto.GetACLRequest;
import org.apache.zookeeper.proto.GetACLResponse;
import org.apache.zookeeper.proto.GetChildren2Request;
import org.apache.zookeeper.proto.GetChildren2Response;
import org.apache.zookeeper.proto.GetChildrenRequest;
import org.apache.zookeeper.proto.GetChildrenResponse;
import org.apache.zookeeper.proto.GetDataRequest;
import org.apache.zookeeper.proto.GetDataResponse;
import org.apache.zookeeper.proto.SetACLRequest;
import org.apache.zookeeper.proto.SetACLResponse;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.proto.SetDataResponse;
import org.apache.zookeeper.server.Request;
public class EZKUtils {
public static String getPath(Request request, Record record) {
switch(request.type) {
case OpCode.create:
return ((CreateRequest) record).getPath();
case OpCode.delete:
return ((DeleteRequest) record).getPath();
case OpCode.exists:
return ((ExistsRequest) record).getPath();
case OpCode.getACL:
return ((GetACLRequest) record).getPath();
case OpCode.getChildren:
return ((GetChildrenRequest) record).getPath();
case OpCode.getChildren2:
return ((GetChildren2Request) record).getPath();
case OpCode.getData:
return ((GetDataRequest) record).getPath();
case OpCode.setData:
return ((SetDataRequest) record).getPath();
case OpCode.setACL:
return ((SetACLRequest) record).getPath();
default:
System.err.println("Unhandled request type: " + request.type);
return null;
}
}
public static Record createRequest(int type) {
switch(type) {
case OpCode.create:
return new CreateRequest();
case OpCode.delete:
return new DeleteRequest();
case OpCode.exists:
return new ExistsRequest();
case OpCode.getACL:
return new GetACLRequest();
case OpCode.getChildren:
return new GetChildrenRequest();
case OpCode.getChildren2:
return new GetChildren2Request();
case OpCode.getData:
return new GetDataRequest();
case OpCode.setData:
return new SetDataRequest();
case OpCode.setACL:
return new SetACLRequest();
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.ping:
return null;
default:
System.err.println("Unhandled operation: " + type);
return null;
}
}
public static Record createResponse(int type) {
switch(type) {
case OpCode.create:
return new CreateResponse();
case OpCode.delete:
return null;
case OpCode.exists:
return new ExistsResponse();
case OpCode.getACL:
return new GetACLResponse();
case OpCode.getChildren:
return new GetChildrenResponse();
case OpCode.getChildren2:
return new GetChildren2Response();
case OpCode.getData:
return new GetDataResponse();
case OpCode.setData:
return new SetDataResponse();
case OpCode.setACL:
return new SetACLResponse();
default:
System.err.println("Unhandled operation: " + type);
return null;
}
}
public static Stat copyStat(StatPersisted src, Stat dst, int dataLength, int childCount) {
if((src == null) || (dst == null)) return null;
dst.setCzxid(src.getCzxid());
dst.setMzxid(src.getMzxid());
dst.setCtime(src.getCtime());
dst.setMtime(src.getMtime());
dst.setVersion(src.getVersion());
dst.setCversion(src.getCversion());
dst.setAversion(src.getAversion());
dst.setEphemeralOwner(src.getEphemeralOwner());
dst.setDataLength(dataLength);
dst.setNumChildren(childCount);
dst.setPzxid(src.getPzxid());
return dst;
}
public static byte[] serializeRecord(Record record) throws KeeperException.MarshallingErrorException {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
record.serialize(boa, "");
baos.close();
return baos.toByteArray();
} catch(IOException ioe) {
throw new KeeperException.MarshallingErrorException();
}
}
}
......@@ -872,8 +872,10 @@ public class DataTree {
default:
// Modification for EZK: Allow SetACLTxns to be included in a multi transaction
if(EZKConfig.IS_EXTENSIBLE) {
if(subtxn.getType() == OpCode.setACL) record = new SetACLTxn();
break;
if(subtxn.getType() == OpCode.setACL) {
record = new SetACLTxn();
break;
}
}
throw new IOException("Invalid type of op: " + subtxn.getType());
}
......
package org.apache.zookeeper.server;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.extension.EZKBaseExtension;
import org.apache.zookeeper.extension.EZKExtension;
public class EZKBuiltInExtension extends EZKBaseExtension {
// ################
// # SUBSCRIPTION #
// ################
@Override
public boolean matchesOperation(int requestType, String path) {
if(path == null) return false;
return path.startsWith(EZKExtensionManager.EZK_EXTENSION_MANAGER_NODE);
}
@Override
public boolean matchesEvent(EventType type, String path) {
return false;
}
// #############
// # EXECUTION #
// #############
@Override
protected String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException {
// Create extension-manager node if it does not yet exist
Stat ezkNodeStat = extensionGate.exists(EZKExtensionManager.EZK_EXTENSION_MANAGER_NODE, false);
if(ezkNodeStat == null) extensionGate.create(EZKExtensionManager.EZK_EXTENSION_MANAGER_NODE, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Create extension node
long sessionID = ((EZKPrepExtensionGate) extensionGate).getCurrentRequest().sessionId;
Stat extensionNodeStat = extensionGate.exists(path + "-" + sessionID, false);
if(extensionNodeStat == null) extensionGate.create(path + "-" + sessionID, data, acl, CreateMode.EPHEMERAL);
// Register extension
String extensionName = path.substring(path.lastIndexOf('/') + 1);
EZKExtension extension = EZKExtensionManager.instance.registerExtension(extensionName, new String(data));
// Initialize extension
extension.init();
return "Extension registered";
}
}
package org.apache.zookeeper.server;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringReader;
import java.net.URI;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.tools.FileObject;
import javax.tools.JavaCompiler;
import javax.tools.JavaCompiler.CompilationTask;
import javax.tools.JavaFileManager;
import javax.tools.JavaFileObject;
import javax.tools.JavaFileObject.Kind;
import javax.tools.SimpleJavaFileObject;
import javax.tools.ToolProvider;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.extension.EZKConfig;
public class EZKExtensionCompiler {
private final ExtensionSource source;
private ExtensionBinary extensionBinary;
public EZKExtensionCompiler(String extensionName, String extensionCode) {
this.source = new ExtensionSource(extensionName, extensionCode);
}
// #################
// # CODE ANALYSIS #
// #################
private static final Pattern FOR_EACH_PATTERN = Pattern.compile("for\\([a-zA-Z0-9]+ [a-zA-Z0-9]+: [a-zA-Z0-9]+\\)");
private void analyzeExtension() throws KeeperException {
// Check extension size
if(source.getCode().length() > EZKConfig.MAX_EXTENSION_CODE_SIZE) throw new KeeperException.EZKExtensionException("An extension's source code may comprise at most " + EZKConfig.MAX_EXTENSION_CODE_SIZE + " characters: found " + source.getCode().length() + " characters");
// Analyze extension line by line