"""History management for ports, components, and systems.
This module provides classes to store, retrieve, and manage
history data for port variables, components, and entire systems
during co-simulation runs.
Overview of Classes:
- ``PortHistory``: Stores time series data for a single port variable.
- ``ComponentHistory``: Manages PortHistory instances for all ports of a component.
- ``SystemHistory``: Manages ComponentHistory instances for all components in a
system, along with event recording.
"""
from __future__ import annotations
import csv
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, cast
import numpy as np
from ..utilities.units import QuantityClass, QuantityType, ureg
from .events import DenseTime
logger = logging.getLogger(__name__)
# -------------------------------------------------------------------
# Port History - stores time series data for a single port variable
# -------------------------------------------------------------------
[docs]
@dataclass
class PortHistory:
"""Stores and manages time-series history of a port's values.
This class maintains a chronological record of values associated with timestamps,
with optional unit support and conversion capabilities. It provides efficient
storage using lists internally while exposing numpy arrays for analysis.
Attributes:
port_name: The name of the port this history belongs to.
unit: The unit of measurement for the values stored in this history.
Example:
>>> history = PortHistory(port_name="temperature", unit="celsius")
>>> history.append(0.0, 25.0)
>>> history.append(1.0, 26.5)
>>> print(history.time)
[0. 1.]
>>> print(history.values)
[25. 26.5]
"""
port_name: str
unit: str | None = None
_timestamps: list[float] = field(default_factory=list)
_values: list[float | int | bool | str] = field(default_factory=list)
[docs]
def append(self, t: float, value: float | int | bool | str | QuantityType) -> None:
"""Append a new time-value pair to the history."""
self._timestamps.append(t)
if isinstance(value, QuantityClass):
self._values.append(value.magnitude)
else:
self._values.append(cast(float | int | bool | str, value))
[docs]
def clear(self) -> None:
"""Clear the history."""
self._timestamps.clear()
self._values.clear()
@property
def time(self) -> np.ndarray:
"""Get timestamps as a numpy array."""
return np.array(self._timestamps)
@property
def values(self) -> np.ndarray:
"""Get values as a numpy array."""
return np.array(self._values)
[docs]
def get_values(self, as_unit: str | None = None) -> np.ndarray:
"""Get values as a numpy array, converting to the specified unit if applicable."""
vals = self.values
if as_unit and self.unit and as_unit != self.unit:
# Convert entire array at once
converted = ureg.Quantity(vals, self.unit).to(as_unit)
return cast(np.ndarray, np.asarray(converted.magnitude))
return vals
[docs]
def to_dict(self, as_unit: str | None = None) -> dict[str, Any]:
"""Get the history as a dictionary with 'time' array, 'values' array, and unit."""
return {
"time": self.time,
"values": self.get_values(as_unit),
"unit": as_unit if as_unit else self.unit,
}
[docs]
def to_tuple(self, as_unit: str | None = None) -> tuple[np.ndarray, np.ndarray, str | None]:
"""Export history as tuple (time, values, unit)."""
if as_unit:
return self.time, self.get_values(as_unit), as_unit
return self.time, self.get_values(as_unit), self.unit
def __len__(self) -> int:
"""Get the number of entries in the history."""
return len(self._timestamps)
def __bool__(self) -> bool:
"""Check if the history has any entries."""
return len(self) > 0
# -------------------------------------------------------------------
# Component History - stores time series data for all ports of a component
# -------------------------------------------------------------------
[docs]
@dataclass
class ComponentHistory:
"""Manages the collection of port histories for a single component.
This class provides convenient access patterns for retrieving and
manipulating trajectory data across all ports of a component.
Attributes:
component_name: Name of the component whose ports are tracked.
Example:
>>> history = ComponentHistory(component_name="Tank1")
>>> history.add_port("temperature", unit="K")
>>> history.append("temperature", t=0.0, value=300.0)
>>> history.append("temperature", t=1.0, value=305.0)
>>> port_hist = history.get_port_history("temperature")
"""
component_name: str
_port_histories: dict[str, PortHistory] = field(default_factory=dict)
[docs]
def add_port(self, port_name: str, unit: str | None = None) -> None:
"""Register a port for history tracking."""
if port_name not in self._port_histories:
self._port_histories[port_name] = PortHistory(port_name=port_name, unit=unit)
[docs]
def append(self, port_name: str, t: float, value: Any) -> None:
"""Append value to port history."""
if port_name not in self._port_histories:
raise KeyError(f"Port '{port_name}' not registered for history tracking")
self._port_histories[port_name].append(t, value)
[docs]
def get_port_history(self, port_name: str) -> PortHistory:
"""Get history object for a specific port."""
if port_name not in self._port_histories:
raise KeyError(f"No history for port '{port_name}'")
return self._port_histories[port_name]
[docs]
def get_all_histories(self) -> dict[str, PortHistory]:
"""Get all port histories as dictionary."""
return dict(self._port_histories)
[docs]
def to_dict(
self, port_names: list[str] | None = None, units: dict[str, str] | None = None
) -> dict[str, dict[str, Any]]:
"""Export histories as nested dictionary.
Args:
port_names: List of ports to export. If None, exports all.
units: Dict mapping port names to desired units for conversion.
Returns:
Dict mapping port names to their history dicts.
"""
ports = port_names if port_names else list(self._port_histories.keys())
result = {}
for port in ports:
if port not in self._port_histories:
continue
target_unit = units.get(port) if units else None
result[port] = self._port_histories[port].to_dict(as_unit=target_unit)
return result
[docs]
def to_arrays(
self, port_names: list[str] | None = None, units: dict[str, str] | None = None
) -> tuple[np.ndarray, dict[str, np.ndarray]]:
"""Export as common time array and dict of value arrays.
Args:
port_names: List of ports to export. If None, exports all.
units: Dict mapping port names to desired units.
Returns:
Tuple of (time_array, {port_name: values_array})
"""
ports = port_names if port_names else list(self._port_histories.keys())
if not ports:
return np.array([]), {}
# Get common time array from first port
time = self._port_histories[ports[0]].time
values_dict = {}
for port in ports:
if port not in self._port_histories:
continue
target_unit = units.get(port) if units else None
values_dict[port] = self._port_histories[port].get_values(as_unit=target_unit)
return time, values_dict
[docs]
def clear(self, port_names: list[str] | None = None) -> None:
"""Clear history for specified ports or all ports."""
ports = port_names if port_names else list(self._port_histories.keys())
for port in ports:
if port in self._port_histories:
self._port_histories[port].clear()
def __len__(self) -> int:
"""Number of ports with history."""
return len(self._port_histories)
def __contains__(self, port_name: str) -> bool:
"""Check if port has history."""
return port_name in self._port_histories
# -------------------------------------------------------------------
# System History - stores time series data for components in a system
# -------------------------------------------------------------------
[docs]
@dataclass
class SystemHistory:
"""Manages simulation history data for an entire system.
This class serves as a container for component-level histories and event recordings
in a system simulation. It provides methods for storing, retrieving, and persisting
simulation results across multiple components and their ports.
Attributes:
system_name: Name identifier for the system.
Key Features:
- Component history management: Register and retrieve histories for individual components
- Event recording: Track discrete event occurrences with timestamps
- Flexible data retrieval: Export histories in nested or flat dictionary formats
- Unit conversion: Support for automatic unit conversion during data retrieval
- Persistence: Save/load simulation results to/from CSV files
Example:
>>> history = SystemHistory(system_name="my_system")
>>> history.add_component("motor", motor_history)
>>> history.record_event("motor", "start", 0.5)
>>> trajectory = history.get_port_trajectory("motor", "speed")
>>> history.save_csv(Path("results.csv"))
"""
system_name: str
_component_histories: dict[str, ComponentHistory] = field(default_factory=dict)
_event_histories: dict[tuple[str, str], list[DenseTime]] = field(
default_factory=dict
) # (component, event_name) -> times
[docs]
def add_component(self, component_name: str, component_history: ComponentHistory) -> None:
"""Register a component's history."""
self._component_histories[component_name] = component_history
[docs]
def get_component_history(self, component_name: str) -> ComponentHistory:
"""Get history object for a specific component."""
if component_name not in self._component_histories:
raise KeyError(f"No history for component '{component_name}'")
return self._component_histories[component_name]
[docs]
def get_all_histories(self) -> dict[str, ComponentHistory]:
"""Get all component histories as dictionary."""
return dict(self._component_histories)
[docs]
def record_event(self, component_name: str, event_name: str, t: DenseTime) -> None:
"""Record an event occurrence time."""
key = (component_name, event_name)
if key not in self._event_histories:
self._event_histories[key] = []
self._event_histories[key].append(t)
[docs]
def get_event_history(self, component_name: str, event_name: str) -> list[DenseTime]:
"""Get recorded times for a specific event."""
key = (component_name, event_name)
return self._event_histories.get(key, [])
[docs]
def get_all_event_histories(self) -> dict[tuple[str, str], list[DenseTime]]:
"""Get all recorded event histories."""
return dict(self._event_histories)
# ----------------------------------------------------------------------------
# Retrieval Methods
# ----------------------------------------------------------------------------
[docs]
def to_dict(
self,
component_names: list[str] | None = None,
port_names: dict[str, list[str]] | None = None,
units: dict[str, dict[str, str]] | None = None,
format: str = "nested",
) -> dict[str, Any]:
"""Retrieve histories from components in the system.
Args:
component_names: List of components to retrieve. If None, retrieves all.
port_names: Dict mapping component names to lists of port names to retrieve.
If None, retrieves all ports for each component.
units: Nested dict {comp_name: {port_name: target_unit}} for unit conversion.
format: 'nested' or 'flat'
- 'nested': {'comp1': {'port1': {...}, 'port2': {...}}, 'comp2': {...}}
- 'flat': {'comp1.port1': {...}, 'comp2.port2': {...}}
Returns:
Dictionary with component/port histories based on format.
"""
comps = component_names if component_names else list(self._component_histories.keys())
if format == "nested":
result = {}
for comp_name in comps:
if comp_name not in self._component_histories:
continue
comp_hist = self._component_histories[comp_name]
ports = port_names.get(comp_name) if port_names else None
comp_units = units.get(comp_name) if units else None
result[comp_name] = comp_hist.to_dict(port_names=ports, units=comp_units)
return result
elif format == "flat":
result = {}
for comp_name in comps:
if comp_name not in self._component_histories:
continue
comp_hist = self._component_histories[comp_name]
ports = port_names.get(comp_name) if port_names else None
comp_units = units.get(comp_name) if units else None
comp_data = comp_hist.to_dict(port_names=ports, units=comp_units)
# Flatten: comp.port as key
for port_name, port_data in comp_data.items():
flat_key = f"{comp_name}.{port_name}"
result[flat_key] = port_data
return result
else:
raise ValueError(f"Unknown format '{format}'. Use 'nested' or 'flat'.")
[docs]
def to_arrays(
self,
component_names: list[str] | None = None,
port_names: dict[str, list[str]] | None = None,
units: dict[str, dict[str, str]] | None = None,
) -> dict[str, tuple[np.ndarray, dict[str, np.ndarray]]]:
"""Get histories as numpy arrays per component.
Args:
component_names: List of components to retrieve. If None, retrieves all.
port_names: Dict mapping component names to lists of port names.
units: Nested dict for unit conversion.
Returns:
Dict mapping component names to (time_array, values_dict) tuples.
"""
comps = component_names if component_names else list(self._component_histories.keys())
result = {}
for comp_name in comps:
if comp_name not in self._component_histories:
continue
comp_hist = self._component_histories[comp_name]
ports = port_names.get(comp_name) if port_names else None
comp_units = units.get(comp_name) if units else None
result[comp_name] = comp_hist.to_arrays(port_names=ports, units=comp_units)
return result
[docs]
def get_port_trajectory(
self, component: str, port: str, as_unit: str | None = None
) -> tuple[np.ndarray, np.ndarray]:
"""Get time and value arrays for a specific port (convenience method).
Args:
component: Component name
port: Port name
as_unit: Target unit for conversion
Returns:
Tuple of (time_array, values_array)
"""
if component not in self._component_histories:
raise KeyError(f"Component '{component}' not found in history")
comp_hist = self._component_histories[component]
port_hist = comp_hist.get_port_history(port)
return port_hist.time, port_hist.get_values(as_unit=as_unit)
[docs]
def clear(
self,
component_names: list[str] | None = None,
port_names: dict[str, list[str]] | None = None,
) -> None:
"""Clear histories for specified components and ports."""
comps = component_names if component_names else list(self._component_histories.keys())
for comp_name in comps:
if comp_name not in self._component_histories:
continue
comp_hist = self._component_histories[comp_name]
ports = port_names.get(comp_name) if port_names else None
comp_hist.clear(port_names=ports)
# Clear event histories if component is specified
self._event_histories.clear()
# -------------------------------------------------------------------
# Persistence Methods
# -------------------------------------------------------------------
[docs]
def save_csv(
self,
filepath: Path,
component_names: list[str] | None = None,
port_names: dict[str, list[str]] | None = None,
units: dict[str, dict[str, str]] | None = None,
long_format: bool = False,
) -> None:
"""Save as single CSV file.
Args:
filepath: Output CSV file path
component_names: Components to save. If None, saves all.
port_names: Specific ports to save per component.
units: Unit conversion specifications.
long_format: If True, uses long/tidy format (time, component, port, value).
If False, uses wide format (time, comp1.port1, comp1.port2, ...).
Format:
time, ref.q_ref, pendulum.q, pendulum.omega, ...
0.0, 1.57, 0.0, 0.0, ...
0.01, 1.55, 0.015, 0.3, ...
"""
filepath.parent.mkdir(parents=True, exist_ok=True)
# Get histories in flat format
histories = self.to_dict(
component_names=component_names, port_names=port_names, units=units, format="flat"
)
if not histories:
logger.warning("No data to save.")
return
# Get all time points (assuming all signals have same time base)
first_key = next(iter(histories.keys()))
time_array = histories[first_key]["time"]
# Build header rows
signal_names = list(histories.keys())
units_row = []
for signal in signal_names:
unit = histories[signal].get("unit", "-")
units_row.append(unit if unit else "-")
# Write CSV
with open(filepath, "w", newline="") as f:
writer = csv.writer(f)
# Header row 1: signal names
writer.writerow(["time"] + signal_names)
# Header row 2: units
writer.writerow(["s"] + units_row)
# Data rows
n_points = len(time_array)
for i in range(n_points):
row = [time_array[i]]
for signal in signal_names:
row.append(histories[signal]["values"][i])
writer.writerow(row)
logger.info("Results saved to: %s", filepath)
[docs]
@staticmethod
def load_csv(filepath: Path) -> dict[str, Any]:
"""Load CSV file (auto-detects wide or long format).
Args
filepath: Path to CSV file
Returns:
Dictionary with loaded data in nested format
"""
if not filepath.exists():
raise FileNotFoundError(f"File not found: {filepath}")
data: dict[str, dict[str, dict[str, Any]]] = {}
with open(filepath) as f:
reader = csv.reader(f)
headers = next(reader) # signal names
units_row = next(reader) # units
# Read all data rows
rows = list(reader)
time_vals = np.array([float(row[0]) for row in rows])
# Parse each signal column
for col_idx, signal_name in enumerate(headers[1:], 1):
values = np.array([float(row[col_idx]) for row in rows])
unit = units_row[col_idx] if units_row[col_idx] != "-" else None
# Parse component.port
parts = signal_name.split(".", 1)
if len(parts) == 2:
comp_name, port_name = parts
else:
comp_name = "default"
port_name = signal_name
# Organize into nested structure
if comp_name not in data:
data[comp_name] = {}
data[comp_name][port_name] = {"time": time_vals, "values": values, "unit": unit}
return data