#
# Copyright (c) 2026, Autonomous Vehicle Systems Lab, University of Colorado at Boulder
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
import pytest
import numpy as np
from Basilisk.architecture import messaging
from Basilisk.architecture import sysModel
from Basilisk.utilities import SimulationBaseClass
from Basilisk.utilities import macros
[docs]
def test_update_time_interval():
"""Test changing a recorder's minimum update time during a paused simulation."""
sc_sim = SimulationBaseClass.SimBaseClass()
sim_process = sc_sim.CreateNewProcess("testProcess")
task_time = macros.sec2nano(1.0) # [ns]
sim_process.addTask(sc_sim.CreateNewTask("testTask", task_time))
msg_payload = messaging.CModuleTemplateMsgPayload()
msg = messaging.CModuleTemplateMsg().write(msg_payload)
initial_record_time = macros.sec2nano(10.0) # [ns]
msg_recorder = msg.recorder(initial_record_time)
sc_sim.AddModelToTask("testTask", msg_recorder)
sc_sim.InitializeSimulation()
first_stop_time = macros.sec2nano(5.0) # [ns]
sc_sim.ConfigureStopTime(first_stop_time)
sc_sim.ExecuteSimulation()
short_record_time = macros.sec2nano(3.0) # [ns]
second_stop_time = macros.sec2nano(12.0) # [ns]
msg_recorder.updateTimeInterval(short_record_time)
sc_sim.ConfigureStopTime(second_stop_time)
sc_sim.ExecuteSimulation()
long_record_time = macros.sec2nano(8.0) # [ns]
final_stop_time = macros.sec2nano(20.0) # [ns]
msg_recorder.updateTimeInterval(long_record_time)
sc_sim.ConfigureStopTime(final_stop_time)
sc_sim.ExecuteSimulation()
first_record_time = macros.sec2nano(0.0) # [ns]
resumed_record_time = macros.sec2nano(6.0) # [ns]
second_resumed_record_time = macros.sec2nano(9.0) # [ns]
expected_times = np.array([
first_record_time,
resumed_record_time,
second_resumed_record_time,
second_stop_time,
final_stop_time,
]) # [ns]
np.testing.assert_array_equal(msg_recorder.times(), expected_times)
[docs]
def test_record_on_change():
"""Test recording changed message payloads after the minimum update time."""
sc_sim = SimulationBaseClass.SimBaseClass()
sim_process = sc_sim.CreateNewProcess("testProcess")
task_time = macros.sec2nano(1.0) # [ns]
sim_process.addTask(sc_sim.CreateNewTask("testTask", task_time))
test_module = ChangingPayloadModule()
sc_sim.AddModelToTask("testTask", test_module)
minimum_record_time = macros.sec2nano(4.0) # [ns]
msg_recorder = test_module.dataOutMsg.recorder(minimum_record_time)
msg_recorder.recordOnChange()
sc_sim.AddModelToTask("testTask", msg_recorder)
sc_sim.InitializeSimulation()
final_stop_time = macros.sec2nano(13.0) # [ns]
sc_sim.ConfigureStopTime(final_stop_time)
sc_sim.ExecuteSimulation()
first_record_time = macros.sec2nano(0.0) # [ns]
first_eligible_change_time = macros.sec2nano(8.0) # [ns]
second_eligible_change_time = macros.sec2nano(12.0) # [ns]
expected_times = np.array([
first_record_time,
first_eligible_change_time,
second_eligible_change_time,
]) # [ns]
expected_values = np.array([
[0, 0, 0],
[1, 0, 0],
[2, 0, 0],
])
np.testing.assert_array_equal(msg_recorder.times(), expected_times)
np.testing.assert_array_equal(msg_recorder.dataVector, expected_values)
[docs]
def test_record_on_change_update_time_interval_after_skipped_record():
"""Test interval updates after a change-only recorder skips an unchanged payload."""
sc_sim = SimulationBaseClass.SimBaseClass()
sim_process = sc_sim.CreateNewProcess("testProcess")
task_time = macros.sec2nano(1.0) # [ns]
sim_process.addTask(sc_sim.CreateNewTask("testTask", task_time))
test_module = ChangingPayloadModule()
sc_sim.AddModelToTask("testTask", test_module)
initial_record_time = macros.sec2nano(4.0) # [ns]
msg_recorder = test_module.dataOutMsg.recorder(initial_record_time)
msg_recorder.recordOnChange()
sc_sim.AddModelToTask("testTask", msg_recorder)
sc_sim.InitializeSimulation()
first_stop_time = macros.sec2nano(6.0) # [ns]
sc_sim.ConfigureStopTime(first_stop_time)
sc_sim.ExecuteSimulation()
updated_record_time = macros.sec2nano(10.0) # [ns]
second_stop_time = macros.sec2nano(13.0) # [ns]
msg_recorder.updateTimeInterval(updated_record_time)
sc_sim.ConfigureStopTime(second_stop_time)
sc_sim.ExecuteSimulation()
first_record_time = macros.sec2nano(0.0) # [ns]
np.testing.assert_array_equal(msg_recorder.times(), np.array([first_record_time]))
final_stop_time = macros.sec2nano(14.0) # [ns]
sc_sim.ConfigureStopTime(final_stop_time)
sc_sim.ExecuteSimulation()
expected_times = np.array([
first_record_time,
final_stop_time,
]) # [ns]
expected_values = np.array([
[0, 0, 0],
[2, 0, 0],
])
np.testing.assert_array_equal(msg_recorder.times(), expected_times)
np.testing.assert_array_equal(msg_recorder.dataVector, expected_values)
[docs]
def test_record_on_change_unsupported_payload():
"""recordOnChange() on a payload with no equality support raises BasiliskError.
VizUserInputMsgPayload contains std::vector<VizEventReply>, which the equality
generator rejects and no manual specialization exists. Enabling change-only
recording should fail explicitly instead of falling back to interval recording.
"""
msg_recorder = messaging.VizUserInputMsg().recorder()
with pytest.raises(Exception, match="does not support equality comparison.*PayloadEqualityTraits"):
msg_recorder.recordOnChange()
msg_recorder.recordOnChange(False)
[docs]
def test_record_on_change_camera_image_payload():
"""Test CameraImageMsgPayload change-only recording with shallow metadata equality.
CameraImageMsgPayload equality compares payload fields, including the image
pointer address, but it does not compare the pointed-to image bytes.
"""
sc_sim = SimulationBaseClass.SimBaseClass()
sim_process = sc_sim.CreateNewProcess("testProcess")
task_time = macros.sec2nano(1.0) # [ns]
sim_process.addTask(sc_sim.CreateNewTask("testTask", task_time))
test_module = ChangingCameraModule()
sc_sim.AddModelToTask("testTask", test_module)
minimum_record_time = macros.sec2nano(4.0) # [ns]
msg_recorder = test_module.cameraOutMsg.recorder(minimum_record_time)
msg_recorder.recordOnChange()
sc_sim.AddModelToTask("testTask", msg_recorder)
sc_sim.InitializeSimulation()
final_stop_time = macros.sec2nano(10.0) # [ns]
sc_sim.ConfigureStopTime(final_stop_time)
sc_sim.ExecuteSimulation()
first_record_time = macros.sec2nano(0.0) # [ns]
first_eligible_change_time = macros.sec2nano(8.0) # [ns]
expected_times = np.array([
first_record_time,
first_eligible_change_time,
]) # [ns]
expected_time_tags = np.array([
macros.sec2nano(0.0),
test_module.image_time_tag,
]) # [ns]
np.testing.assert_array_equal(msg_recorder.times(), expected_times)
np.testing.assert_array_equal(msg_recorder.timeTag, expected_time_tags)
[docs]
class ChangingCameraModule(sysModel.SysModel):
"""Write a CameraImageMsgPayload whose metadata changes at a fixed time."""
def __init__(self):
super().__init__()
self.cameraOutMsg = messaging.CameraImageMsg()
self.change_time = macros.sec2nano(5.0) # [ns]
self.image_time_tag = macros.sec2nano(5.0) # [ns]
def Reset(self, CurrentSimNanos):
self.write_payload(CurrentSimNanos)
def UpdateState(self, CurrentSimNanos):
self.write_payload(CurrentSimNanos)
def write_payload(self, CurrentSimNanos):
payload = self.cameraOutMsg.zeroMsgPayload
payload.valid = 1
payload.cameraID = 7
payload.imageBufferLength = 10 # [bytes]
payload.imageType = 1
if CurrentSimNanos >= self.change_time:
payload.timeTag = self.image_time_tag
self.cameraOutMsg.write(payload, CurrentSimNanos, self.moduleID)
[docs]
class ChangingPayloadModule(sysModel.SysModel):
"""Write a payload that changes at fixed simulation times."""
def __init__(self):
super().__init__()
self.dataOutMsg = messaging.CModuleTemplateMsg()
self.first_change_time = macros.sec2nano(5.0) # [ns]
self.second_change_time = macros.sec2nano(9.0) # [ns]
def Reset(self, CurrentSimNanos):
self.write_payload(CurrentSimNanos)
def UpdateState(self, CurrentSimNanos):
self.write_payload(CurrentSimNanos)
def write_payload(self, CurrentSimNanos):
payload_value = 0
if CurrentSimNanos >= self.first_change_time:
payload_value = 1
if CurrentSimNanos >= self.second_change_time:
payload_value = 2
payload = self.dataOutMsg.zeroMsgPayload
payload.dataVector = np.array([
payload_value,
0,
0,
])
self.dataOutMsg.write(payload, CurrentSimNanos, self.moduleID)
if __name__ == "__main__":
test_update_time_interval()
test_record_on_change()
test_record_on_change_update_time_interval_after_skipped_record()
test_record_on_change_unsupported_payload()
test_record_on_change_camera_image_payload()