diff --git a/can/bridge.py b/can/bridge.py new file mode 100644 index 000000000..57ebb368d --- /dev/null +++ b/can/bridge.py @@ -0,0 +1,66 @@ +""" +Creates a bridge between two CAN buses. + +This will connect to two CAN buses. Messages received on one +bus will be sent to the other bus and vice versa. +""" + +import argparse +import errno +import sys +import time +from datetime import datetime +from typing import Final + +from can.cli import add_bus_arguments, create_bus_from_namespace +from can.listener import RedirectReader +from can.notifier import Notifier + +BRIDGE_DESCRIPTION: Final = """\ +Bridge two CAN buses. + +Both can buses will be connected so that messages from bus1 will be sent on +bus2 and messages from bus2 will be sent to bus1. +""" +BUS_1_PREFIX: Final = "bus1" +BUS_2_PREFIX: Final = "bus2" + + +def _parse_bridge_args(args: list[str]) -> argparse.Namespace: + """Parse command line arguments for bridge script.""" + + parser = argparse.ArgumentParser(description=BRIDGE_DESCRIPTION) + add_bus_arguments(parser, prefix=BUS_1_PREFIX, group_title="Bus 1 arguments") + add_bus_arguments(parser, prefix=BUS_2_PREFIX, group_title="Bus 2 arguments") + + # print help message when no arguments were given + if not args: + parser.print_help(sys.stderr) + raise SystemExit(errno.EINVAL) + + results, _unknown_args = parser.parse_known_args(args) + return results + + +def main() -> None: + results = _parse_bridge_args(sys.argv[1:]) + + with ( + create_bus_from_namespace(results, prefix=BUS_1_PREFIX) as bus1, + create_bus_from_namespace(results, prefix=BUS_2_PREFIX) as bus2, + ): + reader1_to_2 = RedirectReader(bus2) + reader2_to_1 = RedirectReader(bus1) + with Notifier(bus1, [reader1_to_2]), Notifier(bus2, [reader2_to_1]): + print(f"CAN Bridge (Started on {datetime.now()})") + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + pass + + print(f"CAN Bridge (Stopped on {datetime.now()})") + + +if __name__ == "__main__": + main() diff --git a/doc/scripts.rst b/doc/scripts.rst index 2d59b7528..1d730a74b 100644 --- a/doc/scripts.rst +++ b/doc/scripts.rst @@ -57,6 +57,15 @@ The full usage page can be seen below: :shell: +can.bridge +---------- + +A small application that can be used to connect two can buses: + +.. command-output:: python -m can.bridge -h + :shell: + + can.logconvert -------------- diff --git a/pyproject.toml b/pyproject.toml index a6a7f38c4..8fd77de21 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,7 @@ can_logconvert = "can.logconvert:main" can_logger = "can.logger:main" can_player = "can.player:main" can_viewer = "can.viewer:main" +can_bridge = "can.bridge:main" [project.urls] homepage = "https://github.com/hardbyte/python-can" @@ -186,6 +187,7 @@ ignore = [ "can/cli.py" = ["T20"] # flake8-print "can/logger.py" = ["T20"] # flake8-print "can/player.py" = ["T20"] # flake8-print +"can/bridge.py" = ["T20"] # flake8-print "can/viewer.py" = ["T20"] # flake8-print "examples/*" = ["T20"] # flake8-print diff --git a/test/test_bridge.py b/test/test_bridge.py new file mode 100644 index 000000000..ee41bd949 --- /dev/null +++ b/test/test_bridge.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python + +""" +This module tests the functions inside of bridge.py +""" + +import random +import string +import sys +import threading +import time +from time import sleep as real_sleep +import unittest.mock + +import can +import can.bridge +from can.interfaces import virtual + +from .message_helper import ComparingMessagesTestCase + + +class TestBridgeScriptModule(unittest.TestCase, ComparingMessagesTestCase): + + TIMEOUT = 3.0 + + def __init__(self, *args, **kwargs): + unittest.TestCase.__init__(self, *args, **kwargs) + ComparingMessagesTestCase.__init__( + self, + allowed_timestamp_delta=None, + preserves_channel=False, + ) + + def setUp(self) -> None: + self.stop_event = threading.Event() + + self.channel1 = "".join(random.choices(string.ascii_letters, k=8)) + self.channel2 = "".join(random.choices(string.ascii_letters, k=8)) + + self.cli_args = [ + "--bus1-interface", + "virtual", + "--bus1-channel", + self.channel1, + "--bus2-interface", + "virtual", + "--bus2-channel", + self.channel2, + ] + + self.testmsg = can.Message( + arbitration_id=0xC0FFEE, data=[0, 25, 0, 1, 3, 1, 4, 1], is_extended_id=True + ) + + def fake_sleep(self, duration): + """A fake replacement for time.sleep that checks periodically + whether self.stop_event is set, and raises KeyboardInterrupt + if so. + + This allows tests to simulate an interrupt (like Ctrl+C) + during long sleeps, in a controlled and responsive way. + """ + interval = 0.05 # Small interval for responsiveness + t_wakeup = time.perf_counter() + duration + while time.perf_counter() < t_wakeup: + if self.stop_event.is_set(): + raise KeyboardInterrupt("Simulated interrupt from fake_sleep") + real_sleep(interval) + + def test_bridge(self): + with ( + unittest.mock.patch("can.bridge.time.sleep", new=self.fake_sleep), + unittest.mock.patch("can.bridge.sys.argv", [sys.argv[0], *self.cli_args]), + ): + # start script + thread = threading.Thread(target=can.bridge.main) + thread.start() + + # wait until script instantiates virtual buses + t0 = time.perf_counter() + while True: + with virtual.channels_lock: + if ( + self.channel1 in virtual.channels + and self.channel2 in virtual.channels + ): + break + if time.perf_counter() > t0 + 2.0: + raise TimeoutError("Bridge script did not create virtual buses") + real_sleep(0.2) + + # create buses with the same channels as in scripts + with ( + can.interfaces.virtual.VirtualBus(self.channel1) as bus1, + can.interfaces.virtual.VirtualBus(self.channel2) as bus2, + ): + # send test message to bus1, it should be received on bus2 + bus1.send(self.testmsg) + recv_msg = bus2.recv(self.TIMEOUT) + self.assertMessageEqual(self.testmsg, recv_msg) + + # assert that both buses are empty + self.assertIsNone(bus1.recv(0)) + self.assertIsNone(bus2.recv(0)) + + # send test message to bus2, it should be received on bus1 + bus2.send(self.testmsg) + recv_msg = bus1.recv(self.TIMEOUT) + self.assertMessageEqual(self.testmsg, recv_msg) + + # assert that both buses are empty + self.assertIsNone(bus1.recv(0)) + self.assertIsNone(bus2.recv(0)) + + # stop the bridge script + self.stop_event.set() + thread.join() + + # assert that the virtual buses were closed + with virtual.channels_lock: + self.assertNotIn(self.channel1, virtual.channels) + self.assertNotIn(self.channel2, virtual.channels) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_scripts.py b/test/test_scripts.py index 9d8c059cf..c1a6c082d 100644 --- a/test/test_scripts.py +++ b/test/test_scripts.py @@ -98,6 +98,20 @@ def _import(self): return module +class TestBridgeScript(CanScriptTest): + def _commands(self): + commands = [ + "python -m can.bridge --help", + "can_bridge --help", + ] + return commands + + def _import(self): + import can.bridge as module + + return module + + class TestLogconvertScript(CanScriptTest): def _commands(self): commands = [