{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Fault Environment Example\n", "This tutorial demonstrates how to configure and use a simple BSK-RL environment to model faults in a system with four reaction wheels (RWs).\n", "\n", "## Load Modules" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "from typing import Iterable\n", "\n", "from Basilisk.architecture import bskLogging\n", "from Basilisk.utilities import macros, orbitalMotion, simIncludeRW\n", "from Basilisk.simulation import reactionWheelStateEffector\n", "from Basilisk.fswAlgorithms import rwNullSpace\n", "from Basilisk.architecture import messaging\n", "from bsk_rl import SatelliteTasking, act, data, obs, scene, sats\n", "from bsk_rl.sim import dyn, fsw, world\n", "from bsk_rl.utils.orbital import random_orbit, random_unit_vector\n", "from bsk_rl.utils.functional import default_args\n", "\n", "bskLogging.setDefaultLogLevel(bskLogging.BSK_WARNING)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Making Faults Cases\n", "Creating a fault base class and defining individual fault types enables modeling multiple kinds of faults within a single satellite. In this example, a power draw limit is applied to RWs, causing it to operate at reduced speed compared to nominal conditions. By default, while a torque limit is enforced, there are no restrictions on power draw. `time` is used to define the time at which the fault occurs, `reducedLimit` specifies the power draw limit in watts, and `wheel_Idx` indicates which RW is affected by the fault. It can be set to a value from 1 to 4, or to all to apply the fault to every RW." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class FaultObject:\n", " def __init__(self, name, time, verbose=True, **kwargs):\n", " self.name = name\n", " self.time = time\n", " self.verbose = verbose\n", " self.message = None\n", " self.message_printed = False\n", "\n", " def execute(self, satellite):\n", " raise NotImplementedError(\n", " f\"{self.name} does not have a custom execute function!\"\n", " )\n", "\n", " def print_message(self, message, satellite):\n", " if not self.message_printed:\n", " satellite.logger.info(message)\n", " self.message_printed = True\n", "\n", " def addFaultToSimulation(self, satellite, listIdx):\n", " self.uniqueFaultIdx = listIdx # Index in the faultList array.\n", " satellite.simulator.createNewEvent(\n", " f\"add{self.name}Fault\",\n", " satellite.dynamics.dyn_rate,\n", " True,\n", " [f\"self.TotalSim.CurrentNanos>={self.time}\"],\n", " [\n", " f\"self.faultList[{self.uniqueFaultIdx}].execute({satellite._satellite_command})\",\n", " f\"self.faultList[{self.uniqueFaultIdx}].print({satellite._satellite_command})\",\n", " ],\n", " )\n", "\n", "\n", "class RwPowerFault(FaultObject):\n", " def __init__(self, name, time, reducedLimit, wheelIdx):\n", " super().__init__(name, time)\n", " self.reducedLimit = reducedLimit\n", "\n", " if isinstance(wheelIdx, float):\n", " # int needed around wheelIdx because np.random.choice doesn't return\n", " # a type int, and the index will not register in execute without it.\n", " self.wheelIdx = int(wheelIdx)\n", " elif isinstance(wheelIdx, int) or wheelIdx == \"all\":\n", " # option to trigger the fault in all wheels reflecting a larger power issue\n", " self.wheelIdx = wheelIdx\n", " else:\n", " raise ValueError(\n", " \"Fault parameter 'wheelIdx' must either be a number corresponding to a reaction wheel or the string 'all'\"\n", " )\n", "\n", " def execute(self, satellite):\n", " dynModels = satellite.dynamics\n", " if self.wheelIdx == 1:\n", " dynModels.rwFactory.rwList[\"RW1\"].P_max = self.reducedLimit\n", " elif self.wheelIdx == 2:\n", " dynModels.rwFactory.rwList[\"RW2\"].P_max = self.reducedLimit\n", " elif self.wheelIdx == 3:\n", " dynModels.rwFactory.rwList[\"RW3\"].P_max = self.reducedLimit\n", " elif self.wheelIdx == 4:\n", " dynModels.rwFactory.rwList[\"RW4\"].P_max = self.reducedLimit\n", " elif self.wheelIdx == \"all\":\n", " # option to trigger the fault in all wheels (not supported for all fault types)\n", " dynModels.rwFactory.rwList[\"RW1\"].P_max = self.reducedLimit\n", " dynModels.rwFactory.rwList[\"RW2\"].P_max = self.reducedLimit\n", " dynModels.rwFactory.rwList[\"RW3\"].P_max = self.reducedLimit\n", " dynModels.rwFactory.rwList[\"RW4\"].P_max = self.reducedLimit\n", "\n", " def print(self, satellite):\n", " if self.wheelIdx == \"all\":\n", " self.message = f\"RW Power Fault: all RW's power limit reduced to {self.reducedLimit} Watts at {self.time*macros.NANO2MIN} minutes!\"\n", " else:\n", " self.message = f\"RW Power Fault: RW{self.wheelIdx}'s power limit reduced to {self.reducedLimit} Watts at {self.time*macros.NANO2MIN} minutes!\"\n", " super().print_message(self.message, satellite)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Configure the Simulation Models\n", "* [Dynamics model](../api_reference/sim/dyn.rst): `FullFeaturedDynModel` is used as the base class, and `setup_reaction_wheel_dyn_effector` is overridden to support four RWs. Two additional properties are added: the angle between the Sun and the solar panel, and the speed fraction of each RW." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class CustomDynModel(dyn.FullFeaturedDynModel):\n", "\n", " @property\n", " def solar_angle_norm(self) -> float:\n", " sun_vec_N = (\n", " self.world.gravFactory.spiceObject.planetStateOutMsgs[self.world.sun_index]\n", " .read()\n", " .PositionVector\n", " )\n", " sun_vec_N_hat = sun_vec_N / np.linalg.norm(sun_vec_N)\n", " solar_panel_vec_B = np.array([0, 0, -1])\n", " mat = np.transpose(self.BN)\n", " solar_panel_vec_N = np.matmul(mat, solar_panel_vec_B)\n", " error_angle = np.arccos(np.dot(solar_panel_vec_N, sun_vec_N_hat))\n", "\n", " return error_angle / np.pi\n", "\n", " @property\n", " def wheel_speeds_frac(self):\n", " rw_speed = self.wheel_speeds\n", " return rw_speed[0:4] / (self.maxWheelSpeed * macros.rpm2radsec)\n", "\n", " @default_args(\n", " wheelSpeeds=lambda: np.random.uniform(-1500, 1500, 4),\n", " maxWheelSpeed=np.inf,\n", " u_max=0.200,\n", " )\n", " def setup_reaction_wheel_dyn_effector(\n", " self,\n", " wheelSpeeds: Iterable[float],\n", " maxWheelSpeed: float,\n", " u_max: float,\n", " priority: int = 997,\n", " **kwargs,\n", " ) -> None:\n", " \"\"\"Set the RW state effector parameters.\n", "\n", " Args:\n", " wheelSpeeds: Initial speeds of each wheel [RPM]\n", " maxWheelSpeed: Failure speed for wheels [RPM]\n", " u_max: Torque producible by wheel [N*m]\n", " priority: Model priority.\n", " kwargs: Ignored\n", " \"\"\"\n", "\n", " def balancedHR16Triad(\n", " useRandom=False, randomBounds=(-400, 400), wheelSpeeds=[500, 500, 500, 500]\n", " ):\n", " \"\"\"Create a set of three HR16 reaction wheels.\n", "\n", " Args:\n", " useRandom: Use random values for wheel speeds.\n", " randomBounds: Bounds for random wheel speeds.\n", " wheelSpeeds: Fixed wheel speeds.\n", "\n", " Returns:\n", " tuple:\n", " * **rwStateEffector**: Reaction wheel state effector instance.\n", " * **rwFactory**: Factory containing defined reaction wheels.\n", " * **wheelSpeeds**: Wheel speeds.\n", " \"\"\"\n", " rwFactory = simIncludeRW.rwFactory()\n", "\n", " if useRandom:\n", " wheelSpeeds = np.random.uniform(randomBounds[0], randomBounds[1], 4)\n", " c = 3 ** (-0.5)\n", " rwFactory.create(\n", " \"Honeywell_HR16\",\n", " [1, 0, 0],\n", " maxMomentum=50.0,\n", " Omega=wheelSpeeds[0],\n", " )\n", " rwFactory.create(\n", " \"Honeywell_HR16\",\n", " [0, 1, 0],\n", " maxMomentum=50.0,\n", " Omega=wheelSpeeds[1],\n", " )\n", " rwFactory.create(\n", " \"Honeywell_HR16\",\n", " [0, 0, 1],\n", " maxMomentum=50.0,\n", " Omega=wheelSpeeds[2],\n", " )\n", " rwFactory.create(\n", " \"Honeywell_HR16\",\n", " [c, c, c],\n", " maxMomentum=50.0,\n", " Omega=wheelSpeeds[3],\n", " )\n", "\n", " rwStateEffector = reactionWheelStateEffector.ReactionWheelStateEffector()\n", "\n", " return rwStateEffector, rwFactory, wheelSpeeds\n", "\n", " self.maxWheelSpeed = maxWheelSpeed\n", " self.rwStateEffector, self.rwFactory, _ = balancedHR16Triad(\n", " useRandom=False,\n", " wheelSpeeds=wheelSpeeds,\n", " )\n", " for RW in self.rwFactory.rwList.values():\n", " RW.u_max = u_max\n", " self.rwStateEffector.ModelTag = \"ReactionWheels\"\n", " self.rwFactory.addToSpacecraft(\n", " self.scObject.ModelTag, self.rwStateEffector, self.scObject\n", " )\n", " self.simulator.AddModelToTask(\n", " self.task_name, self.rwStateEffector, ModelPriority=priority\n", " )\n", "\n", " self.Gs = np.array(\n", " [\n", " [1, 0, 0, 1 / np.sqrt(3)], # RW1 and RW4 x-components\n", " [0, 1, 0, 1 / np.sqrt(3)], # RW2 and RW4 y-components\n", " [0, 0, 1, 1 / np.sqrt(3)], # RW3 and RW4 z-components\n", " ]\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* [Flight software model](../api_reference/sim/fsw.rst): A custom flight software model is defined to support four RWs. It is based on the `SteeringImagerFSWModel`, with the main modification being the addition of the `rwNullSpace` module. Due to the redundancy of having four RWs, there are infinitely many solutions for mapping the required body control torque to individual RW torques. To address this, once the control torque is computed, the RW null space is used to decelerate the wheels without applying additional torque to the spacecraft." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class CustomSteeringImagerFSWModel(fsw.SteeringImagerFSWModel):\n", " def __init__(self, *args, **kwargs) -> None:\n", " \"\"\"Convenience type that combines the imaging FSW model with MRP steering for four reaction wheels.\"\"\"\n", " super().__init__(*args, **kwargs)\n", "\n", " def _set_config_msgs(self) -> None:\n", " super()._set_config_msgs()\n", " self._set_rw_constellation_msg()\n", "\n", " def _set_rw_constellation_msg(self) -> None:\n", " \"\"\"Set the reaction wheel constellation message.\"\"\"\n", " rwConstellationConfig = messaging.RWConstellationMsgPayload()\n", " rwConstellationConfig.numRW = self.dynamics.rwFactory.getNumOfDevices()\n", " rwConfigElementList = []\n", " for i in range(4):\n", " rwConfigElementMsg = messaging.RWConfigElementMsgPayload()\n", " rwConfigElementMsg.gsHat_B = self.dynamics.Gs[:, i]\n", " rwConfigElementMsg.Js = self.dynamics.rwFactory.rwList[f\"RW{i+1}\"].Js\n", " rwConfigElementMsg.uMax = self.dynamics.rwFactory.rwList[f\"RW{i+1}\"].u_max\n", " rwConfigElementList.append(rwConfigElementMsg)\n", " rwConstellationConfig.reactionWheels = rwConfigElementList\n", " self.rwConstellationConfigInMsg = messaging.RWConstellationMsg().write(\n", " rwConstellationConfig\n", " )\n", "\n", " def _set_gateway_msgs(self) -> None:\n", " \"\"\"Create C-wrapped gateway messages.\"\"\"\n", " self.attRefMsg = messaging.AttRefMsg_C()\n", " self.attGuidMsg = messaging.AttGuidMsg_C()\n", "\n", " self._zero_gateway_msgs()\n", "\n", " # connect gateway FSW effector command msgs with the dynamics\n", " self.dynamics.rwStateEffector.rwMotorCmdInMsg.subscribeTo(\n", " self.rwNullSpace.rwMotorTorqueOutMsg\n", " )\n", " self.dynamics.thrusterSet.cmdsInMsg.subscribeTo(\n", " self.thrDump.thrusterOnTimeOutMsg\n", " )\n", "\n", " class MRPControlTask(fsw.SteeringImagerFSWModel.MRPControlTask):\n", " def _create_module_data(self) -> None:\n", " super()._create_module_data()\n", "\n", " self.rwNullSpace = self.fsw.rwNullSpace = rwNullSpace.rwNullSpace()\n", " self.rwNullSpace.ModelTag = \"rwNullSpace\"\n", "\n", " def _setup_fsw_objects(self, **kwargs) -> None:\n", " super()._setup_fsw_objects(**kwargs)\n", " self.set_rw_null_space(**kwargs)\n", "\n", " @default_args(OmegaGain=0.3)\n", " def set_rw_null_space(\n", " self,\n", " OmegaGain: float,\n", " **kwargs,\n", " ) -> None:\n", " \"\"\"Define the null space to slow down the wheels.\"\"\"\n", " self.rwNullSpace.rwMotorTorqueInMsg.subscribeTo(\n", " self.rwMotorTorque.rwMotorTorqueOutMsg\n", " )\n", " self.rwNullSpace.rwSpeedsInMsg.subscribeTo(\n", " self.fsw.dynamics.rwStateEffector.rwSpeedOutMsg\n", " )\n", " self.rwNullSpace.rwConfigInMsg.subscribeTo(\n", " self.fsw.rwConstellationConfigInMsg\n", " )\n", " self.rwNullSpace.OmegaGain = OmegaGain\n", " self._add_model_to_task(self.rwNullSpace, priority=1193)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Configure the Satellite\n", "* [Observations](../api_reference/obs/index.rst): \n", " - SatProperties: Body angular velocity, instrument pointing direction, body position, body velocity, battery charge (properties in [flight software model](../api_reference/sim/fsw.rst) or [dynamics model](../api_reference/sim/dyn.rst)). Also, customized dynamics property in CustomDynModel above: Angle between the sun and the solar panel and four RW speed fraction. \n", " - OpportunityProperties: Target's priority, normalized location, and target angle (upcoming 32 targets).\n", " - Time: Simulation time.\n", " - Eclipse: Next eclipse start and end times. \n", "* [Actions](../api_reference/act/index.rst):\n", " - Desat: Manage momentum for the RWs for 60 seconds.\n", " - Charge: Enter a sun-pointing charging mode for 60 seconds.\n", " - Image: Image target from upcoming 32 targets\n", "\n", "The fault is introduced by overriding the `reset_post_sim_init` function. The probability of the fault occurring can be specified using the `fault_chance` argument, and the time of occurrence can be set using the `fault_time` argument." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class CustomSatComposed(sats.ImagingSatellite):\n", " observation_spec = [\n", " obs.SatProperties(\n", " dict(prop=\"omega_BP_P\", norm=0.03),\n", " dict(prop=\"c_hat_P\"),\n", " dict(prop=\"r_BN_P\", norm=orbitalMotion.REQ_EARTH * 1e3),\n", " dict(prop=\"v_BN_P\", norm=7616.5),\n", " dict(prop=\"battery_charge_fraction\"),\n", " dict(prop=\"solar_angle_norm\"),\n", " dict(prop=\"wheel_speeds_frac\"),\n", " ),\n", " obs.OpportunityProperties(\n", " dict(prop=\"priority\"),\n", " dict(prop=\"r_LP_P\", norm=orbitalMotion.REQ_EARTH * 1e3),\n", " dict(prop=\"target_angle\", norm=np.pi),\n", " type=\"target\",\n", " n_ahead_observe=32,\n", " ),\n", " obs.Time(),\n", " obs.Eclipse(norm=5700),\n", " ]\n", "\n", " action_spec = [\n", " act.Desat(duration=60.0),\n", " act.Charge(duration=60.0),\n", " act.Image(n_ahead_image=32),\n", " ]\n", "\n", " # Modified the constructor to include the fault chance and list\n", " def __init__(self, *args, fault_chance=0, fault_time=0.0, **kwargs) -> None:\n", " super().__init__(*args, **kwargs)\n", " self.fault_chance = fault_chance\n", " self.fault_time = fault_time\n", " self.faultList = [] # List to store faults\n", "\n", " def reset_post_sim_init(self) -> None:\n", " super().reset_post_sim_init()\n", "\n", " if np.random.random() < self.fault_chance:\n", " powerFault = RwPowerFault(\n", " \"rwPowerLimited\", self.fault_time, reducedLimit=1.0, wheelIdx=1\n", " )\n", " self.faultList = [powerFault]\n", " self.simulator.faultList = self.faultList\n", " for i in range(len(self.faultList)):\n", " self.faultList[i].addFaultToSimulation(self, i)\n", "\n", " dyn_type = CustomDynModel\n", " fsw_type = CustomSteeringImagerFSWModel" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Configure Satellite Pareameters\n", "When instantiating a satellite, these parameters can be overriden with a constant or \n", "rerandomized every time the environment is reset using the ``sat_args`` dictionary." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dataStorageCapacity = 20 * 8e6 * 100\n", "batteryStorageCapacity = 80.0 * 3600 * 2\n", "sat_args = CustomSatComposed.default_sat_args(\n", " oe=random_orbit,\n", " imageAttErrorRequirement=0.01,\n", " imageRateErrorRequirement=0.01,\n", " batteryStorageCapacity=batteryStorageCapacity,\n", " storedCharge_Init=lambda: np.random.uniform(0.4, 1.0) * batteryStorageCapacity,\n", " u_max=0.2, # More realistic values than 1.0\n", " K1=0.5, # Updated value to have smooth and more predictable control\n", " nHat_B=np.array([0, 0, -1]),\n", " imageTargetMinimumElevation=np.radians(45),\n", " rwBasePower=20,\n", " maxWheelSpeed=1500,\n", " storageInit=lambda: np.random.randint(\n", " 0 * dataStorageCapacity,\n", " 0.01 * dataStorageCapacity,\n", " ),\n", " wheelSpeeds=lambda: np.random.uniform(-900, 900, 4),\n", " disturbance_vector=lambda: random_unit_vector(),\n", ")\n", "\n", "# Make the satellites\n", "satellites = []\n", "satellites.append(\n", " CustomSatComposed(\n", " \"EO\",\n", " sat_args,\n", " fault_chance=1.0,\n", " fault_time=0.0, # Fault occurs at 0.0 (nano seconds)\n", " )\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Making and interacting the Environment\n", "For this example, the single-agent [SatelliteTasking](../api_reference/index.rst) environment is used. n addition to the configured satellite, the environment requires a [scenario](../api_reference/scene/index.rst), which defines the context in which the satellite operates. In this case, the scenario uses `UniformTargets`, placing 1000 uniformly distributed targets across the Earth’s surface. The environment also takes a [rewarder](../api_reference/data/index.rst), which defines how data collected from the scenario is rewarded. Here, `UniqueImageReward` is used, which assigns rewards based on the sum of the priorities of uniquely imaged targets in each episode." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "env = SatelliteTasking(\n", " satellite=satellites,\n", " terminate_on_time_limit=True,\n", " world_type=world.GroundStationWorldModel,\n", " world_args=world.GroundStationWorldModel.default_world_args(),\n", " scenario=scene.UniformTargets(n_targets=1000),\n", " rewarder=data.UniqueImageReward(),\n", " sim_rate=0.5,\n", " max_step_duration=300.0,\n", " time_limit=95 * 60 * 3,\n", " log_level=\"INFO\",\n", " failure_penalty=0,\n", " # disable_env_checker=True, # For debugging\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, the environment is reset. A seed is provided to ensure reproducibility of the results; it can be removed to enable randomized testing." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "observation, info = env.reset(seed=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The composed satellite action space returns a human-readable action map and each satellite has the same action space and similar observation space." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"Actions:\", satellites[0].action_description)\n", "print(\"States:\", env.unwrapped.satellites[0].observation_description, \"\\n\")\n", "\n", "# Using the composed satellite features also provides a human-readable state:\n", "for satellite in env.unwrapped.satellites:\n", " for k, v in satellite.observation_builder.obs_dict().items():\n", " print(f\"{k}: {v}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The simulation runs until either the battery is depleted, a RW exceeds its maximum speed (both considered failures), or a timeout occurs (which simply stops the simulation)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "total_reward = 0.0\n", "while True:\n", "\n", " observation, reward, terminated, truncated, info = env.step(\n", " env.action_space.sample()\n", " )\n", " total_reward += reward\n", " if terminated or truncated:\n", " print(\"Episode complete.\")\n", " break\n", "\n", "print(\"Total reward:\", total_reward)" ] } ], "metadata": { "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.12" } }, "nbformat": 4, "nbformat_minor": 2 }