# Cloud Environment with Re-imaging
This tutorial demonstrates the configuration and use of a BSK-RL environment considering cloud coverage and re-imaging capabilities. Two reward functions are presented: a single-picture binary case (where targets are deemed occluded by clouds or not and no re-imaging is allowed) and a re-imaging case where the problem is formulated in terms of the targets' probability of being successfully observed. Still, the satellite cannot observe the true cloud coverage of each target, only its forecast. The satellite has to image targets while keeping a positive battery level. This example script is part of an upcoming publication.

## Loading Modules

In [1]:
from bsk_rl import ConstellationTasking
import numpy as np
from typing import Optional, Callable, Union

from Basilisk.architecture import bskLogging
from Basilisk.utilities import orbitalMotion
from bsk_rl import act, obs, sats
from bsk_rl.sim import dyn, fsw, world
from bsk_rl.scene.targets import UniformTargets
from bsk_rl.data.base import Data, DataStore, GlobalReward
from bsk_rl.data.unique_image_data import (
 UniqueImageData,
 UniqueImageStore,
 UniqueImageReward,
)

bskLogging.setDefaultLogLevel(bskLogging.BSK_WARNING)

## Making a Scenario with Cloud Covered Targets

To account for clouds in the simulation process, we can associate a cloud coverage value to each target that represents the percentage of cloud coverage over that area. Cloud coverage can be randomly generated or derived from real data. Here, we have an example on how to use a stochastic cloud model using UniformTargets as a base and attach the following information to each target:

* `cloud_cover_true` represents the true cloud coverage. Information from external sources, such as historical cloud data, can be used here based on each target's position.

* `cloud_cover_forecast` represents the cloud coverage forecast. Forecast from external sources can be plugged in here.

* `cloud_cover_sigma` represents the standard deviation of the cloud coverage forecast.

* `belief` represents the probability that the target was successfully observed.

* `prev_obs` time at which the last picture of the target was taken.

In [2]:
class CloudTargets(UniformTargets):
 mu_data = 0.6740208166434426 # Average global cloud coverage

 def __init__(
 self,
 n_targets: Union[int, tuple[int, int]],
 priority_distribution: Optional[Callable] = None,
 radius: float = orbitalMotion.REQ_EARTH * 1e3,
 sigma_levels: tuple[float, float] = (0.01, 0.05),
 reward_thresholds: Union[float, tuple[float, float]] = 0.95,
 belief_init: tuple[float, float] = (0.0, 0.94),
 prev_obs_init: tuple[float, float] = (0.0, 5700.0),
 ) -> None:
 super().__init__(n_targets, priority_distribution, radius)
 self.reward_thresholds = reward_thresholds
 self.sigma_levels = sigma_levels
 self.belief_init = belief_init
 self.prev_obs_init = prev_obs_init

 def regenerate_targets(self) -> None:
 super().regenerate_targets()
 for target in self.targets:

 # Initialize true cloud coverage
 cloud_cover_true = np.random.uniform(
 0.0, self.mu_data * 2
 ) # Instead, true cloud coverage can be obtained by historical data based on the target's position
 cloud_cover_true = np.clip(cloud_cover_true, 0.0, 1.0)
 target.cloud_cover_true = cloud_cover_true

 # Initialize cloud coverage forecast
 target.cloud_cover_sigma = np.random.uniform(
 self.sigma_levels[0], self.sigma_levels[1]
 )
 cloud_cover_forecast = np.random.normal(
 target.cloud_cover_true, target.cloud_cover_sigma
 )
 target.cloud_cover_forecast = np.clip(cloud_cover_forecast, 0.0, 1.0)

 # Set reward threshold
 if isinstance(self.reward_thresholds, float):
 target.reward_threshold = self.reward_thresholds
 else:
 target.reward_threshold = np.random.uniform(
 self.reward_thresholds[0], self.reward_thresholds[1]
 )

 # Initialize beliefs and previous observations
 b_S1 = np.random.uniform(self.belief_init[0], self.belief_init[1])
 b_S0 = 1 - b_S1
 target.belief = np.array([b_S0, b_S1])
 target.prev_obs = -np.random.uniform(
 self.prev_obs_init[0], self.prev_obs_init[0]
 )
 target.belief_update_var = 0.0


# Define the randomization interval for the number of targets
n_targets = (1000, 10000)
scenario = CloudTargets(n_targets=n_targets)

