Source code for scenarioBasicOrbitStream

#
#  ISC License
#
#  Copyright (c) 2016, Autonomous Vehicle Systems Lab, University of Colorado at Boulder
#
#  Permission to use, copy, modify, and/or distribute this software for any
#  purpose with or without fee is hereby granted, provided that the above
#  copyright notice and this permission notice appear in all copies.
#
#  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
#  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
#  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
#  ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
#  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
#  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
#  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#

r"""
Overview
--------

This script duplicates the basic orbit simulation in the scenario :ref:`scenarioBasicOrbit`.
The difference is that this version allows for the Basilisk simulation data to be live streamed to the
:ref:`vizard` visualization program, with optional 2-way communication with Vizard (live user inputs to
the simulation).

The script is found in the folder ``basilisk/examples`` and executed by using::

    python3 scenarioBasicOrbitStream.py

To enable live data streaming and/or broadcast streaming, the ``enableUnityVisualization()`` method is provided
with ``liveStream`` and ``broadcastStream`` argument using::

    vizSupport.enableUnityVisualization(scSim, simTaskName, scObject
                                        , liveStream=True
                                        , broadcastStream=True)

When starting Basilisk simulation it prints now to the terminal that it is waiting for Vizard connections::

    Waiting for Vizard at tcp://0.0.0.0:5556

Basilisk will bind to all network interfaces. Open the Vizard application and enter ``tcp://localhost:5556``
in the connection field (or the appropriate IP if connecting from a different machine/container).
Select "Direct Communication" mode as well as "Live Streaming".  After this the Basilisk simulation resumes
and will live stream the data to Vizard.

.. figure:: /_images/static/vizard-ImgStream.jpg
   :align: center
   :scale: 50 %

   Vizard Direct Communication Panel Illustration


To avoid the simulation running too quickly, this tutorial example script includes the ``clock_sync`` module that
enables a 50x realtime mode using::

    clockSync = clock_synch.ClockSynch()
    clockSync.accelFactor = 50.0
    scSim.AddModelToTask(simTaskName, clockSync)

This way a 10s simulation time step will take 0.2 seconds with the 50x speed up factor.

"""


#
# Basilisk Scenario Script and Integrated Test
#
# Purpose:  Integrated test of the spacecraft() and gravity modules.  Illustrates
#           a 3-DOV spacecraft on a range of orbit types with live Vizard data streaming
#           and 2-way communication with Vizard.
# Author:   Hanspeter Schaub
# Creation Date:  Sept. 29, 2019
# Author:   Jack Fox, May 12, 2025

import os

import matplotlib.pyplot as plt
import numpy as np
import datetime as dt

# The path to the location of Basilisk
# Used to get the location of supporting data.
from Basilisk import __path__

bskPath = __path__[0]
fileName = os.path.basename(os.path.splitext(__file__)[0])

# import simulation related support
from Basilisk.simulation import spacecraft

# general support file with common unit test functions
# import general simulation support files
from Basilisk.utilities import (
    SimulationBaseClass,
    macros,
    orbitalMotion,
    simIncludeGravBody,
    unitTestSupport,
    vizSupport,
    simIncludeThruster,
)
from Basilisk.simulation import simSynch
from Basilisk.architecture import messaging
from Basilisk.simulation import thrusterDynamicEffector
from Basilisk.utilities.supportDataTools.dataFetcher import get_path, DataFile

try:
    from Basilisk.simulation import vizInterface
except ImportError:
    pass


