Source code for bsk_rl.utils.orbital

"""``bsk_rl.utils.orbital``:Utilities for computing orbital events."""

import logging
from typing import Iterable, Optional

import numpy as np
from Basilisk import __path__
from Basilisk.simulation import eclipse, spacecraft
from Basilisk.utilities import (
    SimulationBaseClass,
    macros,
    orbitalMotion,
    simIncludeGravBody,
)
from Basilisk.utilities.orbitalMotion import ClassicElements, elem2rv
from scipy.interpolate import interp1d

bskPath = __path__[0]

logger = logging.getLogger(__name__)


[docs] def random_orbit( i: Optional[float] = 45.0, alt: float = 500, r_body: float = 6371, e: float = 0, Omega: Optional[float] = None, omega: Optional[float] = 0, f: Optional[float] = None, ) -> ClassicElements: """Create a set of orbit elements. Parameters are fixed if specified and randomized if ``None``. Defaults to a random circular orbit at 500 km altitude and 45 deg inclination. Args: i: [deg] Inclination, randomized in ``[-pi, pi]``. alt: [km] Altitude above r_body. r_body: [km] Body radius. e: Eccentricity. Omega: [deg] LAN, randomized in ``[0, 2pi]``. omega: [deg] Argument of periapsis, randomized in ``[0, 2pi]``. f: [deg] True anomaly, randomized in ``[0, 2pi]``. Returns: ClassicElements: orbital elements """ oe = ClassicElements() oe.a = (r_body + alt) * 1e3 oe.e = e oe.i = np.radians(i) if i is not None else np.random.uniform(-np.pi, np.pi) oe.Omega = ( np.radians(Omega) if Omega is not None else np.random.uniform(0, 2 * np.pi) ) oe.omega = ( np.radians(omega) if omega is not None else np.random.uniform(0, 2 * np.pi) ) oe.f = np.radians(f) if f is not None else np.random.uniform(0, 2 * np.pi) return oe
[docs] def random_epoch(start: int = 2000, end: int = 2022): """Generate a random epoch in a year range. Date will always be in the first 28 days of the month. Args: start: Initial year. end: Final year. Returns: Epoch in ``YYYY MMM DD HH:MM:SS.SSS (UTC)`` format """ year = np.random.randint(start, end) month = np.random.choice( [ "JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC", ] ) day = np.random.randint(1, 28) # Assume 28 days for simplicity hours = np.random.randint(0, 23) minutes = np.random.randint(0, 59) seconds = np.random.randint(0, 59) milliseconds = np.random.randint(0, 999) # Combine the parts to form the datetime string epoch = ( f"{year} {month} {day:02d} " + f"{hours:02d}:{minutes:02d}:{seconds:02d}.{milliseconds:03d} (UTC)" ) return epoch
[docs] def elevation(r_sat: np.ndarray, r_target: np.ndarray) -> np.ndarray: """Find the elevation angle from a target to a satellite. Args: r_sat: Satellite position(s). r_target: Target position. Returns: Elevation angle(s) """ if r_sat.ndim == 2: return np.pi / 2 - np.arccos( np.sum(r_target * (r_sat - r_target), 1) / (np.linalg.norm(r_target) * np.linalg.norm(r_sat - r_target, axis=1)) ) else: return np.pi / 2 - np.arccos( np.sum(r_target * (r_sat - r_target)) / (np.linalg.norm(r_target) * np.linalg.norm(r_sat - r_target)) )
[docs] def walker_delta( n_spacecraft: int, n_planes: int, rel_phasing: float, altitude: float, inc: float, clustersize: int = 1, clusterspacing: float = 0, ) -> list[orbitalMotion.ClassicElements]: """Compute the initial orbit conditions of a Walker-delta constellation. Args: n_spacecraft: Number of spacecraft. n_planes: Number of orbital planes. rel_phasing: [deg] Relative phasing between planes. altitude: [m] Altitude above Earth's surface. inc: [deg] Inclination. clustersize: Number of spacecraft in each cluster. clusterspacing: [deg] Spacing between spacecraft in a cluster. Returns: list: List of orbital elements """ oe_all = [] # Loop through s/c for idx in range(0, n_spacecraft): # Instantiate orbital elements oe = orbitalMotion.ClassicElements() # Define altitude, eccentricity, and inclination oe.a = 6371 * 1000.0 + altitude oe.e = 0.0 oe.i = np.radians(inc) # Define the plane number spacecraft_per_plane = float(n_spacecraft / n_planes) plane_num = float(int(idx / spacecraft_per_plane)) # Compute longitude of ascending node oe.Omega = np.radians(plane_num * 360.0 / n_planes % 360) # Set argument of periapsis using relative phasing dPhi_rel = plane_num * rel_phasing * 360.0 / n_planes oe.omega = np.radians(dPhi_rel) # Define true anomoly using in-plane phasing dPhi_inplane = 360.0 / (spacecraft_per_plane / clustersize) oe.f = np.radians( (int((idx % spacecraft_per_plane) / clustersize) * dPhi_inplane) + clusterspacing * (idx % clustersize) ) # Append to oe_all oe_all.append(oe) return oe_all
[docs] def walker_delta_args( n_planes: int, rel_phasing: float = 0.0, altitude: float = 500, inc: float = 45.0, randomize_true_anomaly=True, randomize_lan=True, **walker_delta_kwargs, ): """Generate a function to generate a randomly phased Walker-delta constellation. The output of this function should be used to set the ``sat_arg_randomizer`` of the environment. Args: n_planes: Number of constellation planes. rel_phasing: [deg] Relative phasing between planes. Defaults to 0.0. altitude: [km] Orbit altitude above Earth's surface. inc: [deg] Orbit inclination randomize_true_anomaly: Add a random true anomaly phase shift that is the same for all constellation members on each call. randomize_lan: Add a random LAN phase shift that is the same for all constellation members on each call. walker_delta_kwargs: Additional arguments to pass to :class:`walker_delta`. """ def walker_delta_arg_setup(satellites): oe_all = walker_delta( n_spacecraft=len(satellites), n_planes=n_planes, altitude=altitude * 1e3, inc=inc, rel_phasing=rel_phasing, **walker_delta_kwargs, ) if randomize_true_anomaly: f_offset = np.random.uniform(0, 2 * np.pi) for oe in oe_all: oe.f = np.mod(oe.f + f_offset, 2 * np.pi) if randomize_lan: Omega_offset = np.random.uniform(0, 2 * np.pi) for oe in oe_all: oe.Omega = np.mod(oe.Omega + Omega_offset, 2 * np.pi) return {satellite: {"oe": oe} for satellite, oe in zip(satellites, oe_all)} return walker_delta_arg_setup
class TrajectorySimulator(SimulationBaseClass.SimBaseClass): """Class for propagating trajectory using a point mass simulation.""" def __init__( self, utc_init: str, rN: Optional[Iterable[float]] = None, vN: Optional[Iterable[float]] = None, oe: Optional[ClassicElements] = None, mu: Optional[float] = None, dt: float = 30.0, ) -> None: """Class for propagating trajectory using a point mass simulation. Simulated under the effect of Earth's gravity. Returns interpolators for position as well as upcoming eclipse predictions. Specify either ``(rN, vN)`` or ``(oe, mu)``. Args: utc_init: Simulation start time. rN: Initial position [m] vN: Initial velocity [m/s] oe: Orbital elements. mu: Gravitational parameter dt: Simulation timestep. """ super().__init__() self.utc_init = utc_init self.dt = dt rv_specified = rN is not None and vN is not None rv_specified_partial = rN is not None or vN is not None oe_specified = oe is not None if rv_specified and not oe_specified: self.rN_init = rN self.vN_init = vN elif oe_specified and not rv_specified_partial: self.rN_init, self.vN_init = elem2rv(mu, oe) else: raise ( ValueError( "Orbit is over or underspecified. " + "Provide either (rN, vN) or (oe, mu)" ) ) self._eclipse_starts: list[float] = [] self._eclipse_ends: list[float] = [] self._eclipse_search_time = 0.0 self._init_simulator() def _init_simulator(self) -> None: simTaskName = "simTask" simProcessName = "simProcess" dynProcess = self.CreateNewProcess(simProcessName) simulationTimeStep = macros.sec2nano(self.dt) dynProcess.addTask(self.CreateNewTask(simTaskName, simulationTimeStep)) scObject = spacecraft.Spacecraft() scObject.ModelTag = "traj-sat" self.AddModelToTask(simTaskName, scObject) scObject.hub.r_CN_NInit = self.rN_init # m scObject.hub.v_CN_NInit = self.vN_init # m/s # Set up gravity body self.gravFactory = simIncludeGravBody.gravBodyFactory() planet = self.gravFactory.createEarth() self.gravFactory.createSun() planet.isCentralBody = True planet.useSphericalHarmonicsGravityModel( bskPath + "/supportData/LocalGravData/GGM03S.txt", 10 ) UTCInit = self.utc_init self.gravFactory.createSpiceInterface( bskPath + "/supportData/EphemerisData/", UTCInit ) self.gravFactory.spiceObject.zeroBase = "earth" self.AddModelToTask(simTaskName, self.gravFactory.spiceObject) scObject.gravField.gravBodies = spacecraft.GravBodyVector( list(self.gravFactory.gravBodies.values()) ) # Set up eclipse self.eclipseObject = eclipse.Eclipse() self.eclipseObject.addPlanetToModel( self.gravFactory.spiceObject.planetStateOutMsgs[0] ) self.eclipseObject.sunInMsg.subscribeTo( self.gravFactory.spiceObject.planetStateOutMsgs[1] ) self.AddModelToTask(simTaskName, self.eclipseObject, ModelPriority=988) self.eclipseObject.addSpacecraftToModel(scObject.scStateOutMsg) # Log outputs self.sc_state_log = scObject.scStateOutMsg.recorder() self.planet_state_log = self.gravFactory.spiceObject.planetStateOutMsgs[ 0 ].recorder() self.eclipse_log = self.eclipseObject.eclipseOutMsgs[0].recorder() self._time_log = [] self.AddModelToTask(simTaskName, self.sc_state_log) self._r_BN_N_log = [] self.AddModelToTask(simTaskName, self.planet_state_log) self._J20002Pfix_log = [] self.AddModelToTask(simTaskName, self.eclipse_log) self._shadowFactor_log = [] self.InitializeSimulation() @property def sim_time(self) -> float: """Current simulator end time.""" return macros.NANO2SEC * self.TotalSim.CurrentNanos @property def times(self) -> np.ndarray: """Recorder times in seconds.""" return macros.NANO2SEC * np.array(self._time_log) def extend_to(self, t: float) -> None: """Compute the trajectory of the satellite up to t. Args: t: Computation end [s] """ if t < self.sim_time: return self.ConfigureStopTime(macros.sec2nano(t)) self.ExecuteSimulation() self._time_log.extend(self.sc_state_log.times()) self._r_BN_N_log.extend(self.sc_state_log.r_BN_N) self._J20002Pfix_log.extend(self.planet_state_log.J20002Pfix) self._shadowFactor_log.extend(self.eclipse_log.shadowFactor) self.sc_state_log.clear() self.planet_state_log.clear() self.eclipse_log.clear() def _generate_eclipses(self, t: float) -> None: self.extend_to(t + self.dt) upcoming_times = self.times[self.times > self._eclipse_search_time] upcoming_eclipse = ( np.array(self._shadowFactor_log)[self.times > self._eclipse_search_time] > 0 ).astype(float) for i in np.where(np.diff(upcoming_eclipse) == -1)[0]: self._eclipse_starts.append(upcoming_times[i]) for i in np.where(np.diff(upcoming_eclipse) == 1)[0]: self._eclipse_ends.append(upcoming_times[i]) self._eclipse_search_time = t def next_eclipse(self, t: float, max_tries: int = 100) -> tuple[float, float]: """Find the soonest eclipse transitions. The returned values are not necessarily from the same eclipse event, such as when the search start time is in eclipse. Args: t: Time to start searching [s] max_tries: Maximum number of times to search Returns: eclipse_start: Nearest upcoming eclipse beginning eclipse_end: Nearest upcoming eclipse end """ for i in range(max_tries): if any([t_start > t for t_start in self._eclipse_starts]) and any( [t_end > t for t_end in self._eclipse_ends] ): eclipse_start = min( [t_start for t_start in self._eclipse_starts if t_start > t] ) eclipse_end = min([t_end for t_end in self._eclipse_ends if t_end > t]) return eclipse_start, eclipse_end self._generate_eclipses(t + i * self.dt * 10) return 1.0, 1.0 @property def r_BN_N(self) -> interp1d: """Interpolator for r_BN_N.""" if self.sim_time < self.dt * 3: self.extend_to(self.dt * 3) return interp1d( self.times, self._r_BN_N_log, kind="cubic", axis=0, fill_value="extrapolate", ) @property def r_BP_P(self) -> interp1d: """Interpolator for r_BP_P.""" if self.sim_time < self.dt * 3: self.extend_to(self.dt * 3) return interp1d( self.times, [ np.matmul(dcm, pos) for dcm, pos in zip(self._J20002Pfix_log, self._r_BN_N_log) ], kind="cubic", axis=0, fill_value="extrapolate", ) def __del__(self) -> None: """Unload spice kernels when object is deleted.""" try: self.gravFactory.unloadSpiceKernels() except AttributeError: pass
[docs] def lla2ecef(lat: float, long: float, radius: float): """Project LLA to Earth Centered, Earth Fixed location. Args: lat: [deg] long: [deg] radius: [any] """ lat = np.radians(lat) long = np.radians(long) return radius * np.array( [np.cos(lat) * np.cos(long), np.cos(lat) * np.sin(long), np.sin(lat)] )
[docs] def rv2HN(r_N: np.ndarray, v_N: np.ndarray): """Find the Hill frame rotation matrix from position and velocity. Args: r_N: Position vector in the inertial frame v_N: Velocity vector in the inertial frame Returns: Hill frame rotation matrix HN """ o_r_N = r_N / np.linalg.norm(r_N) h_N = np.cross(r_N, v_N) o_h_N = h_N / np.linalg.norm(h_N) o_theta_N = np.cross(o_h_N, o_r_N) HN = np.array([o_r_N, o_theta_N, o_h_N]) return HN
[docs] def rv2omega(r_N: np.ndarray, v_N: np.ndarray): """Find the Hill frame rotation rate from position and velocity. Args: r_N: Position vector in the inertial frame v_N: Velocity vector in the inertial frame Returns: omega_HN_N: Angular velocity of the Hill frame in the inertial frame """ o_r_N = r_N / np.linalg.norm(r_N) h_N = np.cross(r_N, v_N) o_h_N = h_N / np.linalg.norm(h_N) o_theta_N = np.cross(o_h_N, o_r_N) omega_HN_N = o_h_N * np.dot(v_N, o_theta_N) / np.linalg.norm(r_N) return omega_HN_N
__doc_title__ = "Orbital" __all__ = [ "random_orbit", "random_epoch", "lla2ecef", "elevation", "walker_delta", "walker_delta_args", "TrajectorySimulator", "rv2HN", "rv2omega", ]