## Making a Rewarder Considering Cloud Coverage for the Single-picture Case

When considering targets potentially covered by clouds, we can use a binary reward model where the reward is proportional to the target priority if the target's cloud coverage is below its `reward_threshold` (how much cloud coverage is acceptable). Therefore, we create a modified rewarder `CloudImageBinaryRewarder`; it has similar settings as the [UniqueImageReward](../api_reference/data/index.rst) class, but `cloud_covered` and `cloud_free` information is added. Additionally, the `calculate_reward` function is modified for the binary reward model. 

For this case, the reward function is given by

$$
R = \begin{cases}
\rho_i & \text{if } c_{p_i} \leq c_{\text{thr}_i} \\
0 & \text{otherwise.}
\end{cases}
$$

where $\rho_i$ is priority, $c_{p_i}$ is the true cloud coverage, and $c_{\text{thr}_i}$ is the `reward_threshold` for target $i$. For a case where the reward is linearly proportional to the cloud coverage, see [Cloud Environment](../examples/cloud_environment.rst)

In [3]:
from typing import TYPE_CHECKING

if TYPE_CHECKING: # pragma: no cover
 from bsk_rl.scene.targets import (
 Target,
 )


class CloudImageBinaryData(UniqueImageData):
 """DataType for unique images of targets."""

 def __init__(
 self,
 imaged: Optional[list["Target"]] = None,
 duplicates: int = 0,
 known: Optional[list["Target"]] = None,
 cloud_covered: Optional[list["Target"]] = None,
 cloud_free: Optional[list["Target"]] = None,
 ) -> None:
 """Construct unit of data to record unique images.

 Keeps track of ``imaged`` targets, a count of ``duplicates`` (i.e. images that
 were not rewarded due to the target already having been imaged), and all
 ``known`` targets in the environment. It also keeps track of which targets are considered
 ``cloud_covered`` and ``cloud_free`` based on the specified threshold.

 Args:
 imaged: List of targets that are known to be imaged.
 duplicates: Count of target imaging duplication.
 known: List of targets that are known to exist (imaged and unimaged).
 cloud_covered: List of imaged targets that are known to be cloud covered.
 cloud_free: List of imaged targets that are known to be cloud free.
 """
 super().__init__(imaged=imaged, duplicates=duplicates, known=known)
 if cloud_covered is None:
 cloud_covered = []
 if cloud_free is None:
 cloud_free = []
 self.cloud_covered = list(set(cloud_covered))
 self.cloud_free = list(set(cloud_free))

 def __add__(self, other: "CloudImageBinaryData") -> "CloudImageBinaryData":
 """Combine two units of data.

 Args:
 other: Another unit of data to combine with this one.

 Returns:
 Combined unit of data.
 """

 imaged = list(set(self.imaged + other.imaged))
 duplicates = (
 self.duplicates
 + other.duplicates
 + len(self.imaged)
 + len(other.imaged)
 - len(imaged)
 )
 known = list(set(self.known + other.known))
 cloud_covered = list(set(self.cloud_covered + other.cloud_covered))
 cloud_free = list(set(self.cloud_free + other.cloud_free))

 return self.__class__(
 imaged=imaged,
 duplicates=duplicates,
 known=known,
 cloud_covered=cloud_covered,
 cloud_free=cloud_free,
 )


class CloudImageBinaryDataStore(UniqueImageStore):
 """DataStore for unique images of targets."""

 data_type = CloudImageBinaryData

 def compare_log_states(
 self, old_state: np.ndarray, new_state: np.ndarray
 ) -> CloudImageBinaryData:
 """Check for an increase in logged data to identify new images.

 Args:
 old_state: older storedData from satellite storage unit
 new_state: newer storedData from satellite storage unit

 Returns:
 list: Targets imaged at new_state that were unimaged at old_state
 """
 update_idx = np.where(new_state - old_state > 0)[0]
 imaged = []
 for idx in update_idx:
 message = self.satellite.dynamics.storageUnit.storageUnitDataOutMsg
 target_id = message.read().storedDataName[int(idx)]
 imaged.append(
 [target for target in self.data.known if target.id == target_id][0]
 )

 cloud_covered = []
 cloud_free = []
 for target in imaged:
 cloud_coverage = target.cloud_cover_true
 if cloud_coverage > target.reward_threshold:
 cloud_covered.append(target)
 else:
 cloud_free.append(target)

 return CloudImageBinaryData(
 imaged=imaged, cloud_covered=cloud_covered, cloud_free=cloud_free
 )