[docs] def run( show_plots, liveStream, broadcastStream, timeStep, orbitCase, useSphericalHarmonics, planetCase, ): """ At the end of the python script you can specify the following example parameters. Args: show_plots (bool): Determines if the script should display plots liveStream (bool): Determines if the script should use live data streaming broadcastStream (bool): Determines if the script should broadcast messages for listener Vizards to pick up. timeStep (double): Integration update time in seconds orbitCase (str): ====== ============================ String Definition ====== ============================ 'LEO' Low Earth Orbit 'GEO' Geosynchronous Orbit 'GTO' Geostationary Transfer Orbit ====== ============================ useSphericalHarmonics (Bool): False to use first order gravity approximation: :math:`\\frac{GMm}{r^2}` planetCase (str): {'Earth', 'Mars'} """ # Create simulation variable names simTaskName = "simTask" simProcessName = "simProcess" # Create a sim module as an empty container scSim = SimulationBaseClass.SimBaseClass() # # create the simulation process # dynProcess = scSim.CreateNewProcess(simProcessName) # create the dynamics task and specify the integration update time simulationTimeStep = macros.sec2nano(timeStep) dynProcess.addTask(scSim.CreateNewTask(simTaskName, simulationTimeStep)) # # setup the simulation tasks/objects # # initialize spacecraft object and set properties scObject = spacecraft.Spacecraft() scObject.ModelTag = "bskSat" I = [60.0, 0.0, 0.0, 0.0, 30.0, 0.0, 0.0, 0.0, 40.0] scObject.hub.mHub = 50.0 # kg - spacecraft mass scObject.hub.IHubPntBc_B = unitTestSupport.np2EigenMatrix3d(I) # add spacecraft object to the simulation process scSim.AddModelToTask(simTaskName, scObject) # setup Gravity Body gravFactory = simIncludeGravBody.gravBodyFactory() if planetCase == "Mars": planet = gravFactory.createMarsBarycenter() planet.isCentralBody = True # ensure this is the central gravitational body if useSphericalHarmonics: ggm2b_path = get_path(DataFile.LocalGravData.GGM2BData) planet.useSphericalHarmonicsGravityModel(str(ggm2b_path), 100) else: # Earth planet = gravFactory.createEarth() planet.isCentralBody = True # ensure this is the central gravitational body if useSphericalHarmonics: ggm03s_j2_only_path = get_path(DataFile.LocalGravData.GGM03S_J2_only) planet.useSphericalHarmonicsGravityModel(str(ggm03s_j2_only_path), 2) mu = planet.mu # attach gravity model to spacecraft gravFactory.addBodiesTo(scObject) # # setup orbit and simulation time # # setup the orbit using classical orbit elements oe = orbitalMotion.ClassicElements() rLEO = 7000.0 * 1000 # meters rGEO = 42000.0 * 1000 # meters if orbitCase == "GEO": oe.a = rGEO oe.e = 0.00001 oe.i = 0.0 * macros.D2R elif orbitCase == "GTO": oe.a = (rLEO + rGEO) / 2.0 oe.e = 1.0 - rLEO / oe.a oe.i = 0.0 * macros.D2R else: # LEO case, default case 0 oe.a = 2.5 * rLEO oe.e = 0.10 oe.i = 33.3 * macros.D2R oe.Omega = 48.2 * macros.D2R oe.omega = 347.8 * macros.D2R oe.f = 85.3 * macros.D2R rN, vN = orbitalMotion.elem2rv(mu, oe) oe = orbitalMotion.rv2elem( mu, rN, vN ) # this stores consistent initial orbit elements # with circular or equatorial orbit, some angles are arbitrary # # initialize Spacecraft States with the initialization variables # scObject.hub.r_CN_NInit = rN # m - r_BN_N scObject.hub.v_CN_NInit = vN # m/s - v_BN_N # Configure thruster thrusterSet = thrusterDynamicEffector.ThrusterDynamicEffector() scSim.AddModelToTask(simTaskName, thrusterSet) # Make a fresh thruster factory instance, this is critical to run multiple times thFactory = simIncludeThruster.thrusterFactory() thFactory.create("MOOG_Monarc_22_6", [0, 0, 0], [0, -1.5, 0]) thrModelTag = "ACSThrusterDynamics" thFactory.addToSpacecraft(thrModelTag, thrusterSet, scObject) thrMsgData = messaging.THRArrayOnTimeCmdMsgPayload(OnTimeRequest=[0, 0, 0]) thrMsg = messaging.THRArrayOnTimeCmdMsg() thrMsg.write(thrMsgData) thrusterSet.cmdsInMsg.subscribeTo(thrMsg) # set the simulation time n = np.sqrt(mu / oe.a / oe.a / oe.a) P = 2.0 * np.pi / n if useSphericalHarmonics: simulationTime = macros.sec2nano(3.0 * P) else: simulationTime = macros.sec2nano(1 * P) # # Setup data logging before the simulation is initialized # if useSphericalHarmonics: numDataPoints = 400 else: numDataPoints = 100 samplingTime = unitTestSupport.samplingTime( simulationTime, simulationTimeStep, numDataPoints ) dataLog = scObject.scStateOutMsg.recorder(samplingTime) scSim.AddModelToTask(simTaskName, dataLog) if vizSupport.vizFound: # create spacecraft data container scData = vizInterface.VizSpacecraftData() scData.spacecraftName = scObject.ModelTag scData.scStateInMsg.subscribeTo(scObject.scStateOutMsg) if liveStream: clockSync = simSynch.ClockSynch() clockSync.accelFactor = 50.0 scSim.AddModelToTask(simTaskName, clockSync) # Configure Vizard, using liveStream and broadcastStream options viz = vizSupport.enableUnityVisualization( scSim, simTaskName, scObject, thrEffectorList=thrusterSet, thrColors=vizSupport.toRGBA255("white"), liveStream=liveStream, broadcastStream=broadcastStream, ) # Set key listeners viz.settings.keyboardLiveInput = "bxpqz" # To set 2-way port: viz.reqComProtocol = "tcp" viz.reqComAddress = "0.0.0.0" viz.reqPortNumber = "5556" # To set broadcast port: viz.pubComProtocol = "tcp" viz.pubComAddress = "0.0.0.0" viz.pubPortNumber = "5570" # Pre-instantiate panels infopanel = vizInterface.VizEventDialog() infopanel.eventHandlerID = "INFO PANEL" infopanel.displayString = """This is an information panel. Vizard is reporting 'b', 'p', 'x' and 'z' \ keystrokes back to BSK, which you can hook up to specific sim states. In this case, \ press 'b' to show a burn panel. If you initiate a burn, press 'x' to stop the burn. \ Press 'p' to pause the simulation, 'z' to stop the simulation, 'q' to stop the simulation and cleanly quit Vizard.""" infopanel.durationOfDisplay = 0 # stay open infopanel.dialogFormat = "CAUTION" burnpanel = vizInterface.VizEventDialog() burnpanel.eventHandlerID = "OPTION PANEL" burnpanel.displayString = "This panel accepts a user response. Initiate burn?" burnpanel.durationOfDisplay = 0 # stay open burnpanel.hideOnSelection = True burnpanel.userOptions.append("Yes") burnpanel.userOptions.append("No") burnpanel.useConfirmationPanel = True burnpanel.dialogFormat = "WARNING" hudpanel = vizInterface.VizEventDialog() hudpanel.eventHandlerID = "HUD" hudpanel.durationOfDisplay = 0 # stay open # Del viz.vizEventDialogs[:] at the start of the sim viz.vizEventDialogs.clear() # "Subscriber" Vizards will pick up the main settings at this frequency viz.broadcastSettingsSendDelay = 2 # seconds # # initialize Simulation: This function clears the simulation log, and runs the self_init() # cross_init() and reset() routines on each module. # If the routine InitializeSimulationAndDiscover() is run instead of InitializeSimulation(), # then the all messages are auto-discovered that are shared across different BSK threads. # scSim.InitializeSimulation() # This is the execution loop. BSK executes at a frequency governed by [n * simulationTimeStep]. incrementalStopTime = 0 # Scenario specific flag continueBurn = False priorKeyPressTime = dt.datetime.now() while incrementalStopTime < simulationTime: currState = scObject.scStateOutMsg.read() alt = np.linalg.norm(currState.r_BN_N) - planet.radEquator velNorm = np.linalg.norm(currState.v_BN_N) if vizSupport.vizFound: hudpanel.displayString = f"HUD\nAltitude: {alt / 1000:.2f} km\nInertial Velocity: {velNorm / 1000:.2f} km/s" viz.vizEventDialogs.append(hudpanel) # Here, I only want to run a single BSK timestep before checking for user responses. # If the 'end' flag is set, exit the scenario. # If the 'pause' flag is set, only update vizInterface module and reset clockSync. if vizSupport.endFlag: exit(0) elif vizSupport.pauseFlag: viz.UpdateState(incrementalStopTime) clockSync.Reset(0) else: incrementalStopTime += simulationTimeStep scSim.ConfigureStopTime(incrementalStopTime) scSim.ExecuteSimulation() # Retrieve copy of user input message from Vizard if liveStream and vizSupport.vizFound: userInputs = viz.userInputMsg.read() keyInputs = userInputs.keyboardInput eventInputs = userInputs.vizEventReplies # Parse keyboard inputs, perform actions now = dt.datetime.now() if ( now - priorKeyPressTime ).total_seconds() > 1.0: # check that 1s has passed since last key press if "b" in keyInputs: print("key - b") priorKeyPressTime = now if not continueBurn: print("burn panel") viz.vizEventDialogs.append(burnpanel) if "q" in keyInputs: print("key - q") # Set terminateVizard flag, send to Vizard to cleanly close Vizard and end scenario viz.liveSettings.terminateVizard = True viz.UpdateState(incrementalStopTime) exit(0) if "x" in keyInputs: print("key - x") priorKeyPressTime = now if continueBurn: print("Stopping burn.") continueBurn = False thrMsgData.OnTimeRequest = [0, 0, 0] thrMsg.write(thrMsgData, incrementalStopTime) if "z" in keyInputs: print("key - z") priorKeyPressTime = now vizSupport.endFlag = True # End scenario if "p" in keyInputs: print("key - p") priorKeyPressTime = now vizSupport.pauseFlag = not vizSupport.pauseFlag # Toggle # Parse panel responses for response in eventInputs: if response.eventHandlerID == "INFO PANEL": # Can add any other behavior here print("Panel 1 closed") if response.eventHandlerID == "OPTION PANEL": print("Panel 2 response: " + response.reply) if response.reply == "Yes": print("Initiating burn.") continueBurn = True else: print("Cancelling burn.") # Append info panel if incrementalStopTime == 100 * simulationTimeStep: viz.vizEventDialogs.append(infopanel) # Turn on thrusters if continueBurn: thrMsgData.OnTimeRequest = [100, 100, 100] thrMsg.write(thrMsgData, incrementalStopTime) # # retrieve the logged data # posData = dataLog.r_BN_N velData = dataLog.v_BN_N np.set_printoptions(precision=16) # # plot the results # # draw the inertial position vector components plt.close("all") # clears out plots from earlier test runs plt.figure(1) fig = plt.gcf() ax = fig.gca() ax.ticklabel_format(useOffset=False, style="plain") for idx in range(3): plt.plot( dataLog.times() * macros.NANO2SEC / P, posData[:, idx] / 1000.0, color=unitTestSupport.getLineColor(idx, 3), label="$r_{BN," + str(idx) + "}$", ) plt.legend(loc="lower right") plt.xlabel("Time [orbits]") plt.ylabel("Inertial Position [km]") figureList = {} pltName = fileName + "1" + orbitCase + str(int(useSphericalHarmonics)) + planetCase figureList[pltName] = plt.figure(1) if useSphericalHarmonics is False: # draw orbit in perifocal frame b = oe.a * np.sqrt(1 - oe.e * oe.e) p = oe.a * (1 - oe.e * oe.e) plt.figure(2, figsize=tuple(np.array((1.0, b / oe.a)) * 4.75), dpi=100) plt.axis(np.array([-oe.rApoap, oe.rPeriap, -b, b]) / 1000 * 1.25) # draw the planet fig = plt.gcf() ax = fig.gca() if planetCase == "Mars": planetColor = "#884400" else: planetColor = "#008800" planetRadius = planet.radEquator / 1000 ax.add_artist(plt.Circle((0, 0), planetRadius, color=planetColor)) # draw the actual orbit rData = [] fData = [] for idx in range(0, len(posData)): oeData = orbitalMotion.rv2elem(mu, posData[idx], velData[idx]) rData.append(oeData.rmag) fData.append(oeData.f + oeData.omega - oe.omega) plt.plot( rData * np.cos(fData) / 1000, rData * np.sin(fData) / 1000, color="#aa0000", linewidth=3.0, ) # draw the full osculating orbit from the initial conditions fData = np.linspace(0, 2 * np.pi, 100) rData = [] for idx in range(0, len(fData)): rData.append(p / (1 + oe.e * np.cos(fData[idx]))) plt.plot( rData * np.cos(fData) / 1000, rData * np.sin(fData) / 1000, "--", color="#555555", ) plt.xlabel("$i_e$ Cord. [km]") plt.ylabel("$i_p$ Cord. [km]") plt.grid() else: plt.figure(2) fig = plt.gcf() ax = fig.gca() ax.ticklabel_format(useOffset=False, style="plain") smaData = [] for idx in range(0, len(posData)): oeData = orbitalMotion.rv2elem(mu, posData[idx], velData[idx]) smaData.append(oeData.a / 1000.0) plt.plot( posData[:, 0] * macros.NANO2SEC / P, smaData, color="#aa0000", ) plt.xlabel("Time [orbits]") plt.ylabel("SMA [km]") pltName = fileName + "2" + orbitCase + str(int(useSphericalHarmonics)) + planetCase figureList[pltName] = plt.figure(2) if show_plots: plt.show() # close the plots being saved off to avoid over-writing old and new figures plt.close("all") return figureList
# # This statement below ensures that the unit test scrip can be run as a # stand-along python script # if __name__ == "__main__": run( False, # show_plots True, # liveStream True, # broadcastStream 1.0, # time step (s) "LEO", # orbit Case (LEO, GTO, GEO) False, # useSphericalHarmonics "Earth", # planetCase (Earth, Mars) )