Source code for RetentionPolicy

import pandas as pd
from Basilisk.utilities import unitTestSupport


[docs] class VariableRetentionParameters: """ Represents a variable's logging parameters. """ def __init__(self, varName, varRate, startIndex=0, stopIndex=0, varType='double'): self.varName = varName self.varRate = varRate self.startIndex = startIndex self.stopIndex = stopIndex self.varType = varType
[docs] class MessageRetentionParameters: """ Represents a message's logging parameters. Args: name: name of the message recorder retainedVars: the message variable to record """ def __init__(self, name, retainedVars): self.msgRecName = name self.retainedVars = retainedVars
[docs] class RetentionPolicy: """ This policy controls what simulation data is saved and how it is stored. Note that the simulation data array will have the message time prepended as the first column. """ def __init__(self, rate=int(1E10)): self.logRate = rate self.messageLogList = [] self.varLogList = [] self.dataCallback = None self.retentionFunctions = [] def addMessageLog(self, name, retainedVars): self.messageLogList.append(MessageRetentionParameters(name, retainedVars)) def addVariableLog(self, variableName, startIndex=0, stopIndex=0, varType='double', logRate=None): if logRate is None: logRate = self.logRate varContainer = VariableRetentionParameters(variableName, logRate, startIndex, stopIndex, varType) self.varLogList.append(varContainer) def addLogsToSim(self, simInstance): for variable in self.varLogList: simInstance.AddVariableForLogging(variable.varName, variable.varRate, variable.startIndex, variable.stopIndex, variable.varType) def addRetentionFunction(self, function): self.retentionFunctions.append(function) def setDataCallback(self, dataCallback): self.dataCallback = dataCallback def executeCallback(self, data): if self.dataCallback is not None: self.dataCallback(data, self)
[docs] @staticmethod def addRetentionPoliciesToSim(simInstance, retentionPolicies): """ Adds logs for variables and messages to a simInstance Args: simInstance: The simulation instance to add logs to. retentionPolicies: RetentionPolicy[] list that defines the data to log. """ for retentionPolicy in retentionPolicies: retentionPolicy.addLogsToSim(simInstance)
# TODO handle duplicates somehow?
[docs] @staticmethod def getDataForRetention(simInstance, retentionPolicies): """ Returns the data that should be retained given a simInstance and the retentionPolicies Args: simInstance: The simulation instance to retrieve data from retentionPolicies: A list of RetentionPolicy objects defining the data to retain Returns: Retained Data in the form of a dictionary with two sub-dictionaries for messages and variables:: { "messages": { "messageName": [value1,value2,value3] }, "variables": { "variableName": [value1,value2,value3] } } """ data = {"messages": {}, "variables": {}, "custom": {}} df = pd.DataFrame() dataFrames = [] for retentionPolicy in retentionPolicies: for msgParam in retentionPolicy.messageLogList: # record the message recording times msgTimes = simInstance.msgRecList[msgParam.msgRecName].times() # record the message variables for varName in msgParam.retainedVars: # To ensure the current datashaders utilities continue to work, the # retained data is combined with the time information as it was in # BSK1.x releases. msgData = getattr(simInstance.msgRecList[msgParam.msgRecName], varName) msgData = unitTestSupport.addTimeColumn(msgTimes, msgData) data["messages"][msgParam.msgRecName + "." + varName] = msgData for variable in retentionPolicy.varLogList: data["variables"][variable.varName] = simInstance.GetLogVariableData(variable.varName) for func in retentionPolicy.retentionFunctions: tmpModuleData = func(simInstance) for (key, value) in tmpModuleData.items(): data["custom"][key] = value return data