class CloudImageBinaryRewarder(UniqueImageReward):
 """DataManager for rewarding unique images."""

 datastore_type = CloudImageBinaryDataStore

 def calculate_reward(
 self, new_data_dict: dict[str, CloudImageBinaryData]
 ) -> dict[str, float]:
 """Reward new each unique image once using self.reward_fn().

 Args:
 new_data_dict: Record of new images for each satellite

 Returns:
 reward: Cumulative reward across satellites for one step
 """
 reward = {}

 for sat_id, new_data in new_data_dict.items():
 reward[sat_id] = 0.0
 for target in new_data.cloud_free:
 reward[sat_id] += self.reward_fn(target.priority)

 for new_data in new_data_dict.values():
 self.data += new_data
 return reward


# Define the reward function as a function of the priority of the target and the cloud cover
def reward_function_binary(priority):
 return priority


# Uncomment this line and comment the reward in the cell below to use the binary reward function
# rewarder = CloudImageBinaryRewarder(reward_fn=reward_function_binary)

## Making a Rewarder Considering Cloud Coverage for the Re-imaging Case

If the target is deemed occluded by clouds, it won't be tasked again in the single-picture case. However, the problem can be formulated in terms of the probability of observing the target ($\text{P}(S=1)$, represented by the variable `belief` in the code) given the number of pictures and time difference between pictures ($\delta t_i$). Thus, a new rewarder named `CloudImageProbabilityRewarder` is created to accommodate this new formulation, as well as a new reward function. 

The reward function accounts for the desired success probability threshold for each target ($\theta_{\text{thr}_i}$, represented by `reward_threshold` in the code) and has a tunable parameter $\alpha\in[0,1]$:

$$
R = \begin{cases}
\rho_i\alpha_i\Delta \text{P}(S=1) + \rho_i(1 - \alpha) & \text{ if } \text{P}_i(S=1) \geq \theta_{\text{thr}_i} \\
\rho_i\alpha_i\Delta \text{P}(S=1) & \text{ otherwise.}
\end{cases}
$$


In [4]:
class CloudImageProbabilityData(Data):
 """DataType for unique images of targets."""

 def __init__(
 self,
 imaged: Optional[list["Target"]] = None,
 imaged_complete: Optional[list["Target"]] = None,
 list_belief_update_var: Optional[list[float]] = None,
 known: Optional[list["Target"]] = None,
 ) -> None:
 """Construct unit of data to record unique images.

 Keeps track of ``imaged`` targets and completely imaged targets (those with a success probability
 higher than the ``reward_threshold``).

 Args:
 imaged: List of targets that are known to be imaged.
 imaged_complete: List of targets that are known to be completely imaged (P(S=1) >= reward_threshold).
 list_belief_update_var: List of belief update variations for each target after each picture.
 known: List of targets that are known to exist (imaged and not imaged)
 """
 if imaged is None:
 imaged = []
 if imaged_complete is None:
 imaged_complete = []
 if list_belief_update_var is None:
 list_belief_update_var = []
 if known is None:
 known = []
 self.known = list(set(known))

 self.imaged = list(imaged)
 self.imaged_complete = list(set(imaged_complete))
 self.list_belief_update_var = list(list_belief_update_var)

 def __add__(
 self, other: "CloudImageProbabilityData"
 ) -> "CloudImageProbabilityData":
 """Combine two units of data.

 Args:
 other: Another unit of data to combine with this one.

 Returns:
 Combined unit of data.
 """

 imaged = list(self.imaged + other.imaged)
 imaged_complete = list(set(self.imaged_complete + other.imaged_complete))
 list_belief_update_var = list(
 self.list_belief_update_var + other.list_belief_update_var
 )

 known = list(set(self.known + other.known))
 return self.__class__(
 imaged=imaged,
 imaged_complete=imaged_complete,
 list_belief_update_var=list_belief_update_var,
 known=known,
 )


