Source code for test_recorderLargeHistory

#
#  Copyright (c) 2026, Autonomous Vehicle Systems Lab, University of Colorado at Boulder
#
#  Permission to use, copy, modify, and/or distribute this software for any
#  purpose with or without fee is hereby granted, provided that the above
#  copyright notice and this permission notice appear in all copies.
#
#  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
#  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
#  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
#  ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
#  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
#  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
#  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
#   Unit Test Script
#   Module Name: messaging Recorder (large-history storage)
#   Author: robotrocketscience (https://github.com/robotrocketscience)
#   Creation Date: June 30, 2026
#

import numpy as np

from Basilisk.architecture import messaging


[docs] def test_recorder_large_history(): """Record a long history that crosses the storage container's growth boundaries and verify every sample round-trips intact. The Recorder stores its history in a std::deque (see issue #788). This test guards that appending well past the points where the previous std::vector storage would reallocate (2**16 = 65536 and beyond) preserves both the recorded times and the recorded payload fields exactly. It is a pure correctness-at-scale check with no timing assertions, so it is deterministic and not flaky. The recorder is driven directly (rather than through a full simulation) purely so the large sample count stays fast. """ sampleCount = 70000 # number of recorded samples; > 2**16 to cross the old reallocation boundary timeStep = 1000000 # [ns] simulated time between successive recordings payload = messaging.CModuleTemplateMsgPayload() msg = messaging.CModuleTemplateMsg() recorder = msg.recorder() # default minimum recording interval of 0 -> records on every update recorder.Reset(0) expectedTimes = np.empty(sampleCount, dtype=np.uint64) expectedData = np.empty((sampleCount, 3), dtype=float) for i in range(sampleCount): # Stamp each sample with a unique, exactly-representable value so any dropped, # duplicated, or corrupted entry across a growth boundary is detected. sampleVector = [float(i), float(2 * i), float(3 * i)] payload.dataVector = sampleVector currentTime = i * timeStep # [ns] msg.write(payload, currentTime) recorder.UpdateState(currentTime) expectedTimes[i] = currentTime expectedData[i] = sampleVector # Every sample must be present, in order, and unmodified. assert len(recorder.times()) == sampleCount np.testing.assert_array_equal(recorder.times(), expectedTimes) np.testing.assert_array_equal(recorder.dataVector, expectedData)
if __name__ == "__main__": test_recorder_large_history() print("test_recorder_large_history passed")