You can now see your quota consumption at
If your GitLab username differs from your IDM username, please write an e-mail to in order to get it changed.

Commit b5016fc8 authored by Christian Dietrich's avatar Christian Dietrich

generator/SSE: alternative SSE with APP-FSM

The symbolic system execution is now able to enumerate all possible
system states with two different approaches. Either we can use the "old"
version which uses states with blocks to be executed in the next step,
like it is described in LCTES'15 (Dietrich et. al).

Or we can use the application finite state machines as basis. These
state machines are smaller (lesser nodes) than the application's
CFG. They are based on the task's ICFG.

Change-Id: I05e050ed3f39bd18ecf86b4d7bb423b4ce0825e0
parent 2f69ade0
......@@ -7,6 +7,11 @@ DOSEK_BINARY(
# This has to be done, since FSM is currently build with the APP-FSM
# method and not the CFG->GCFG method for SSE.
config_valid(VALID --systemcalls normal)
NAME bcc1_task1a_sse
SYSTEM_DESC system.oil
......@@ -15,7 +20,8 @@ DOSEK_BINARY(
GENERATOR_ARGS -fgen-asserts -fsse
NAME bcc1_task1b
from .common import *
from .Analysis import Analysis
from .AtomicBasicBlock import E, S
from .AtomicBasicBlock import E, S, AtomicBasicBlock
from .Function import Function
from datastructures.fsm import *
from collections import defaultdict
......@@ -32,7 +32,7 @@ class ApplicationFSM(Analysis, GraphObject):
for subtask in self.system_graph.subtasks:
wrapper = GraphObjectContainer(str(subtask), color='red')
nodes = {}
for trans in subtask.conf.fsm.transitions:
for trans in subtask.fsm.transitions:
if not trans.source in nodes:
nodes[trans.source] = Node(None, str(trans.source), color='black')
if not in nodes:
......@@ -54,7 +54,12 @@ class ApplicationFSM(Analysis, GraphObject):
def block_functor(in_edge, cur_abb):
transitions = []
# ICFG Entry transition
# Here we do the following. Each Edge becomes a state in our finite state machine.
# So from
# ---[in_edge]--> [] --[out_edge]-->
# we make
# [in_edge] --[ == ABB]--> [out_edge]
# Entry and exit nodes are handleded differently
if cur_abb == subtask.entry_abb:
in_edge = None
out_edge = cur_abb.get_outgoing_edges(self.get_edge_filter())[0]
......@@ -63,6 +68,12 @@ class ApplicationFSM(Analysis, GraphObject):
for out_edge in cur_abb.get_outgoing_edges(self.get_edge_filter()):
# Every Edge becomes a state, every state becomes an edge
transitions.append(Transition(in_edge, out_edge,
# Exit nodes are different
if cur_abb == subtask.exit_abb:
out_edge = None
# Every Edge becomes a state, every state becomes an edge
transitions.append(Transition(in_edge, -1, None))
event = Event(cur_abb, transitions)
......@@ -76,7 +87,7 @@ class ApplicationFSM(Analysis, GraphObject):
ret.initial_state = None
return ret
def __epsilon_elimination(self, fsm):
def epsilon_elimination(self, fsm):
"""The epsilon elimination pass makes every computation
event/transition to an epsilon transition. With the
standard FSM epslion elimination, we reduce the FSM
......@@ -91,7 +102,7 @@ class ApplicationFSM(Analysis, GraphObject):
# epsilon set is the set of states, that can be reached from
# one state by using only epsilon transitions. In our case,
# all computation blocks are epsilon transitions.
def is_epsilon_transition(transition):
def is_epsilon_transition(trans):
return trans.event.isA(S.computation)
changed = True
while changed:
......@@ -138,8 +149,125 @@ class ApplicationFSM(Analysis, GraphObject):
app_fsm = self.__to_fsm(subtask)
app_fsm = self.__epsilon_elimination(app_fsm)
app_fsm = self.epsilon_elimination(app_fsm)
subtask.fsm = app_fsm
subtask.ApplicationFSMIterator = ApplicationFSMIterator.create
# Rewrite AlarmSubtask
if self.system_graph.AlarmHandlerSubtask:
fsm = self.system_graph.AlarmHandlerSubtask.fsm
for E in
if not[S.CheckAlarm, S.iret]):
old_states = set([T.source for T in E.transitions])
new_state = [T.source for T in E.transitions][0]
def renamer(state):
if state in old_states:
return new_state
return state
fsm.rename(states = renamer)
tgt = {(T.source, T for T in E.transitions}
E.transitions = tgt.values()
for trans in self.system_graph.idle_subtask.fsm.transitions:
if trans.event.isA(S.Idle): = trans.source
fsm_iter_cache = {}
class ApplicationFSMIterator:
"""The Application FSM Iterator is a wrapper for APP-FSM states to be
used by the symbolic system execution. It has a similar
interface to the AtomicBasicBlock.
def create(subtask, state):
if not (subtask, state) in fsm_iter_cache:
it = ApplicationFSMIterator()
it.__create__(subtask, state)
fsm_iter_cache[(subtask, state)] = it
return fsm_iter_cache[(subtask, state)]
def __init__(self):
def __create__(self, subtask, state):
self.subtask = subtask
self.__state = state
self.dynamic_priority = self.__consistent(
lambda x: x.dynamic_priority
computations = self.__originating_blocks(True)
if computations:
self.interrupt_block_all = self.__consistent(
lambda x: x.interrupt_block_all
self.interrupt_block_os = self.__consistent(
lambda x: x.interrupt_block_os
self.possible_systemcalls = set()
for trans in self.subtask.fsm.get_outgoing_transitions(self.__state):
if self.__state != self.subtask.fsm.initial_state \
and self.subtask.is_user_thread():
# Besides the initial state, we give back an additional
# computation block as possible system-call.
edges = self.subtask.fsm.state_mapping[self.__state]
computations = [x.event for x in edges if x.event.isA(S.computation)]
abb = AtomicBasicBlock(self.subtask.system_graph)
abb.make_it_a_syscall(S.CheckIRQ, computations)
self.possible_systemcalls = list(self.possible_systemcalls)
def __originating_blocks(self, only_computation = False):
blocks = [x.event for x in self.subtask.fsm.state_mapping[self.__state]]
if only_computation:
blocks = [x for x in blocks if x.isA(S.computation)]
return blocks
def __consistent(self, seq, mapper=lambda x:x):
ret = [mapper(x) for x in seq]
assert len(ret) > 0, "To have a consistent value, at least one value must be present"
assert all([ret[0] == ret[i] for i in range(1, len(ret))]), "Inconsistent %s" % seq
return ret[0]
def __repr__(self):
return "<It %s:%s prio:%d>" %(, self.__state, self.dynamic_priority)
return "<It %s:%s prio>" %(, self.__state)
def isA(self, cls):
return self.__consistent(
lambda x: x.isA(cls)
def path(self):
return "%s/%d" % (self.subtask, self.__state)
def abb_id(self):
return self.__state
def get_blocks(self):
return self.__originating_blocks()
def next_iterators(self, event):
ret = []
for trans in self.subtask.fsm.get_outgoing_transitions(self.__state):
if trans.event == event:
# No computation block should ever arrive here as event
assert not event.isA(S.computation)
return ret
subtask.conf.fsm = app_fsm
......@@ -102,8 +102,10 @@ class DynamicPriorityAnalysis(Analysis):
# Blocks within a non-preemtable subtask cannot be
# rescheduled (except for ISR2)
if not abb.function.subtask.conf.preemptable and \
not abb.isA(S.kickoff):
not abb.isA(S.kickoff) and \
not abb.subtask.conf.is_isr:
dynamic_priority = RES_SCHEDULER.conf.static_priority
# Each abb has a dynamic priority
abb.dynamic_priority = dynamic_priority
......@@ -112,7 +114,7 @@ class DynamicPriorityAnalysis(Analysis):
# StartOS), the priority is the idle priority.
for syscall in self.system_graph.syscalls:
precessors = syscall.get_incoming_nodes(E.task_level)
if syscall.isA(S.kickoff) or syscall.isA(S.WaitEvent):
if syscall.isA(S.kickoff):
syscall.dynamic_priority = syscall.function.subtask.static_priority
elif len(precessors) == 1:
syscall.dynamic_priority = precessors[0].dynamic_priority
......@@ -19,45 +19,51 @@ class PreciseSystemState():
__slots__ = ["frozen", "states", "call_stack", "continuations",
"current_abb", "__hash", "__next_states", "__prev_states", "events"]
def __init__(self, system_graph):
def __init__(self, system_graph, init = True):
PreciseSystemState.system_graph = system_graph
self.frozen = False
self.__hash = None
self.__next_states = []
self.__prev_states = []
if not init:
size = len(self.system_graph.subtasks)
self.states = [0] * size
self.continuations = [None] * size
self.current_abb = None
self.__hash = None
self.__next_states = []
self.__prev_states = [] = [0] * size
self.call_stack = []
for i in range(0, size):
def add_cfg_edge(self, block, ty):
self.__next_states.append((ty, block))
block.__prev_states.append((ty, self))
def new(self):
return PreciseSystemState(self.system_graph)
def add_cfg_edge(self, block, ty, label = None):
self.__next_states.append((ty, block, label))
block.__prev_states.append((ty, self, label))
def remove_cfg_edge(self, block, ty):
for i, elem in enumerate(self.__next_states):
if elem == (ty, block):
if tuple(elem[0:2]) == (ty, block):
del self.__next_states[i]
for i, elem in enumerate(block.__prev_states):
if elem == (ty, self):
if tuple(elem[0:2]) == (ty, self):
del block.__prev_states[i]
def get_outgoing_nodes(self, ty):
return [block for Ty,block in self.__next_states if Ty == ty]
return [block for Ty,block,label in self.__next_states if Ty == ty]
def get_incoming_nodes(self, ty):
return [block for Ty,block in self.__prev_states if Ty == ty]
return [block for Ty,block,label in self.__prev_states if Ty == ty]
def new(self):
return PreciseSystemState(self.system_graph)
def get_outgoing_edges(self, ty):
return [(block, label) for Ty,block,label in self.__next_states if Ty == ty]
def copy_if_needed(self, multiple=None):
if self.frozen:
......@@ -74,17 +80,18 @@ class PreciseSystemState():
# Increase the copy counter
SystemState.copy_count += 1
state =
# For faster copying, we use the do-not-init constructor and
# do everything ourselves here.
state = PreciseSystemState(self.system_graph, init = False)
state.current_abb = self.current_abb
def copyto(dst, src):
dst[:] = src
copyto(state.states, self.states)
copyto(state.continuations, self.continuations)
# Shallow copies
state.states = self.states[:]
state.continuations = self.continuations[:] =[:]
state.call_stack = []
for subtask_id in range(0, len(self.states)):
state.call_stack[subtask_id] = list(self.call_stack[subtask_id])
return state
......@@ -116,6 +123,9 @@ class PreciseSystemState():
def is_surely_ready(self, subtask):
return self.states[subtask.subtask_id] == self.READY
def is_surely_waiting(self, subtask):
return self.states[subtask.subtask_id] == self.WAITING
def is_maybe_ready(self, subtask):
return self.READY & self.states[subtask.subtask_id]
......@@ -129,9 +139,13 @@ class PreciseSystemState():
def get_continuations(self, subtask):
if type(subtask) == int:
return set([self.continuations[subtask]])
return set([self.continuations[subtask.subtask_id]])
def get_continuation(self, subtask):
if type(subtask) == int:
return self.continuations[subtask]
return self.continuations[subtask.subtask_id]
def set_continuation(self, subtask, abb):
assert not self.frozen
self.continuations[subtask.subtask_id] = abb
......@@ -160,36 +174,72 @@ class PreciseSystemState():
return self.call_stack[subtask.subtask_id]
# Events
def __events(self, subtask, set_events = None, set_block_list = None):
events =[subtask.subtask_id] & 0xffff
block_list = ([subtask.subtask_id] >> 16) & 0xffff
if not set_events is None:
assert (set_events & 0xffff == set_events)[subtask.subtask_id] = (set_events | (block_list << 16))
events = set_events
if not set_block_list is None:
assert (set_block_list & 0xffff == set_block_list)[subtask.subtask_id] = (events | (set_block_list << 16))
block_list = set_block_list
return (events, block_list)
def get_events(self, subtask):
"""Return a tuple (None, set)"""
events =[subtask.subtask_id]
events, _ = self.__events(subtask)
return (subtask.event_mask_valid ^ events, events)
def set_events(self, subtask, event_list):
mask = Event.combine_event_masks(event_list)[subtask.subtask_id] |= mask
self.__events(subtask, set_events = mask)
def clear_events(self, subtask, event_list):
mask = Event.combine_event_masks(event_list)[subtask.subtask_id] &= ~mask
events, bl = self.__events(subtask)
events &= ~mask
assert bl == 0
self.__events(subtask, set_events = events)
def maybe_waiting(self, subtask, event_list):
"""Returns True, if the task may block be waiting at this point"""
def clear_block_list(self, subtask):
self.__events(subtask, set_block_list = 0)
def set_block_list(self, subtask, event_list):
mask = Event.combine_event_masks(event_list)
if ([subtask.subtask_id] & mask) == 0:
self.__events(subtask, set_block_list = mask)
def get_block_list(self, subtask):
events, bl = self.__events(subtask)
return bl
def maybe_waiting(self, subtask):
"""Returns True, if the task may block be waiting at this point"""
events, block_list = self.__events(subtask)
if (block_list & events) == 0:
return True
return False
def maybe_not_waiting(self, subtask, event_list):
def maybe_not_waiting(self, subtask):
"""Returns True, if the task may be continue without blocking"""
# In the precise system state, this is the exact opposite
return not self.maybe_waiting(subtask, event_list)
return not self.maybe_waiting(subtask)
def is_event_set(self, subtask, event):
return ([subtask.subtask_id] & event.event_mask) != 0
events, block_list = self.__events(subtask)
return (events & event.event_mask) != 0
def is_event_waiting(self, subtask, event):
events, block_list = self.__events(subtask)
return (block_list & event.event_mask) != 0
def is_event_cleared(self, subtask, event):
return ([subtask.subtask_id] & event.event_mask) == 0
events, block_list = self.__events(subtask)
return (events & event.event_mask) == 0
## Continuation accesors
def was_surely_not_kickoffed(self, subtask):
......@@ -235,7 +285,7 @@ class PreciseSystemState():
def current_subtask(self):
return self.current_abb.function.subtask
return self.current_abb.subtask
def merge_with(self, other):
"""Returns a newly created state that is the result of the merge of
......@@ -303,6 +353,7 @@ class PreciseSystemState():
for event in
SET = self.is_event_set(subtask, event)
CLEARED = self.is_event_cleared(subtask, event)
WAITING = self.is_event_waiting(subtask, event)
ret.append("%s:*" %
elif SET:
......@@ -310,6 +361,8 @@ class PreciseSystemState():
assert CLEARED
ret.append("%s:0" %
return " ".join(ret)
import logging
from .Analysis import Analysis
from .Subtask import Subtask
......@@ -34,6 +35,7 @@ class PrioritySpreadingPass(Analysis):
p[0] = prio
p[1].conf.static_priority = prio
self.prio_to_participant[prio] = p[1]
logging.debug("Prio %d: %s", prio, p[1])
prio += 1
assert participants[0][1] == self.system_graph.get(Subtask, "Idle")
......@@ -41,13 +41,18 @@ class SporadicEvent:
assert state.is_surely_suspended(self.handler), (state, self.handler)
# Save current IP
current_subtask = state.current_abb.function.subtask
current_subtask = state.current_abb.subtask
copy_state.set_continuation(current_subtask, state.current_abb)
# Dispatch to Event Handler
copy_state.set_continuation(self.handler, self.handler.entry_abb)
entry_abb = self.handler.entry_abb
# If we use the APP-FSM method to do the SSE, then we have
# this field available in the handler (== ISR Subtask)
if hasattr(self.handler, "ApplicationFSMIterator"):
entry_abb = self.handler.ApplicationFSMIterator(self.handler, self.handler.fsm.initial_state)
copy_state.set_continuation(self.handler, entry_abb)
copy_state.current_abb = self.handler.entry_abb
copy_state.current_abb = entry_abb
return copy_state
......@@ -18,6 +18,8 @@ class Subtask(Function, SystemObject):
self._events = {}
# A mask that is the combination of all event masks
self.event_mask_valid = 0
# Field is used by ApplicationFSM
self.fsm = None
def find(self, cls, name):
if issubclass(cls, Event):
......@@ -23,11 +23,13 @@ class SymbolicSystemExecution(Analysis, GraphObject):
pass_alias = "sse"
def __init__(self):
def __init__(self, use_app_fsm = False):
GraphObject.__init__(self, "SymbolicSystemState", root = True)
self.states = None
self.states_by_abb = None
# Should the application fsm pass be used as a basis
self.use_app_fsm = use_app_fsm
# A member for the RunningTask analysis
self.running_task = None
......@@ -42,6 +44,8 @@ class SymbolicSystemExecution(Analysis, GraphObject):
def requires(self):
# We require all possible system edges to be contructed
if self.use_app_fsm:
return ["ApplicationFSM"]
return ["DynamicPriorityAnalysis", "InterruptControlAnalysis"]
def get_edge_filter(self):
......@@ -62,22 +66,22 @@ class SymbolicSystemExecution(Analysis, GraphObject):
M[state] = wrapper
cont = GraphObjectContainer(str(abb), color="red",
subobjects=subobjs) = {"ABB": str(abb), "func":} = {"ABB": abb.path()} = abb
for state, wrapper in M.items():
for next_state in state.get_outgoing_nodes(StateTransition):
cont.edges.append(Edge(wrapper, M[next_state], color="red"))
for next_state in state.get_outgoing_nodes(SavedStateTransition):
cont.edges.append(Edge(wrapper, M[next_state], color="green"))
for next_state, label in state.get_outgoing_edges(SavedStateTransition):
cont.edges.append(Edge(wrapper, M[next_state], color="green", label=label))
ret = list(sorted(ret, key=lambda x:
return ret
def state_transition(self, source_block, source_state, target_block, target_state):
def state_transition(self, syscall, source_block, source_state, target_block, target_state):
# print(source_block.path(),"->", target_block.path())
# Do not allow self loops
#if source_state == target_state:
......@@ -88,26 +92,40 @@ class SymbolicSystemExecution(Analysis, GraphObject):
assert False
if target_state in self.states:
real_instance = self.states[target_state]
source_state.add_cfg_edge(real_instance, StateTransition)
source_state.add_cfg_edge(real_instance, StateTransition, label=syscall)
self.working_stack.push((source_state, target_state))
self.working_stack.push((syscall, source_state, target_state))
def state_functor(self, state):
# Do the System Call
after_states = self.system_call_semantic.do_SystemCall(
state.current_abb, state, self.transitions)
def state_functor__(state_ptr):
after_states = self.system_call_semantic.do_SystemCall(
state_ptr.current_abb, state_ptr, self.transitions)
# Schedule on each possible block
for syscall, next_state in after_states:
edges = list(self.system_call_semantic.schedule(state_ptr.current_abb, next_state.copy()))
for (source, target, new_state) in edges:
assert new_state.frozen
# Re-step system state, if syscall is None. (Happens in CheckAlarm for use_app_fsm)
if syscall == None:
for (syscall, _, target, new_state) in state_functor__(new_state):
yield (syscall, source, target, new_state)
yield (syscall, source, target, new_state)
# Schedule on each possible block
for next_state in after_states:
self.system_call_semantic.schedule(state.current_abb, next_state,
lambda source, target, new_state: \
self.state_transition(source, state, target, new_state))
for (syscall, source, target, new_state) in state_functor__(state):
# otherwise: Add state transition
self.state_transition(syscall, source