class CloudImageProbabilityDataStore(DataStore):
 """DataStore for unique images of targets."""

 data_type = CloudImageProbabilityData

 def __init__(self, *args, **kwargs) -> None:
 """DataStore for unique images.

 Detects new images by watching for an increase in data in each target's corresponding
 buffer.
 """
 super().__init__(*args, **kwargs)

 def get_log_state(self) -> np.ndarray:
 """Log the instantaneous storage unit state at the end of each step.

 Returns:
 array: storedData from satellite storage unit
 """
 return np.array(
 self.satellite.dynamics.storageUnit.storageUnitDataOutMsg.read().storedData
 )

 def compare_log_states(
 self, old_state: np.ndarray, new_state: np.ndarray
 ) -> CloudImageProbabilityData:
 """Check for an increase in logged data to identify new images.

 This method also performs the belief update (new probability of success) for each target
 based on the cloud coverage forecast and the time difference between the current time and
 the previous observation time. It also keeps track of the variation in the belief update.

 Args:
 old_state: older storedData from satellite storage unit
 new_state: newer storedData from satellite storage unit

 Returns:
 list: Targets imaged at new_state that were unimaged at old_state
 """
 update_idx = np.where(new_state - old_state > 0)[0]
 imaged = []
 for idx in update_idx:
 message = self.satellite.dynamics.storageUnit.storageUnitDataOutMsg
 target_id = message.read().storedDataName[int(idx)]
 imaged.append(
 [target for target in self.data.known if target.id == target_id][0]
 )

 list_imaged_complete = []
 list_belief_update_var = []

 current_sim_time = self.satellite.simulator.sim_time
 belief_update_func = self.satellite.belief_update_func

 for target in imaged:
 target_prev_obs = (
 target.prev_obs
 ) # Time at which the target was previously observed
 target_time_diff = (
 current_sim_time - target_prev_obs
 ) # Time difference between the current time and the previous observation time
 target_belief = (
 target.belief
 ) # Belief of the target before the current picture

 target_cloud_cover_forecast = target.cloud_cover_forecast
 updated_belief = belief_update_func(
 target_belief, target_cloud_cover_forecast, target_time_diff
 )

 target.belief = updated_belief # Update the belief of the target
 target.belief_update_var = updated_belief[1] - target_belief[1]
 target.prev_obs = current_sim_time # Update the previous observation time

 if updated_belief[1] > target.reward_threshold:
 list_imaged_complete.append(target)
 list_belief_update_var.append(target.belief_update_var)

 return CloudImageProbabilityData(
 imaged=imaged,
 imaged_complete=list_imaged_complete,
 list_belief_update_var=list_belief_update_var,
 )


class CloudImageProbabilityRewarder(GlobalReward):

 datastore_type = CloudImageProbabilityDataStore

 def __init__(
 self,
 reward_fn: Callable,
 alpha: float = 0.5,
 ) -> None:
 """

 Modifies the constructor to include the alpha parameter to tune the reward function and
 the reward function.
 Args:
 reward_fn: Reward as function of priority, targets belief, and alpha.
 """
 super().__init__()
 self.reward_fn = reward_fn
 self.alpha = alpha

 def initial_data(self, satellite: "sats.Satellite") -> "UniqueImageData":
 """Furnish data to the scenario.

 Currently, it is assumed that all targets are known a priori, so the initial data
 given to the data store is the list of all targets.
 """
 return self.data_type(known=self.scenario.targets)

 def calculate_reward(
 self, new_data_dict: dict[str, CloudImageProbabilityData]
 ) -> dict[str, float]:
 """Reward new each unique image once using self.reward_fn().

 Args:
 new_data_dict: Record of new images for each satellite

 Returns:
 reward: Cumulative reward across satellites for one step
 """
 reward = {}

 for sat_id, new_data in new_data_dict.items():
 reward[sat_id] = 0.0
 for target, belief_variation in zip(
 new_data.imaged, new_data.list_belief_update_var
 ):
 reward[sat_id] += self.reward_fn(
 target.priority, belief_variation, self.alpha, reach_threshold=False
 )
 for target in new_data.imaged_complete:
 reward[sat_id] += self.reward_fn(
 target.priority, None, self.alpha, reach_threshold=True
 )

 return reward


