"""Co-simulation system orchestration and management.
This module provides the ``System`` class, which is the central orchestrator
for heterogeneous co-simulation. It manages components, connections, execution
order computation, and delegates simulation stepping to pluggable algorithms.
Key Responsibilities:
- **Component Management**: Register and organize ``CoSimComponent`` instances
- **Connection Validation**: Type and unit compatibility checking between ports
- **Graph Analysis**: Build dependency graphs and detect algebraic loops
- **Execution Ordering**: Compute parallelizable generations using topological sort
- **Algorithm Delegation**: Step simulation using Jacobi, Gauss-Seidel, or Hybrid algorithms
- **Event Routing**: Dispatch events between components via event connections
- **History Aggregation**: Collect time-series data from all components
Typical Usage:
Building and running a co-simulation::
from syssimx import System, Connection
from syssimx.components import FMUComponent
# Create system
system = System(name="ControlledPendulum")
# Add components
pendulum = FMUComponent("Pendulum", fmu_path="Pendulum.fmu")
controller = FMUComponent("PID", fmu_path="Controller.fmu")
system.add_component(pendulum)
system.add_component(controller)
# Define connections
system.add_connection(Connection(
src_comp="Pendulum", src_port="angle",
dst_comp="PID", dst_port="measurement"
))
system.add_connection(Connection(
src_comp="PID", src_port="control",
dst_comp="Pendulum", dst_port="torque"
))
# Initialize and run
system.initialize(t0=0.0)
system.run(t0=0.0, tf=10.0, dt=0.001)
# Retrieve results
history = system.get_history()
See Also:
:class:`Connection`: Signal connections between component ports
:class:`EventConnection`: Event routing between components
:mod:`syssimx.system.algorithms`: Stepping algorithm implementations
:mod:`syssimx.system.graph`: Graph analysis utilities
"""
from __future__ import annotations
import logging
import timeit
from typing import Any
import networkx as nx
from ..core.base import CoSimComponent
from ..core.events import Event
from ..core.history import SystemHistory
from ..core.port import PortSpec
from . import graph
from .algorithms.base import Algorithm
from .algorithms.gauss_seidel import GaussSeidelAlgorithm
from .algorithms.hybrid import HybridAlgorithm
from .algorithms.ijcsa import solve_algebraic_scc_ijcsa
from .connection import Connection, EventConnection
_ports_compatible = PortSpec.compatible
logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------
# System Class
# ----------------------------------------------------------------------------
[docs]
class System:
"""Central orchestrator for heterogeneous co-simulation systems.
The ``System`` class manages a collection of interconnected
``CoSimComponent`` instances, validates their connections, computes
execution order, and coordinates simulation stepping through a
pluggable algorithm.
Attributes:
name (str): Identifier for this system.
components (dict[str, CoSimComponent]): Registered components by name.
connections (list[Connection]): Signal connections between components.
event_connections (list[EventConnection]): Event routing connections.
graph (nx.MultiDiGraph): Complete connection graph (including delayed).
groups (dict[str, list[CoSimComponent]]): Components organized by group.
execution_order (list[list[str]]): Topologically sorted generations.
Each generation contains components that can execute in parallel.
algebraic_loops (list[list[str]]): Detected algebraic loops (SCCs > 1).
algorithm (Algorithm): Stepping algorithm (Gauss-Seidel, Jacobi, Hybrid).
history (SystemHistory): Aggregated time-series data from all components.
is_initialized (bool): Whether ``initialize()`` has been called.
t (float): Current simulation time.
Example:
Creating a simple two-component system::
>>> system = System("Feedback")
>>> system.add_component(plant)
>>> system.add_component(controller)
>>> system.add_connection(Connection(
>>> src_comp="plant", src_port="y",
>>> dst_comp="controller", dst_port="measurement"
>>> ))
>>> system.initialize(t0=0.0)
>>> system.run(t0=0.0, tf=10.0, dt=0.01)
See Also:
:class:`Connection`: Signal connection specification
:class:`Algorithm`: Base class for stepping algorithms
"""
[docs]
def __init__(self, name: str):
"""Initialize a new co-simulation system.
Args:
name: Identifier for this system. Used in logging and
history tracking.
Example:
>>> system = System("ControlledPendulum")
>>> system.name
'ControlledPendulum'
"""
self.name = name
self.components: dict[str, CoSimComponent] = {}
self.connections: list[Connection] = []
self.event_connections: list[EventConnection] = []
self.graph = nx.MultiDiGraph() # All connections (including delayed)
self._dag = nx.DiGraph() # Zero-delay connections only
self.groups: dict[str, list[CoSimComponent]] = {}
self.execution_order: list[list[str]] = [] # [ [comp names in gen0], [gen1], ...]
self.execution_idx: dict[str, int] = {} # comp name -> gen index
# Algebraic loop diagnostics (SCCs with size > 1 on the direct feed-through graph)
self.algebraic_loops: list[list[str]] = [] # Detected algebraic loops (if any)
self._scc_index: dict[str, int] = {} # comp name -> scc index
# Pre-computed connection lookups
self._incoming_by_dst: dict[str, list[Connection]] = {}
self._input_sources: dict[str, dict[str, Connection]] = {}
self._event_targets_by_source: dict[tuple[str, str], list[str]] = {}
# Algorithm
self.algorithm: Algorithm = GaussSeidelAlgorithm()
# History Management
self.history = SystemHistory(system_name=name)
self.is_initialized: bool = False
self.t = 0.0
# ----------------------------------------------------------------------------
# Algorithm
# ----------------------------------------------------------------------------
[docs]
def set_algorithm(self, algorithm: Algorithm) -> None:
"""Set the co-simulation stepping algorithm.
The algorithm determines how components are stepped during
simulation. Available algorithms include:
- ``GaussSeidelAlgorithm``: Sequential stepping (default)
- ``JacobiAlgorithm``: Parallel stepping with delayed inputs
- ``HybridAlgorithm``: Event-driven with bisection localization
Args:
algorithm: An instance implementing the ``Algorithm`` interface.
Raises:
TypeError: If ``algorithm`` doesn't implement ``Algorithm``.
Example:
>>> from syssimx.system.algorithms import JacobiAlgorithm
>>> system.set_algorithm(JacobiAlgorithm())
Note:
If components with event indicators are detected during
``initialize()``, the algorithm is automatically upgraded
to ``HybridAlgorithm``.
"""
if not isinstance(algorithm, Algorithm):
raise TypeError("algorithm must implement Algorithm")
self.algorithm = algorithm
# ----------------------------------------------------------------------------
# Register components
# ----------------------------------------------------------------------------
[docs]
def add_component(self, component: CoSimComponent):
"""Register a component with the system.
Components must be added before connections can reference them.
Each component's history is automatically registered with the
system's history tracker.
Args:
component: A ``CoSimComponent`` instance to add. Its ``name``
attribute must be unique within this system.
Raises:
RuntimeError: If called after ``initialize()``.
ValueError: If a component with the same name already exists.
Example:
>>> pendulum = FMUComponent("Pendulum", fmu_path="model.fmu")
>>> system.add_component(pendulum)
>>> "Pendulum" in system.components
True
Note:
If the component has a ``group`` attribute set, it will be
added to ``system.groups[group]`` for visual organization.
"""
if self.is_initialized:
raise RuntimeError("Cannot add components after system initialization.")
if component.name in self.components:
raise ValueError(f"Component '{component.name}' already in system.")
self.components[component.name] = component
# Register component history
self.history.add_component(component.name, component.history)
if component.group:
self.groups.setdefault(component.group, []).append(component)
# ----------------------------------------------------------------------------
# Connections
# ----------------------------------------------------------------------------
[docs]
def _validate_connection(self, c: Connection) -> None:
# 1) existence
if c.src_comp not in self.components or c.dst_comp not in self.components:
raise ValueError(
f"Both '{c.src_comp}' and '{c.dst_comp}' must be added to the system before connecting them."
)
src = self.components[c.src_comp]
dst = self.components[c.dst_comp]
# 2) port existence
if c.src_port not in src.output_specs:
raise KeyError(
f"Source port '{c.src_port}' is not an OUTPUT port of component '{src.name}'."
)
if c.dst_port not in dst.input_specs:
raise KeyError(
f"Destination port '{c.dst_port}' is not an INPUT port of component '{dst.name}'."
)
src_ps: PortSpec = src.output_specs[c.src_port]
dst_ps: PortSpec = dst.input_specs[c.dst_port]
# 3) type and unit compatibility
if not _ports_compatible(src_ps, dst_ps):
src_unit = src_ps.unit if src_ps.unit else "unitless"
dst_unit = dst_ps.unit if dst_ps.unit else "unitless"
raise TypeError(
f"Port incompatibility: {c.src_comp}.{c.src_port} ({src_ps.type}, {src_unit}) "
f"-> {c.dst_comp}.{c.dst_port} ({dst_ps.type}, {dst_unit})"
)
# 5) duplicate and single-assignment checks
for existing in self.connections:
same_source = (
existing.src_comp == c.src_comp
and existing.src_port == c.src_port
)
same_destination = (
existing.dst_comp == c.dst_comp
and existing.dst_port == c.dst_port
)
if same_source and same_destination:
raise ValueError(
f"Duplicate connection: {c.src_comp}.{c.src_port} -> {c.dst_comp}.{c.dst_port}"
)
if same_destination:
raise ValueError(
f"Input port already connected: {c.dst_comp}.{c.dst_port} is already driven by "
f"{existing.src_comp}.{existing.src_port}; cannot also connect "
f"{c.src_comp}.{c.src_port}."
)
[docs]
def add_connection(self, connection: Connection) -> None:
"""Add a signal connection between two components.
Validates port existence, type compatibility, and unit
compatibility before registering the connection.
Args:
connection: A ``Connection`` specifying source and destination
component/port pairs.
Raises:
ValueError: If source or destination component not in system,
or if connection is a duplicate.
KeyError: If specified ports don't exist on the components.
TypeError: If port types or units are incompatible.
Example:
>>> system.add_connection(Connection(
... src_comp="Sensor", src_port="value",
... dst_comp="Controller", dst_port="measurement"
... ))
See Also:
:class:`Connection`: Connection specification class
:meth:`add_event_connection`: For event-based connections
"""
self._validate_connection(connection)
self.connections.append(connection)
self._incoming_by_dst.clear()
self._input_sources.clear()
# ----------------------------------------------------------------------------
# Event Connections
# ----------------------------------------------------------------------------
[docs]
def _validate_event_connection(self, connection: EventConnection) -> None:
"""Validate an event connection before registration.
Performs the following checks:
1. Source component exists in the system
2. Target component exists in the system
3. Event indicator is registered on the source component
4. No duplicate connections exist
Args:
connection: The ``EventConnection`` to validate.
Raises:
ValueError: If source or target component not in system,
or if connection is a duplicate.
KeyError: If the event indicator is not registered on the
source component.
Note:
Event subscription on the target is handled automatically
by ``add_event_connection()``, not during validation.
"""
if connection.src_comp not in self.components:
raise ValueError(f"Event source '{connection.src_comp}' is not in the system.")
if connection.dst_comp not in self.components:
raise ValueError(f"Event target '{connection.dst_comp}' is not in the system.")
source = self.components[connection.src_comp]
event_name = connection.src_port
# Check event indicator exists on source
if event_name not in source.event_indicators:
raise KeyError(
f"Event indicator '{event_name}' not found on source component '{source.name}'. "
f"Register it via: source.add_event_indicator(name='{event_name}', func=..., direction=...)"
)
# Check for duplicate connections
for existing in self.event_connections:
if existing.key() == connection.key():
raise ValueError(
f"Duplicate event connection: {connection.src_comp}:{connection.event_name} -> {connection.target_comp}"
)
[docs]
def add_event_connection(self, connection: EventConnection) -> None:
"""Add an event connection with automatic target subscription.
Registers an event connection and automatically subscribes the
target component to receive the event. This eliminates the need
for manual ``Event`` object creation and ``subscribe_event()`` calls.
Args:
connection: ``EventConnection`` specifying the source component's
event indicator and the target component to notify.
Raises:
ValueError: If source/target not in system or duplicate connection.
KeyError: If event indicator not registered on source component.
Example:
>>> # Ball emits 'bounce' event, floor handles it
>>> system.add_event_connection(EventConnection(
... src_comp="Ball", event_name="bounce",
... dst_comp="Floor"
... ))
Note:
The target component must implement ``_handle_events_internal()``
to respond to the event when it fires.
See Also:
:class:`EventConnection`: Event connection specification
:meth:`dispatch_event`: Manual event dispatching
"""
self._validate_event_connection(connection)
# Auto-subscribe the target component to this event
source = self.components[connection.src_comp]
target = self.components[connection.dst_comp]
event_name = connection.event_name
# Get direction from event indicator
direction = source.event_indicators[event_name].direction
# Create Event object for subscription (source, name, direction)
event = Event(name=event_name, source=connection.src_comp, direction=direction)
# Subscribe target if not already subscribed
already_subscribed = any(
e.name == event_name and e.source == connection.src_comp
for e in target.event_subscriptions
)
if not already_subscribed:
target.event_subscriptions.append(event)
# Register the connection
self.event_connections.append(connection)
key = (connection.src_comp, connection.src_port)
self._event_targets_by_source.setdefault(key, []).append(connection.dst_comp)
[docs]
def get_event_targets(self, source_comp: str, event_name: str) -> list[str]:
"""Get component names that receive a specific event.
Args:
source_comp: Name of the component emitting the event.
event_name: Name of the event indicator.
Returns:
List of component names subscribed to this event.
Returns empty list if no subscribers.
"""
return self._event_targets_by_source.get((source_comp, event_name), []).copy()
[docs]
def dispatch_event(self, event: Event, t: float, notify: bool = True) -> list[str]:
"""Dispatch an event to all subscribed components.
Resolves event listeners based on registered event connections
and optionally notifies them by calling their ``handle_event()``
method.
Args:
event: The ``Event`` object containing source and event name.
t: Time at which the event occurred.
notify: If ``True``, calls ``handle_event()`` on each target.
If ``False``, only returns the target list without notifying.
Returns:
List of component names that are subscribed to this event.
Raises:
ValueError: If the event source is not in the system.
Example:
>>> event = Event(name="threshold", source="Sensor")
>>> targets = system.dispatch_event(event, t=1.5)
>>> print(targets)
['Controller', 'Logger']
"""
if event.source not in self.components:
raise ValueError(f"Event source '{event.source}' is not in the system.")
targets = self.get_event_targets(event.source, event.name)
if notify:
for comp_name in targets:
self.components[comp_name].handle_event([event.name], t)
return targets
# ----------------------------------------------------------------------------
# Graphs
# ----------------------------------------------------------------------------
[docs]
def build_graphs(self) -> None:
"""Build dependency graphs from registered connections.
Constructs two graph representations:
1. ``self.graph``: Complete ``MultiDiGraph`` with all connections,
including delayed connections (for visualization)
2. ``self._dag``: ``DiGraph`` with only zero-delay, direct-feedthrough
connections (for execution ordering)
Also detects algebraic loops as strongly connected components (SCCs)
with more than one node on the direct-feedthrough graph.
Note:
Called automatically during ``initialize()``. The detected
algebraic loops are stored in ``self.algebraic_loops``.
See Also:
:mod:`syssimx.system.graph`: Graph construction utilities
"""
graph.build_graphs(self)
[docs]
def compute_execution_order(self) -> None:
"""Compute parallelizable execution order from the dependency graph.
Performs a topological sort on the zero-delay dependency DAG to
produce generations of components. Components within the same
generation have no dependencies on each other and can execute
in parallel.
The result is stored in ``self.execution_order`` as a list of
lists, where each inner list contains component names in one
generation.
Example:
After calling::
system.execution_order = [
['Source', 'Reference'], # Generation 0
['Controller'], # Generation 1
['Plant'], # Generation 2
['Sensor'] # Generation 3
]
See Also:
:mod:`syssimx.system.graph`: Execution order computation
"""
graph.compute_execution_order(self)
[docs]
def classify_components(self) -> dict[str, list[CoSimComponent]]:
"""Classify components by their hybrid simulation capabilities.
Categorizes all components based on whether they have event
indicators, event subscriptions, or neither. This classification
is used to determine if hybrid simulation is needed.
Returns:
Dictionary with the following keys:
- ``"event_sources"``: Components with event indicators
(require rollback support for bisection)
- ``"event_listeners"``: Components subscribed to events
(will receive event notifications)
- ``"continuous_only"``: Components without any hybrid
capabilities (pure continuous dynamics)
Note:
A component can be both an event source and an event listener.
The categorization is stored in instance attributes
``self.event_sources``, ``self.event_listeners``, and
``self.continuous_only``.
"""
self.event_sources: list[CoSimComponent] = []
self.event_listeners: list[CoSimComponent] = []
self.continuous_only: list[CoSimComponent] = []
for comp in self.components.values():
if comp.has_state_events:
self.event_sources.append(comp)
if comp.has_event_subscriptions:
self.event_listeners.append(comp)
if not comp.has_state_events and not comp.has_event_subscriptions:
self.continuous_only.append(comp)
return {
"event_sources": self.event_sources,
"event_listeners": self.event_listeners,
"continuous_only": self.continuous_only,
}
# ----------------------------------------------------------------------------
# Simulation Lifecycle
# ----------------------------------------------------------------------------
[docs]
def initialize(self, t0: float) -> None:
"""Initialize the system and all components at start time.
Performs the complete initialization sequence:
1. Classifies components and auto-selects ``HybridAlgorithm``
if event sources are detected
2. Creates port state objects for all components so that
feedthrough detection and input propagation can work
3. Detects direct feedthrough for pure-Python components via
perturbation (FMU components already have this from their
model description)
4. Builds dependency graphs and computes execution order;
algebraic loops are identified from zero-delay edges
5. Iterates over generations in execution order:
a. Propagates initial input values from upstream outputs
into the generation's input port states
b. Initializes components (FMUs apply stored port values
via ``_apply_input_starts`` during init mode)
c. Solves algebraic loops within the generation
d. Performs a zero-step (dt=0) to establish consistent
initial outputs for downstream generations
This ordering ensures that each generation receives consistent
initial values from already-initialized upstream components
before entering FMU initialization mode.
Args:
t0: Initial simulation time in seconds.
Example:
>>> system.add_component(plant)
>>> system.add_component(controller)
>>> system.add_connection(connection)
>>> system.initialize(t0=0.0)
>>> system.is_initialized
True
Note:
Must be called after all components and connections are added,
but before ``run()``. Calling ``initialize()`` locks the system
against further component additions.
"""
self.t = t0
# 1) Classify components and auto-select hybrid algorithm
self.classify_components()
if self.event_sources:
self.algorithm = HybridAlgorithm()
# 2) Ensure all components have port states created so that
# _detect_direct_feedthrough() can call evaluate_outputs() and
# _set_inputs_for_generation can write to them before initialize().
# For FMU components ports already exist (created in __init__);
# _initialize_ports_from_specs skips already-existing ports.
for comp in self.components.values():
comp._initialize_ports_from_specs()
# 3) Detect direct feedthrough for components that haven't computed
# it yet. FMU components populate this from modelDescription.xml
# in __init__(); pure-Python components need perturbation-based
# detection here so that build_graphs() can correctly identify
# zero-delay edges and algebraic loops.
for comp in self.components.values():
if not comp.direct_feedthrough:
comp._detect_direct_feedthrough()
# 4) Build dependency graphs and compute execution order
self.build_graphs()
self.compute_execution_order()
# 5) Initialize generation by generation in execution order
for gen in self.execution_order:
# 5a) Propagate upstream outputs into this generation's input ports.
# Components are not yet initialized, so values are written
# directly to PortState objects (no FMU instance needed).
self._set_inputs_for_generation(gen, t0)
# 5b) Initialize components in this generation.
# For FMU components, _apply_input_starts() will push the
# port state values into the FMU during initialization mode.
for comp_name in gen:
self.components[comp_name].initialize(t0)
# 5c) Solve algebraic loops within this generation
gen_set = set(gen)
for loop in self.algebraic_loops:
if set(loop).issubset(gen_set):
solve_algebraic_scc_ijcsa(self, loop, t0)
# 5d) Zero-step to update outputs for downstream generations
for comp_name in gen:
self.components[comp_name].do_step(0, 0)
self.is_initialized = True
# ----------------------------------------------------------------------------
# Run System Simulation
# ----------------------------------------------------------------------------
[docs]
def run(self, t0: float, tf: float, dt: float):
"""Run the simulation from start time to end time.
Advances the simulation by repeatedly calling the algorithm's
``step()`` method with the specified time step size.
Args:
t0: Start time in seconds (should match ``initialize(t0)``).
tf: End time in seconds.
dt: Fixed time step size in seconds. The final step may be
smaller to land exactly on ``tf``.
Example:
>>> system.initialize(t0=0.0)
>>> system.run(t0=0.0, tf=10.0, dt=0.001)
>>> # Simulation complete, retrieve history
>>> history = system.get_history()
Note:
The algorithm may take sub-steps during event localization
(Hybrid algorithm) or iteration (IJCSA for algebraic loops).
The ``dt`` parameter controls the macro step size.
"""
timer = timeit.default_timer
logger.info(f"Starting simulation run from t={t0} to t={tf} with dt={dt}")
t = t0
self.t_end = tf
start_time = timer()
while t < tf - 1e-12:
dt = min(dt, tf - t)
self.algorithm.step(self, t, dt)
t += dt
end_time = timer()
logger.info(f"Simulation completed in {end_time - start_time:.2f} seconds")
# ----------------------------------------------------------------------------
# Get history of all components
# ----------------------------------------------------------------------------
[docs]
def get_history(self) -> dict[str, Any]:
"""Retrieve time-series history from all components.
Collects the recorded output history from every component in
the system, plus any event history records.
Returns:
Dictionary with the following structure:
- Keys are component names, values are tuples of
``(time_array, values_dict)`` from ``get_history_arrays()``
- Special key ``"Events"`` contains event occurrence records
Example:
>>> history = system.get_history()
>>> t, values = history["Pendulum"]
>>> plt.plot(t, values["angle"])
>>> # Access events
>>> events = history["Events"]
See Also:
:meth:`CoSimComponent.get_history_arrays`: Component history format
"""
history: dict[str, Any] = {}
for comp_name, comp in self.components.items():
history[comp_name] = comp.get_history_arrays()
history["Events"] = self.history.get_all_event_histories()
return history
# ----------------------------------------------------------------------------
# Reset System
# ----------------------------------------------------------------------------
[docs]
def reset(self) -> None:
"""Reset the system and all components to uninitialized state.
Clears all component states, histories, and internal flags,
allowing the system to be re-initialized from scratch.
Example:
>>> system.initialize(t0=0.0)
>>> system.run(t0=0.0, tf=10.0, dt=0.01)
>>> system.reset()
>>> system.is_initialized
False
"""
for comp in self.components.values():
comp.reset()
self.history.clear()
self.is_initialized = False
self.t = 0.0