Source code for test_unsubscribe

#
#  Copyright (c) 2025, Autonomous Vehicle Systems Lab, University of Colorado at Boulder
#
#  Permission to use, copy, modify, and/or distribute this software for any
#  purpose with or without fee is hereby granted, provided that the above
#  copyright notice and this permission notice appear in all copies.
#
#  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
#  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
#  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
#  ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
#  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
#  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
#  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
import time
from typing import Union

from Basilisk.moduleTemplates import cModuleTemplate
from Basilisk.moduleTemplates import cppModuleTemplate
from Basilisk.architecture import messaging
from Basilisk.architecture import bskLogging

def assertConnectionWithValues(
    inMsg: Union[messaging.CModuleTemplateMsg_C, messaging.CModuleTemplateMsgReader],
    outMsg: Union[messaging.CModuleTemplateMsg_C, messaging.CModuleTemplateMsg]
):
    val = time.time() # unique value

    payload = messaging.CModuleTemplateMsgPayload()
    payload.dataVector = [val, 0, 0]
    outMsg.write(payload)

    if isinstance(inMsg, messaging.CModuleTemplateMsg_C):
        readPayload: messaging.CModuleTemplateMsgPayload = inMsg.read()
    else:
        readPayload: messaging.CModuleTemplateMsgPayload = inMsg()

    assert readPayload.dataVector[0] == val

[docs] def test_unsubscribe(): """ testing the unsubscribe functions of C and C++ messages """ # create modules cMod = cModuleTemplate.cModuleTemplate() cppMod = cppModuleTemplate.CppModuleTemplate() # check that unsubscribe is a noop for unlinked OnMsgs cMod.dataInMsg.unsubscribe() cppMod.dataInMsg.unsubscribe() # check that unsubscribe is noop for (C) OutMsg cMod.dataOutMsg.unsubscribe() # connect the input messages: # C -> Cpp # Cpp -> C cMod.dataInMsg.subscribeTo(cppMod.dataOutMsg) cppMod.dataInMsg.subscribeTo(cMod.dataOutMsg) # check that unsubscribe is noop for (C) OutMsg # even when connected to an InMsg cMod.dataOutMsg.unsubscribe() assert messaging.CModuleTemplateMsg_C_isLinked(cMod.dataInMsg) assert cppMod.dataInMsg.isLinked() assert messaging.CModuleTemplateMsg_C_isLinked(cMod.dataOutMsg) assertConnectionWithValues(cMod.dataInMsg, cppMod.dataOutMsg) assertConnectionWithValues(cppMod.dataInMsg, cMod.dataOutMsg) # check that unsubscribe works cMod.dataInMsg.unsubscribe() cppMod.dataInMsg.unsubscribe() assert not messaging.CModuleTemplateMsg_C_isLinked(cMod.dataInMsg) assert not cppMod.dataInMsg.isLinked() # try connecting differently: # C -> C # Cpp -> Cpp cMod.dataInMsg.subscribeTo(cMod.dataOutMsg) cppMod.dataInMsg.subscribeTo(cppMod.dataOutMsg) # check that unsubscribe is noop for (C) OutMsg # even when connected to an InMsg cMod.dataOutMsg.unsubscribe() assert messaging.CModuleTemplateMsg_C_isLinked(cMod.dataInMsg) assert cppMod.dataInMsg.isLinked() assertConnectionWithValues(cMod.dataInMsg, cMod.dataOutMsg) assertConnectionWithValues(cppMod.dataInMsg, cppMod.dataOutMsg) # check that unsubscribe works cMod.dataInMsg.unsubscribe() cppMod.dataInMsg.unsubscribe() assert not messaging.CModuleTemplateMsg_C_isLinked(cMod.dataInMsg) assert not cppMod.dataInMsg.isLinked()
if __name__ == "__main__": test_unsubscribe()