Skip to content
Snippets Groups Projects
Commit d1351f46 authored by Michael Eischer's avatar Michael Eischer
Browse files

REFITOmadaReplica: Properly enable execution on replicas without order stage

parent ae4ac9af
No related branches found
No related tags found
No related merge requests found
...@@ -71,6 +71,7 @@ public class REFITOmadaReplica extends REFITBaseReplica { ...@@ -71,6 +71,7 @@ public class REFITOmadaReplica extends REFITBaseReplica {
REFITCouplingHelper helper = new REFITCouplingHelper(this); REFITCouplingHelper helper = new REFITCouplingHelper(this);
addToInbound(REFITMessageType.EXECUTOR_FLUSH, helper); addToInbound(REFITMessageType.EXECUTOR_FLUSH, helper);
boolean hasGroup = false;
for (short groupID = 0; groupID < REFITOrderGroups.COUNT; groupID++) { for (short groupID = 0; groupID < REFITOrderGroups.COUNT; groupID++) {
boolean inGroup = REFITOrderGroups.isInGroupByID(id, groupID); boolean inGroup = REFITOrderGroups.isInGroupByID(id, groupID);
if (!inGroup && id >= REFITConfig.EXEC_GROUP_SIZE) { if (!inGroup && id >= REFITConfig.EXEC_GROUP_SIZE) {
...@@ -91,6 +92,7 @@ public class REFITOmadaReplica extends REFITBaseReplica { ...@@ -91,6 +92,7 @@ public class REFITOmadaReplica extends REFITBaseReplica {
if (!inGroup) { if (!inGroup) {
continue; continue;
} }
hasGroup = true;
orderStage[groupID] = new REFITOrderStage(this, REFITConfig.INITIAL_ORDER_PROTOCOL, groupID, null); orderStage[groupID] = new REFITOrderStage(this, REFITConfig.INITIAL_ORDER_PROTOCOL, groupID, null);
msgRouter.orderSenders[groupID].addListener(orderStage[groupID]); msgRouter.orderSenders[groupID].addListener(orderStage[groupID]);
...@@ -113,6 +115,10 @@ public class REFITOmadaReplica extends REFITBaseReplica { ...@@ -113,6 +115,10 @@ public class REFITOmadaReplica extends REFITBaseReplica {
checkpointStage.progressSender.addListener(orderStage[groupID]); checkpointStage.progressSender.addListener(orderStage[groupID]);
} }
if (!hasGroup && id < REFITConfig.EXEC_GROUP_SIZE) {
couplingStage[0].configSender.addListener(executionStage);
}
executionStage.checkpointSender.addListener(checkpointStage); executionStage.checkpointSender.addListener(checkpointStage);
checkpointStage.progressSender.addListener(executionStage); checkpointStage.progressSender.addListener(executionStage);
checkpointStage.stableCPSender.addListener(executionStage); checkpointStage.stableCPSender.addListener(executionStage);
......
...@@ -16,11 +16,13 @@ import refit.replica.REFITReplicaGroups; ...@@ -16,11 +16,13 @@ import refit.replica.REFITReplicaGroups;
import refit.replica.REFITStage; import refit.replica.REFITStage;
import refit.replica.checkpoint.REFITCheckpoint; import refit.replica.checkpoint.REFITCheckpoint;
import refit.replica.order.REFITOrderGroups; import refit.replica.order.REFITOrderGroups;
import refit.replica.order.REFITStubOrderProtocol;
import refit.scheduler.REFITSchedulerTaskType; import refit.scheduler.REFITSchedulerTaskType;
public class REFITCouplingStage extends REFITStage { public class REFITCouplingStage extends REFITStage {
public final REFITObserver<REFITExecuteRequest> requestSender; public final REFITObserver<REFITExecuteRequest> requestSender;
public final REFITObserver<REFITConfigurationNotification> configSender;
private REFITCouplingHelper helper; private REFITCouplingHelper helper;
...@@ -33,6 +35,7 @@ public class REFITCouplingStage extends REFITStage { ...@@ -33,6 +35,7 @@ public class REFITCouplingStage extends REFITStage {
public REFITCouplingStage(REFITBaseReplica replica, REFITCouplingHelper helper, int groupID) { public REFITCouplingStage(REFITBaseReplica replica, REFITCouplingHelper helper, int groupID) {
super(REFITSchedulerTaskType.COUPLING_STAGE, replica); super(REFITSchedulerTaskType.COUPLING_STAGE, replica);
requestSender = new REFITObserver<>(); requestSender = new REFITObserver<>();
configSender = new REFITObserver<>();
this.helper = helper; this.helper = helper;
this.groupID = groupID; this.groupID = groupID;
...@@ -68,6 +71,12 @@ public class REFITCouplingStage extends REFITStage { ...@@ -68,6 +71,12 @@ public class REFITCouplingStage extends REFITStage {
return "COUPL[" + groupID + "]"; return "COUPL[" + groupID + "]";
} }
@Override
public void init() {
// send configuration notification with stub protocol to execution
configSender.broadcast(new REFITConfigurationNotification(new REFITStubOrderProtocol(), -1, -1));
}
@Override @Override
protected void handleMessage(REFITMessage message) { protected void handleMessage(REFITMessage message) {
switch (message.type) { switch (message.type) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment