Source code for test_stochasticIntegrators

#
#  ISC License
#
#  Copyright (c) 2026, Autonomous Vehicle Systems Lab, University of Colorado at Boulder
#
#  Permission to use, copy, modify, and/or distribute this software for any
#  purpose with or without fee is hereby granted, provided that the above
#  copyright notice and this permission notice appear in all copies.
#
#  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
#  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
#  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
#  ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
#  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
#  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
#  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""
Tests the Euler-Mayurama and SRK integrator described in:

Tang, X., Xiao, A. Efficient weak second-order stochastic Runge-Kutta methods
for Itô stochastic differential equations. Bit Numer Math 57, 241-260 (2017).
https://doi.org/10.1007/s10543-016-0618-9
"""
from __future__ import annotations

from typing import Callable, List, Literal, get_args, Any
import tqdm
import itertools

import pytest
import numpy as np
import numpy.typing as npt
import numpy.testing
import matplotlib.pyplot as plt

from Basilisk.utilities import macros
from Basilisk.utilities import SimulationBaseClass
from Basilisk.utilities import pythonVariableLogger
from Basilisk.simulation import svIntegrators
from Basilisk.simulation import dynParamManager

try:
    from Basilisk.simulation import mujoco
    couldImportMujoco = True
except:
    couldImportMujoco = False

SRKMethod = Literal["W2Ito1", "W2Ito2"]
Method = Literal["W2Ito1", "W2Ito2", "EulerMayurama"]
METHODS = get_args(Method)

# Function of the form y = f(t,x) where x and y are vectors of the same size
DynFunc = Callable[[float, npt.NDArray[np.float64]], npt.NDArray[np.float64]]

# Coefficient sets (from Tang & Xiao Table 2, W2-Ito1 & W2-Ito2)
W2Ito1Coefficients = {
    'alpha':  np.array([1/6, 2/3, 1/6]),
    'beta0':  np.array([-1, 1, 1]),
    'beta1':  np.array([2, 0, -2]),

    'A0':     np.array([[0.0, 0.0, 0.0],
                        [1/2, 0.0, 0.0],
                        [-1, 2, 0]]),

    'B0':     np.array([[0.0, 0.0, 0.0],
                        [(6-np.sqrt(6))/10, 0.0, 0.0],
                        [(3+2*np.sqrt(6))/5, 0.0, 0.0]]),

    'A1':     np.array([[0.0, 0.0, 0.0],
                        [1/4, 0.0, 0.0],
                        [1/4, 0.0, 0.0]]),

    'B1':     np.array([[0.0, 0.0, 0.0],
                        [1/2, 0.0, 0.0],
                        [-1/2, 0.0, 0.0]]),

    'B2':     np.array([[0.0, 0.0, 0.0],
                        [1.0, 0.0, 0.0],
                        [0.0, 0.0, 0.0]]),
}

W2Ito2Coefficients = {
    'alpha':  np.array([1/6, 1/3, 1/3, 1/6]),
    'beta0':  np.array([0, -1, 1, 1]),
    'beta1':  np.array([0, 2, 0, -2]),

    'A0':     np.array([[0.0, 0.0, 0.0, 0.0],
                        [1/2, 0.0, 0.0, 0.0],
                        [0.0, 1/2, 0.0, 0.0],
                        [0.0, 0.0, 1.0, 0.0]]),

    'B0':     np.array([[0.0, 0.0, 0.0, 0.0],
                        [0.0, 0.0, 0.0, 0.0],
                        [0.0, 1.0, 0.0, 0.0],
                        [0.0, 1.0, 0.0, 0.0]]),

    'A1':     np.array([[0.0, 0.0, 0.0, 0.0],
                        [1/2, 0.0, 0.0, 0.0],
                        [1/2, 0.0, 0.0, 0.0],
                        [1/2, 0.0, 0.0, 0.0]]),

    'B1':     np.array([[0.0, 0.0,  0.0, 0.0],
                        [0.0, 0.0,  0.0, 0.0],
                        [0.0, 1/2,  0.0, 0.0],
                        [0.0, -1/2, 0.0, 0.0]]),

    'B2':     np.array([[0.0, 0.0, 0.0, 0.0],
                        [0.0, 0.0, 0.0, 0.0],
                        [0.0, 1.0, 0.0, 0.0],
                        [0.0, 0.0, 0.0, 0.0]]),
}

SRK_COEFFICIENTS: dict[SRKMethod, dict[str, npt.NDArray[Any]]] = {
    "W2Ito1": W2Ito1Coefficients,
    "W2Ito2": W2Ito2Coefficients
}

[docs] def srk2Integrate( f: DynFunc, g_list: List[DynFunc], x0: npt.NDArray[np.float64], dt: float, tf: float, rng_seed: int, alpha: npt.NDArray[np.float64], beta0: npt.NDArray[np.float64], beta1: npt.NDArray[np.float64], A0: npt.NDArray[np.float64], B0: npt.NDArray[np.float64], A1: npt.NDArray[np.float64], B1: npt.NDArray[np.float64], B2: npt.NDArray[np.float64], ): """ Generic s-stage SRK integrator for vector SDE: dX = f(t,X) dt + sum_k g_list[k](t,X) dW_k Method described in Tang & Xiao. Args: f: Drift function. g_list: List of diffusion functions. x0: Initial state. dt: Time step. tf: Final time. rng_seed: Random seed. alpha, beta0, beta1, A0, B0, A1, B1, B2: SRK coefficients. Returns: Array of state trajectories, including time as the first column. """ def wrapped_f(full_state: npt.NDArray[Any]): t = full_state[0] x = full_state[1:] return np.concatenate([[1], f(t, x)]) wrapped_g_list = [] for g in g_list: def wrapped_g(full_state: npt.NDArray[Any], g: DynFunc = g): t = full_state[0] x = full_state[1:] return np.concatenate([[0], g(t, x)]) wrapped_g_list.append(wrapped_g) s = alpha.size n = x0.size m = len(g_list) nsteps = int(np.floor(tf / dt)) x = np.zeros([nsteps+1, n+1], dtype=float) x[0,0] = 0 x[0,1:] = x0 rng = svIntegrators.SRKRandomVariableGenerator() rng.setSeed(rng_seed) # throw the first away, similar to how Basilisk # calls the equationsOfMotion once at the beginning rng.generate(m, 0) for step in range(nsteps): X = x[step,:].copy() # generate random variables rvs: svIntegrators.SRKRandomVariables = rng.generate(m, dt) Ik: List[List[float]] = rvs.Ik Ikl: List[List[float]] = rvs.Ikl xi: float = rvs.xi # allocate stage arrays H0 = [X.copy() for _ in range(s)] Hk = [[X.copy() for _ in range(s)] for _ in range(m)] for i in range(s): # compute H0 stages sumA = np.zeros(n+1) sumB = np.zeros(n+1) for j in range(s): sumA += A0[i, j] * wrapped_f(H0[j]) * dt for k in range(m): sumB += B0[i, j] * wrapped_g_list[k](Hk[k][j]) * Ik[k] H0[i] = X + sumA + sumB # compute Hk stages for k in range(m): sumA = np.zeros(n+1) sumB1 = np.zeros(n+1) sumB2 = np.zeros(n+1) for j in range(s): sumA += A1[i, j] * wrapped_f(H0[j]) * dt sumB1 += B1[i, j] * wrapped_g_list[k](Hk[k][j]) * xi for l in range(m): if l != k: sumB2 += B2[i, j] * wrapped_g_list[l](Hk[k][j]) * Ikl[k][l] Hk[k][i] = X + sumA + sumB1 + sumB2 # combine increments drift = np.zeros(n+1) for i in range(s): drift += alpha[i] * wrapped_f(H0[i]) * dt diffusion = np.zeros(n+1) for k in range(m): for i in range(s): diffusion += beta0[i] * wrapped_g_list[k](Hk[k][i]) * Ik[k] diffusion += beta1[i] * wrapped_g_list[k](Hk[k][i]) * Ikl[k][k] x[step+1,:] = X + drift + diffusion return x
[docs] def eulerMayuramaIntegrate( f: DynFunc, g_list: List[DynFunc], x0: npt.NDArray[np.float64], dt: float, tf: float, rng_seed: int ): """ Euler-Mayurama integrator for the vector SDE: dX = f(t,X) dt + sum_k g_list[k](t,X) dW_k Args: f: Drift function. g_list: List of diffusion functions. x0: Initial state. dt: Time step. tf: Final time. rng_seed: Random seed. Returns: Array of state trajectories, including time as the first column. """ n = x0.size m = len(g_list) nsteps = int(np.floor(tf / dt)) t = dt * np.arange(nsteps+1) x = np.zeros([nsteps+1, n], dtype=float) x[0,:] = x0 rng = svIntegrators.EulerMayuramaRandomVariableGenerator() rng.setSeed(rng_seed) # throw the first away, similar to how Basilisk # calls the equationsOfMotion once at the beginning rng.generate(m, 0) for step in range(nsteps): # generate random variables pseudoTimeSteps = rng.generate(m, dt) x[step+1,:] = x[step,:] + f(t[step], x[step,:])*dt for k, g in enumerate(g_list): x[step+1,:] += g(t[step], x[step,:])*pseudoTimeSteps[k] return np.column_stack([t, x])
[docs] class ExponentialSystem: """A simple deterministic system with one state: dx/dt = x*t.""" def __init__(self, x0: float = 1): """Initialize the system with initial state x0.""" self.x0 = np.array([x0])
[docs] def f(self, t: float, x: npt.NDArray[np.float64]): """Drift function for the exponential system.""" return np.array([t*x[0]])
g = []
[docs] class OrnsteinUhlenbeckSystem: """A process defined by dx = theta*(mu - x)*dt + sigma*dW """ def __init__(self, x0: float = .1, mu: float = 0, theta: float = .1, sigma: float = .01): """Initialize the OU process parameters.""" self.x0 = np.array([x0]) self.mu = mu self.theta = theta self.sigma = sigma
[docs] def f(self, t: float, x: npt.NDArray[np.float64]): """Drift function for the OU process.""" return np.array([self.theta*(self.mu - x[0])])
[docs] def g1(self, t: float, x: npt.NDArray[np.float64]): """Diffusion function for the OU process.""" return np.array([self.sigma])
@property def g(self): """Return list of diffusion functions.""" return [self.g1]
[docs] def mean(self, t: float): """E[x(t)]""" return np.array([ self.x0[0]*np.exp(-self.theta*t) + self.mu*(1 - np.exp(-self.theta*t)) ])
[docs] def var(self, t: float): """Var(x(t))""" return np.array([ self.sigma**2/(2*self.theta) * (1- np.exp(-2*self.theta*t)) ])
[docs] class ComplexOrnsteinUhlenbeckSystem: """A process defined by dx = -theta*x*dt + sigma_x1*dW_1 + sigma_x2*dW_2 dy = -theta*y*dt + sigma_y1*dW_1 + sigma_y2*dW_2 """ def __init__( self, x0: float = .1, y0: float = -.1, theta_x: float = .1, theta_y: float = .073, sigma_x1: float = .015, sigma_x2: float = .011, sigma_y1: float = 0, sigma_y2: float = .029 ): """Initialize the complex OU process parameters.""" self.x0 = np.array([x0, y0]) self.theta_x = theta_x self.theta_y = theta_y self.sigma_x1 = sigma_x1 self.sigma_x2 = sigma_x2 self.sigma_y1 = sigma_y1 self.sigma_y2 = sigma_y2
[docs] def f(self, t: float, x: npt.NDArray[np.float64]): """Drift function for the complex OU process.""" return np.array([-self.theta_x*x[0], -self.theta_y*x[1]])
[docs] def g1(self, t: float, x: npt.NDArray[np.float64]): """First diffusion function for the complex OU process.""" return np.array([self.sigma_x1, self.sigma_y1])
[docs] def g2(self, t: float, x: npt.NDArray[np.float64]): """Second diffusion function for the complex OU process.""" return np.array([self.sigma_x2, self.sigma_y2])
@property def g(self): """Return list of diffusion functions.""" return [self.g1, self.g2]
[docs] class Example1System: """Example 1 dynamical system in: Tang, X., Xiao, A. Efficient weak second-order stochastic Runge-Kutta methods for Itô stochastic differential equations. Bit Numer Math 57, 241-260 (2017). https://doi.org/10.1007/s10543-016-0618-9 """ def __init__(self, y1_0: float = 1, y2_0: float = 1): """Initialize the Example 1 system.""" self.x0 = np.array([y1_0, y2_0])
[docs] def f(self, t: float, x: npt.NDArray[np.float64]): """Drift function for Example 1 system.""" y1, y2 = x return np.array([ -273/512*y1, -1/160*y1 - (785/512 - np.sqrt(2)/8)*y2 ])
[docs] def g1(self, t: float, x: npt.NDArray[np.float64]): """First diffusion function for Example 1 system.""" y1, y2 = x return np.array([ 1/4*y1, (1 - 2*np.sqrt(2)/8)/4*y2 ])
[docs] def g2(self, t: float, x: npt.NDArray[np.float64]): """Second diffusion function for Example 1 system.""" y1, y2 = x return np.array([ 1/16*y1, 1/10*y1 + 1/16*y2 ])
@property def g(self): """Return list of diffusion functions.""" return [self.g1, self.g2]
[docs] def getBasiliskSim(method: Method, dt: float, x0: npt.NDArray[np.float64], f: DynFunc, g: List[DynFunc], seed: int | None): """ Set up and return a Basilisk simulation for a given SDE and integrator method. Args: method: Integration method ("W2Ito1", "W2Ito2", or "EulerMayurama"). dt: Time step. x0: Initial state. f: Drift function. g: List of diffusion functions. seed: RNG seed (or None for random). Returns: Tuple of (scSim, stateModel, integratorObject, stateLogger). """ from Basilisk.simulation import StatefulSysModel # Declared inside, since StatefulSysModel may be undefined if not running with mujoco class GenericStochasticStateModel(StatefulSysModel.StatefulSysModel): def __init__(self, x0: npt.NDArray[np.float64], f: DynFunc, g: List[DynFunc]): super().__init__() self.x0 = x0 self.f = f self.g = g def registerStates(self, registerer: StatefulSysModel.DynParamRegisterer): """Called once during InitializeSimulation""" # We get one noise source per diffusion function g: m = len(self.g) n = self.x0.size self.states: List[dynParamManager.StateData] = [] for i in range(n): self.states.append( registerer.registerState(1, 1, f"y{i+1}") ) self.states[-1].setNumNoiseSources(m) self.states[-1].setState([[self.x0[i]]]) # We want every noise source to be shared between states for k in range(m): registerer.registerSharedNoiseSource([ (state, k) for state in self.states ]) def UpdateState(self, CurrentSimNanos: int): """Called at every integrator step""" m = len(self.g) n = self.x0.size t = macros.NANO2SEC * CurrentSimNanos # Collect current state into a numpy array x = np.array([state.getState()[0][0] for state in self.states]) # Compute f and store in the derivative of the states deriv = self.f(t, x) for i in range(n): self.states[i].setDerivative( [[deriv[i]]] ) # Compute g_k and store in the diffusion of the states for k in range(m): diff = self.g[k](t, x) for i in range(n): self.states[i].setDiffusion( [[diff[i]]], index=k ) # Create sim, process, and task scSim = SimulationBaseClass.SimBaseClass() dynProcess = scSim.CreateNewProcess("test") dynProcess.addTask(scSim.CreateNewTask("test", macros.sec2nano(dt))) scene = mujoco.MJScene("<mujoco/>") # empty scene, no multi-body dynamics scSim.AddModelToTask("test", scene) if method == "W2Ito1": integratorClass = svIntegrators.svStochasticIntegratorW2Ito1 elif method == "W2Ito2": integratorClass = svIntegrators.svStochasticIntegratorW2Ito2 elif method == "EulerMayurama": integratorClass = svIntegrators.svStochasticIntegratorMayurama else: raise NotImplementedError(method) integratorObject = integratorClass(scene) if seed is not None: integratorObject.setRNGSeed(seed) scene.setIntegrator(integratorObject) stateModel = GenericStochasticStateModel(x0, f, g) stateModel.ModelTag = "testModel" # The same model computes both the drift and diffusion of the state # so they must be added to both tasks scene.AddModelToDynamicsTask(stateModel) scene.AddModelToDiffusionDynamicsTask(stateModel) stateLogger = pythonVariableLogger.PythonVariableLogger( {"x": lambda _: np.array([state.getState()[0][0] for state in stateModel.states])} ) scSim.AddModelToTask("test", stateLogger) scSim.InitializeSimulation() return scSim, stateModel, integratorObject, stateLogger
[docs] def estimateErrorAndEmpiricalVariance( computeTrajectory: Callable[[], npt.NDArray[np.float64]], G: Callable[[npt.NDArray[np.float64]], float], estimateGOnTrajectory: float, M1: int, M2: int, ): r"""Computes the error and empirical variance according to the equations described in Section 4 of Tang & Xiao. Args: computeTrajectory (Callable[[], npt.NDArray[np.float64]]): A function that, when called, realizes one simulation by forward propagating the system from t0. The result is the state at the end point. In the paper: ``y_N``. G (Callable[[npt.NDArray[np.float64]], float]): A differentiable function that takes in a state vector and outputs a single number. estimateGOnTrajectory (float): The estimate of the application of the function G on the random variable that is the last state of the propagated system. In the paper: ``E[G(y(t_N))]`` M1 (int): Number of batches. M2 (int): Number of trajectories per batch. Returns: tuple[float, float]: The integrator error (``\hat{\mu}`` in the paper), and the empirical variance (``\hat{\sigma}_\mu^2`` in the paper). """ muHat = [0. for _ in range(M1)] for j, _ in tqdm.tqdm(itertools.product(range(M1), range(M2)), total=M1*M2): muHat[j] += (G(computeTrajectory()) - estimateGOnTrajectory)/M2 muHatAvg = sum(muHat) / M1 sigmaMuSquared = 0 for j in range(M1): sigmaMuSquared += (muHat[j] - muHatAvg)**2/(M1-1) return muHatAvg, sigmaMuSquared
[docs] @pytest.mark.skipif(not couldImportMujoco, reason="Compiled Basilisk without --mujoco") @pytest.mark.parametrize("method", METHODS) def test_deterministic(method: Method, plot: bool = False): """ Test deterministic integration (no diffusion) for all integrator methods. Compares Basilisk and Python implementations against the analytical solution for the exponential system dx/dt = x*t. Args: method: Integration method ("W2Ito1", "W2Ito2", or "EulerMayurama"). plot: If True, plot the relative error. """ dt = .01 tf = 1 seed = 10 system = ExponentialSystem() scSim, stateModel, integratorObject, stateLogger = getBasiliskSim(method, dt, system.x0, system.f, system.g, seed) scSim.ConfigureStopTime( macros.sec2nano(tf) ) scSim.ExecuteSimulation() tBasilisk = macros.NANO2SEC* stateLogger.times() xBasilisk = stateLogger.x if method == "EulerMayurama": txPython = eulerMayuramaIntegrate(system.f, system.g, system.x0, dt, tf, seed) else: txPython = srk2Integrate(system.f, system.g, system.x0, dt, tf, seed, **SRK_COEFFICIENTS[method]) tPython = txPython[:,0] xPython = txPython[:,1] xExpected = np.exp( tPython **2 / 2 ) if plot: fig, ax = plt.subplots() ax.plot(tBasilisk, 100*(xBasilisk - xExpected)/xExpected, label="Basilisk") ax.plot(tPython, 100*(xPython - xExpected)/xExpected, ls="--", label="Python") ax.legend() ax.set_ylabel("Relative Error [%]") plt.show() # The Python and Basilisk implementation should behave identially numpy.testing.assert_allclose( xBasilisk[-1], xPython[-1], atol=1e-12, rtol=0 ) if method == "EulerMayurama": expectedIntegrationError = .05 elif method == "W2Ito1": expectedIntegrationError = 1e-6 elif method == "W2Ito2": expectedIntegrationError = 1e-8 # The Basilisk should have some integration error w.r.t analytical solution numpy.testing.assert_allclose( xBasilisk[-1], xExpected[-1], atol=expectedIntegrationError, rtol=0 )
[docs] @pytest.mark.skipif(not couldImportMujoco, reason="Compiled Basilisk without --mujoco") @pytest.mark.parametrize("method", METHODS) def test_ou(method: Method, plot: bool = False): """ Test Ornstein-Uhlenbeck process integration for all integrator methods. Compares Basilisk and Python implementations for a single realization. Args: method: Integration method. plot: If True, plot the state trajectories. """ dt = 1 tf = 10 seed = 10 system = OrnsteinUhlenbeckSystem() scSim, stateModel, integratorObject, stateLogger = getBasiliskSim(method, dt, system.x0, system.f, system.g, seed) scSim.ConfigureStopTime( macros.sec2nano(tf) ) scSim.ExecuteSimulation() tBasilisk = macros.NANO2SEC* stateLogger.times() xBasilisk = stateLogger.x if method == "EulerMayurama": txPython = eulerMayuramaIntegrate(system.f, system.g, system.x0, dt, tf, seed) else: txPython = srk2Integrate(system.f, system.g, system.x0, dt, tf, seed, **SRK_COEFFICIENTS[method]) tPython = txPython[:,0] xPython = txPython[:,1] if plot: fig, ax = plt.subplots() ax.plot(tBasilisk, xBasilisk, label="Basilisk") ax.plot(tPython, xPython, ls="--", label="Python") ax.legend() ax.set_ylabel("x") ax.set_xlabel("t") plt.show() # The Python and Basilisk implementation should behave identially numpy.testing.assert_allclose( xBasilisk[-1], xPython[-1], atol=1e-12, rtol=0 )
[docs] @pytest.mark.skipif(not couldImportMujoco, reason="Compiled Basilisk without --mujoco") @pytest.mark.parametrize("method", METHODS) def test_ouComplex(method: Method, plot: bool = False): """ Test integration of a two-dimensional coupled Ornstein-Uhlenbeck process. Compares Basilisk and Python implementations for a single realization. Args: method: Integration method. plot: If True, plot the state trajectories. """ dt = 1 tf = 10 seed = 10 system = ComplexOrnsteinUhlenbeckSystem() scSim, stateModel, integratorObject, stateLogger = getBasiliskSim(method, dt, system.x0, system.f, system.g, seed) scSim.ConfigureStopTime( macros.sec2nano(tf) ) scSim.ExecuteSimulation() tBasilisk = macros.NANO2SEC* stateLogger.times() xBasilisk = stateLogger.x if method == "EulerMayurama": txPython = eulerMayuramaIntegrate(system.f, system.g, system.x0, dt, tf, seed) else: txPython = srk2Integrate(system.f, system.g, system.x0, dt, tf, seed, **SRK_COEFFICIENTS[method]) tPython = txPython[:,0] xPython = txPython[:,1:] if plot: fig, axs = plt.subplots(2, sharex=True) for i, ax in enumerate(axs): ax.plot(tBasilisk, xBasilisk[:,i], label="Basilisk") ax.plot(tPython, xPython[:,i], ls="--", label="Python") ax.legend() ax.set_ylabel("xy"[i]) ax.set_xlabel("t") plt.show() # The Python and Basilisk implementation should behave identially numpy.testing.assert_allclose( xBasilisk[-1,:], xPython[-1,:], atol=1e-12, rtol=0 )
[docs] @pytest.mark.skipif(not couldImportMujoco, reason="Compiled Basilisk without --mujoco") @pytest.mark.parametrize("method", METHODS) def test_example1(method: Method, plot: bool = False): """ Test Example 1 system from Tang & Xiao (2017) for all integrator methods. Compares Basilisk and Python implementations for a single realization. Args: method: Integration method. plot: If True, plot the state trajectories. """ dt = 1 tf = 10 seed = 10 system = Example1System() scSim, stateModel, integratorObject, stateLogger = getBasiliskSim(method, dt, system.x0, system.f, system.g, seed) scSim.ConfigureStopTime( macros.sec2nano(tf) ) scSim.ExecuteSimulation() tBasilisk = macros.NANO2SEC* stateLogger.times() xBasilisk = stateLogger.x if method == "EulerMayurama": txPython = eulerMayuramaIntegrate(system.f, system.g, system.x0, dt, tf, seed) else: txPython = srk2Integrate(system.f, system.g, system.x0, dt, tf, seed, **SRK_COEFFICIENTS[method]) tPython = txPython[:,0] xPython = txPython[:,1:] if plot: fig, axs = plt.subplots(2, sharex=True) for i, ax in enumerate(axs): ax.plot(tBasilisk, xBasilisk[:,i], label="Basilisk") ax.plot(tPython, xPython[:,i], ls="--", label="Python") ax.legend() ax.set_ylabel(f"y{i+1}") ax.set_xlabel("t") plt.show() # The Python and Basilisk implementation should behave identially numpy.testing.assert_allclose( xBasilisk[-1,:], xPython[-1,:], atol=1e-12, rtol=0 )
[docs] @pytest.mark.skipif(not couldImportMujoco, reason="Compiled Basilisk without --mujoco") @pytest.mark.flaky(reruns=6) @pytest.mark.parametrize("method", METHODS) @pytest.mark.parametrize("figureOfMerit", ["mean", "variance"]) def test_validateOu( method: Method, figureOfMerit: Literal["mean", "variance"], tf: float = 0.1 ): """ Validate the weak accuracy of the integrators for the Ornstein-Uhlenbeck process. Compares the empirical mean or variance of the final state to the analytical value using multiple Monte Carlo batches. Args: method: Integration method. figureOfMerit: "mean" or "variance" to validate. """ dt = .05 system = OrnsteinUhlenbeckSystem() def basiliskTrajectory(): scSim, stateModel, integratorObject, stateLogger = getBasiliskSim(method, dt, system.x0, system.f, system.g, None) scSim.ConfigureStopTime( macros.sec2nano(tf) ) scSim.ExecuteSimulation() xBasilisk = stateLogger.x return np.array([xBasilisk[-1]]) # G is some differentiable function on the final state after # propagation # estimateGOnTrajectory should be the correct E[G(x(t))] if figureOfMerit == "mean": def G(arr: npt.NDArray[np.float64]) -> float: return arr[0] # E[G(x(t))] = E[x(t)] estimateGOnTrajectory = system.mean(tf)[0] elif figureOfMerit == "variance": def G(arr: npt.NDArray[np.float64]) -> float: return arr[0]**2 # E[G(x(t))] = E[x(t)**2] = Var(x(t)) + E[x(t)]**2 estimateGOnTrajectory = system.var(tf)[0] + system.mean(tf)[0]**2 err, varErr = estimateErrorAndEmpiricalVariance( basiliskTrajectory, G, estimateGOnTrajectory, M1 = 10, M2 = 100 ) twoSigma = 2*np.sqrt(varErr) print(method, figureOfMerit, "error", err, "+-", twoSigma) # We expect the error to be zero, but we allow some tolerance # given that the error is estimated with a certain variance np.testing.assert_allclose( err, 0, atol = twoSigma, rtol = 0 )
# when running in pytest, we use skipAssert=True, because the test # keeps failing for low tf and we can't afford a high tf at CI testing time
[docs] @pytest.mark.skipif(not couldImportMujoco, reason="Compiled Basilisk without --mujoco") @pytest.mark.flaky(reruns=6) @pytest.mark.parametrize("method", METHODS) def test_validateExample1(method: Method, tf: float = 0.1, skipAssert: bool = True): """ Validate the weak accuracy of the integrators for Example 1 from Tang & Xiao (2017). Compares the empirical variance of the final state to the analytical value using multiple Monte Carlo batches. Args: method: Integration method. """ dt = 2.**-3 system = Example1System() def basiliskTrajectory(): scSim, stateModel, integratorObject, stateLogger = getBasiliskSim(method, dt, system.x0, system.f, system.g, None) scSim.ConfigureStopTime( macros.sec2nano(tf) ) scSim.ExecuteSimulation() xBasilisk = stateLogger.x return xBasilisk[-1,:] # Use the same G and E[G(x(t))] as in Example 1 in the paper def G(arr: npt.NDArray[np.float64]) -> float: return arr[1]**2 estimateGOnTrajectory = 149/150*np.exp(-5/2*tf) +1/150*np.exp(-tf) err, varErr = estimateErrorAndEmpiricalVariance( basiliskTrajectory, G, estimateGOnTrajectory, M1 = 10, M2 = 100 ) twoSigma = 2*np.sqrt(varErr) print(method, "variance error", err, "+-", twoSigma) if not skipAssert: # We expect the error to be zero, but we allow some tolerance # given that the error is estimated with a certain variance np.testing.assert_allclose( err, 0, atol = twoSigma, rtol = 0 )
if __name__ == "__main__": pytest.main([__file__]) # run this test with a higher tf, enough to pass for method in METHODS: test_validateExample1(method, tf=5, skipAssert=False)