"""FMI 2.0 Co-Simulation FMU wrapper.
This module exposes :class:`FMUComponent`, a :class:`~syssimx.core.base.CoSimComponent`
implementation backed by an FMI 2.0 co-simulation FMU via ``fmpy``. The component
derives input/output ports from the FMU model description, handles units for REAL
signals, applies parameter starts during initialization, and supports direct
feedthrough evaluation.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any, Literal, cast
from ..core.base import CoSimComponent
from ..core.port import PortSpec, PortType
from ..utilities.units import QuantityClass, QuantityType, to_pint_unit
_FMPY_IMPORT_ERROR: ModuleNotFoundError | None = None
try:
from fmpy import extract, read_model_description
from fmpy.fmi2 import FMU2Slave
from fmpy.model_description import ModelDescription, ScalarVariable
except ModuleNotFoundError as e:
extract = None # type: ignore[assignment]
read_model_description = None # type: ignore[assignment]
FMU2Slave = None # type: ignore[assignment]
ModelDescription = None # type: ignore[assignment]
ScalarVariable = None # type: ignore[assignment]
_FMPY_IMPORT_ERROR = e
[docs]
def _require_fmpy() -> None:
if _FMPY_IMPORT_ERROR is not None:
raise ModuleNotFoundError(
"Optional dependency 'fmpy' is required for FMUComponent. "
"Install with: pip install syssimx[fmu] (or pip install fmpy)."
) from _FMPY_IMPORT_ERROR
_LOG = logging.getLogger(__name__)
# ----------------------------------------------------------------------------
# FMU Component Class
# ----------------------------------------------------------------------------
[docs]
class FMUComponent(CoSimComponent):
"""FMU co-simulation wrapper implementing the CoSimComponent interface.
Supports FMI 2.0 co-simulation FMUs, typed input/output ports (REAL, INT,
BOOL, STRING), and unit-aware REAL ports via ``pint``.
"""
# State of all FMU variables
state: dict[str, Any]
_md: ModelDescription
_md_vars: dict[str, ScalarVariable]
_instance: FMU2Slave | None
_unzipdir: str | None
# Direct Feedthrough for output dependencies
direct_feedthrough: dict[str, set[str]]
# Cached value references per base type and causality
_vrs_in_real: dict[str, int]
_vrs_in_int: dict[str, int]
_vrs_in_bool: dict[str, int]
_vrs_in_str: dict[str, int]
_vrs_in_bytes: dict[str, int]
_vrs_out_real: dict[str, int]
_vrs_out_int: dict[str, int]
_vrs_out_bool: dict[str, int]
_vrs_out_str: dict[str, int]
_vrs_out_bytes: dict[str, int]
[docs]
def __init__(self, name: str, fmu_path: str, group: str | None = None):
"""Create the FMU component and derive port specifications.
Args:
name: Component name.
fmu_path: Path to the FMU file.
group: Optional component group for organization.
Raises:
RuntimeError: If the FMU is not a co-simulation FMU.
NotImplementedError: If the FMU uses an unsupported FMI version.
"""
_require_fmpy()
super().__init__(name=name, group=group)
self._path = str(Path(fmu_path).resolve())
self._md = read_model_description(self._path)
self._md_vars = {var.name: var for var in self._md.modelVariables}
if not self._md.coSimulation:
raise RuntimeError(f"FMU '{fmu_path}' is not a Co-Simulation FMU.")
if self._md.fmiVersion != "2.0":
raise NotImplementedError(
f"FMU '{fmu_path}' has unsupported FMI version '{self._md.fmiVersion}'. Only FMI 2.0 is supported."
)
self._instance = None
self._unzipdir = None
# Port specification containers
self.input_specs: dict[str, PortSpec] = {}
self.output_specs: dict[str, PortSpec] = {}
self.direct_feedthrough = {}
self._detect_direct_feedthrough()
self._analyze_model_structure()
# Build port specifications from model description (Real, Int, Bool, Str, Bytes)
self._build_port_specs()
self._initialize_ports_from_specs()
# Cache value references map
self._vrs_in_real = {}
self._vrs_in_int = {}
self._vrs_in_bool = {}
self._vrs_in_str = {}
self._vrs_out_real = {}
self._vrs_out_int = {}
self._vrs_out_bool = {}
self._vrs_out_str = {}
self._build_value_reference_map()
# Parameters dictionary
self.parameters: dict[str, Any] = self.get_default_parameters()
# ----------------------------------------------------------------------------
# Get default parameters from model description
# ----------------------------------------------------------------------------
[docs]
def get_default_parameters(self) -> dict[str, Any]:
"""Return default parameter values from the FMU model description.
Returns:
Dict of parameter names to default values.
"""
defaults: dict[str, Any] = {}
for var in self._md.modelVariables:
if var.causality not in ("parameter", "calculatedParameter", "structuralParameter"):
continue
name = var.name
unit = to_pint_unit(var.unit) if var.unit else None
type = _port_type_from_var(var)
value = _convert_start_value(var.start, type, unit)
defaults[name] = value
return defaults
# ----------------------------------------------------------------------------
# Build helper for port specifications and value reference maps
# ----------------------------------------------------------------------------
[docs]
def _build_port_specs(self) -> None:
"""Build input/output PortSpec objects from the model description."""
for var in self._md.modelVariables:
if var.causality not in ("input", "output"):
continue
direction: Literal["in", "out"] = "in" if var.causality == "input" else "out"
port_type = _port_type_from_var(var)
if port_type is None:
continue
unit = None
if port_type == PortType.REAL and var.unit:
unit = str(to_pint_unit(var.unit)) if var.unit else None
spec = PortSpec(
name=var.name,
type=port_type,
direction=direction,
unit=unit,
description=var.description,
)
if direction == "in":
self.input_specs.update({var.name: spec})
else:
self.output_specs.update({var.name: spec})
[docs]
def _build_value_reference_map(self) -> None:
"""Cache value references for fast typed access."""
for var in self._md.modelVariables:
causality = var.causality
if causality not in ("input", "output"):
continue
port_type = _port_type_from_var(var)
value_reference = var.valueReference
if causality == "input":
if port_type == PortType.REAL:
self._vrs_in_real[var.name] = value_reference
elif port_type == PortType.INT:
self._vrs_in_int[var.name] = value_reference
elif port_type == PortType.BOOL:
self._vrs_in_bool[var.name] = value_reference
elif port_type == PortType.STRING:
self._vrs_in_str[var.name] = value_reference
elif causality == "output":
if port_type == PortType.REAL:
self._vrs_out_real[var.name] = value_reference
elif port_type == PortType.INT:
self._vrs_out_int[var.name] = value_reference
elif port_type == PortType.BOOL:
self._vrs_out_bool[var.name] = value_reference
elif port_type == PortType.STRING:
self._vrs_out_str[var.name] = value_reference
[docs]
def _analyze_model_structure(self) -> None:
"""Populate model structure dependencies for outputs/derivatives/initials."""
for output in self._md.outputs:
out_var = output.variable.name
deps = []
for dep in output.dependencies:
deps.append(dep.name)
self.model_structure["outputs"][out_var] = deps
for derivative in self._md.derivatives:
der_var = derivative.variable.name
deps = []
for dep in derivative.dependencies:
deps.append(dep.name)
self.model_structure["derivatives"][der_var] = deps
for init_unknown in self._md.initialUnknowns:
init_var = init_unknown.variable.name
deps = []
for dep in init_unknown.dependencies:
deps.append(dep.name)
self.model_structure["initialUnknowns"][init_var] = deps
[docs]
def _detect_direct_feedthrough(self) -> dict[str, set[str]]:
"""Detect direct feedthrough dependencies for FMU outputs."""
feedthrough: dict[str, set[str]] = {}
for unknown in self._md.outputs:
feedthrough[unknown.variable.name] = set()
for dep in unknown.dependencies:
if dep.causality == "input":
feedthrough[unknown.variable.name].add(dep.name)
self.direct_feedthrough = feedthrough
return feedthrough
[docs]
def _require_instance(self) -> FMU2Slave:
"""Return the FMU instance or raise if not initialized."""
if self._instance is None:
raise RuntimeError(f"{self.name}: FMU instance not initialized")
return self._instance
# ----------------------------------------------------------------------------
# Initialization
# ----------------------------------------------------------------------------
[docs]
def _initialize_component(self, t0: float) -> None:
"""Initialize and instantiate the FMU for simulation.
1. Extracts the FMU archive.
2. Creates the FMU2Slave instance.
3. Sets up the experiment with the start time.
4. Enters initialization mode.
5. Applies parameter start values.
6. Applies input start values from PortStates.
7. Exits initialization mode.
Args:
t0: Simulation start time.
"""
self._unzipdir = extract(self._path)
self._instance = FMU2Slave(
instanceName=self.name,
guid=self._md.guid,
unzipDirectory=self._unzipdir,
modelIdentifier=self._md.coSimulation.modelIdentifier,
)
self._instance.instantiate()
self._instance.setupExperiment(startTime=t0)
self._instance.enterInitializationMode()
self._apply_parameters_starts()
self._apply_input_starts()
self._instance.exitInitializationMode()
[docs]
def reinitialize_instance(self, t0: float) -> None:
"""Recreate the FMU instance and enter initialized state at ``t0``.
This is intentionally different from FMI ``reset()``. Some FMUs do not
support reliable rollback through ``fmi2Reset`` after stepping, but can
be restored by creating a fresh instance and applying reconstructed
initial conditions through parameters.
"""
self._instance.instantiate()
self._instance.setupExperiment(startTime=t0)
self._instance.enterInitializationMode()
self._apply_parameters_starts()
self._apply_input_starts()
self._instance.exitInitializationMode()
# ----------------------------------------------------------------------------
# Initialization helpers
# ----------------------------------------------------------------------------
[docs]
def _apply_parameters_starts(self) -> None:
"""Apply parameter start values to the FMU instance."""
instance = self._require_instance()
# batch parameters by base type
real_vrs: list[int] = []
real_vals: list[float] = []
int_vrs: list[int] = []
int_vals: list[int] = []
bool_vrs: list[int] = []
bool_vals: list[int] = []
str_vrs: list[int] = []
str_vals: list[str] = []
for name, value in self.parameters.items():
if value is None:
continue
var = self._md_vars.get(name)
if var is None:
continue
if var._python_type is float:
real_vrs.append(var.valueReference)
value = value.magnitude if isinstance(value, QuantityClass) else value
real_vals.append(float(value))
elif var._python_type is int:
int_vrs.append(var.valueReference)
int_vals.append(int(value))
elif var._python_type is bool:
bool_vrs.append(var.valueReference)
bool_vals.append(1 if bool(value) else 0)
elif var._python_type is str:
str_vrs.append(var.valueReference)
str_vals.append(str(value))
if real_vrs:
instance.setReal(real_vrs, real_vals)
if int_vrs:
instance.setInteger(int_vrs, int_vals)
if bool_vrs:
instance.setBoolean(bool_vrs, bool_vals)
if str_vrs:
instance.setString(str_vrs, str_vals)
# ----------------------------------------------------------------------------
# Parameter and Initial Condition Handling
# ----------------------------------------------------------------------------
[docs]
def set_parameters(self, **parameters) -> None:
"""Set parameter values with validation and unit handling.
Args:
**parameters: Parameter name/value pairs.
Raises:
KeyError: If a parameter name is unknown.
TypeError: If a value does not match the parameter type or unit.
"""
for name, value in parameters.items():
if name not in self.parameters:
raise KeyError(
f"Unknown parameter '{name}' for component '{self.name}'. "
f"Available parameters: {list(self.parameters.keys())}"
)
self.parameters[name] = self._validate_parameter(name, value)
[docs]
def get_parameters(self, *names: str) -> dict[str, Any]:
"""Return parameter values, optionally limited to specific names.
Args:
*names: Optional parameter names to fetch.
Raises:
KeyError: If a requested parameter name is unknown.
"""
if not names:
return self.parameters.copy()
result = {}
for name in names:
if name not in self.parameters:
raise KeyError(
f"Unknown parameter '{name}' for component '{self.name}'. "
f"Available parameters: {list(self.parameters.keys())}"
)
result[name] = self.parameters[name]
return result
# ----------------------------------------------------------------------------
# Input/output methods
# ----------------------------------------------------------------------------
[docs]
def get_outputs(self) -> dict[str, Any]:
"""Return current output values as a dict of name to value."""
return {
name: out_port.get()
for name, out_port in self.outputs.items()
if out_port.get() is not None
}
[docs]
def _update_output_states(
self, t: float | None = None, event_names: list[str] | None = None
) -> None:
"""Refresh output PortStates from the FMU instance.
Args:
t: Optional timestamp to record with outputs.
event_names: Reserved for event-based updates (unused).
"""
instance = self._require_instance()
# For each base type, batch get and set into PortStates as Quantities (REAL) or raw types
if self._vrs_out_real:
vrs = list(self._vrs_out_real.values())
vals = instance.getReal(vrs)
for name, val in zip(self._vrs_out_real.keys(), vals):
spec = self.output_specs[name]
q = (val * to_pint_unit(spec.unit)) if spec.unit else val
self.outputs[name].set(q, t=t)
if self._vrs_out_int:
vrs = list(self._vrs_out_int.values())
vals = instance.getInteger(vrs)
for name, val in zip(self._vrs_out_int.keys(), vals):
self.outputs[name].set(int(val), t=t)
if self._vrs_out_bool:
vrs = list(self._vrs_out_bool.values())
vals = instance.getBoolean(vrs)
for name, val in zip(self._vrs_out_bool.keys(), vals):
self.outputs[name].set(bool(val), t=t)
if self._vrs_out_str:
vrs = list(self._vrs_out_str.values())
vals = instance.getString(vrs)
for name, val in zip(self._vrs_out_str.keys(), vals):
self.outputs[name].set(str(val), t=t)
# ----------------------------------------------------------------------------
# Time stepping method
# ----------------------------------------------------------------------------
[docs]
def _do_step_internal(self, t: float, dt: float) -> None:
"""Perform a single time step in the FMU.
Args:
t: Current communication point.
dt: Communication step size.
"""
self._require_instance().doStep(currentCommunicationPoint=t, communicationStepSize=dt)
# ----------------------------------------------------------------------------
# State methods for setting and getting simulation state
# ----------------------------------------------------------------------------
[docs]
def set_state(self, state: dict[str, Any], t: float) -> None:
"""Restore FMU state from a serialized variable dictionary.
Args:
state: Mapping of variable names to stored attributes/values.
t: Time to reinitialize the FMU at.
"""
instance = self._require_instance()
instance.instantiate()
instance.setupExperiment(startTime=t)
instance.enterInitializationMode()
params = {}
inputs = {}
for var_name, attr in state.items():
if var_name in self.parameters:
params[var_name] = attr["value"]
if var_name in self.inputs:
inputs[var_name] = attr["value"]
self.set_parameters(**params)
self._apply_parameters_starts()
self.set_inputs(inputs, t=t)
instance.exitInitializationMode()
self._update_output_states(t)
[docs]
def get_state(self) -> dict[str, dict[str, Any]]:
"""Return FMU variables as a serialized dictionary.
The dictionary includes variables that are not fixed and not local,
along with their units and current values pulled from the FMU instance.
Returns:
Dict mapping variable names to unit/value attributes.
"""
instance = self._require_instance()
state: dict[str, dict[str, Any]] = {}
for var in self._md.modelVariables:
if var.variability == "fixed" or var.causality == "local":
continue
name = var.name
state[name] = {}
state[name]["unit"] = var.unit
vr = var.valueReference
if var.type == "Real":
val = instance.getReal([vr])[0]
# if unit:
# val = val * ureg(unit)
state[name]["value"] = val
elif var.type == "Integer":
val = instance.getInteger([vr])[0]
state[name]["value"] = val
elif var.type == "Boolean":
val = instance.getBoolean([vr])[0]
state[name]["value"] = bool(val)
elif var.type == "String":
val = instance.getString([vr])[0]
state[name]["value"] = val
self.state = state
return self.state
# ----------------------------------------------------------------------------
# Evaluate outputs without time step for direct feedthrough
# ----------------------------------------------------------------------------
[docs]
def evaluate_outputs(self, inputs: dict[str, Any], t: float | None = None) -> dict[str, Any]:
"""Evaluate outputs for a given input set without advancing time.
Args:
inputs: Input values to apply.
t: Optional time to use for evaluation.
Returns:
Mapping of output names to raw (unitless) values.
"""
# 1) Apply inputs without touching histories
self.set_inputs(inputs, t=None)
# 2) Do a zero-time step to update outputs
step_time = self.t if t is None else t
self._do_step_internal(t=step_time, dt=0.0)
# 3) Read outputs
self._update_output_states(t=None)
# 4) Return outputs as raw values (Quantities converted to magnitudes)
outputs = self.get_outputs()
for key, value in outputs.items():
value = value.magnitude if isinstance(value, QuantityClass) else value
outputs[key] = value
return outputs
# ----------------------------------------------------------------------------
# Reset method
# ----------------------------------------------------------------------------
[docs]
def reset(self) -> None:
"""Reset the component and release the FMU instance.
After reset, the component can be reinitialized via ``initialize()``.
"""
if self._instance is not None:
self._instance = None
# Clear runtime state (including _is_initialized via super)
super().reset()
self.inputs.clear()
self.outputs.clear()
self.parameters.clear()
self.parameters = self.get_default_parameters()
[docs]
def soft_reset(self, t0: float = 0.0) -> None:
"""Reset the FMU to initial state without releasing the instance.
Uses FMI ``reset()`` to return to the instantiated state, then
reinitializes with parameter and input starts.
Args:
t0: New start time for the simulation.
"""
if self._instance is None:
raise RuntimeError(f"{self.name}: Cannot soft_reset - FMU not initialized")
# FMI reset() returns to Instantiated state
self._instance.reset()
# Must go through full initialization sequence
self._instance.setupExperiment(startTime=t0)
self._instance.enterInitializationMode()
self._apply_parameters_starts()
self._apply_input_starts()
self._instance.exitInitializationMode()
# Clear and update state
self.history.clear()
self.t = t0
self._update_output_states(t=t0)
self._record_outputs(t0)
[docs]
def _validate_parameter(self, name: str, value: Any) -> QuantityType | float | int | bool | str:
"""Validate and normalize a parameter value.
Args:
name: Parameter name.
value: Proposed parameter value.
Returns:
Validated value, converted to the expected type/unit.
Raises:
TypeError: If the value is incompatible with the parameter type or unit.
"""
model_var = self._md_vars.get(name)
if model_var is None:
raise KeyError(f"{self.name}: Unknown parameter '{name}'")
type = _port_type_from_var(model_var)
unit = model_var.unit
if type == PortType.REAL:
if not isinstance(value, (float, int, QuantityClass)):
raise TypeError(f"{self.name}: Parameter '{name}' must be float.")
if unit:
if isinstance(value, QuantityClass):
unit = to_pint_unit(unit)
if value.is_compatible_with(unit):
return cast(QuantityType, value.to(unit))
else:
raise TypeError(
f"{self.name}: Parameter '{name}' must have unit compatible with '{unit}'."
)
else:
q = float(value) * to_pint_unit(unit)
return cast(QuantityType, q)
return float(value)
elif type == PortType.INT:
if not isinstance(value, int):
raise TypeError(f"{self.name}: Parameter '{name}' must be int.")
return int(value)
elif type == PortType.BOOL:
if not isinstance(value, bool):
raise TypeError(f"{self.name}: Parameter '{name}' must be bool.")
return bool(value)
elif type == PortType.STRING:
if not isinstance(value, str):
raise TypeError(f"{self.name}: Parameter '{name}' must be str.")
return str(value)
raise NotImplementedError(f"{self.name}: Unsupported parameter type for '{name}'.")
# ----------------------------------------------------------------------------
# Helper functions
# ----------------------------------------------------------------------------
[docs]
def _port_type_from_var(var: ScalarVariable) -> PortType:
"""Determine PortType from ScalarVariable type.
Args:
var (ScalarVariable): The model variable to determine the port type for.
Returns:
PortType: The corresponding PortType.
Raises:
NotImplementedError: If the variable type is unsupported.
"""
if var._python_type is float:
return PortType.REAL
elif var._python_type is int:
return PortType.INT
elif var._python_type is bool:
return PortType.BOOL
elif var._python_type is str:
return PortType.STRING
else:
raise NotImplementedError(
f"Unsupported variable type '{var._python_type}' for variable '{var.name}'."
)
[docs]
def _convert_start_value(
start: str | None, port_type: PortType, unit: str | None
) -> QuantityType | float | int | bool | str | None:
"""Convert start value string to appropriate type based on PortType and unit.
Args:
start (str | None): The start value as a string.
port_type (PortType): The type of the port.
unit (str | None): The unit of the port, if applicable.
Returns:
Union[float, int, bool, str, None]: The converted start value in the appropriate type.
"""
if start is None:
return None
if port_type == PortType.REAL:
val = float(start)
if unit:
val = val * to_pint_unit(unit)
return val
elif port_type == PortType.INT:
return int(start)
elif port_type == PortType.BOOL:
return start.lower() in ("true", "1")
elif port_type == PortType.STRING:
return str(start)
else:
return None
[docs]
def _in_vr_map_for_type(comp: FMUComponent, pt: PortType) -> dict[str, int]:
"""Return the input value-reference map for a port type."""
return (
comp._vrs_in_real
if pt == PortType.REAL
else comp._vrs_in_int
if pt == PortType.INT
else comp._vrs_in_bool
if pt == PortType.BOOL
else comp._vrs_in_str
)