diff --git a/src/refit/replica/REFITOmadaReplica.java b/src/refit/replica/REFITOmadaReplica.java index ed24cbd2006aaeb319f648bbee814d2b85ed0d59..39cdb4edf7b915ec74e4dc1442a603bc2ade51ae 100644 --- a/src/refit/replica/REFITOmadaReplica.java +++ b/src/refit/replica/REFITOmadaReplica.java @@ -71,6 +71,7 @@ public class REFITOmadaReplica extends REFITBaseReplica { REFITCouplingHelper helper = new REFITCouplingHelper(this); addToInbound(REFITMessageType.EXECUTOR_FLUSH, helper); + boolean hasGroup = false; for (short groupID = 0; groupID < REFITOrderGroups.COUNT; groupID++) { boolean inGroup = REFITOrderGroups.isInGroupByID(id, groupID); if (!inGroup && id >= REFITConfig.EXEC_GROUP_SIZE) { @@ -91,6 +92,7 @@ public class REFITOmadaReplica extends REFITBaseReplica { if (!inGroup) { continue; } + hasGroup = true; orderStage[groupID] = new REFITOrderStage(this, REFITConfig.INITIAL_ORDER_PROTOCOL, groupID, null); msgRouter.orderSenders[groupID].addListener(orderStage[groupID]); @@ -113,6 +115,10 @@ public class REFITOmadaReplica extends REFITBaseReplica { checkpointStage.progressSender.addListener(orderStage[groupID]); } + if (!hasGroup && id < REFITConfig.EXEC_GROUP_SIZE) { + couplingStage[0].configSender.addListener(executionStage); + } + executionStage.checkpointSender.addListener(checkpointStage); checkpointStage.progressSender.addListener(executionStage); checkpointStage.stableCPSender.addListener(executionStage); diff --git a/src/refit/replica/coupling/REFITCouplingStage.java b/src/refit/replica/coupling/REFITCouplingStage.java index 88ce8bfc8664b301490350f5f72d239006179cd4..c7c7e329df181f6372d70f19805ab19906df1859 100644 --- a/src/refit/replica/coupling/REFITCouplingStage.java +++ b/src/refit/replica/coupling/REFITCouplingStage.java @@ -16,11 +16,13 @@ import refit.replica.REFITReplicaGroups; import refit.replica.REFITStage; import refit.replica.checkpoint.REFITCheckpoint; import refit.replica.order.REFITOrderGroups; +import refit.replica.order.REFITStubOrderProtocol; import refit.scheduler.REFITSchedulerTaskType; public class REFITCouplingStage extends REFITStage { public final REFITObserver<REFITExecuteRequest> requestSender; + public final REFITObserver<REFITConfigurationNotification> configSender; private REFITCouplingHelper helper; @@ -33,6 +35,7 @@ public class REFITCouplingStage extends REFITStage { public REFITCouplingStage(REFITBaseReplica replica, REFITCouplingHelper helper, int groupID) { super(REFITSchedulerTaskType.COUPLING_STAGE, replica); requestSender = new REFITObserver<>(); + configSender = new REFITObserver<>(); this.helper = helper; this.groupID = groupID; @@ -68,6 +71,12 @@ public class REFITCouplingStage extends REFITStage { return "COUPL[" + groupID + "]"; } + @Override + public void init() { + // send configuration notification with stub protocol to execution + configSender.broadcast(new REFITConfigurationNotification(new REFITStubOrderProtocol(), -1, -1)); + } + @Override protected void handleMessage(REFITMessage message) { switch (message.type) {