diff --git a/bumble/controller.py b/bumble/controller.py index 4f687956..494a6437 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -682,7 +682,7 @@ def on_link_cis_disconnected(self, cig_id: int, cis_id: int) -> None: ############################################################ # Classic link connections - ############################################################ + ############################################################ def send_lmp_packet( self, receiver_address: hci.Address, packet: lmp.Packet @@ -1174,6 +1174,56 @@ def on_hci_remote_name_request_command( return None + def on_classic_link_key_request_reply_command(self, command: hci.HCI_Link_Key_Request_Reply_Command) -> Optional[bytes]: + ''' + See Bluetooth spec Vol 4, Part E - 7.1.10 Link Key Request Reply command + ''' + bd_addr = ( + bytes(self._public_address) + if self._public_address is not None + else bytes(6) + ) + return bytes([HCI_SUCCESS]) + bd_addr + + def on_classic_link_key_request_negative_reply_command(self, command: hci.HCI_Link_Key_Request_Negative_Reply_Command) -> Optional[bytes]: + ''' + See Bluetooth spec Vol 4, Part E - 7.1.11 Link Key Request Negative Reply command + ''' + bd_addr = ( + bytes(self._public_address) + if self._public_address is not None + else bytes(6) + ) + return bytes([HCI_SUCCESS]) + bd_addr + + def on_classic_authentication_requested_command(self, command: hci.HCI_Authentication_Requested_Command) -> Optional[bytes]: + ''' + See Bluetooth spec Vol 4, Part E - 7.1.15 Authentication Requested command + ''' + if connection := self.find_classic_connection_by_handle( + command.connection_handle + ): + self.send_hci_packet( + HCI_Command_Status_Event( + status=HCI_SUCCESS, + num_hci_command_packets=1, + command_opcode=command.op_code, + ) + ) + + self.send_hci_packet( + HCI_Link_Key_Request_Event( + bd_addr=connection.peer_address + ) + ) + else: + logger.warning( + f'!!! no connection for handle 0x{command.connection_handle:04X}' + ) + return None + + + def on_hci_enhanced_setup_synchronous_connection_command( self, command: hci.HCI_Enhanced_Setup_Synchronous_Connection_Command ) -> Optional[bytes]: diff --git a/tests/device_test.py b/tests/device_test.py index 3fd3d20a..5ec4e0b4 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -41,6 +41,7 @@ HCI_CREATE_CONNECTION_COMMAND, HCI_SUCCESS, Address, + HCI_Authentication_Requested_Command, HCI_Command_Complete_Event, HCI_Command_Status_Event, HCI_Connection_Complete_Event, @@ -789,6 +790,14 @@ async def test_accept_classic_connection(roles: tuple[hci.Role, hci.Role]): assert devices.connections[1].role == roles[1] + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_secure_simple_pairing(): + devices = await TwoDevices.create_with_connection() + await devices[0].authenticate(devices.connections[0]) + + # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_remote_name_request():