Core Modules#
This module contains the core classes for co-simulation components.
CoSimComponent Base Class#
Co-simulation component base classes and interfaces.
This module defines the abstract base class CoSimComponent that all
co-simulation components must inherit from. It provides a unified interface
for initialization, stepping, I/O handling, state management, and event
detection in heterogeneous co-simulation scenarios.
- Architecture Overview:
The component lifecycle follows these phases:
Construction: Create component with name and optional label/group
Configuration: Set parameters via
set_parameters()Initialization: Call
initialize(t0)to set up ports and stateSimulation Loop: Repeated calls to
set_inputs(),do_step(),get_outputs()Cleanup: Call
reset()for re-initialization orfree()for resource release
- State Management API:
Components provide two distinct state management interfaces:
- Physical State Transfer (for mode switching and inspection):
get_state(): Export human-readable physical variablesset_state(): Import and initialize from physical variablesFormat:
Dict[str, Dict[str, Any]]with ‘value’ and ‘unit’ keysUse: Transferring state between different models, debugging
- Time Rollback (for event detection via bisection):
snapshot_state(): Capture complete internal solver state (opaque)restore_state(): Restore exact solver state at previous timeFormat: Opaque (component-specific, may include solver internals)
Use: Reverting to previous time during hybrid simulation
- Key Rule:
These are separate concerns. Snapshots cannot be adapted across models and should only be used for rollback within the same component instance.
Example
Basic component usage pattern:
from syssimx.core.base import CoSimComponent
# Create and configure
comp = MyComponent("sensor", label="Temperature Sensor")
comp.set_parameters(gain=2.0, offset=0.5)
# Initialize at t=0
comp.initialize(t0=0.0)
# Simulation loop
for t in np.arange(0, 10, 0.01):
comp.set_inputs({'u': input_signal[t]})
comp.do_step(t, dt=0.01)
outputs = comp.get_outputs()
# Retrieve history
time, values = comp.get_history_arrays()
See also
syssimx.components.FMUComponent: FMI 2.0 wrappersyssimx.components.FEMComponent: Finite element wrappersyssimx.components.OpenSimComponent: Biomechanics wrapper
- class syssimx.core.base.CoSimComponent[source]#
Bases:
ABCAbstract base class for all co-simulation components.
This class defines the unified interface that all simulation components must implement to participate in a co-simulation. It handles port management, history recording, parameter configuration, and provides hooks for hybrid event detection.
- Subclasses must implement the following abstract methods:
_initialize_component(t0): Component-specific initialization_do_step_internal(t, dt): Time step computation_update_output_states(t, event_names): Update output port valuesget_state(): Export physical stateset_state(state, t): Import physical state
- history#
Time-series recorder for output values.
- Type:
- direct_feedthrough#
Maps outputs to inputs with algebraic dependency (no state dynamics between them).
- model_structure#
FMI-style dependency structure for outputs, derivatives, and initial unknowns.
- event_indicators#
Registered zero-crossing functions for state event detection.
- Type:
- internal_event_hints#
Timing hints from micro-stepping components to improve event localization.
- Type:
- event_commutativity#
Whether event pairs can be handled in any order.
Example
Creating a custom component:
class Integrator(CoSimComponent): def __init__(self, name: str): super().__init__(name) self.input_specs = {'u': PortSpec('u', PortType.REAL, 'in')} self.output_specs = {'y': PortSpec('y', PortType.REAL, 'out')} self.parameters = {'gain': 1.0} self._state = 0.0 def _initialize_component(self, t0: float) -> None: self._state = 0.0 def _do_step_internal(self, t: float, dt: float) -> None: self._state += self.parameters['gain'] * self.inputs['u'].get() * dt def _update_output_states(self, t: float, event_names=None) -> None: self.outputs['y'].set(self._state, t=t) def get_state(self) -> dict: return {'y': {'value': self._state, 'unit': '1'}} def set_state(self, state: dict, t: float) -> None: self._state = state['y']['value'] self.t = t
See also
PortSpec: Port specification dataclassPortState: Mutable port state containerComponentHistory: History recording utility- __init__(name, label=None, group=None)[source]#
Initialize a new co-simulation component.
- Parameters:
name (str) – Unique identifier for this component. Must be unique within the containing System. Used for connection definitions and graph construction.
label (str | None) – Human-readable display name for visualization and logging. Defaults to
nameif not provided.group (str | None) – Optional category for grouping related components (e.g., ‘sensors’, ‘actuators’, ‘controllers’). Used for filtering and organization in large systems.
Example
>>> comp = MyComponent("pid_1", label="PID Controller", group="controllers") >>> comp.name 'pid_1' >>> comp.label 'PID Controller'
- event_indicators: dict[str, EventIndicator]#
- internal_event_hints: list[InternalEventInfo]#
- set_parameters(**parameters)[source]#
Set one or more component parameters before initialization.
Parameters control component behavior and must be set before calling
initialize(). Available parameters are defined in the component’s__init__method.- Parameters:
**parameters (Any) – Keyword arguments mapping parameter names to their values. All names must exist in
self.parameters.- Raises:
KeyError – If any parameter name is not recognized.
ValueError – If parameter validation fails (see
_validate_parameter()hook).
- Return type:
None
Example
>>> comp.set_parameters(mass=1.5, length=0.4) >>> comp.set_parameters(gain=2.0) # Single parameter >>> comp.get_parameters() {'mass': 1.5, 'length': 0.4, 'gain': 2.0}
Note
Override
_validate_parameter()in subclasses to add custom validation logic (e.g., range checking).See also
get_parameters(): Retrieve parameter values_validate_parameter(): Validation hook
- get_parameters(*names)[source]#
Retrieve one or more parameter values.
- Parameters:
*names (str) – Parameter names to retrieve. If no names are provided, returns all parameters.
- Returns:
Dictionary mapping parameter names to their current values. Returns a copy to prevent external modification.
- Raises:
KeyError – If any requested parameter name is not recognized.
- Return type:
Example
>>> comp.get_parameters('mass', 'length') {'mass': 1.5, 'length': 0.4} >>> comp.get_parameters() # All parameters {'mass': 1.5, 'length': 0.4, 'gain': 2.0}
See also
set_parameters(): Set parameter values
- _validate_parameter(name, value)[source]#
Subclass hook for custom parameter validation.
Override this method to add validation logic such as range checking, type verification, or cross-parameter constraints. Called by
set_parameters()before storing the value.- Parameters:
- Raises:
ValueError – If validation fails. Include a descriptive message explaining the constraint.
- Return type:
Example
>>> def _validate_parameter(self, name: str, value: Any) -> Any: ... if name == 'mass' and value <= 0: ... raise ValueError("Mass must be positive") ... if name == 'damping' and not (0 <= value <= 1): ... raise ValueError("Damping must be in [0, 1]") ... return super()._validate_parameter(name, value)
Note
Always call
super()._validate_parameter(name, value)at the end to allow parent class validation.
- _initialize_ports_from_specs()[source]#
Create port state objects from port specifications.
Iterates through
input_specsandoutput_specsto create correspondingPortStateinstances. Output ports are also registered with the component’s history tracker.Called automatically during
initialize()if port specs exist.Note
Port specifications (
PortSpec) define the port interface (name, type, direction, unit). Port states (PortState) hold the actual runtime values.See also
PortSpec: Port specification dataclassPortState: Mutable port value container- Return type:
None
- initialize(t0)[source]#
Initialize the component at the specified start time.
Prepares the component for simulation by setting up ports, event indicators, and internal state. This method must be called once before the simulation loop begins.
The initialization sequence:
Sets internal time to
t0Creates
PortStateobjects from port specificationsCreates event output ports for registered indicators
Calls
_initialize_component(t0)subclass hookUpdates output states and records initial values
If already initialized, returns immediately (idempotent).
- Parameters:
t0 (float) – Initial simulation time in seconds.
- Return type:
None
Example
>>> comp.set_parameters(mass=1.5) >>> comp.initialize(t0=0.0) >>> comp.t 0.0 >>> comp.get_outputs() {'y': 0.0}
Note
Call
reset()before re-initializing with different parameters or initial time.See also
reset(): Clear state for re-initialization_initialize_component(): Subclass initialization hook
- _initialize_event_ports()[source]#
Create output ports for all registered event indicators.
Automatically creates
PortSpecandPortStateobjects for each event indicator registered before initialization. Event ports are boolean outputs that signal when an event has occurred.Note
Called automatically during
initialize(). Event indicators registered after initialization create their ports immediately inadd_event_indicator().- Return type:
None
- abstractmethod _initialize_component(t0)[source]#
Subclass hook for component-specific initialization.
Override this method to perform initialization tasks such as:
Loading model files or external resources
Setting initial state values
Configuring internal solvers
Validating parameters
Called automatically by
initialize()after ports are created but before outputs are updated and recorded.- Parameters:
t0 (float) – Initial simulation time in seconds.
- Return type:
None
Example
>>> def _initialize_component(self, t0: float) -> None: ... self._state = np.zeros(self.n_states) ... self._solver = RK45(self._ode_func, t0, self._state)
Note
Port specifications (
input_specs,output_specs) should be defined in__init__, not here. This method is for runtime initialization that depends on parameters.
- set_inputs(signals, t=None)[source]#
Set input port values from a dictionary of signals.
This method updates the component’s input ports with new values. Override in subclasses for type conversions, unit handling, or special input processing.
- Parameters:
- Raises:
KeyError – If any key in
signalsdoesn’t match a registered input port name.- Return type:
None
Example
>>> comp.set_inputs({'u': 1.5, 'v': 2.0}, t=0.5) >>> comp.set_inputs({'u': 0.0}) # Timestamp optional
Note
Called automatically by the master algorithm before each step. The System class handles unit conversions via Connection objects before calling this method.
- evaluate_outputs(inputs, t=None)[source]#
Evaluate outputs for given inputs without advancing time.
This method computes outputs based on the provided inputs at the current time, without performing a time step. Essential for solving algebraic loops where outputs depend algebraically on inputs.
Components with direct feedthrough must override this method to perform a zero-step evaluation (dt=0) that updates outputs based on the current inputs.
- Parameters:
- Returns:
Dictionary mapping output port names to their computed values.
- Return type:
Example
>>> # For algebraic loop solving (IJCSA algorithm) >>> outputs = comp.evaluate_outputs({'u': trial_value}, t=0.5) >>> y = outputs['y']
Note
The default implementation simply sets inputs and returns current outputs. Override for components with true algebraic feedthrough where outputs must be recomputed based on new inputs.
See also
direct_feedthrough: Declares input-output dependencieshas_direct_feedthrough: Check for any feedthrough
- get_outputs()[source]#
Return current output values as a dictionary.
Retrieves the current value of all output ports that have been set. Ports with
Nonevalues are excluded from the result.- Returns:
Dictionary mapping output port names to their current values. Only includes ports with non-None values.
- Return type:
Example
>>> comp.do_step(t=0.0, dt=0.01) >>> outputs = comp.get_outputs() >>> print(outputs) {'y': 1.5, 'dy': 0.1}
Note
For type-safe access with units, use
outputs[name].get()directly on the PortState object.
- do_step(t, dt)[source]#
Execute a single simulation time step.
Advances the component’s simulation from time
ttot + dt. This is the main simulation interface called by the master algorithm during the simulation loop.The method performs these operations in order:
Calls
_do_step_internal(t, dt)for actual computationUpdates
self.ttot + dtCalls
_update_output_states()to refresh outputsCalls
_record_outputs()to save to history
- Parameters:
- Return type:
None
Example
>>> comp.initialize(t0=0.0) >>> for i in range(1000): ... t = i * 0.001 ... comp.set_inputs({'u': signal[i]}, t=t) ... comp.do_step(t, dt=0.001)
Note
Inputs should be set via
set_inputs()before callingdo_step(). The master algorithm handles this automatically.See also
_do_step_internal(): Override for custom stepping logic
- abstractmethod _do_step_internal(t, dt)[source]#
Subclass hook for time step computation.
Override this method to implement the component’s simulation logic. This is where state integration, solver calls, and internal computations happen.
- Parameters:
- Return type:
None
Example
Simple Euler integration:
def _do_step_internal(self, t: float, dt: float) -> None: u = self.inputs['u'].get() self._state += self._derivative(self._state, u) * dt
Note
Do NOT update
self.there;do_step()handles thatDo NOT call
_update_output_states(); done bydo_step()For event detection, call
report_internal_event()if micro-stepping detects a zero-crossing
See also
do_step(): Public interface that calls this methodreport_internal_event(): Report events from micro-stepping
- abstractmethod _update_output_states(t=None, event_names=[])[source]#
Subclass hook to update output port values from internal state.
Override this method to read internal state variables and write them to output ports using
self.outputs[name].set(value, t=t). Called automatically afterinitialize()anddo_step().- Parameters:
- Return type:
None
Example
>>> def _update_output_states(self, t=None, event_names=None): ... self.outputs['position'].set(self._x, t=t) ... self.outputs['velocity'].set(self._v, t=t) ... # Set event port if bounce just occurred ... if event_names and 'bounce' in event_names: ... self.outputs['bounce'].set(True, t=t) ... else: ... self.outputs['bounce'].set(False, t=t)
Note
Called after
_initialize_component()during initCalled after
_do_step_internal()during steppingCalled after
_handle_events_internal()during eventsAlways use
port.set(value, t=t)to include timestamp
- set_state(state, t)[source]#
Set physical state from human-readable format.
Initializes the component from physical variables exported by
get_state(). Used for mode switching inMultiComponentand for debugging/testing. May perform unit conversions.This is the “physical state transfer” interface, distinct from
snapshot_state()/restore_state()which handle opaque solver state for time rollback.- Parameters:
- Return type:
None
Example
>>> state = { ... 'q': {'value': 0.5, 'unit': 'rad'}, ... 'omega': {'value': 1.0, 'unit': 'rad/s'} ... } >>> component.set_state(state, t=0.0) >>> component.t 0.0
Note
For mode switching, state may be adapted from another model
Does NOT preserve solver-specific internal artifacts
May require re-initialization of internal solvers
See also
get_state(): Export state in matching formatrestore_state(): Opaque state for rollback (different use)
- get_state()[source]#
Export current physical state in human-readable format.
Returns the component’s physical state variables (positions, velocities, etc.) in a format suitable for inspection, debugging, or transferring to another model during mode switching.
This is the “physical state transfer” interface. It does NOT include solver-specific internal artifacts. For complete state capture for rollback, use
snapshot_state().- Returns:
‘value’: The numeric value (float or array)
’unit’: The physical unit string (e.g., ‘rad’, ‘m/s’)
- Return type:
Dictionary mapping state variable names to dicts containing
Example
>>> state = component.get_state() >>> print(state) {'q': {'value': 0.5, 'unit': 'rad'}, 'omega': {'value': 1.0, 'unit': 'rad/s'}}
Note
The returned format matches what
set_state()expects, enabling state round-tripping and model switching.See also
set_state(): Import state in matching formatsnapshot_state(): Opaque state for rollback
- snapshot_state()[source]#
Capture complete internal state for time rollback.
Creates an opaque snapshot preserving ALL solver state needed to restore the component to its exact condition at the current time. This includes internal solver artifacts not exposed by
get_state().This is required for event detection in hybrid co-simulation, where bisection search needs to roll back and retry time steps at different points.
- Returns:
Opaque snapshot object. Format is component-specific and should be treated as a black box.
- Raises:
NotImplementedError – If the component doesn’t support rollback. Check
supports_rollbackbefore calling.- Return type:
Example
>>> snapshot = component.snapshot_state() >>> component.do_step(t, dt) >>> # Oops, event detected - roll back >>> component.restore_state(snapshot, t)
Warning
Snapshot format is internal and may change between versions
Cannot transfer snapshots between component instances
Only for rollback within the SAME component
Use
get_state()/set_state()for model switching
See also
restore_state(): Restore from snapshotsupports_rollback: Check capability before usingget_state(): Human-readable state export
- restore_state(snapshot, t)[source]#
Restore component to exact state from a snapshot.
Reverses time by restoring the component to the state captured by
snapshot_state(). After restoration, subsequentdo_step()calls behave as if the component never advanced past timet.Used internally by the hybrid master algorithm during bisection search for event localization.
- Parameters:
- Raises:
NotImplementedError – If the component doesn’t support rollback.
ValueError – If the snapshot is incompatible (wrong component, wrong mode for
MultiComponent).
- Return type:
None
Example
>>> t_before = component.t >>> snapshot = component.snapshot_state() >>> component.do_step(t_before, dt) >>> # Roll back to before the step >>> component.restore_state(snapshot, t_before) >>> component.t == t_before True
Warning
Snapshot must be from the SAME component instance
Do not use across models (use
set_state()instead)For
MultiComponent, active mode must match snapshot
See also
snapshot_state(): Create snapshotsset_state(): Physical state transfer between models
- property supports_rollback: bool#
Check if the component supports state snapshot and restore.
Components that support rollback can participate in hybrid co-simulation with event detection. The hybrid master algorithm requires rollback capability to perform bisection search for precise event localization.
- Returns:
Trueif bothsnapshot_state()andrestore_state()are implemented (overridden from base class).Falseotherwise.
Example
>>> if component.supports_rollback: ... component.add_event_indicator('zero_crossing', func) ... else: ... print("Cannot add events without rollback support")
Note
Automatically detects method overrides using introspection. No manual flag setting required.
See also
snapshot_state(): Capture state for rollbackrestore_state(): Restore from snapshot
- _record_outputs(t)[source]#
Record current output values to the history buffer.
Appends the current value of each output port to the component’s history for later retrieval. Called automatically after
initialize()anddo_step().- Parameters:
t (float) – Current simulation time to associate with the recorded values.
- Return type:
None
Note
Only ports with non-None values are recorded. This method is called internally and typically should not be overridden.
- get_history(port_names=None, units=None)[source]#
Retrieve time-series history of output port values.
Returns the recorded history from simulation as a dictionary structure. Optionally filter by port names and convert units.
- Parameters:
port_names (list[str] | None) – List of port names to retrieve. If
None, returns history for all output ports.units (dict[str, str] | None) – Dictionary mapping port names to desired output units (e.g.,
{'angle': 'deg'}). Pint is used for conversion. IfNone, values are returned in their original units.
- Returns:
‘time’: List of time values
’values’: List of recorded values at each time
’unit’: The unit of the values (after conversion)
- Return type:
Dictionary mapping port names to history dictionaries with
Example
>>> history = comp.get_history(['y', 'dy']) >>> print(history['y']['time'][:3]) [0.0, 0.01, 0.02] >>> print(history['y']['values'][:3]) [0.0, 0.1, 0.19]
>>> # With unit conversion >>> history = comp.get_history(['angle'], units={'angle': 'deg'})
See also
get_history_arrays(): Get history as NumPy arrays
- get_history_arrays(port_names=None, units=None)[source]#
Retrieve time-series history as NumPy arrays.
Returns the recorded history optimized for numerical analysis and plotting. More efficient than
get_history()for large datasets.- Parameters:
- Returns:
time_array: 1D NumPy array of time points
values_dict: Dict mapping port names to 1D NumPy arrays
- Return type:
Tuple of
(time_array, values_dict)where
Example
>>> t, values = comp.get_history_arrays(['y', 'dy']) >>> plt.plot(t, values['y'], label='Position') >>> plt.plot(t, values['dy'], label='Velocity')
See also
get_history(): Get history as nested dictionaries
- add_event_indicator(name, func, direction=0)[source]#
Register an event indicator function for zero-crossing detection.
Event indicators are scalar functions of component state that trigger events when they cross zero. The hybrid master algorithm monitors these indicators and uses bisection search to locate the precise crossing time.
Can be called before or after
initialize(). If called before initialization, the event output port will be created duringinitialize(). If called after, the port is created immediately.- Parameters:
name (str) – Unique name for the event indicator. This name is used to identify the event in handlers and subscriptions.
func (Callable[[CoSimComponent], float]) – Callable that takes the component instance and returns a float. The event triggers when this value crosses zero.
direction (int) – Direction of zero-crossing to detect: - -1: Falling edge only (positive → negative) - 0: Both directions (default) - +1: Rising edge only (negative → positive)
- Raises:
RuntimeError – If the component does not support rollback (required for bisection-based event localization).
KeyError – If an indicator with the same name already exists.
ValueError – If direction is not -1, 0, or +1.
- Return type:
None
Example
Detect when velocity crosses zero (bouncing ball):
def velocity_indicator(comp: CoSimComponent) -> float: return comp.outputs['v'].get() ball.add_event_indicator('bounce', velocity_indicator, direction=-1)
See also
evaluate_event_indicators(): Evaluate all indicatorsdetect_event_crossings(): Check for crossingshas_state_events: Check if indicators are registered
- property has_state_events: bool#
Check if state event indicators are registered.
State events are triggered by zero-crossings of indicator functions during simulation. Components with state events require the hybrid master algorithm for proper event detection and handling.
- Returns:
True if at least one event indicator is registered via
add_event_indicator(). False otherwise.
Example
>>> ball.add_event_indicator('bounce', velocity_indicator) >>> ball.has_state_events True
See also
add_event_indicator(): Register event indicatorsevent_indicators: Dictionary of registered indicators
- evaluate_event_indicators()[source]#
Evaluate all registered event indicators at the current state.
Computes the current value of each event indicator function. These values are compared between time steps to detect zero crossings.
- Returns:
Dictionary mapping event indicator names to their current float values. Positive, negative, or zero values indicate the position relative to the event threshold.
- Return type:
Example
>>> indicators_before = comp.evaluate_event_indicators() >>> comp.do_step(t, dt) >>> indicators_after = comp.evaluate_event_indicators() >>> events = comp.detect_event_crossings(indicators_before, indicators_after)
Note
Called by the hybrid master algorithm before and after each macro step to detect state events.
- detect_event_crossings(previous, current, sign_tolerance=1e-10)[source]#
Detect zero-crossings between previous and current indicator values.
Compares indicator values before and after a time step to identify which indicators have crossed zero. Respects the direction setting of each indicator (rising, falling, or both).
- Parameters:
previous (dict[str, float]) – Indicator values at the start of the step, as returned by
evaluate_event_indicators()beforedo_step().current (dict[str, float]) – Indicator values at the end of the step, as returned by
evaluate_event_indicators()afterdo_step().sign_tolerance (float) – Values with absolute magnitude smaller than this threshold are treated as zero for sign determination. Defaults to 1e-10.
- Returns:
List of event indicator names that experienced a zero-crossing during the step, filtered by each indicator’s direction setting.
- Return type:
Example
>>> prev = comp.evaluate_event_indicators() # {'bounce': 0.5} >>> comp.do_step(t, dt) >>> curr = comp.evaluate_event_indicators() # {'bounce': -0.2} >>> events = comp.detect_event_crossings(prev, curr) >>> print(events) ['bounce']
Note
This method only detects that a crossing occurred somewhere in the interval. Use bisection search to locate the precise time.
- report_internal_event(event_name, t_before, t_after, indicator_before=None, indicator_after=None)[source]#
Report an event detected during internal micro-stepping.
Call this from
_do_step_internal()when an event is detected during internal sub-stepping. The master algorithm uses these hints to narrow the bisection interval, improving event localization efficiency.- Parameters:
event_name (str) – Name of the event indicator that crossed zero. Must match a registered event indicator name.
t_before (float) – Last micro-step time before the event was detected. Should be within the current macro step interval.
t_after (float) – First micro-step time after the event was detected. Together with
t_before, defines a bracket for bisection.indicator_before (float | None) – Indicator value at
t_before. Optional but improves localization accuracy if available.indicator_after (float | None) – Indicator value at
t_after. Optional but improves localization accuracy if available.
- Return type:
None
Example
Inside an adaptive integrator:: >>> >>> def _do_step_internal(self, t, dt): >>> h = dt / 100 # Internal micro-step >>> for i in range(100): >>> t_i = t + i * h >>> #… integration logic … >>> if velocity_before > 0 and velocity_after < 0: >>> self.report_internal_event( >>> ‘bounce’, t_i, t_i + h, >>> velocity_before, velocity_after >>> )
Note
Hints are consumed by
get_internal_event_hints()and cleared after retrieval. Multiple hints can be reported per step.See also
get_internal_event_hints(): Retrieve and clear hintsInternalEventInfo: Hint data structure
- get_internal_event_hints()[source]#
Retrieve and clear internal event timing hints.
Returns event hints reported by the component during micro-stepping and clears the internal buffer. Called by the master algorithm after each macro step to incorporate micro-step event information into the bisection search.
- Returns:
List of
InternalEventInfoobjects containing timing brackets for events detected during micro-stepping. The list is cleared after this call.- Return type:
Example
>>> comp.do_step(t, dt) >>> hints = comp.get_internal_event_hints() >>> for hint in hints: ... print(f"{hint.event_name}: [{hint.t_before}, {hint.t_after}]")
Note
Each call clears the hints buffer. Call only once per step to avoid losing information.
See also
report_internal_event(): Add hints during stepping
- subscribe_event(event)[source]#
Register a subscription to an external event.
Allows this component to receive notifications when another component emits a named event. The subscription is pattern-based: specify the source component and event name to listen for.
- Parameters:
event (Event) – An
Eventobject specifying the source component and event name to subscribe to. Thetimefield must beNone(subscriptions are not time-specific).- Raises:
ValueError – If
event.timeis notNone.KeyError – If an identical subscription already exists.
- Return type:
None
Example
Subscribe to a sensor’s threshold event:: >>> >>> from syssimx.core.events import Event >>> >>> threshold_event = Event(name=’high_temp’, source=’sensor_1’) >>> controller.subscribe_event(threshold_event)
Note
When the subscribed event fires,
handle_event()is called with the event name in theevent_nameslist.See also
handle_event(): Called when subscribed events occurhas_event_subscriptions: Check for subscriptions
- property has_event_subscriptions: bool#
Check if this component subscribes to external events.
Event subscriptions allow a component to receive and react to events emitted by other components in the system.
- Returns:
True if at least one event subscription is registered via
subscribe_event(). False otherwise.
See also
subscribe_event(): Register event subscriptionsevent_subscriptions: List of subscribed events
- handle_event(event_names, t)[source]#
Process detected events and update component state.
Called by the master algorithm when events are detected and localized. Invokes the subclass event handler, updates outputs, and records the post-event state.
- Parameters:
- Return type:
None
Example
Handling a bounce event:
def _handle_events_internal(self, event_names, t): if 'bounce' in event_names: # Reverse velocity with coefficient of restitution v = self.outputs['v'].get() self._velocity = -0.8 * v
Note
Override
_handle_events_internal()to implement custom event handling logic. This wrapper method ensures outputs are properly updated and recorded after event handling.See also
_handle_events_internal(): Subclass hook for event logic
- _handle_events_internal(event_names, t)[source]#
Subclass hook for custom event handling logic.
Override this method to implement state discontinuities, mode switches, or other event-driven behavior. Called by
handle_event()when events are detected.- Parameters:
- Return type:
None
Example
Implementing a bouncing ball:
def _handle_events_internal(self, event_names, t): if 'ground_contact' in event_names: self._velocity *= -self.parameters['restitution'] self._position = 0.0 # Reset to ground level
Note
Called before
_update_output_states()Multiple events may fire simultaneously
State changes here should be reflected in subsequent outputs
- reset()[source]#
Reset component to clean state for re-initialization.
Clears the simulation time, history buffer, and initialization flag. After calling
reset(), the component can be re-initialized withinitialize()for a new simulation run.Subclasses should call
super().reset()and additionally clear any internal state variables.Example
>>> comp.initialize(t0=0.0) >>> comp.do_step(0.0, 0.01) >>> comp.reset() >>> comp.initialize(t0=5.0) # Fresh start at t=5
Note
Does not release resources (use
free()for that). The component remains usable after reset.See also
free(): Release resources permanentlyinitialize(): Re-initialize after reset- Return type:
None
- free()[source]#
Release component resources permanently.
Override this method in subclasses to clean up external resources such as FMU instances, file handles, network connections, or native library memory.
Example
>>> system.run(t0=0, tf=10, dt=0.01) >>> for comp in system.components.values(): ... comp.free() # Clean up all resources
Note
After calling
free(), the component may not be usable. For reuse, callreset()instead.See also
reset(): Reset for re-initialization without freeing- Return type:
None
- _abc_impl = <_abc._abc_data object>#
- _detect_direct_feedthrough()[source]#
Detect direct feedthrough dependencies between inputs and outputs.
Analyzes the component’s behavior to determine which output ports depend algebraically on which input ports. This information is used to identify potential algebraic loops in the system.
- Returns:
Dictionary mapping output port names to sets of input port names that have direct feedthrough to that output. If an output has no direct feedthrough, it maps to an empty set.
- Return type:
Example
>>> comp.direct_feedthrough = comp._detect_direct_feedthrough() >>> print(comp.direct_feedthrough) {'y': {'u', 'v'}, 'z': {'u'}, 'w': set()}
Note
Called during initialization to populate
self.direct_feedthroughUses finite difference perturbation to test dependencies
May be overridden in subclasses for efficiency
- property reactive_inputs: set[str]#
Input port names with direct algebraic feedthrough to outputs.
An input is “reactive” if changing its value immediately affects at least one output without requiring a time step. This information is used by the System to detect algebraic loops.
- Returns:
Set of input port names that have direct feedthrough to any output. Empty set if no feedthrough exists.
Example
>>> comp.direct_feedthrough = {'y': {'u', 'v'}, 'z': {'u'}} >>> comp.reactive_inputs {'u', 'v'}
See also
direct_feedthrough: Full input-output dependency maphas_direct_feedthrough: Boolean check for any feedthrough
- property has_direct_feedthrough: bool#
Check if any output depends algebraically on any input.
Direct feedthrough occurs when an output depends directly on an input in the same time step, without intermediate state dynamics. Components with direct feedthrough create potential algebraic loops when connected in cycles.
- Returns:
True if at least one input-output pair has direct feedthrough. False if all outputs depend only on internal state.
Example
>>> # A pure gain has direct feedthrough: y = k * u >>> gain.has_direct_feedthrough True >>> # An integrator has no feedthrough: y = integral(u) >>> integrator.has_direct_feedthrough False
See also
direct_feedthrough: Detailed dependency mappingreactive_inputs: Set of inputs with feedthrough
Ports#
Port specification and port state management for co-simulation components.
This module defines the PortType enum, PortSpec class for immutable port specifications, and PortState class for mutable port state with type-safe get/set operations including unit handling.
Overview of classes: - PortType: Enum of supported port data types (REAL, INT, BOOL, STRING, EVENT) - PortSpec: Immutable specification of a port (name, type, direction, unit, description) - PortState: Mutable state of a port (spec, current value, last update time)
- class syssimx.core.port.PortType[source]#
Bases:
StrEnumSupported port data types for co-simulation components.
- REAL#
Floating-point values, optionally with physical units.
- INT#
Integer values.
- BOOL#
Boolean values (True/False).
- STRING#
String values.
- EVENT#
Boolean trigger for discrete events.
- REAL = 'real'#
- INT = 'int'#
- BOOL = 'bool'#
- STRING = 'string'#
- EVENT = 'event'#
- _generate_next_value_(start, count, last_values)#
Return the lower-cased version of the member name.
- __new__(value)#
- syssimx.core.port._validate_unit(unit, port_name)[source]#
Validate that a unit string or Unit is recognized by the framework registry.
- syssimx.core.port._coerce_numpy_scalar(value)[source]#
Convert numpy scalar types to Python native types.
This allows seamless use of numpy arrays in port operations. Returns the original value if not a numpy scalar.
- class syssimx.core.port.PortSpec[source]#
Bases:
objectSpecification of a port in a CoSimComponent (immutable).
Defines the name, type, direction, optional unit, and description of the port. Units are validated at construction time to catch errors early.
- type#
Data type of the port (REAL, INT, BOOL, STRING, EVENT).
- direction#
Whether this is an input (“in”) or output (“out”) port.
- Type:
Literal[‘in’, ‘out’]
- unit#
Physical unit string or Unit for REAL ports (e.g., “m/s”, “N*m”).
- Type:
str | pint.registry.Unit | None
- validate_value(value, *, coerce_numpy=True)[source]#
Validate that the given value matches the port’s type and unit.
- Parameters:
- Returns:
The (possibly coerced) value if valid.
- Raises:
TypeError – If value type doesn’t match port type.
ValueError – If unit validation fails.
DimensionalityError – If Quantity has incompatible dimensions.
- Return type:
- static compatible(spec1, spec2)[source]#
Check if two PortSpecs are compatible for connection.
Compatibility rules: - Types must match exactly (REAL-REAL, INT-INT, etc.) - For REAL ports: both must have units, and units must be dimensionally compatible - For REAL ports: both having no unit is also compatible
- class syssimx.core.port.PortState[source]#
Bases:
objectMutable state of a port in a CoSimComponent.
Holds the specification, current value, and last update time. Provides type-safe get/set operations with automatic unit conversion.
- spec#
Immutable specification defining port properties.
- set(value, t=None)[source]#
Set the port’s value with validation and unit conversion.
- Parameters:
- Raises:
TypeError – If value type doesn’t match port type.
DimensionalityError – If Quantity has incompatible dimensions.
- Return type:
None
Events#
Hybrid functionalities for event handling in syssimx.
This module provides classes and functions to manage events in a co-simulation environment.
Overview Functions:
_sign(value: float, tol: float = 1e-10) -> int: Returns the sign of a value with a tolerance.
Dataclasses:
DenseTime: Represents a dense time instant with real and discrete values.Event: Represents an event with name, source, time, and direction.EventIndicator: Represents an event indicator for zero-crossing detection.InternalEventInfo: Information about an event detected during internal micro-stepping.
- class syssimx.core.events.DenseTime[source]#
Bases:
objectRepresents a dense time instant with real and discrete values.
Dense time allows distinguishing multiple events occurring at the same real time instant by using a discrete micro-step counter.
- class syssimx.core.events.Event[source]#
Bases:
objectRepresents an event in the co-simulation environment.
Events are used for signaling discrete occurrences that may affect the simulation flow, such as instantaneous state changes.
- time#
Time instant when the event occurs (DenseTime)
- Type:
- class syssimx.core.events.EventIndicator[source]#
Bases:
objectRepresents an event indicator for zero-crossing detection.
Event indicators are functions that evaluate to a float value. When the value crosses zero, an event is triggered. The direction of the crossing can be specified to filter which crossings trigger events.
- name#
Name of the event indicator
- function#
Callable that takes a CoSimComponent and returns a float
- direction#
Direction of zero-crossing to trigger events (-1: falling, 0: both, +1: rising)
Example
>>> def temp_indicator(component): >>> return component.get_temperature() - component.threshold >>> event_indicator = EventIndicator( >>> name="high_temp", >>> function=temp_indicator, >>> direction=1 >>> )
- __init__(name, function, direction=0)[source]#
- Parameters:
name (str)
function (Callable[[CoSimComponent], float])
direction (int)
- evaluate(component)[source]#
Evaluate the event indicator function.
- Parameters:
component (CoSimComponent)
- Return type:
- class syssimx.core.events.InternalEventInfo[source]#
Bases:
objectInformation about an event detected during internal micro-stepping.
Components that use internal micro-stepping (e.g., FEM models) can detect events more precisely than the master algorithm. This class allows them to communicate timing hints to the master algorithm for more efficient event localization.
- t_before#
Time of the last micro-step before the event (indicator was positive/negative)
- Type:
Example
>>> internal_event = InternalEventInfo( >>> event_name="high_temp", >>> t_before=1.234, >>> t_after=1.235, >>> indicator_before=0.5, >>> indicator_after=-0.3 >>> )
History#
History management for ports, components, and systems.
This module provides classes to store, retrieve, and manage history data for port variables, components, and entire systems during co-simulation runs.
Overview of Classes:
PortHistory: Stores time series data for a single port variable.ComponentHistory: Manages PortHistory instances for all ports of a component.SystemHistory: Manages ComponentHistory instances for all components in a system, along with event recording.
- class syssimx.core.history.PortHistory[source]#
Bases:
objectStores and manages time-series history of a port’s values.
This class maintains a chronological record of values associated with timestamps, with optional unit support and conversion capabilities. It provides efficient storage using lists internally while exposing numpy arrays for analysis.
Example
>>> history = PortHistory(port_name="temperature", unit="celsius") >>> history.append(0.0, 25.0) >>> history.append(1.0, 26.5) >>> print(history.time) [0. 1.] >>> print(history.values) [25. 26.5]
- get_values(as_unit=None)[source]#
Get values as a numpy array, converting to the specified unit if applicable.
- to_dict(as_unit=None)[source]#
Get the history as a dictionary with ‘time’ array, ‘values’ array, and unit.
- class syssimx.core.history.ComponentHistory[source]#
Bases:
objectManages the collection of port histories for a single component.
This class provides convenient access patterns for retrieving and manipulating trajectory data across all ports of a component.
Example
>>> history = ComponentHistory(component_name="Tank1") >>> history.add_port("temperature", unit="K") >>> history.append("temperature", t=0.0, value=300.0) >>> history.append("temperature", t=1.0, value=305.0) >>> port_hist = history.get_port_history("temperature")
- _port_histories: dict[str, PortHistory]#
- get_port_history(port_name)[source]#
Get history object for a specific port.
- Parameters:
port_name (str)
- Return type:
- to_arrays(port_names=None, units=None)[source]#
Export as common time array and dict of value arrays.
- __init__(component_name, _port_histories=<factory>)#
- Parameters:
component_name (str)
_port_histories (dict[str, PortHistory])
- Return type:
None
- class syssimx.core.history.SystemHistory[source]#
Bases:
objectManages simulation history data for an entire system.
This class serves as a container for component-level histories and event recordings in a system simulation. It provides methods for storing, retrieving, and persisting simulation results across multiple components and their ports.
Key Features:
Component history management: Register and retrieve histories for individual components
Event recording: Track discrete event occurrences with timestamps
Flexible data retrieval: Export histories in nested or flat dictionary formats
Unit conversion: Support for automatic unit conversion during data retrieval
Persistence: Save/load simulation results to/from CSV files
Example
>>> history = SystemHistory(system_name="my_system") >>> history.add_component("motor", motor_history) >>> history.record_event("motor", "start", 0.5) >>> trajectory = history.get_port_trajectory("motor", "speed") >>> history.save_csv(Path("results.csv"))
- _component_histories: dict[str, ComponentHistory]#
- add_component(component_name, component_history)[source]#
Register a component’s history.
- Parameters:
component_name (str)
component_history (ComponentHistory)
- Return type:
None
- get_component_history(component_name)[source]#
Get history object for a specific component.
- Parameters:
component_name (str)
- Return type:
- to_dict(component_names=None, port_names=None, units=None, format='nested')[source]#
Retrieve histories from components in the system.
- Parameters:
component_names (list[str] | None) – List of components to retrieve. If None, retrieves all.
port_names (dict[str, list[str]] | None) – Dict mapping component names to lists of port names to retrieve. If None, retrieves all ports for each component.
units (dict[str, dict[str, str]] | None) – Nested dict {comp_name: {port_name: target_unit}} for unit conversion.
format (str) – ‘nested’ or ‘flat’ - ‘nested’: {‘comp1’: {‘port1’: {…}, ‘port2’: {…}}, ‘comp2’: {…}} - ‘flat’: {‘comp1.port1’: {…}, ‘comp2.port2’: {…}}
- Returns:
Dictionary with component/port histories based on format.
- Return type:
- to_arrays(component_names=None, port_names=None, units=None)[source]#
Get histories as numpy arrays per component.
- Parameters:
- Returns:
Dict mapping component names to (time_array, values_dict) tuples.
- Return type:
- get_port_trajectory(component, port, as_unit=None)[source]#
Get time and value arrays for a specific port (convenience method).
- clear(component_names=None, port_names=None)[source]#
Clear histories for specified components and ports.
- save_csv(filepath, component_names=None, port_names=None, units=None, long_format=False)[source]#
Save as single CSV file.
- Parameters:
filepath (Path) – Output CSV file path
component_names (list[str] | None) – Components to save. If None, saves all.
port_names (dict[str, list[str]] | None) – Specific ports to save per component.
units (dict[str, dict[str, str]] | None) – Unit conversion specifications.
long_format (bool) – If True, uses long/tidy format (time, component, port, value). If False, uses wide format (time, comp1.port1, comp1.port2, …).
- Return type:
None
- Format:
time, ref.q_ref, pendulum.q, pendulum.omega, … 0.0, 1.57, 0.0, 0.0, … 0.01, 1.55, 0.015, 0.3, …
Multi-Component#
Multi-component wrapper for heterogeneous model switching.
This module provides the MultiComponent class for wrapping multiple
interchangeable simulation models (e.g., FEM, OpenSim, FMU pendulum) under a
unified interface. It enables dynamic mode switching during simulation
with automatic state synchronization between models.
- Key Features:
Dynamic Mode Switching: Switch between different simulation models at runtime based on custom criteria (time, cached outputs, events)
State Synchronization: Automatic state transfer and adaptation when switching between models with different interfaces
Hysteresis Protection: Configurable dwell time to prevent rapid chattering between modes
Port Unification: Validates that all sub-models have compatible port interfaces
Event Delegation: Transparently delegates hybrid event detection to the currently active sub-component
- Typical Use Cases:
Multi-fidelity simulation: Switch between high-fidelity FEM and reduced-order models based on accuracy requirements
Contact dynamics: Use detailed contact model only when contact is imminent, otherwise use simpler dynamics
Adaptive resolution: Increase model complexity in regions of interest, decrease elsewhere
Example
Creating a multi-model pendulum:
class MasterPendulum(MultiComponent):
def __init__(self, fem, opensim, fmu):
super().__init__(
name="Pendulum",
models={"FEM": fem, "OpenSim": opensim, "FMU": fmu},
initial_mode="FEM",
)
def _adapt_state(self, state, target_mode):
if target_mode == "FMU":
return {'q0': state['q'], 'omega0': state['omega']}
return state
# Use with mode selector
pendulum = MasterPendulum(fem, opensim, fmu)
pendulum.mode_selector = lambda t: "FEM" if t < 1.0 else "FMU"
pendulum.hysteresis = Hysteresis(dwell_time=0.05)
See also
CoSimComponent: Base class for all components
Hysteresis: Mode switching debounce utility
- class syssimx.core.multi_comp.StateAdapter[source]#
Bases:
ProtocolProtocol for adapting state between components with different interfaces.
Implement this protocol to provide custom state translation logic when switching between models that use different state variable names, units, or representations.
Example
>>> class FMUAdapter: ... def adapt_state(self, source_state, target_component): ... # FMU uses 'q0', 'omega0' instead of 'q', 'omega' ... return { ... 'q0': source_state['q'], ... 'omega0': source_state['omega'] ... }
- adapt_state(source_state, target_component)[source]#
Convert state from source format to target component’s format.
- Parameters:
source_state (dict[str, Any]) – State dictionary from the source component, typically in the format returned by
get_state().target_component (CoSimComponent) – The component that will receive the adapted state via
set_state().
- Returns:
Adapted state dictionary compatible with the target component’s
set_state()method.- Return type:
- __init__(*args, **kwargs)#
- _abc_impl = <_abc._abc_data object>#
- _is_protocol = True#
- class syssimx.core.multi_comp.Hysteresis[source]#
Bases:
objectMinimum dwell time between mode switches.
Prevents chattering by enforcing a minimum elapsed time between consecutive switches. The caller decides whether the proposed mode differs from the current one. This class only answers the timing question “is the dwell window still open?”.
- last_switch_time#
Timestamp of the most recent switch. Initialized to
-infso the first switch is always allowed.- Type:
Example
>>> hyst = Hysteresis(dwell_time=0.05) >>> hyst.in_dwell_window(t=0.02) False # No prior switch yet >>> hyst.record_switch(t=0.10) >>> hyst.in_dwell_window(t=0.12) True # Only 20 ms since last switch >>> hyst.in_dwell_window(t=0.20) False # 100 ms elapsed, window closed
- __init__(dwell_time=0.01)[source]#
Initialize hysteresis with the given dwell time.
- Parameters:
dwell_time (float) – Minimum time in seconds between mode switches. Defaults to 0.01 (10 ms).
- class syssimx.core.multi_comp.MultiComponent[source]#
Bases:
CoSimComponentAbstract base class for components wrapping multiple interchangeable models.
MultiComponentenables dynamic switching between different simulation models during runtime while presenting a unified interface to the rest of the co-simulation system. Each sub-model (“mode”) can use a different solver, fidelity level, or physics representation.- Subclass Responsibilities:
Construct the sub-components and pass them to
super().__init__through themodelsargument together withinitial_mode.Override
_adapt_state()for component-specific state translation.(Optional) Set
self.mode_selectorfor custom switching logic.(Optional) Set
self.hysteresisfor chattering prevention.
- Base Class Handles:
Port unification (validates all models have compatible ports)
Mode switching with hysteresis protection
State synchronization during mode transitions
Input/output delegation to the active component
Event indicator delegation for hybrid simulation
- models#
Registry mapping mode keys (e.g., “FEM”, “OpenSim”) to component instances. Populated in
__init__and fixed for the lifetime of the wrapper.- Type:
dict[ModeKey, CoSimComponent]
- active_mode#
Key of the currently active model.
- Type:
ModeKey
- active_comp#
Reference to the currently active component instance. Always set after
__init__.- Type:
- mode_selector#
Function
(t) -> ModeKeythat determines which mode should be active. Selectors that need state information must read cached output ports rather than callingget_state(), which can be expensive for high-fidelity models. IfNone, no automatic switching occurs.- Type:
Callable | None
- hysteresis#
Optional hysteresis controller to prevent rapid mode switching.
- Type:
Hysteresis | None
- state_adapters#
Optional per-mode state adapters for complex translation logic.
- Type:
dict[ModeKey, StateAdapter]
Example
Minimal subclass implementation:
class DualPendulum(MultiComponent): def __init__(self, detailed, simplified): super().__init__( "Pendulum", models={"detailed": detailed, "simplified": simplified}, initial_mode="detailed", ) def _adapt_state(self, state, target_mode): # Both models use same state format return state
See also
CoSimComponent: Parent class with full interface docsHysteresis: Mode switching debounce utilityStateAdapter: Protocol for state translation- __init__(name, models, initial_mode, group=None)[source]#
Initialize a multi-component wrapper.
- Parameters:
name (str) – Unique identifier for this component in the system.
models (dict[str, CoSimComponent]) – Mapping of mode keys to component instances. Must contain at least
initial_modeand must not be empty.initial_mode (str) – Key of the model to activate initially. Must be a key in
models.group (str | None) – Optional category for component organization.
- Raises:
ValueError – If
modelsis empty orinitial_modeis not a key inmodels.
Example
>>> super().__init__( ... name="Pendulum", ... models={"FEM": fem, "FMU": fmu}, ... initial_mode="FEM", ... group="Plant", ... )
- models: dict[str, CoSimComponent]#
- active_comp: CoSimComponent#
- hysteresis: Hysteresis | None#
- state_adapters: dict[str, StateAdapter]#
- _adapt_state(state, target_mode)[source]#
Adapt state dictionary for the target model’s interface.
Subclasses must override this method to translate state between models that use different variable names, units, or representations. Called during mode switching to transform the current model’s state into a format the target model can accept.
- Parameters:
- Returns:
Adapted state dictionary compatible with the target model’s
set_state()method.- Raises:
NotImplementedError – If not overridden by subclass.
- Return type:
Example
>>> def _adapt_state(self, state, target_mode): ... if target_mode == "FMU": ... # FMU uses initial condition naming ... return { ... 'q0': state['q'], ... 'omega0': state['omega'], ... 'torque': state['torque'] ... } ... return state # Other models use standard naming
Note
This is the primary extension point for handling heterogeneous model interfaces. If models share identical state formats, simply return
stateunchanged.
- _initialize_component(t0)[source]#
Initialize all registered sub-components at time
t0.Models and the active component are fixed by
__init__. This hook only initializes each registered sub-component so that any of them is ready for activation on a later mode switch.- Parameters:
t0 (float) – Initial simulation time in seconds.
- Return type:
None
Note
All sub-components are initialized, not just the active one.
- static _validate_port_compatibility(ref_spec, spec, model_name, port_name)[source]#
Validate that two PortSpecs are compatible for MultiComponent use.
- _unify_ports()[source]#
Adopt port specifications from active component and validate compatibility.
Copies input and output port specifications from the active component to this
MultiComponent, then validates that all registered models have compatible port interfaces.- Raises:
ValueError – If any model is missing a required input or output port that exists in the active component’s specification.
- Return type:
None
Note
This ensures the
MultiComponentpresents a consistent interface regardless of which sub-model is active. All models must have at least the same ports as the active component (they may have more).
- _do_step_internal(t, dt)[source]#
Execute one macro step, switching modes first if requested.
- Parameters:
- Return type:
None
Note
Mode switching can be temporarily disabled by setting
_allow_mode_switching = False. This is used by the hybrid algorithm during trial steps so that event detection does not change the active model while a rollback snapshot is valid.
- _select_target_mode(t)[source]#
Return the desired mode at
t, honoring switching guards.Returns the current
active_modewhen switching is disabled, when no selector is configured, or when the hysteresis dwell window is still open. Otherwise returns the selector’s proposal.
- _switch_mode(new_mode, t)[source]#
Switch to a new mode with state synchronization.
Orchestrates the transition. Validates the target, transfers the adapted state to the new active component, records the switch event for inspection, and notifies the hysteresis controller.
- Parameters:
- Raises:
ValueError – If
new_modeis not in the models registry.RuntimeError – If the target model is
None.
- Return type:
None
- _perform_state_transfer(new_comp, new_mode, t)[source]#
Move physical state from the current active model to
new_comp.Retrieves the state of the active component, replays the most recent inputs onto
new_compso it is current with the outgoing model, adapts the state for the target model, writes it tonew_comp, and promotesnew_compto be the active component.- Parameters:
new_comp (CoSimComponent) – The component instance that will become active.
new_mode (str) – Key of the target mode used by
_adapt_state().t (float) – Current simulation time.
- Returns:
The retrieved (pre-adaptation) state of the previously active component, for inclusion in the switch event log.
- Return type:
- _capture_switch_event(t, from_mode, to_mode, retrieved)[source]#
Append one record of the completed switch to
sync_events.Always logs the time, source mode, and target mode. When
self.record_switch_stateisTrue, the record also includes the pre-adaptation source state (retrieved) and a fresh snapshot of the new active component’s state (now). Thenowsnapshot callsactive_comp.get_state(), which can be expensive for high-fidelity models.record_switch_statedefaults toFalseand should be enabled only for debugging synchronization issues.
- set_inputs(signals, t=None)[source]#
Forward inputs to the active sub-component and cache them.
Only the active model receives inputs each step. The cached
(signals, t)pair is replayed onto the target model inside_perform_state_transferwhen a mode switch occurs, so the newly activated model sees the same inputs the outgoing one had.
- _update_output_states(t=None, event_names=None)[source]#
Copy output values from the active component to this wrapper.
Reads all output values from the active sub-component and writes them to this
MultiComponent’s output ports. Also handles event port updates based on which events fired.- Parameters:
- Return type:
None
Note
This ensures the
MultiComponentalways reflects the active component’s outputs, regardless of which model is active.
- evaluate_outputs(inputs, t=None)[source]#
Evaluate outputs for given inputs without advancing time.
This method computes outputs based on the provided inputs at the current time, without performing a time step. Essential for solving algebraic loops where outputs depend algebraically on inputs.
Components with direct feedthrough must override this method to perform a zero-step evaluation (dt=0) that updates outputs based on the current inputs.
- Parameters:
- Returns:
Dictionary mapping output port names to their computed values.
- Return type:
Example
>>> # For algebraic loop solving (IJCSA algorithm) >>> outputs = comp.evaluate_outputs({'u': trial_value}, t=0.5) >>> y = outputs['y']
Note
The default implementation simply sets inputs and returns current outputs. Override for components with true algebraic feedthrough where outputs must be recomputed based on new inputs.
See also
direct_feedthrough: Declares input-output dependencieshas_direct_feedthrough: Check for any feedthrough
- set_state(state, t)[source]#
Set state on the active component with adaptation.
Adapts the provided state for the active model’s interface using
_adapt_state(), then delegates to the active component.- Parameters:
- Return type:
None
See also
_adapt_state(): State translation hook
- get_state()[source]#
Get the current state from the active component.
- Returns:
State dictionary from the active sub-component, in that component’s native format.
- Return type:
Note
The returned state format depends on which model is active. Use
_adapt_state()if you need to translate to another model’s format.
- add_event_indicator(name, func, direction=0)[source]#
Register an event indicator on all sub-components.
Adds the event indicator to every sub-component that supports rollback, ensuring consistent event detection regardless of which model is active.
- Parameters:
- Return type:
None
Note
The indicator function should access state through the unified interface (e.g.,
comp.get_outputs()) rather than model-specific internals to work across all models.
- evaluate_event_indicators()[source]#
Evaluate event indicators on the active component.
Delegates to the active sub-component’s event indicator evaluation if it has state events configured.
- detect_event_crossings(previous, current, sign_tolerance=1e-10)[source]#
Detect zero-crossings on the active component.
Delegates to the active sub-component’s crossing detection if it has state events configured.
- Parameters:
- Returns:
List of indicator names that experienced crossings. Empty list if active component has no event indicators.
- Return type:
- snapshot_state()[source]#
Capture state snapshot from the active component.
Delegates to the active sub-component’s snapshot mechanism. Used for time rollback during event localization.
- Returns:
Opaque snapshot from the active component.
Warning
The snapshot is only valid for restoration to the same active component. Mode switches invalidate snapshots.
- restore_state(snapshot, t)[source]#
Restore state snapshot on the active component.
Delegates to the active sub-component’s restore mechanism. Used to roll back time during event localization bisection.
- Parameters:
snapshot – Opaque snapshot from
snapshot_state().t – Time at which the snapshot was taken.
- Return type:
None
Warning
Must restore to the same component that created the snapshot. Do not switch modes between snapshot and restore.
- property supports_rollback: bool#
Trueif the currently active sub-component supports state rollback.
- get_internal_event_hints()[source]#
Retrieve internal event hints from the active component.
Forwarding is unconditional so that hints reported by the active model during a trial step are visible to the hybrid algorithm and can short-circuit bisection.
- Returns:
List of
InternalEventInfoobjects from the active component.- Return type:
- _detect_direct_feedthrough()[source]#
Determine if all models have consistent direct feedthrough.
Checks the
direct_feedthroughproperty of all registered sub-components. If they differ, raises an error. Otherwise, sets thisMultiComponent’sdirect_feedthroughproperty accordingly.
- reset()[source]#
Reset all registered sub-components.
Calls
reset()on every non-None model in the registry, clearing their state and allowing re-initialization. Also clears the cached input replay buffer.Note
Unlike the base class, this resets ALL models, not just the active one. This ensures clean state when the
MultiComponentis re-initialized.- Return type:
None
- _abc_impl = <_abc._abc_data object>#