From 24e8364c49b7cacdf100976a39f92c49e1974449 Mon Sep 17 00:00:00 2001 From: khsiao-google Date: Thu, 6 Nov 2025 09:28:18 +0000 Subject: [PATCH] Add classic authentication --- bumble/controller.py | 53 +++++++++++++++++++++++++++++++++++++++++++- tests/device_test.py | 9 ++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/bumble/controller.py b/bumble/controller.py index 1153ec00..f0c29f64 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -50,6 +50,7 @@ HCI_Connection_Complete_Event, HCI_Connection_Request_Event, HCI_Disconnection_Complete_Event, + HCI_Link_Key_Request_Event, HCI_Encryption_Change_Event, HCI_LE_Advertising_Report_Event, HCI_LE_CIS_Established_Event, @@ -696,7 +697,7 @@ def on_link_cis_disconnected(self, cig_id: int, cis_id: int) -> None: ############################################################ # Classic link connections - ############################################################ + ############################################################ def on_classic_connection_request( self, peer_address: Address, link_type: int @@ -964,6 +965,56 @@ def on_hci_accept_connection_request_command( self.link.classic_accept_connection(self, command.bd_addr, command.role) return None + def on_classic_link_key_request_reply_command(self, command: hci.HCI_Link_Key_Request_Reply_Command) -> Optional[bytes]: + ''' + See Bluetooth spec Vol 4, Part E - 7.1.10 Link Key Request Reply command + ''' + bd_addr = ( + bytes(self._public_address) + if self._public_address is not None + else bytes(6) + ) + return bytes([HCI_SUCCESS]) + bd_addr + + def on_classic_link_key_request_negative_reply_command(self, command: hci.HCI_Link_Key_Request_Negative_Reply_Command) -> Optional[bytes]: + ''' + See Bluetooth spec Vol 4, Part E - 7.1.11 Link Key Request Negative Reply command + ''' + bd_addr = ( + bytes(self._public_address) + if self._public_address is not None + else bytes(6) + ) + return bytes([HCI_SUCCESS]) + bd_addr + + def on_classic_authentication_requested_command(self, command: hci.HCI_Authentication_Requested_Command) -> Optional[bytes]: + ''' + See Bluetooth spec Vol 4, Part E - 7.1.15 Authentication Requested command + ''' + if connection := self.find_classic_connection_by_handle( + command.connection_handle + ): + self.send_hci_packet( + HCI_Command_Status_Event( + status=HCI_SUCCESS, + num_hci_command_packets=1, + command_opcode=command.op_code, + ) + ) + + self.send_hci_packet( + HCI_Link_Key_Request_Event( + bd_addr=connection.peer_address + ) + ) + else: + logger.warning( + f'!!! no connection for handle 0x{command.connection_handle:04X}' + ) + return None + + + def on_hci_enhanced_setup_synchronous_connection_command( self, command: hci.HCI_Enhanced_Setup_Synchronous_Connection_Command ) -> Optional[bytes]: diff --git a/tests/device_test.py b/tests/device_test.py index d7717651..384d275e 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -41,6 +41,7 @@ HCI_CREATE_CONNECTION_COMMAND, HCI_SUCCESS, Address, + HCI_Authentication_Requested_Command, HCI_Command_Complete_Event, HCI_Command_Status_Event, HCI_Connection_Complete_Event, @@ -789,6 +790,14 @@ async def test_accept_classic_connection(roles: tuple[hci.Role, hci.Role]): assert devices.connections[1].role == roles[1] + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_secure_simple_pairing(): + devices = await TwoDevices.create_with_connection() + await devices[0].authenticate(devices.connections[0]) + + # ----------------------------------------------------------------------------- async def run_test_device(): await test_device_connect_parallel()