Skip to content
Snippets Groups Projects
REFITTimingTracker.java 9.06 KiB
Newer Older
  • Learn to ignore specific revisions
  • Michael Eischer's avatar
    Michael Eischer committed
    package refit.replica.order;
    
    import java.util.*;
    
    import refit.config.REFITConfig;
    import refit.config.REFITLogger;
    import refit.message.REFITInstruction;
    import refit.message.REFITInstruction.REFITFlushRequestNotification;
    import refit.message.REFITMessage;
    import refit.replica.REFITBaseReplica;
    import refit.replica.REFITObserver;
    import refit.replica.REFITStage;
    import refit.scheduler.REFITSchedulerTaskType;
    import refit.util.REFITAssert;
    import refit.util.REFITTime;
    
    public class REFITTimingTracker extends REFITStage {
    
    	enum ProposalAction {
    		STOP, SLOW, NORMAL, FAST, VERY_FAST
    	}
    
    
    	public final REFITObserver<REFITFlushRequestNotification> flushSender;
    
    	private final short ownGroup;
    	private final short replicaID;
    
    	private final Map<Short, GroupData> groupDataMap;
    	private final int[][] oneWayDelays;
    	private final int[] currentView;
    	private final boolean[] isViewStable;
    
    	private long nextProposalTime;
    
    	public REFITTimingTracker(REFITBaseReplica replica, short ownGroup) {
    		super(REFITSchedulerTaskType.ORDER_STAGE, replica);
    		flushSender = new REFITObserver<>();
    
    		this.ownGroup = ownGroup;
    		this.replicaID = replica.id;
    
    		groupDataMap = new HashMap<>();
    
    		for (short i = 0; i < REFITOrderGroups.COUNT; i++) {
    			final GroupData groupData = new GroupData();
    			groupData.highestPrepare = i == 0 ? 0 : i - REFITOrderGroups.COUNT;
    			groupData.highestPrepareArrivalTime = 0L;
    			groupData.prepareNum = new LinkedList<>();
    			groupData.prepareTime = new LinkedList<>();
    			groupData.pingTimes = new ArrayList<>();
    			groupDataMap.put(i, groupData);
    		}
    
    		int[][] delays = REFITConfig.EXPECTED_GROUP_NETWORK_DELAYS_MS;
    		oneWayDelays = new int[delays.length][];
    		for (int i = 0; i < oneWayDelays.length; i++) {
    			oneWayDelays[i] = Arrays.copyOf(delays[i], delays[i].length);
    		}
    		currentView = new int[REFITOrderGroups.COUNT];
    		isViewStable = new boolean[REFITOrderGroups.COUNT];
    		nextProposalTime = Long.MAX_VALUE;
    	}
    
    	@Override
    	public String toString() {
    		return "TT";
    	}
    
    	public void updateView(short groupID, int newView, boolean isStable) {
    		currentView[groupID] = newView;
    		isViewStable[groupID] = isStable;
    	}
    
    	public void receivedPrepare(long instanceId, short from) {
    		final short group = REFITOrderGroups.getGroupID(instanceId);
    		final GroupData groupData = groupDataMap.get(group);
    
    		// ignore old or duplicate prepare
    		if (instanceId <= groupData.highestPrepare) return;
    
    		long ct = REFITTime.currentTimeMillis.getAsLong();
    
    		// Save arrival time
    		groupData.highestPrepare = instanceId;
    		groupData.highestPrepareArrivalTime = ct;
    
    		if (from == replicaID) {
    			// track own prepares
    			for (int i = 0; i < REFITOrderGroups.COUNT; i++) {
    				if (i == ownGroup) continue;
    				final GroupData gd = groupDataMap.get((short) i);
    				gd.prepareNum.add(instanceId);
    				gd.prepareTime.add(ct);
    			}
    //			REFITLogger.logWarning(this, "prep " + instanceId);
    		}
    
    //		REFITLogger.logWarning(this, "prep " + group + " " + instanceId);
    	}
    
    	public void receivedCommit(long instanceId, short otherReplica) {
    		short otherGroup = REFITOrderGroups.getPrimaryGroup(otherReplica);
    		short groupLeader = REFITOrderGroups.getGroupPrimary(otherGroup, currentView[otherGroup]);
    
    		// only measure ping times to the other leader
    		if (groupLeader != otherReplica) return;
    
    //		REFITLogger.logWarning(this, "commit " + otherGroup + " " + instanceId);
    		final short group = REFITOrderGroups.getGroupID(instanceId);
    		REFITAssert.assertTrue(group == ownGroup);
    
    		final GroupData groupData = groupDataMap.get(otherGroup);
    
    		// ignore delayed prepared
    		while (!groupData.prepareNum.isEmpty() && groupData.prepareNum.peek() < instanceId) {
    			groupData.prepareNum.removeFirst();
    			groupData.prepareTime.removeFirst();
    		}
    		// only use commit if we have a matching prepare
    		if (groupData.prepareNum.isEmpty() || groupData.prepareNum.peek() != instanceId) return;
    
    		// Save arrival time
    		long ct = REFITTime.currentTimeMillis.getAsLong();
    		// FIXME: ignore pings during view-changes
    		groupData.pingTimes.add((int) (ct - groupData.prepareTime.peek()));
    
    		if (groupData.pingTimes.size() >= REFITConfig.NETWORK_PING_ITERATIONS) {
    			int lowestOneWayDelay = Collections.min(groupData.pingTimes) / 2;
    			oneWayDelays[ownGroup][group] = lowestOneWayDelay;
    			oneWayDelays[group][ownGroup] = lowestOneWayDelay;
    			REFITLogger.logWarning(this, "Update delay between " + ownGroup + " and " + otherGroup + " to " + lowestOneWayDelay + " ms");
    
    			groupData.pingTimes = new ArrayList<>();
    		}
    	}
    
    	@Override
    	protected void handleMessage(REFITMessage message) {
    		switch (message.type) {
    		case INSTRUCTION:
    			handleInstruction(((REFITInstruction) message));
    			break;
    		default:
    			REFITLogger.logError(this, "drop message of unexpected type " + message.type);
    		}
    	}
    
    	private void handleInstruction(REFITInstruction instruction) {
    		switch (instruction.instructionType) {
    		case INITIAL_CONNECTIONS_COMPLETED:
    			// just needed to trigger stageComplete()
    			break;
    		default:
    			REFITLogger.logError(this, "drop instruction of unexpected type " + instruction.instructionType);
    		}
    	}
    
    	@Override
    	protected void stageComplete() {
    		// Nothing to do if not leader or view not yet stable
    		if (REFITOrderGroups.getGroupPrimary(ownGroup, currentView[ownGroup]) != replica.id || !isViewStable[ownGroup])
    			return;
    
    		// At start or after a previous proposal stop has ended, propose immediately
    		if (nextProposalTime == Long.MAX_VALUE && getNextProposalAction() != REFITTimingTracker.ProposalAction.STOP) {
    			nextProposalTime = REFITTime.currentTimeMillis.getAsLong();
    		}
    		// wait for turn
    		if (REFITTime.currentTimeMillis.getAsLong() < nextProposalTime) return;
    
    		long currentSeq = groupDataMap.get(ownGroup).highestPrepare;
    		long newLimit = currentSeq + REFITOrderGroups.COUNT + 1;
    		flushSender.broadcast(new REFITFlushRequestNotification(newLimit, newLimit, newLimit));
    
    		ProposalAction nextProposalAction = getNextProposalAction();
    		switch (nextProposalAction) {
    		case STOP:
    			nextProposalTime = Long.MAX_VALUE;
    			break;
    		case SLOW:
    			nextProposalTime += REFITConfig.PROPOSALS_INTERVAL_MS + 1;
    			break;
    		case NORMAL:
    			nextProposalTime += REFITConfig.PROPOSALS_INTERVAL_MS;
    			break;
    		case FAST:
    			nextProposalTime += REFITConfig.PROPOSALS_INTERVAL_MS - 1;
    			break;
    		case VERY_FAST:
    			nextProposalTime += REFITConfig.PROPOSALS_INTERVAL_MS / 2;
    			break;
    		}
    
    		final long currentTime = REFITTime.currentTimeMillis.getAsLong();
    		setTimeoutAbsolute(Math.min(nextProposalTime, currentTime + 2 * REFITConfig.PROPOSALS_INTERVAL_MS));
    	}
    
    	private long getEstimatedCurrentState(final short group, final short ownGroup) {
    		if (group == ownGroup) {
    			return 0;
    		}
    
    		final GroupData groupData = groupDataMap.get(group);
    		final long lastKnownState = (groupData.highestPrepare + 1) / REFITOrderGroups.COUNT;
    
    		final GroupData ownGroupData = groupDataMap.get(ownGroup);
    		long ownPrepare = (ownGroupData.highestPrepare + 1) / REFITOrderGroups.COUNT;
    
    		// expected time difference based on the known proposals
    		final long seqTimeOffset = (ownPrepare - lastKnownState) * REFITConfig.PROPOSALS_INTERVAL_MS;
    
    		final long delay = oneWayDelays[group][ownGroup];
    		// time since latest proposal arrived, limited to one missing proposal
    		long currentTime = REFITTime.currentTimeMillis.getAsLong();
    		final long timePassed = Math.min(2 * REFITConfig.PROPOSALS_INTERVAL_MS, currentTime - groupData.highestPrepareArrivalTime);
    
    		return seqTimeOffset - delay - timePassed;
    	}
    
    	public ProposalAction getNextProposalAction() {
    		long slowestOtherGroupOffset = -Long.MAX_VALUE;
    
    //		long ct = REFITTime.currentTimeMillis.getAsLong();
    //		String str = "a";
    //		String stro = "o";
    		for (short i = 0; i < REFITOrderGroups.COUNT; i++) {
    			if (i == ownGroup) continue;
    			final long groupOffset = getEstimatedCurrentState(i, ownGroup);
    			slowestOtherGroupOffset = Math.max(slowestOtherGroupOffset, groupOffset);
    //			str += " " + (groupDataMap.get(i).highestPrepare + 1) / REFITConfig.REPLICA_GROUPS.size() + "@" + (ct - groupDataMap.get(i).highestPrepareArrivalTime);
    //			stro += " " + groupOffset;
    		}
    //		str += " m " + (groupDataMap.get(ownGroup).highestPrepare + 1) / REFITConfig.REPLICA_GROUPS.size();
    //		REFITLogger.logWarning(this, "reference " + slowestOtherGroupOffset + " " + str + " " + stro);
    
    		int pi = REFITConfig.PROPOSALS_INTERVAL_MS;
    		if (slowestOtherGroupOffset < -10 * pi) {
    			// If we are very far behind the next slowest (e.g. after view-change), catch up fast
    			return ProposalAction.VERY_FAST;
    		} else if (slowestOtherGroupOffset < -pi / 2) {
    			// If we are a bit behind, speed up
    			return ProposalAction.FAST;
    		} else if (slowestOtherGroupOffset < pi / 2) {
    			// If we share the last place with at least one other group, continue normally
    			return ProposalAction.NORMAL;
    		} else if (slowestOtherGroupOffset < 20 * pi) {
    			// Slow down if we are at least one prepare ahead
    			return ProposalAction.SLOW;
    		} else {
    			// Stop if we are more than 20 prepares ahead
    			return ProposalAction.STOP;
    		}
    	}
    
    	private static class GroupData {
    		LinkedList<Long> prepareNum;
    		LinkedList<Long> prepareTime;
    		long highestPrepare;
    		long highestPrepareArrivalTime;
    		ArrayList<Integer> pingTimes;
    	}
    
    }