# Define the reward function as a function of the priority of the target, the cloud cover, and the number of times the target has been imaged
def reward_function_probability(
 priority: float, belief_variation: float, alpha: float, reach_threshold: bool
) -> float:
 """

 Rewards based on the priority of the target, the belief variation, and the alpha parameter.

 Args:
 priority: Priority of the target.
 belief_variation: Variation in the belief of the target after the picture.
 alpha: Tuning parameter between 0 and 1.
 reach_threshold: Boolean indicating whether the target has reached the reward threshold.

 Returns:
 float: Reward for the target.
 """
 if reach_threshold:
 return priority * (1 - alpha)
 else:
 return priority * belief_variation * alpha


rewarder = CloudImageProbabilityRewarder(
 reward_fn=reward_function_probability, alpha=1.0
)

`CloudImageProbabilityDataStore` requires a function `belief_update_func` that returns the updated success probability for target $i$ ($\text{P}^{(k+1)}_i(S=1)$) given its current success probability ($\text{P}^{(k)}_i(S=1)$), cloud coverage forecast ($c_{f_i}$), and the time different between the current and previous image ($\delta t_i$).

The update in the success probability is given by:

$$
\text{P}^{(k+1)}(S=1) = 1 - \text{P}^{(k)}(S=1)\bar{c}_{f_i}
$$

