Source code for syssimx.core.port

"""Port specification and port state management for co-simulation components.

This module defines the PortType enum, PortSpec class for immutable port
specifications, and PortState class for mutable port state with type-safe
get/set operations including unit handling.

Overview of classes:
- PortType: Enum of supported port data types (REAL, INT, BOOL, STRING, EVENT)
- PortSpec: Immutable specification of a port (name, type, direction, unit, description)
- PortState: Mutable state of a port (spec, current value, last update time)
"""

from __future__ import annotations

from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any, Literal, cast

import numpy as np

from ..utilities.units import (
    QuantityClass,
    QuantityType,
    UnitType,
    to_pint_unit,
    ureg,
)


# -------------------------------------------------------------------
# Port Types and Specifications - immutable port definitions
# -------------------------------------------------------------------
[docs] class PortType(StrEnum): """Supported port data types for co-simulation components. Attributes: REAL: Floating-point values, optionally with physical units. INT: Integer values. BOOL: Boolean values (True/False). STRING: String values. EVENT: Boolean trigger for discrete events. """ REAL = "real" INT = "int" BOOL = "bool" STRING = "string" EVENT = "event" # Boolean trigger for discrete events
Direction = Literal["in", "out"] PortValue = float | int | bool | str | QuantityType
[docs] def _validate_unit(unit: str | UnitType | None, port_name: str) -> None: """Validate that a unit string or Unit is recognized by the framework registry.""" if unit is None or unit == "": return try: _ = to_pint_unit(unit) except Exception as e: raise ValueError(f"Port '{port_name}': Invalid unit '{unit}': {e}") from e
[docs] def _coerce_numpy_scalar(value: Any) -> Any: """Convert numpy scalar types to Python native types. This allows seamless use of numpy arrays in port operations. Returns the original value if not a numpy scalar. """ if isinstance(value, np.floating): return float(value) if isinstance(value, np.integer): return int(value) if isinstance(value, np.bool_): return bool(value) return value
[docs] @dataclass(frozen=True) class PortSpec: """ Specification of a port in a CoSimComponent (immutable). Defines the name, type, direction, optional unit, and description of the port. Units are validated at construction time to catch errors early. Attributes: name: Unique identifier for the port within a component. type: Data type of the port (REAL, INT, BOOL, STRING, EVENT). direction: Whether this is an input ("in") or output ("out") port. unit: Physical unit string or Unit for REAL ports (e.g., "m/s", "N*m"). description: Human-readable description of the port's purpose. """ name: str type: PortType direction: Direction unit: str | UnitType | None = None description: str | None = None def __post_init__(self) -> None: """Validate port specification at construction time.""" # Only REAL ports can have units if self.type != PortType.REAL and self.unit is not None: raise ValueError( f"Port '{self.name}': Only REAL ports can have units, " f"got {self.type.value.upper()} with unit '{self.unit}'" ) # Validate unit string is recognized by pint _validate_unit(self.unit, self.name) def __str__(self) -> str: """Human-readable string representation.""" unit_str = f" [{self.unit}]" if self.unit else "" return f"{self.name} ({self.type.value}, {self.direction}){unit_str}"
[docs] def validate_value(self, value: Any, *, coerce_numpy: bool = True) -> Any: """ Validate that the given value matches the port's type and unit. Args: value: The value to validate. coerce_numpy: If True, convert numpy scalars to Python native types. Returns: The (possibly coerced) value if valid. Raises: TypeError: If value type doesn't match port type. ValueError: If unit validation fails. DimensionalityError: If Quantity has incompatible dimensions. """ if coerce_numpy: value = _coerce_numpy_scalar(value) # Type-specific validation if self.type == PortType.REAL: return self._validate_real(value) elif self.type == PortType.INT: return self._validate_int(value) elif self.type == PortType.BOOL: return self._validate_bool(value) elif self.type == PortType.STRING: return self._validate_string(value) elif self.type == PortType.EVENT: return self._validate_event(value) else: raise ValueError(f"Unknown port type: {self.type}")
[docs] def _validate_real(self, value: Any) -> float | QuantityType: """Validate REAL port value.""" if not isinstance(value, (float, int, QuantityClass)): raise TypeError( f"Port '{self.name}': REAL expects float, int, or Quantity, " f"got {type(value).__name__}" ) # Unit compatibility check for Quantity if self.unit and isinstance(value, QuantityClass): _ = value.to(self.unit) # Raises DimensionalityError if incompatible if isinstance(value, QuantityClass): return cast(QuantityType, value) return float(value)
[docs] def _validate_int(self, value: Any) -> int: """Validate INT port value.""" # Use 'type(value) is int' to reject bool (which is subclass of int) if type(value) is not int: raise TypeError(f"Port '{self.name}': INT expects int, got {type(value).__name__}") return value
[docs] def _validate_bool(self, value: Any) -> bool: """Validate BOOL port value.""" if not isinstance(value, bool): raise TypeError(f"Port '{self.name}': BOOL expects bool, got {type(value).__name__}") return value
[docs] def _validate_string(self, value: Any) -> str: """Validate STRING port value.""" if not isinstance(value, str): raise TypeError(f"Port '{self.name}': STRING expects str, got {type(value).__name__}") return value
[docs] def _validate_event(self, value: Any) -> bool: """Validate EVENT port value (boolean trigger).""" if not isinstance(value, bool): raise TypeError(f"Port '{self.name}': EVENT expects bool, got {type(value).__name__}") return value
[docs] @staticmethod def compatible(spec1: PortSpec, spec2: PortSpec) -> bool: """Check if two PortSpecs are compatible for connection. Compatibility rules: - Types must match exactly (REAL-REAL, INT-INT, etc.) - For REAL ports: both must have units, and units must be dimensionally compatible - For REAL ports: both having no unit is also compatible Args: spec1: Source port specification (typically output). spec2: Destination port specification (typically input). Returns: True if ports can be connected, False otherwise. """ # Type must match exactly if spec1.type != spec2.type: return False # For non-REAL types, type match is sufficient if spec1.type != PortType.REAL: return True # REAL type: check unit compatibility return PortSpec._check_unit_compatibility(spec1.unit, spec2.unit)
[docs] @staticmethod def _check_unit_compatibility( unit1: str | UnitType | None, unit2: str | UnitType | None ) -> bool: """Check if two unit specifications are compatible. Rules: - Both None: compatible (dimensionless) - One None, one has unit: incompatible (ambiguous) - Both have units: check dimensional compatibility """ if unit1 is None and unit2 is None: return True if unit1 is None or unit2 is None: return False # Asymmetric: one has unit, other doesn't try: u1 = to_pint_unit(unit1) u2 = to_pint_unit(unit2) (1 * u1).to(u2) return True except Exception: return False
# ------------------------------------------------------------------- # Port State - mutable state of a port # -------------------------------------------------------------------
[docs] @dataclass class PortState: """Mutable state of a port in a CoSimComponent. Holds the specification, current value, and last update time. Provides type-safe get/set operations with automatic unit conversion. Attributes: spec: Immutable specification defining port properties. value: Current value of the port. t_last: Simulation time of last value update. """ spec: PortSpec value: PortValue | None = field(default=None) t_last: float | None = field(default=None) def __post_init__(self) -> None: """Initialize with type-appropriate default value if None.""" if self.value is None: self.value = self._get_default_value() def __str__(self) -> str: """Human-readable string representation.""" time_str = f" @ t={self.t_last}" if self.t_last is not None else "" return f"{self.spec.name} = {self.value}{time_str}"
[docs] def _get_default_value(self) -> PortValue: """Get default value based on port type.""" defaults: dict[PortType, PortValue] = { PortType.REAL: 0.0 if self.spec.unit is None else 0.0 * ureg(str(self.spec.unit)), PortType.INT: 0, PortType.BOOL: False, PortType.STRING: "", PortType.EVENT: False, } return defaults[self.spec.type]
[docs] def set(self, value: PortValue, t: float | None = None) -> None: """Set the port's value with validation and unit conversion. Args: value: New value to set. For REAL ports with units, Quantities are automatically converted to the port's unit. t: Simulation time of this update (optional). Raises: TypeError: If value type doesn't match port type. DimensionalityError: If Quantity has incompatible dimensions. """ # Validate and potentially coerce numpy types validated_value = self.spec.validate_value(value, coerce_numpy=True) # Handle REAL port unit conversion if self.spec.type == PortType.REAL: self.value = self._convert_real_value(validated_value) else: self.value = validated_value if t is not None: self.t_last = t
[docs] def _convert_real_value(self, value: float | int | QuantityType) -> float | QuantityType: """Convert and store REAL value with proper unit handling.""" if isinstance(value, QuantityClass): if self.spec.unit is None: return cast(QuantityType, value) return cast(QuantityType, value.to(self.spec.unit)) else: real_value = float(value) if self.spec.unit is None: return real_value return cast(QuantityType, real_value * ureg(str(self.spec.unit)))
[docs] def get(self, as_unit: str | None = None) -> PortValue | None: """Get the current value of the port. Args: as_unit: For REAL ports, convert to this unit before returning. Returns: Current port value, possibly converted to requested unit. """ if self.value is None: return None if self.spec.type == PortType.REAL and as_unit is not None: return self._get_with_unit_conversion(as_unit) return self.value
[docs] def _get_with_unit_conversion(self, target_unit: str) -> PortValue: """Get REAL value converted to target unit.""" if isinstance(self.value, QuantityClass): return cast(QuantityType, self.value.to(target_unit)) if self.spec.unit is not None: unit = to_pint_unit(self.spec.unit) return cast(QuantityType, (cast(float, self.value) * unit).to(target_unit)) # No unit on port, return raw value return cast(float, self.value)
[docs] def compatible_with(self, other: PortSpec) -> bool: """Check if this port's state can connect to another port specification. Uses the same compatibility rules as PortSpec.compatible(). Args: other: Target port specification to check compatibility with. Returns: True if connection is valid, False otherwise. """ return PortSpec.compatible(self.spec, other)
[docs] def reset(self) -> None: """Reset port to default value and clear timestamp.""" self.value = self._get_default_value() self.t_last = None