diff --git a/experiments/weave-log/refit-overrides b/experiments/weave-log/refit-overrides index 8729b2ba396994de39fe17025dd4121a33663a12..7d4a35ef2c90a7610b2ad107f88e55dbc131b2fb 100644 --- a/experiments/weave-log/refit-overrides +++ b/experiments/weave-log/refit-overrides @@ -21,14 +21,20 @@ replica.timeout.request = 2900 application.nodes = 50 replica.max_clients = 500 +system.mencius_mode.variants = \ + Value("true", "mencius") \ + Value("false", "weave") application.only_consistent.variants = \ - Value("false", "weave") + Value("false") \ + Value("true", "regular", requires=("mencius",)) replica.network.addresses.variants = \ + Value("server0, server2, server4", requires=("mencius",)) \ Value("server0, server1, server2, server3, server4, server5", requires=("weave",)) replica.count.variants = \ + Value("3", requires=("mencius",)) \ Value("6", requires=("weave",)) client.network.addresses.variants = \ diff --git a/experiments/weave-queue/refit-overrides b/experiments/weave-queue/refit-overrides index 91dcb750a4856a353fdba83201dc289d929882e1..f2e2b008c21d5561b035505929176ec1b650d942 100644 --- a/experiments/weave-queue/refit-overrides +++ b/experiments/weave-queue/refit-overrides @@ -19,14 +19,20 @@ replica.timeout.request = 2900 application.nodes = 50 replica.max_clients = 500 +system.mencius_mode.variants = \ + Value("true", "mencius") \ + Value("false", "weave") application.only_consistent.variants = \ - Value("false", "weave") + Value("false") \ + Value("true", "regular", requires=("mencius",)) replica.network.addresses.variants = \ + Value("server0, server2, server4", requires=("mencius",)) \ Value("server0, server1, server2, server3, server4, server5", requires=("weave",)) replica.count.variants = \ + Value("3", requires=("mencius",)) \ Value("6", requires=("weave",)) client.network.addresses.variants = \ diff --git a/scripts/config/refit-defaults b/scripts/config/refit-defaults index 9d61e8054b3b6885f9f311af96a072faaa6cff0f..dbbd64be4b88a554b1a41bbb5574351f804739ea 100644 --- a/scripts/config/refit-defaults +++ b/scripts/config/refit-defaults @@ -77,6 +77,7 @@ replica.order.wheat_weights = 0 system.location_based_groups = false replica.weave.proposal_interval_ms = 20 replica.weave.ping_iterations = 100 +system.mencius_mode = false # Spider system.spider.separate_execution_replicas = false diff --git a/src/refit/agreement/pbft/REFITPBFT.java b/src/refit/agreement/pbft/REFITPBFT.java index 72589b198c336c5c05e5f9c6964d815f29145c1c..7d85c58e626e8fe74888039e064f1295535c0701 100644 --- a/src/refit/agreement/pbft/REFITPBFT.java +++ b/src/refit/agreement/pbft/REFITPBFT.java @@ -99,6 +99,10 @@ public class REFITPBFT { return (isPrimary && (instanceState == REFITPBFTProtocolInstanceState.INITIALIZED) && proposal == null); } + public boolean hasProposal() { + return instanceState != REFITPBFTProtocolInstanceState.INITIALIZED || proposal != null; + } + @Override public boolean isPrepared() { return instanceState == REFITPBFTProtocolInstanceState.COMMITTED diff --git a/src/refit/agreement/weave/REFITWeave.java b/src/refit/agreement/weave/REFITWeave.java index 62d712df4b4b54ee4b61af2b7a5d48da8c17795e..1fa332a150e773f6000756b94577160d27383dc0 100644 --- a/src/refit/agreement/weave/REFITWeave.java +++ b/src/refit/agreement/weave/REFITWeave.java @@ -38,7 +38,7 @@ public class REFITWeave { public static class REFITWeaveProtocolInstance extends REFITOrderProtocolInstance { - private static final int COMMIT_QUORUM_SIZE = REFITConfig.FAULTS_TO_TOLERATE + 1; + private static final int COMMIT_QUORUM_SIZE = (REFITConfig.MENCIUS_MODE) ? (REFITConfig.ORDER_GROUP_SIZE / 2 + 1) : (REFITConfig.FAULTS_TO_TOLERATE + 1); private enum State { INITIALIZED, @@ -149,6 +149,16 @@ public class REFITWeave { prepare = new REFITWeaveMessagePrepare(instanceID, viewID, proposal, replica.id); + if (REFITConfig.MENCIUS_MODE && proposal.type == REFITMessageType.REQUEST_NO_OP) { + // a SKIP message can only be triggered by a message with a higher seqNr; + // the corresponding prepare or commit messages count as implicit acks and each replica + // broadcasts its prepare or commit message to all other replicas. That is each replica + // automatically and immediately sends its implicit SKIP messages + state = State.COMMITTED; + completeWithResult(proposal, null); + return; + } + orderReplicaMulticast(prepare); // Own prepare counts as commit currentCommits++; @@ -205,11 +215,20 @@ public class REFITWeave { this.prepare = prepare; proposal = prepare.request; - // Leader prepare counts as commit - currentCommits++; - state = State.PREPARED; - - prepareReceived(this.prepare.from); + if (REFITConfig.MENCIUS_MODE && proposal.type == REFITMessageType.REQUEST_NO_OP) { + prepareReceived(this.prepare.from); + state = State.COMMITTED; + // don't send commit + commitSent = true; + completeWithResult(proposal, null); + return; + } else { + // Leader prepare counts as commit + currentCommits++; + state = State.PREPARED; + + prepareReceived(this.prepare.from); + } } } } @@ -224,6 +243,8 @@ public class REFITWeave { private void receiveCommits() { if (prepare == null) return; + // nothing to do for SKIP messages + if (REFITConfig.MENCIUS_MODE && proposal.type == REFITMessageType.REQUEST_NO_OP) return; while (true) { final REFITWeaveMessageCommit commitMessage = @@ -273,6 +294,8 @@ public class REFITWeave { } private void tryComplete() { + if (REFITConfig.MENCIUS_MODE && proposal != null && proposal.type == REFITMessageType.REQUEST_NO_OP) return; + if (currentCommits >= COMMIT_QUORUM_SIZE && (isPrimary || commitSent)) { state = State.COMMITTED; // ACK can be sent only after a commit certificate is reached to ensure that the request cannot be lost diff --git a/src/refit/config/REFITConfig.java b/src/refit/config/REFITConfig.java index 046b46534ee71e4fa76c13633445a3f92b19c30e..5446cb7a6e011902d64043b652e411569375e7e4 100644 --- a/src/refit/config/REFITConfig.java +++ b/src/refit/config/REFITConfig.java @@ -158,6 +158,7 @@ public class REFITConfig { public static final boolean LOCATION_BASED_ORDER_GROUPS = config.getBoolean("system.location_based_groups"); public static final int PROPOSALS_INTERVAL_MS = config.getInt("replica.weave.proposal_interval_ms"); public static final int NETWORK_PING_ITERATIONS = config.getInt("replica.weave.ping_iterations"); + public static final boolean MENCIUS_MODE = config.getBoolean("system.mencius_mode"); // Spider public static final int MAXIMUM_EXECUTION_INSTANCES_IN_PROGRESS; @@ -363,8 +364,10 @@ public class REFITConfig { if (LOCATION[j] == i) ORDER_GROUPS[i][ctr++] = j; } ORDER_GROUP_LEADER_LIMIT[i] = ctr; - // Every group has enough replicas to tolerate f faults - REFITAssert.assertTrue(ctr == FAULTS_TO_TOLERATE + 1); + if (!REFITConfig.MENCIUS_MODE) { + // Every group has enough replicas to tolerate f faults + REFITAssert.assertTrue(ctr == FAULTS_TO_TOLERATE + 1); + } for (short j = 0; j < REFITConfig.TOTAL_NR_OF_REPLICAS; j++) { if (LOCATION[j] != i) ORDER_GROUPS[i][ctr++] = j; } diff --git a/src/refit/replica/REFITWeaveReplica.java b/src/refit/replica/REFITWeaveReplica.java index e037a10593c6fa29417f43f20a2f77f0594f520b..0f2c15c15248540e5d3e2f71bd4063cf3516eb54 100644 --- a/src/refit/replica/REFITWeaveReplica.java +++ b/src/refit/replica/REFITWeaveReplica.java @@ -12,6 +12,7 @@ import refit.replica.checkpoint.REFITCheckpointStage; import refit.replica.client.REFITClientStage; import refit.replica.execution.REFITExecutionStage; import refit.replica.fuzzycheckpoint.REFITFuzzyCheckpointStage; +import refit.replica.order.REFITMenciusSplitter; import refit.replica.order.REFITOrderGroups; import refit.replica.order.REFITOrderStage; import refit.replica.order.REFITTimingTracker; @@ -23,6 +24,7 @@ import refit.util.REFITAssert; public class REFITWeaveReplica extends REFITBaseReplica { private final REFITClientStage[] clientStage; private final REFITOrderStage[] orderStage; + private final REFITMenciusSplitter menciusSplitter; private final REFITUpdateStage updateStage; private final REFITCheckpointStage checkpointStage; @@ -75,8 +77,12 @@ public class REFITWeaveReplica extends REFITBaseReplica { orderStage = new REFITOrderStage[REFITOrderGroups.COUNT]; short myPrimaryOrderGroupID = REFITOrderGroups.getPrimaryGroup(id); - REFITTimingTracker tracker = new REFITTimingTracker(this, myPrimaryOrderGroupID); - addToInitialConnectObserver(tracker); + REFITTimingTracker tracker = (REFITConfig.MENCIUS_MODE) ? null : new REFITTimingTracker(this, myPrimaryOrderGroupID); + if (tracker != null) { + addToInitialConnectObserver(tracker); + } + + menciusSplitter = (REFITConfig.MENCIUS_MODE) ? new REFITMenciusSplitter(this) : null; for (short groupID = 0; groupID < REFITOrderGroups.COUNT; groupID++) { // also setup client and coupling stage on execution replicas @@ -84,24 +90,37 @@ public class REFITWeaveReplica extends REFITBaseReplica { msgRouter.requestSenders[groupID].addListener(clientStage[groupID]); executionStage.replySenders[groupID].addListener(clientStage[groupID]); clientStage[groupID].readRequestSender.addListener(executionStage); - orderStage[groupID] = new REFITOrderStage(this, REFITWeaveProtocol.instance, groupID, tracker); - msgRouter.orderSenders[groupID].addListener(orderStage[groupID]); - - clientStage[groupID].requestSender.addListener(orderStage[groupID]); - clientStage[groupID].panicSender.addListener(orderStage[groupID]); - - orderStage[groupID].requestSender.addListener(executionStage); - orderStage[groupID].ackSender.addListener(clientStage[groupID]); - orderStage[groupID].viewChangeSender.addListener(clientStage[groupID]); - orderStage[groupID].viewChangeSender.addListener(executionStage); - if (updateStage != null) { - orderStage[groupID].viewChangeSender.addListener(updateStage); + orderStage[groupID] = (REFITConfig.MENCIUS_MODE && groupID > 0) ? null : new REFITOrderStage(this, + REFITWeaveProtocol.instance, (REFITConfig.MENCIUS_MODE) ? -1 : groupID, tracker); + + REFITOrderStage order = orderStage[(REFITConfig.MENCIUS_MODE) ? 0 : groupID]; + msgRouter.orderSenders[groupID].addListener(order); + + clientStage[groupID].requestSender.addListener(order); + clientStage[groupID].panicSender.addListener(order); + + if (!REFITConfig.MENCIUS_MODE || groupID == 0) { + order.requestSender.addListener(executionStage); + order.viewChangeSender.addListener(executionStage); + checkpointStage.progressSender.addListener(order); + + REFITStage client = (REFITConfig.MENCIUS_MODE) ? menciusSplitter : clientStage[groupID]; + order.ackSender.addListener(client); + order.viewChangeSender.addListener(client); + if (updateStage != null) { + order.viewChangeSender.addListener(updateStage); + } } - checkpointStage.progressSender.addListener(orderStage[groupID]); + if (menciusSplitter != null) { + menciusSplitter.viewChangeSenders[groupID].addListener(clientStage[groupID]); + menciusSplitter.ackSenders[groupID].addListener(clientStage[groupID]); + } + } + if (tracker != null) { + // replica can only propose requests in it's primary group + tracker.flushSender.addListener(orderStage[myPrimaryOrderGroupID]); } - // replica can only propose requests in it's primary group - tracker.flushSender.addListener(orderStage[myPrimaryOrderGroupID]); executionStage.checkpointSender.addListener(checkpointStage); checkpointStage.progressSender.addListener(executionStage); diff --git a/src/refit/replica/client/REFITClientStage.java b/src/refit/replica/client/REFITClientStage.java index 3aeaadb716c3277a167e9b7c35bff015fc00a30d..a29fbb23e108ab4309fd0bb8627b93eed29b8264 100644 --- a/src/refit/replica/client/REFITClientStage.java +++ b/src/refit/replica/client/REFITClientStage.java @@ -211,8 +211,9 @@ public class REFITClientStage extends REFITStage { switch (instruction.instructionType) { case CONFIGURATION_NOTIFICATION: REFITConfigurationNotification notification = (REFITConfigurationNotification) instruction; + int limitedGroupID = Math.max(0, groupID); REFITOrderProtocol protocol = notification.protocol; - sendReplies = protocol.getActiveOrderRecipients(Math.max(0, groupID))[replica.id]; + sendReplies = protocol.getActiveOrderRecipients(limitedGroupID)[replica.id]; contactReplica = REFITOrderGroups.getGroupPrimary(notification.groupID, notification.viewID); if (REFITConfig.SIGNED_REQUESTS) { diff --git a/src/refit/replica/order/REFITMenciusSplitter.java b/src/refit/replica/order/REFITMenciusSplitter.java new file mode 100644 index 0000000000000000000000000000000000000000..8054151f831a1cfb1f5cf1b36f7af75d3e447df1 --- /dev/null +++ b/src/refit/replica/order/REFITMenciusSplitter.java @@ -0,0 +1,60 @@ +package refit.replica.order; + +import refit.config.REFITLogger; +import refit.message.REFITAckReply; +import refit.message.REFITInstruction; +import refit.message.REFITInstruction.REFITConfigurationNotification; +import refit.message.REFITMessage; +import refit.replica.REFITBaseReplica; +import refit.replica.REFITObserver; +import refit.replica.REFITStage; +import refit.scheduler.REFITSchedulerTaskType; + +public class REFITMenciusSplitter extends REFITStage { + + public final REFITObserver<REFITConfigurationNotification>[] viewChangeSenders; + public final REFITObserver<REFITAckReply>[] ackSenders; + + @SuppressWarnings("unchecked") + public REFITMenciusSplitter(REFITBaseReplica replica) { + super(REFITSchedulerTaskType.ORDER_STAGE, replica); + + viewChangeSenders = (REFITObserver<REFITConfigurationNotification>[]) new REFITObserver[REFITOrderGroups.COUNT]; + ackSenders = (REFITObserver<REFITAckReply>[]) new REFITObserver[REFITOrderGroups.COUNT]; + + for (int i = 0; i < viewChangeSenders.length; i++) viewChangeSenders[i] = new REFITObserver<>(); + for (int i = 0; i < ackSenders.length; i++) ackSenders[i] = new REFITObserver<>(); + } + + @Override + protected void handleMessage(REFITMessage message) { + switch (message.type) { + case ACK_REPLY: + REFITAckReply ack = (REFITAckReply) message; + if (ack.groupID >= 0) { + ackSenders[ack.groupID].broadcast(ack); + } else { + for (int i = 0; i < ackSenders.length; i++) { + ackSenders[i].broadcast(ack); + } + } + break; + case INSTRUCTION: + handleInstruction((REFITInstruction) message); + break; + default: + REFITLogger.logError(this, "drop message of unexpected type " + message.type); + } + } + + private void handleInstruction(REFITInstruction message) { + switch (message.instructionType) { + case CONFIGURATION_NOTIFICATION: + REFITConfigurationNotification config = (REFITConfigurationNotification) message; + viewChangeSenders[config.groupID].broadcast(config); + break; + default: + REFITLogger.logError(this, "drop instruction of unexpected type " + message.instructionType); + } + } +} diff --git a/src/refit/replica/order/REFITOrderProtocolInstance.java b/src/refit/replica/order/REFITOrderProtocolInstance.java index b85d12b06e9dc4c252b67bdf7cebf0711d00b103..f13bd1e1a12ae9f43e00fe4b506944a45989d8e4 100644 --- a/src/refit/replica/order/REFITOrderProtocolInstance.java +++ b/src/refit/replica/order/REFITOrderProtocolInstance.java @@ -113,4 +113,5 @@ public abstract class REFITOrderProtocolInstance { // only accepts the proposal if isProposer returns true public abstract void assignProposal(REFITOrderRequest message); + public abstract boolean hasProposal(); } diff --git a/src/refit/replica/order/REFITOrderStage.java b/src/refit/replica/order/REFITOrderStage.java index 54fa44d5f29f008f06b7adbb6189a10f8040cfea..8937e503cbcdae05cf1a3379d250c86db5984f87 100644 --- a/src/refit/replica/order/REFITOrderStage.java +++ b/src/refit/replica/order/REFITOrderStage.java @@ -4,6 +4,7 @@ import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import refit.agreement.weave.REFITWeaveMessages.REFITWeaveMessagePrepare; import refit.config.REFITConfig; import refit.config.REFITLogger; import refit.message.*; @@ -24,16 +25,20 @@ public class REFITOrderStage extends REFITStage { private int targetMinView; private long targetAfterSeq; private final short groupID; + private final short limitedGroupID; private static final int THROTTLE_TIMER_INTERVAL = 25; private final REFITTimingTracker timingTracker; public final REFITObserver<REFITAckReply> ackSender; + private final long[] noopedUpTo = new long[REFITOrderGroups.COUNT]; public REFITOrderStage(REFITBaseReplica replica, REFITOrderProtocol initialProtocol, short groupID, REFITTimingTracker timingTracker) { // Initialize data structures super(REFITSchedulerTaskType.ORDER_STAGE, replica); + if (groupID == -1 && !REFITConfig.MENCIUS_MODE) throw new IllegalStateException("Invalid order group id"); this.groupID = groupID; + this.limitedGroupID = (short) Math.max(0, groupID); this.readySlots = new TreeSet<>(); this.proposals = new LinkedList<>(); @@ -187,6 +192,12 @@ public class REFITOrderStage extends REFITStage { REFITOrderStageSlot slot = slots[(int) (orderMessage.uid.seqNr % slots.length)]; boolean ready = slot.insertMessage(orderMessage); if (ready) readySlots.add(slot); + + if (REFITConfig.MENCIUS_MODE && (orderMessage.type == REFITMessageType.WEAVE_PREPARE || orderMessage.type == REFITMessageType.WEAVE_COMMIT)) { + synthesizeNoops(orderMessage.uid.seqNr, orderMessage.from); + // skip own slots with lower sequence number + synthesizeNoops(orderMessage.uid.seqNr, replica.id); + } break; case INSTRUCTION: handleInstruction((REFITInstruction) message); @@ -199,6 +210,32 @@ public class REFITOrderStage extends REFITStage { } } + private void synthesizeNoops(long seqNr, short from) { + short group = REFITOrderGroups.getPrimaryGroup(from); + boolean isPrimary = REFITOrderGroups.getGroupPrimary(group, 0) == replica.id; + long last = -1; + for (long instanceID = Math.max(windowStartID, noopedUpTo[group]); (instanceID < activeSlotLimit) + && (instanceID < seqNr); instanceID++) { + if (REFITOrderGroups.getGroupID(instanceID) != group) continue; + last = instanceID; + + final REFITOrderStageSlot slot = slots[(int) (instanceID % slots.length)]; + if (slot.hasProposal()) continue; + + if (isPrimary) { + slot.assignProposal(REFITNoOpRequest.INSTANCE); + readySlots.add(slot); + } else { + short leaderID = REFITOrderGroups.getGroupPrimary(group, 0); + REFITWeaveMessagePrepare prepare = new REFITWeaveMessagePrepare(instanceID, 0, REFITNoOpRequest.INSTANCE, leaderID); +// REFITLogger.logWarning(this, "Skipping " + prepare); + boolean ready = slot.insertMessage(prepare); + if (ready) readySlots.add(slot); + } + } + noopedUpTo[group] = last; + } + private void handleInstruction(REFITInstruction instruction) { switch (instruction.instructionType) { case FLUSH_REQUEST: @@ -268,7 +305,7 @@ public class REFITOrderStage extends REFITStage { // Initialize new protocol instances for (long instanceID = initInstancesStartID; instanceID < initInstancesEndID; instanceID++) { REFITOrderStageSlot slot = slots[(int) (instanceID % slots.length)]; - boolean ready = slot.init(instanceID, REFITOrderGroups.getGroupID(instanceID) != groupID, timingTracker != null); + boolean ready = slot.init(instanceID, groupID != -1 && REFITOrderGroups.getGroupID(instanceID) != groupID, timingTracker != null); if (ready && instanceID < activeSlotLimit) readySlots.add(slot); else readySlots.remove(slot); } @@ -412,6 +449,7 @@ public class REFITOrderStage extends REFITStage { private void abort() { + if (REFITConfig.MENCIUS_MODE) throw new UnsupportedOperationException("Not implemented for mencius"); REFITLogger.logWarning(this, "abort state " + viewState + " next view " + viewID); if (viewState == REFITTransitionProtocolState.VIEW_CHANGE_STARTED) { REFITLogger.logWarning(this, "ignoring abort for view " @@ -471,7 +509,7 @@ public class REFITOrderStage extends REFITStage { viewIDUpdated = true; // TODO clean protocol handling - short contactReplicaID = REFITOrderGroups.getGroupPrimary(groupID, viewID); + short contactReplicaID = REFITOrderGroups.getGroupPrimary(limitedGroupID, viewID); REFITLogger.logEvent(this, "switch to " + protocol.getClass().getSimpleName() + " (view " + viewID + ")"); REFITLogger.logEvent(this, contactReplicaID + " is now the contact replica for group " + groupID); @@ -482,8 +520,14 @@ public class REFITOrderStage extends REFITStage { proposals.clear(); // Distribute replica-role information - REFITConfigurationNotification notification = new REFITConfigurationNotification(protocol, groupID, viewID); - viewChangeSender.broadcast(notification); + if (groupID >= 0) { + viewChangeSender.broadcast(new REFITConfigurationNotification(protocol, groupID, viewID)); + } else { + // Hack to support mencius + for (int i = 0; i < REFITOrderGroups.COUNT; i++) { + viewChangeSender.broadcast(new REFITConfigurationNotification(protocol, i, viewID)); + } + } if (timingTracker != null) timingTracker.updateView(groupID, viewID, true); } @@ -572,7 +616,7 @@ public class REFITOrderStage extends REFITStage { } private boolean fullBatchAvailable() { - if (proposals.size() >= REFITConfig.MAXIMUM_BATCH_SIZE[groupID]) { + if (proposals.size() >= REFITConfig.MAXIMUM_BATCH_SIZE[limitedGroupID]) { return true; } @@ -593,7 +637,7 @@ public class REFITOrderStage extends REFITStage { return REFITNoOpRequest.INSTANCE; } - int maximumBatchSize = REFITConfig.MAXIMUM_BATCH_SIZE[groupID]; + int maximumBatchSize = REFITConfig.MAXIMUM_BATCH_SIZE[limitedGroupID]; final REFITRequest head = proposals.peek(); diff --git a/src/refit/replica/order/REFITOrderStageSlot.java b/src/refit/replica/order/REFITOrderStageSlot.java index 0e50b240e8749264bb6faaf73af3e95087272bf4..a34175b5fda2c5fdd676334e0babe6508c524471 100644 --- a/src/refit/replica/order/REFITOrderStageSlot.java +++ b/src/refit/replica/order/REFITOrderStageSlot.java @@ -140,6 +140,10 @@ public class REFITOrderStageSlot implements Comparable<REFITOrderStageSlot> { return currentInstance != null && currentInstance.isProposer(); } + public boolean hasProposal() { + return currentInstance.hasProposal(); + } + public void assignProposal(REFITOrderRequest message) { currentInstance.assignProposal(message); }