import pytest
import numpy as np
from Basilisk.utilities import SimulationBaseClass
from Basilisk.utilities import unitTestSupport
from Basilisk.simulation import spacecraft
from Basilisk.utilities import macros
from Basilisk.utilities import simIncludeRW
from Basilisk.simulation import reactionWheelStateEffector
from Basilisk.architecture import messaging
import gc
import psutil
import os
[docs]
def getMemoryUsage():
"""Get memory usage of current process in MB"""
process = psutil.Process(os.getpid())
gc.collect() # Force collection before measurement
memory_info = process.memory_info()
return memory_info.rss / 1024 / 1024 # RSS in MB
[docs]
def create_and_run_simulation():
"""Create and run a simulation with RW setup"""
# Create simulation variable names
unitTaskName = "unitTask" # arbitrary name (don't change)
unitProcessName = "TestProcess" # arbitrary name (don't change)
# Create simulation
unitTestSim = SimulationBaseClass.SimBaseClass()
testProc = unitTestSim.CreateNewProcess(unitProcessName)
testProc.addTask(unitTestSim.CreateNewTask(unitTaskName, macros.sec2nano(0.1)))
# Create spacecraft and RW objects
scObject = spacecraft.Spacecraft()
rwFactory = simIncludeRW.rwFactory()
# Create 3 reaction wheels
for i in range(3):
rwFactory.create('Honeywell_HR16',
[1, 0, 0] if i == 0 else [0, 1, 0] if i == 1 else [0, 0, 1],
Omega=500.,
maxMomentum=50.)
# Create RW state effector
rwStateEffector = reactionWheelStateEffector.ReactionWheelStateEffector()
rwStateEffector.ModelTag = "ReactionWheel"
rwFactory.addToSpacecraft("ReactionWheels", rwStateEffector, scObject)
# Add models to task
unitTestSim.AddModelToTask(unitTaskName, rwStateEffector)
unitTestSim.AddModelToTask(unitTaskName, scObject)
# Add logging
rwLogs = []
for item in range(rwFactory.getNumOfDevices()):
rwLogs.append(rwStateEffector.rwOutMsgs[item].recorder())
unitTestSim.AddModelToTask(unitTaskName, rwLogs[item])
# Run simulation
unitTestSim.InitializeSimulation()
unitTestSim.ConfigureStopTime(macros.sec2nano(0.5))
unitTestSim.ExecuteSimulation()
# Cleanup in reverse order of creation
# Clear logs first
for log in rwLogs:
del log
# Clear message subscriptions
for msg in rwStateEffector.rwOutMsgs:
if hasattr(msg, 'unsubscribeAll'):
msg.unsubscribeAll()
# Clear references in reverse order
del rwStateEffector
del rwFactory
del scObject
del testProc
del unitTestSim
gc.collect()
[docs]
@pytest.mark.parametrize("num_iterations,max_allowed_growth", [
(25, 3.0), # Reduced from 50 to 25 iterations
(50, 3.0), # Reduced from 100 to 50 iterations
])
def test_rw_memory_leak(num_iterations, max_allowed_growth):
"""Test for memory leaks in reaction wheel implementation"""
initial_memory = getMemoryUsage()
memory_measurements = []
# Run multiple iterations
for i in range(num_iterations):
create_and_run_simulation()
# Take measurement after forced GC
gc.collect()
current_memory = getMemoryUsage()
memory_measurements.append(current_memory)
if (i + 1) % 10 == 0:
print(f"Iteration {i+1}/{num_iterations}, Memory: {current_memory:.2f} MB")
print(f"Delta from start: {current_memory - initial_memory:.2f} MB")
# Calculate memory statistics
memory_growth = memory_measurements[-1] - initial_memory
memory_trend = np.polyfit(range(len(memory_measurements)), memory_measurements, 1)[0]
# More detailed failure messages
if memory_growth >= max_allowed_growth:
pytest.fail(f"Memory growth ({memory_growth:.2f} MB) exceeds maximum allowed "
f"({max_allowed_growth:.2f} MB)\nTrend: {memory_trend:.4f} MB/iteration")
if memory_trend >= 0.05:
pytest.fail(f"Memory growth trend ({memory_trend:.4f} MB/iteration) indicates "
f"potential leak\nTotal growth: {memory_growth:.2f} MB")
if __name__ == "__main__":
test_rw_memory_leak(50, 3.0)