Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion bumble/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ def on_link_cis_disconnected(self, cig_id: int, cis_id: int) -> None:

############################################################
# Classic link connections
############################################################
############################################################

def send_lmp_packet(
self, receiver_address: hci.Address, packet: lmp.Packet
Expand Down Expand Up @@ -1174,6 +1174,56 @@ def on_hci_remote_name_request_command(

return None

def on_classic_link_key_request_reply_command(self, command: hci.HCI_Link_Key_Request_Reply_Command) -> Optional[bytes]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_hci_link_key_requestr_reply_command

Handler is gotten from f"on_{command.name}".

'''
See Bluetooth spec Vol 4, Part E - 7.1.10 Link Key Request Reply command
'''
bd_addr = (
bytes(self._public_address)
if self._public_address is not None
else bytes(6)
)
return bytes([HCI_SUCCESS]) + bd_addr

def on_classic_link_key_request_negative_reply_command(self, command: hci.HCI_Link_Key_Request_Negative_Reply_Command) -> Optional[bytes]:
'''
See Bluetooth spec Vol 4, Part E - 7.1.11 Link Key Request Negative Reply command
'''
bd_addr = (
bytes(self._public_address)
if self._public_address is not None
else bytes(6)
)
return bytes([HCI_SUCCESS]) + bd_addr

def on_classic_authentication_requested_command(self, command: hci.HCI_Authentication_Requested_Command) -> Optional[bytes]:
'''
See Bluetooth spec Vol 4, Part E - 7.1.15 Authentication Requested command
'''
if connection := self.find_classic_connection_by_handle(
command.connection_handle
):
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_SUCCESS,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)

self.send_hci_packet(
HCI_Link_Key_Request_Event(
bd_addr=connection.peer_address
)
)
else:
logger.warning(
f'!!! no connection for handle 0x{command.connection_handle:04X}'
)
return None



def on_hci_enhanced_setup_synchronous_connection_command(
self, command: hci.HCI_Enhanced_Setup_Synchronous_Connection_Command
) -> Optional[bytes]:
Expand Down
9 changes: 9 additions & 0 deletions tests/device_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
HCI_CREATE_CONNECTION_COMMAND,
HCI_SUCCESS,
Address,
HCI_Authentication_Requested_Command,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused

HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_Connection_Complete_Event,
Expand Down Expand Up @@ -789,6 +790,14 @@ async def test_accept_classic_connection(roles: tuple[hci.Role, hci.Role]):
assert devices.connections[1].role == roles[1]



# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_secure_simple_pairing():
devices = await TwoDevices.create_with_connection()
await devices[0].authenticate(devices.connections[0])


# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_remote_name_request():
Expand Down
Loading