Source code for pythonVariableLogger

import keyword
from typing import Callable, Any, Dict

import numpy as np

from Basilisk.architecture import sysModel

LoggingFunction = Callable[[int], Any]

[docs] class PythonVariableLogger(sysModel.SysModel): """Logs results of arbitrary functions keyed by name. Usage:: log = PythonVariableLogger({ "a": lambda CurrentSimNanos: CurrentSimNanos**2, "b": lambda CurrentSimNanos: CurrentSimNanos**3, }) # run sim times = log.times() # array of timestamps timesSquared = log["a"] # or log.a timesCubed = log["b"] # or log.b """ def __init__( self, logging_functions: Dict[str, LoggingFunction], min_log_period: int = 0, ) -> None: if min_log_period < 0: raise ValueError("min_log_period must be >= 0") super().__init__() self.logging_functions = logging_functions self.min_log_period = min_log_period self._next_update_time = 0 self.clear() def clear(self) -> None: self._times: list = [] self._variables: Dict[str, list] = {k: [] for k in self.logging_functions} def times(self) -> np.ndarray: return np.array(self._times) def __getitem__(self, name: str) -> np.ndarray: try: variables = object.__getattribute__(self, "_variables") except AttributeError as err: raise AttributeError( f"'{type(self).__name__}' object has no attribute '{name}'" ) from err if name not in variables: raise KeyError( f"Logger is not logging '{name}'. " f"Available: {', '.join(variables)}" ) return np.array(variables[name]) def __getattr__(self, name: str) -> np.ndarray: # Only called for names not found through normal lookup try: return self[name] except KeyError as err: raise AttributeError(str(err)) from err def Reset(self, CurrentSimNanos: int) -> None: self.clear() self._next_update_time = CurrentSimNanos return super().Reset(CurrentSimNanos) def UpdateState(self, CurrentSimNanos: int) -> None: if CurrentSimNanos < self._next_update_time: return super().UpdateState(CurrentSimNanos) self._times.append(CurrentSimNanos) for name, fn in self.logging_functions.items(): try: val = np.array(fn(CurrentSimNanos)).squeeze() except Exception as ex: self.bskLogger.bskLog( sysModel.BSK_ERROR, f"Error logging '{name}' in '{self.ModelTag}': {ex}", ) val = None self._variables[name].append(val) self._next_update_time += self.min_log_period return super().UpdateState(CurrentSimNanos)