#
# ISC License
#
# Copyright (c) 2026, Autonomous Vehicle Systems Lab, University of Colorado at Boulder
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
"""
Benchmark attitude-control scenarios across C++, Python, and Numba implementations.
This script measures two key timings for each simulation run:
init -- InitializeSimulation (one-time setup; includes Numba cache load)
exec -- ExecuteSimulation (steady-state runtime cost)
The minimum timing is used as the primary performance metric because it best
approximates achievable throughput with minimal OS scheduling noise.
Two scenario groups are evaluated:
Pointing:
Compare a single control module implemented in C++, Python, and Numba.
Feedback:
Compare full FSW + sensor stacks implemented in C++ vs Numba.
"""
from __future__ import annotations
import contextlib
import io
import os
import statistics
import sys
import time
from dataclasses import dataclass
from typing import Callable, Any, Literal
sys.path.insert(0, os.path.dirname(__file__))
import scenarioAttitudeFeedback
import scenarioAttitudeFeedbackNumba
import scenarioAttitudePointing
import scenarioAttitudePointingNumba
import scenarioAttitudePointingPy
from Basilisk.utilities import SimulationBaseClass
sep = "=" * 72
[docs]
@dataclass(frozen=True)
class TimingStats:
"""Container for summary statistics of a timing sample."""
min: float
median: float
max: float
stdev: float
[docs]
@dataclass(frozen=True)
class BenchmarkResult:
"""Stores timing statistics for a single benchmark configuration."""
label: str
init: TimingStats
exec: TimingStats
[docs]
@dataclass(frozen=True)
class Scenario:
"""
Represents a benchmarkable simulation scenario.
Attributes:
label: Human-readable name for reporting.
run: Callable that executes the scenario once.
"""
label: str
run: Callable[[], Any]
[docs]
def computeStats(samples: list[float]) -> TimingStats:
"""
Compute descriptive statistics for a list of timing samples.
Args:
samples: List of timing measurements.
Returns:
TimingStats object containing min, median, max, and stdev.
Raises:
ValueError: If the input list is empty.
"""
if not samples:
raise ValueError("Cannot compute statistics of an empty sample list.")
return TimingStats(
min=min(samples),
median=statistics.median(samples),
max=max(samples),
stdev=statistics.stdev(samples) if len(samples) > 1 else 0.0,
)
[docs]
@contextlib.contextmanager
def suppressOutput():
"""
Context manager that suppresses stdout and stderr.
This prevents scenario print statements from interfering with
benchmark timing and output readability.
"""
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
yield
[docs]
@contextlib.contextmanager
def patchedSimTimers():
"""
Context manager that patches simulation methods to measure execution time.
Temporarily replaces:
- InitializeSimulation
- ExecuteSimulation
to capture timing information for each run.
Yields:
dict: Mutable dictionary with keys 'init' and 'exec' storing durations.
Guarantees:
Original methods are restored after exiting the context.
"""
timings = {"init": 0.0, "exec": 0.0}
origInit = SimulationBaseClass.SimBaseClass.InitializeSimulation
origExec = SimulationBaseClass.SimBaseClass.ExecuteSimulation
def timedInit(self):
"""Wrapped InitializeSimulation that records execution time."""
t0 = time.perf_counter()
origInit(self)
timings["init"] = time.perf_counter() - t0
def timedExec(self):
"""Wrapped ExecuteSimulation that records execution time."""
t0 = time.perf_counter()
origExec(self)
timings["exec"] = time.perf_counter() - t0
SimulationBaseClass.SimBaseClass.InitializeSimulation = timedInit
SimulationBaseClass.SimBaseClass.ExecuteSimulation = timedExec
try:
yield timings
finally:
SimulationBaseClass.SimBaseClass.InitializeSimulation = origInit
SimulationBaseClass.SimBaseClass.ExecuteSimulation = origExec
[docs]
def measureScenario(scenario: Scenario, nRuns: int = 10) -> BenchmarkResult:
"""
Benchmark a scenario over multiple runs.
Args:
scenario: Scenario object containing label and run function.
nRuns: Number of repetitions.
Returns:
BenchmarkResult containing timing statistics for init and exec.
"""
initTimes: list[float] = []
execTimes: list[float] = []
with patchedSimTimers() as timings:
for _ in range(nRuns):
with suppressOutput():
scenario.run()
initTimes.append(timings["init"])
execTimes.append(timings["exec"])
return BenchmarkResult(
label=scenario.label,
init=computeStats(initTimes),
exec=computeStats(execTimes),
)
[docs]
def printResultsTable(results: list[BenchmarkResult], title: str, labelCol: int = 28) -> None:
"""
Print a single compact benchmark table for one scenario family.
Args:
results: Benchmark results to print.
title: Title of the scenario family.
labelCol: Width of the label column.
"""
execBaseline = results[0].exec.min
print(title)
print(
f" {'Module':<{labelCol}} "
f"{'exec min':>10} {'exec med':>10} {'exec max':>10} {'exec min rel':>12} "
f"{'init min':>10} {'init med':>10} {'init max':>10}"
)
print(
f" {'-' * labelCol} "
f"{'-' * 10} {'-' * 10} {'-' * 10} {'-' * 12} "
f"{'-' * 10} {'-' * 10} {'-' * 10}"
)
for result in results:
execRatio = result.exec.min / execBaseline if execBaseline > 0.0 else float("inf")
print(
f" {result.label:<{labelCol}} "
f"{formatSeconds(result.exec.min):>10} "
f"{formatSeconds(result.exec.median):>10} "
f"{formatSeconds(result.exec.max):>10} "
f"{execRatio:>11.3f}x "
f"{formatSeconds(result.init.min):>10} "
f"{formatSeconds(result.init.median):>10} "
f"{formatSeconds(result.init.max):>10} "
)
print()
[docs]
def runGroup(title: str, scenarios: list[Scenario], nRuns: int = 10) -> None:
"""
Execute and report a group of benchmark scenarios.
Args:
title: Title describing the scenario family.
scenarios: Scenarios to benchmark.
nRuns: Number of runs per scenario.
"""
results = [measureScenario(s, nRuns=nRuns) for s in scenarios]
printResultsTable(results, f"{title} ({nRuns} runs)")
[docs]
def run(case: Literal["pointing", "feedback"] = "feedback", nRuns: int = 10) -> None:
"""Main scenario entry point"""
if case == "pointing":
runGroup(
title="Pointing - single control module replaced",
scenarios=[
Scenario("C++ (all C++)", lambda: scenarioAttitudePointing.run(False, False)),
Scenario("Python (1 Py module)", lambda: scenarioAttitudePointingPy.run(False)),
Scenario("Numba (1 Nb module)", lambda: scenarioAttitudePointingNumba.run(False)),
],
nRuns=nRuns,
)
elif case == "feedback":
runGroup(
title="Feedback - all 4 FSW+sensor modules replaced",
scenarios=[
Scenario(
"C++ (all C++)",
lambda: scenarioAttitudeFeedback.run(False, False, False, False, False),
),
Scenario(
"Numba (4 Nb modules)",
lambda: scenarioAttitudeFeedbackNumba.run(False, False, False, False),
),
],
nRuns=nRuns,
)
if __name__ == "__main__":
"""Entry point: run all benchmark scenario groups."""
for case in ("pointing", "feedback"):
run(case)