diff --git a/src/visualization/abstraction_domain.py b/src/visualization/abstraction_domain.py new file mode 100644 index 0000000000000000000000000000000000000000..2252e43a326de1e8dd719711c8dade42a51f0b7d --- /dev/null +++ b/src/visualization/abstraction_domain.py @@ -0,0 +1,168 @@ +## This file is part of the simulative evaluation for the qronos observer abstractions. +## Copyright (C) 2022-2023 Tim Rheinfels <tim.rheinfels@fau.de> +## See https://gitlab.cs.fau.de/qronos-state-abstractions/simulation +## +## Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: +## +## 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. +## +## 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. +## +## 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. +## +## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +### +### @file visualization/abstraction_domain.py +### +### @brief Provides the abstraction domain visualization used throughout @cite ECRTS23-Rheinfels +### +### @author Tim Rheinfels <tim.rheinfels@fau.de> +### + +import matplotlib.pyplot as plot +import numpy as np + +from simulation.closed_loop import ClosedLoopSimulation +from simulation.disturbance import DisturbanceSimulation +from simulation.switching_sequence import StateAbstractionSwitchingSequenceSimulation +from visualization.color import scale_lightness + +### +### @brief Visualizes the results from a @ref closed_loop.ClosedLoopSimulation in the abstraction domain +### +### @param ax The matplotlib.Axes object to render to +### @param experiment The @ref closed_loop.ClosedLoopSimulation to visualize +### @param which_run The index of the run to visualize +### @param show_s Show safety outputs under the S-norm (safety norm) +### @param show_x_p Show system state under the P-norm (analysis norm) +### @param show_decisions Show markers according to the switching sequence +### @param show_cost Show averaged cost (use only for two modes) +### @param show_disturbance_area Mark the area where the disturbance occurs +### @param abstraction_kwargs kwargs passed to the rendering of the state abstraction +### +### @returns Tuple containing +### - The list of legend handles, +### - the list of legend labels, +### - the list of axes generated (split axes if @p show_cost is used) +### +def visualize_abstraction_domain(ax, experiment, which_run=0, show_s=False, show_x_p=False, show_decisions=False, show_cost=False, show_disturbance_area=False, abstraction_kwargs={}): + assert(isinstance(experiment, ClosedLoopSimulation)) + + run = experiment.get_run(which_run) + + # Get data from experiment + state_abstraction = experiment.state_abstraction_simulation.abstraction + system_model = experiment.system_model_simulation.system_model + + # Extract signals, match lengths, and convert to numpy arrays + t = np.array(run['sigma']['time_data']) + timesteps = t.size + sigma = np.concatenate(run['sigma']['signal_data'][:timesteps]) + x = np.stack(run['x']['signal_data'][:timesteps], axis=1) + s = np.stack(run['s']['signal_data'][:timesteps], axis=1) + v = np.array(run['v']['signal_data'][:timesteps]) + v_sys = np.concatenate(run['v_sys']['signal_data'][:timesteps]) + + # Plot specification + assert(np.isclose(state_abstraction.v_max, 1.0)) + ax.plot((0, timesteps-1), (1, 1), color='C1', linewidth=2, label=r'Safety Specification: $v_{max}$') + + # Plot area where disturbance occurs if requested + if show_disturbance_area: + k_start = max(experiment.disturbance_simulation.T_start, 0) + k_end = min(experiment.disturbance_simulation.T_end, timesteps-1) + if experiment.disturbance_simulation.strategy == DisturbanceSimulation.Strategy.DrawFromVolume: + color = scale_lightness('C3', 0.4) + label = r'Scenario: Disturbance in $d_k$ and $z_k$' + elif experiment.disturbance_simulation.strategy == DisturbanceSimulation.Strategy.WorstCaseMonteCarlo: + color = scale_lightness('C3', 1) + label = r'Scenario: Disturbance in $d_k$ and $z_k$' + else: + assert(False) + ax.axvspan(k_start, k_end, color=color, alpha=0.2, label=label) + + # Set default values for kwargs + if 'color' not in abstraction_kwargs: + abstraction_kwargs['color'] = 'C0' + if 'linestyle' not in abstraction_kwargs: + abstraction_kwargs['linestyle'] = '--' + if 'linewidth' not in abstraction_kwargs: + abstraction_kwargs['linewidth'] = 2 + if 'label' not in abstraction_kwargs: + abstraction_kwargs['label'] = r'Observer Abstraction: $v_k$' + + # Plot abstraction + ax.plot(t, v, **abstraction_kwargs) + + # Compute and plot state under P-norm if requested + if show_x_p: + x_p = np.zeros(timesteps) + for i in range(timesteps): + x_p[i] = state_abstraction.E_P.norm(x[:, i]) + ax.plot(t, x_p, color='k', linestyle='-.', linewidth=1, label=r'Analysis Norm: $||x_k||_P$') + + # Plot system safety output scaled to be comparable with the abstraction + ax.plot(t, state_abstraction.v_max * v_sys, color='k', linestyle=':', linewidth=2, label=r'Safety Norm: $v_{max} \cdot ||s_k||_S$') + # Compute and plot safety space + if show_s: + S_sqrt_T = np.linalg.inv(system_model.E_S.Pi_sqrt) + for i in range(system_model.n_s): + s_i = np.zeros(timesteps) + label = (i == 0) and (r'Safety Outputs: $s_k$') or None + for j in range(timesteps): + s_i[j] = state_abstraction.v_max * S_sqrt_T[i, i] * s[i, j] + ax.plot(t, s_i, color='gray', linestyle='-', linewidth=1, label=label) + + # Plot decisions and their boundaries + if show_decisions: + colors = ('C2', 'C4', 'C6') + markers = ('o', 's', 'p') + for j, mode in enumerate(state_abstraction.modes): + if j in range(experiment.system_model_simulation.system_model.n_Sigma): + idx = np.where(sigma == j) + if isinstance(experiment.switching_sequence_simulation, StateAbstractionSwitchingSequenceSimulation): + draw_onto = v[idx] + else: + draw_onto = np.zeros(t[idx].shape) + ax.scatter(t[idx], draw_onto, c=colors[j], marker=markers[j], s=40, linewidths=1, edgecolor='k', zorder=10, label=r'Mode: $\sigma_k = %d$' % j) + + if isinstance(experiment.switching_sequence_simulation, StateAbstractionSwitchingSequenceSimulation): + v_bar = experiment.switching_sequence_simulation.v_bar[j] + ax.plot((0, timesteps-1), (v_bar, v_bar), color=colors[j], linewidth=4, linestyle=':', label=r'Decision Boundary: $\bar v(%d)$' % j) + + if show_cost: + N_window = 31 + padding = int((N_window-1)/2) + ax2 = ax.twinx() + kernel = np.ones(N_window)/float(N_window) + sigma_boxcar = np.convolve(sigma.astype(float), kernel, mode='valid') + t_boxcar = t[padding:-padding] + ax2.plot(t_boxcar, 1.0-sigma_boxcar, color='gray', linewidth=1, label=r'Average Cost: $c_k$') + + # Some styling + ax.set_xlabel('Time Step $k$') + ax.set_ylabel('Abstraction Domain') + ax.set_ylim([-0.05, 1.05]) + ax.grid() + if show_cost: + ax2.set_ylabel('Average Cost') + ax2.yaxis.label.set_color('gray') + [t.set_color('gray') for t in ax2.yaxis.get_ticklines()] + [t.set_color('gray') for t in ax2.yaxis.get_ticklabels()] + ax2.set_ylim([-0.05, 1.05]) + + if not show_cost: + ax.legend() + return *ax.get_legend_handles_labels(), (ax) + + # Combine legends from the twin axes + handles = [] + labels = [] + for ax_i in (ax, ax2): + handles_i, labels_i = ax_i.get_legend_handles_labels() + handles.extend(handles_i) + labels.extend(labels_i) + ax.legend(handles, labels, loc='right', ncol=2) + + return handles, labels, (ax, ax2)