To penalize two consecutive pictures without enough elapsed time (and not enough shift in clouds' position), a new cloud-free probability variable $g_{f_i}$ is introduced such that

$$
g^{(k)}_{f_i} = (1-c^{(k)}_{f_i})\beta(\delta t_i)
$$

where $\beta$ is given by a sigmoid

$$
\beta(\delta t) = \frac{1}{\eta_3+e^{-\eta_1(\frac{\delta t}{\tau}-\eta_2)}}
$$

and

$$
\bar{c}_{f_i} = 1 - g_{f_i}^{(k)}
$$

leading to:

$$
\text{P}^{(k+1)}(S=1) = \text{P}^{(k)}(S=1) + (1-\text{P}^{(k)}(S=1))(1-c^{(k)}_{f_i})\beta(\delta t_i)
$$

In [5]:
def time_variation(
 delta_t: float, t_const: float, k_1: float = 2.5, k_2: float = 2.5, k_3: float = 1.0
) -> float:
 """
 Time variation function based on sigmoid function.

 Args:
 delta_t (float): Time difference between the current time and the previous observation time.
 t_const (float): Time constant for the sigmoid function.
 k_1 (float): Sigmoid function parameter.
 k_2 (float): Sigmoid function parameter.
 k_3 (float): Sigmoid function parameter.

 Returns:
 float: Time variation value.
 """
 if delta_t <= 0:
 return 0
 else:
 return 1 / (k_3 + np.exp(-k_1 * (delta_t / t_const - k_2)))


def belief_update(
 b: list[float], cloud_cover_forecast: float, delta_t: float, t_const: float
) -> np.array:
 """
 Update the belief based on the cloud forecast and the time variation.

 Args:
 b (np.array): Belief array (b(S=0), b(S=1)).
 cloud_forecast (float): Cloud coverage forecast.
 delta_t (float): Time difference between the current time and the previous observation time.
 t_const (float): Time constant for the sigmoid function.

 Returns:
 np.array: Updated belief array
 """

 cloud_time_variation = time_variation(delta_t, t_const)
 cloud_free = (1 - cloud_cover_forecast) * cloud_time_variation
 cloud_cover_bar = 1 - cloud_free
 b_0 = b[0] * cloud_cover_bar
 b_1 = 1 - b_0
 return np.array([b_0, b_1])


def belief_update_func(
 b: list[float], cloud_cover_forecast: float, delta_t: float
) -> np.array:
 """
 Belief update function for the satellite.

 Args:
 b (np.array): Belief array (b(S=0), b(S=1)).
 cloud_forecast (float): Cloud coverage forecast.
 delta_t (float): Time difference between the current time and the previous observation time.

 Returns:
 np.array: Updated belief array
 """
 time_constant = 30 * 60 / 5 # 30 minutes
 return belief_update(b, cloud_cover_forecast, delta_t, time_constant)

## Configuring the Satellite to Have Access to Cloud Information

The satellite has observations and actions associated with it that are relevant to the decision-making process. The observation space can be modified to include information about the targets and the weather (cloud coverage forecast, reward threshold, success probability, etc) which allows better informed decision-making.

* [Observations](../api_reference/obs/index.rst): 
 - SatProperties: Body angular velocity, instrument pointing direction, body position, body velocity, battery charge (properties in [flight software model](../api_reference/sim/fsw.rst) or [dynamics model](../api_reference/sim/dyn.rst)). Also, customized dynamics property in CustomDynModel below: Angle between the sun and the solar panel.
 - OpportunityProperties: Target's priority, cloud coverage forecast, standard deviation of cloud coverage forecast, probability of being successfully imaged, and last time it was imaged (upcoming 32 targets). 
 - Time: Simulation time.
 - Eclipse: Next eclipse start and end times. 
* [Actions](../api_reference/act/index.rst):
 - Charge: Enter a sun-pointing charging mode for 60 seconds.
 - Image: Image target from upcoming 32 targets
* [Dynamics model](../api_reference/sim/dyn.rst): FullFeaturedDynModel is used and a property, angle between sun and solar panel, is added.
* [Flight software model](../api_reference/sim/fsw.rst): SteeringImagerFSWModel is used.

In [6]:
class CustomSatComposed(sats.ImagingSatellite):
 observation_spec = [
 obs.SatProperties(
 dict(prop="omega_BP_P", norm=0.03),
 dict(prop="c_hat_P"),
 dict(prop="r_BN_P", norm=orbitalMotion.REQ_EARTH * 1e3),
 dict(prop="v_BN_P", norm=7616.5),
 dict(prop="battery_charge_fraction"),
 dict(prop="solar_angle_norm"),
 ),
 obs.Eclipse(),
 obs.OpportunityProperties(
 dict(prop="priority"),
 dict(
 fn=lambda sat, opp: opp["object"].cloud_cover_forecast
 ), # Cloud coverage forecast (percentage of the area covered by clouds)
 dict(
 fn=lambda sat, opp: opp["object"].cloud_cover_sigma
 ), # Confidence on the cloud coverage forecast
 # dict(fn=lambda sat, opp: opp["object"].reward_threshold), #Reward threshold for each target. Uncomment if using variable threshold
 dict(
 fn=lambda sat, opp: opp["object"].belief[1]
 ), # Probability of successfully imaging the target. Used only in the re-imaging case
 dict(
 fn=lambda sat, opp: opp["object"].prev_obs, norm=5700
 ), # Previous observation time. Used only in the re-imaging case
 type="target",
 n_ahead_observe=32,
 ),
 obs.Time(),
 ]

 action_spec = [
 act.Charge(duration=60.0),
 act.Image(n_ahead_image=32),
 ]

 # Modified the constructor to include the belief update function
 def __init__(self, *args, belief_update_func=None, **kwargs) -> None:
 super().__init__(*args, **kwargs)
 self.belief_update_func = belief_update_func

 class CustomDynModel(dyn.FullFeaturedDynModel):

 @property
 def solar_angle_norm(self) -> float:
 sun_vec_N = (
 self.world.gravFactory.spiceObject.planetStateOutMsgs[
 self.world.sun_index
 ]
 .read()
 .PositionVector
 )
 sun_vec_N_hat = sun_vec_N / np.linalg.norm(sun_vec_N)
 solar_panel_vec_B = np.array([0, 0, -1]) # Not default configuration
 mat = np.transpose(self.BN)
 solar_panel_vec_N = np.matmul(mat, solar_panel_vec_B)
 error_angle = np.arccos(np.dot(solar_panel_vec_N, sun_vec_N_hat))

 return error_angle / np.pi

 dyn_type = CustomDynModel
 fsw_type = fsw.SteeringImagerFSWModel

It is necessary to add a filter to remove targets that reached the success threshold from the targets list when re-imaging is allowed such that:

In [7]:
def belief_threshold_filter(opportunity):
 if opportunity["type"] == "target":
 return (
 True
 if opportunity["object"].belief[1] < opportunity["object"].reward_threshold
 else False
 )
 return True

When instantiating a satellite, these parameters can be overriden with a constant or 
rerandomized every time the environment is reset using the ``sat_args`` dictionary.

In [8]:
dataStorageCapacity = 20 * 8e6 * 100
sat_args = CustomSatComposed.default_sat_args(
 imageAttErrorRequirement=0.01,
 imageRateErrorRequirement=0.01,
 batteryStorageCapacity=80.0 * 3600 * 2,
 storedCharge_Init=lambda: np.random.uniform(0.4, 1.0) * 80.0 * 3600 * 2,
 u_max=0.2,
 K1=0.5,
 nHat_B=np.array([0, 0, -1]),
 imageTargetMinimumElevation=np.radians(45),
 rwBasePower=20,
 maxWheelSpeed=1500,
 storageInit=lambda: np.random.randint(
 0 * dataStorageCapacity,
 0.01 * dataStorageCapacity,
 ), # Initialize storage use close to zero
 wheelSpeeds=lambda: np.random.uniform(
 -1, 1, 3
 ), # Initialize reaction wheel speeds close to zero
 dataStorageCapacity=dataStorageCapacity, # Large storage to avoid filling up in three orbits
)

## Initializing and Interacting with the Environment
For this example, we will be using the multi-agent [ConstellationTasking](../api_reference/index.rst) 
environment. Along with passing the satellite that we configured, the environment takes
a [scenario](../api_reference/scene/index.rst), which defines the environment the
satellite is acting in, and a [rewarder](../api_reference/data/index.rst), which defines
how data collected from the scenario is rewarded.

In [None]:
from bsk_rl.utils.orbital import walker_delta_args

sat_arg_randomizer = walker_delta_args(
 altitude=500.0, n_planes=1, inc=45, clustersize=5, clusterspacing=72
)

satellites = [
 CustomSatComposed(f"EO-{i}", sat_args, belief_update_func=belief_update_func)
 for i in range(5)
]

# Add filter to satellites to remove targets that have already reached the belief threshold
for sat in satellites:
 sat.add_access_filter(belief_threshold_filter)

env = ConstellationTasking(
 satellites=satellites,
 world_type=world.GroundStationWorldModel,
 world_args=world.GroundStationWorldModel.default_world_args(),
 scenario=scenario,
 rewarder=rewarder,
 sat_arg_randomizer=sat_arg_randomizer,
 sim_rate=0.5,
 max_step_duration=300.0,
 time_limit=95 * 60 / 2, # half orbit
 log_level="INFO",
 failure_penalty=0.0,
 # disable_env_checker=True, # For debugging
)

First, reset the environment. It is possible to specify the seed when resetting the environment.

In [10]:
observation, info = env.reset(seed=1)

[90;3m2025-03-26 15:49:51,992 [0m[mgym [0m[mINFO [0m[mResetting environment with seed=1[0m
[90;3m2025-03-26 15:49:51,996 [0m[mscene.targets [0m[mINFO [0m[mGenerating 1020 targets[0m
[90;3m2025-03-26 15:49:52,279 [0m[36msats.satellite.EO-0 [0m[mINFO [0m[33m<0.00> [0m[36mEO-0: [0m[mFinding opportunity windows from 0.00 to 2850.00 seconds[0m
[90;3m2025-03-26 15:49:52,317 [0m[92msats.satellite.EO-1 [0m[mINFO [0m[33m<0.00> [0m[92mEO-1: [0m[mFinding opportunity windows from 0.00 to 2850.00 seconds[0m
[90;3m2025-03-26 15:49:52,349 [0m[34msats.satellite.EO-2 [0m[mINFO [0m[33m<0.00> [0m[34mEO-2: [0m[mFinding opportunity windows from 0.00 to 2850.00 seconds[0m
[90;3m2025-03-26 15:49:52,379 [0m[95msats.satellite.EO-3 [0m[mINFO [0m[33m<0.00> [0m[95mEO-3: [0m[mFinding opportunity windows from 0.00 to 2850.00 seconds[0m
[90;3m2025-03-26 15:49:52,413 [0m[96msats.satellite.EO-4 [0m[mINFO [0m[33m<0.00> [0m[96mEO-4: [0m[mFinding 

It is possible to print out the actions and observations. The composed satellite [action_description](../api_reference/sats/index.rst) returns a human-readable action map each satellite has the same action space and similar observation space.

In [11]:
print("Actions:", env.satellites[0].action_description, "\n")
print("States:", env.unwrapped.satellites[0].observation_description, "\n")

# Using the composed satellite features also provides a human-readable state:
for satellite in env.unwrapped.satellites:
 for k, v in satellite.observation_builder.obs_dict().items():
 print(f"{k}: {v}")

Actions: ['action_charge', 'action_image_0', 'action_image_1', 'action_image_2', 'action_image_3', 'action_image_4', 'action_image_5', 'action_image_6', 'action_image_7', 'action_image_8', 'action_image_9', 'action_image_10', 'action_image_11', 'action_image_12', 'action_image_13', 'action_image_14', 'action_image_15', 'action_image_16', 'action_image_17', 'action_image_18', 'action_image_19', 'action_image_20', 'action_image_21', 'action_image_22', 'action_image_23', 'action_image_24', 'action_image_25', 'action_image_26', 'action_image_27', 'action_image_28', 'action_image_29', 'action_image_30', 'action_image_31'] 

States: [np.str_('sat_props.omega_BP_P_normd[0]'), np.str_('sat_props.omega_BP_P_normd[1]'), np.str_('sat_props.omega_BP_P_normd[2]'), np.str_('sat_props.c_hat_P[0]'), np.str_('sat_props.c_hat_P[1]'), np.str_('sat_props.c_hat_P[2]'), np.str_('sat_props.r_BN_P_normd[0]'), np.str_('sat_props.r_BN_P_normd[1]'), np.str_('sat_props.r_BN_P_normd[2]'), np.str_('sat_props.v_BN_P

Then, run the simulation until timeout or agent failure.

In [12]:
count = 0
while True:

 if count == 0:
 # Vector with an action for each satellite (we can pass different actions for each satellite)
 # Tasking all satellites to charge (tasking None as the first action will raise a warning)
 action_dict = {sat_i.name: 0 for sat_i in env.satellites}
 else:
 # Tasking random actions
 action_dict = {sat_i.name: np.random.randint(0, 32) for sat_i in env.satellites}
 count += 1

 observation, reward, terminated, truncated, info = env.step(action_dict)

 if all(terminated.values()) or all(truncated.values()):
 print("Episode complete.")
 break

[90;3m2025-03-26 15:49:52,485 [0m[mgym [0m[mINFO [0m[33m<0.00> [0m[93;1m=== STARTING STEP ===[0m
[90;3m2025-03-26 15:49:52,486 [0m[36msats.satellite.EO-0 [0m[mINFO [0m[33m<0.00> [0m[36mEO-0: [0m[maction_charge tasked for 60.0 seconds[0m
[90;3m2025-03-26 15:49:52,486 [0m[36msats.satellite.EO-0 [0m[mINFO [0m[33m<0.00> [0m[36mEO-0: [0m[msetting timed terminal event at 60.0[0m
[90;3m2025-03-26 15:49:52,487 [0m[92msats.satellite.EO-1 [0m[mINFO [0m[33m<0.00> [0m[92mEO-1: [0m[maction_charge tasked for 60.0 seconds[0m
[90;3m2025-03-26 15:49:52,487 [0m[92msats.satellite.EO-1 [0m[mINFO [0m[33m<0.00> [0m[92mEO-1: [0m[msetting timed terminal event at 60.0[0m
[90;3m2025-03-26 15:49:52,488 [0m[34msats.satellite.EO-2 [0m[mINFO [0m[33m<0.00> [0m[34mEO-2: [0m[maction_charge tasked for 60.0 seconds[0m
[90;3m2025-03-26 15:49:52,488 [0m[34msats.satellite.EO-2 [0m[mINFO [0m[33m<0.00> [0m[34mEO-2: [0m[msetting timed terminal ev

Episode complete.


After the running the simulation, we can check the reward, number of imaged targets that were covered by clouds and that were not covered by clouds (according to the threshold set in the rewarder).

In [13]:
print("Total reward:", env.unwrapped.rewarder.cum_reward)
print("Number of total images taken:", len(env.unwrapped.rewarder.data.imaged))
print(
 "Number of imaged targets (once or more):",
 len(set(env.unwrapped.rewarder.data.imaged)),
)
print(
 "Number of re-images:",
 len(env.unwrapped.rewarder.data.imaged)
 - len(set(env.unwrapped.rewarder.data.imaged)),
)
print(
 "Number of completely imaged targets:",
 len(env.unwrapped.rewarder.data.imaged_complete),
)

Total reward: {'EO-0': np.float64(0.0), 'EO-1': np.float64(0.004597230086921116), 'EO-2': np.float64(0.1849686437643978), 'EO-3': 0.0, 'EO-4': 0.0}
Number of total images taken: 5
Number of imaged targets (once or more): 5
Number of re-images: 0
Number of completely imaged targets: 0


Check [Training with RLlib PPO](../examples/rllib_training.ipynb) for an example on how to train the agent in this environment.