Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 56 additions & 42 deletions homekit/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ def finish_pairing(pin):

return finish_pairing

def remove_pairing(self, alias, pairingId=None):
def remove_pairing(self, alias, pairingId=None, force=False):
"""
Remove a pairing between the controller and the accessory. The pairing data is delete on both ends, on the
accessory and the controller.
Expand All @@ -632,6 +632,7 @@ def remove_pairing(self, alias, pairingId=None):

:param alias: the controller's alias for the accessory
:param pairingId: the pairing id to be removed
:param force: remove the pairing even if the accessory cannot be reached
:raises AuthenticationError: if the controller isn't authenticated to the accessory.
:raises AccessoryNotFoundError: if the device can not be found via zeroconf
:raises UnknownError: on unknown errors
Expand All @@ -651,47 +652,60 @@ def remove_pairing(self, alias, pairingId=None):
tlv8.Entry(TlvTypes.Identifier, pairingIdToDelete.encode())
])

if connection_type == 'IP':
if not IP_TRANSPORT_SUPPORTED:
raise TransportNotSupportedError('IP')
session = IpSession(pairing_data)
response = session.post('/pairings', request_tlv, content_type='application/pairing+tlv8')
session.close()
data = response.read()
data = tlv8.decode(data, {
TlvTypes.State: tlv8.DataType.INTEGER,
TlvTypes.Error: tlv8.DataType.INTEGER

})
elif connection_type == 'BLE':
if not BLE_TRANSPORT_SUPPORTED:
raise TransportNotSupportedError('BLE')
inner = tlv8.encode([
tlv8.Entry(AdditionalParameterTypes.ParamReturnResponse, bytearray(b'\x01')),
tlv8.Entry(AdditionalParameterTypes.Value, request_tlv)
])

body = len(inner).to_bytes(length=2, byteorder='little') + inner

from .ble_impl.device import DeviceManager
manager = DeviceManager(self.ble_adapter)
device = manager.make_device(pairing_data['AccessoryMAC'])
device.connect()

logging.debug('resolved %d services', len(device.services))
pair_remove_char, pair_remove_char_id = find_characteristic_by_uuid(device, ServicesTypes.PAIRING_SERVICE,
CharacteristicsTypes.PAIRING_PAIRINGS)
logging.debug('setup char: %s %s', pair_remove_char, pair_remove_char.service.device)

session = BleSession(pairing_data, self.ble_adapter)
response = session.request(pair_remove_char, pair_remove_char_id, HapBleOpCodes.CHAR_WRITE, body)
data = tlv8.decode(response.first_by_id(AdditionalParameterTypes.Value).data, {
TlvTypes.State: tlv8.DataType.INTEGER,
TlvTypes.Error: tlv8.DataType.INTEGER

})
else:
raise Exception('not implemented (neither IP nor BLE)')
try:
if connection_type == 'IP':
if not IP_TRANSPORT_SUPPORTED:
raise TransportNotSupportedError('IP')
session = IpSession(pairing_data)
response = session.post('/pairings', request_tlv, content_type='application/pairing+tlv8')
session.close()
data = response.read()
data = tlv8.decode(data, {
TlvTypes.State: tlv8.DataType.INTEGER,
TlvTypes.Error: tlv8.DataType.INTEGER

})
elif connection_type == 'BLE':
if not BLE_TRANSPORT_SUPPORTED:
raise TransportNotSupportedError('BLE')
inner = tlv8.encode([
tlv8.Entry(AdditionalParameterTypes.ParamReturnResponse, bytearray(b'\x01')),
tlv8.Entry(AdditionalParameterTypes.Value, request_tlv)
])

body = len(inner).to_bytes(length=2, byteorder='little') + inner

from .ble_impl.device import DeviceManager
manager = DeviceManager(self.ble_adapter)
device = manager.make_device(pairing_data['AccessoryMAC'])
device.connect()

logging.debug('resolved %d services', len(device.services))
pair_remove_char, pair_remove_char_id = \
find_characteristic_by_uuid(device,
ServicesTypes.PAIRING_SERVICE,
CharacteristicsTypes.PAIRING_PAIRINGS)
logging.debug('setup char: %s %s', pair_remove_char, pair_remove_char.service.device)

session = BleSession(pairing_data, self.ble_adapter)
response = session.request(pair_remove_char, pair_remove_char_id, HapBleOpCodes.CHAR_WRITE, body)
data = tlv8.decode(response.first_by_id(AdditionalParameterTypes.Value).data, {
TlvTypes.State: tlv8.DataType.INTEGER,
TlvTypes.Error: tlv8.DataType.INTEGER
})
else:
raise NotImplementedError('not implemented (neither IP nor BLE)')
except NotImplementedError:
raise
except Exception:
if not force:
raise

logging.debug('error sending unpair request, remove without accessory notification')
data = tlv8.EntryList([
tlv8.Entry(TlvTypes.State, States.M2),
tlv8.Entry(TlvTypes.Error, 0)
])

# act upon the response (the same is returned for IP and BLE accessories)
# handle the result, spec says, if it has only one entry with state == M2 we unpaired, else its an error.
Expand Down
5 changes: 5 additions & 0 deletions homekit/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,8 @@ def __init__(self, transport):
class DisconnectedControllerError(HomeKitException):
def __init__(self):
Exception.__init__(self, 'Controller has passed away')


class NotImplementedError(HomeKitException):
def __init__(self, message):
Exception.__init__(self, message)
4 changes: 3 additions & 1 deletion homekit/remove_pairing.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def setup_args_parser():
help='this pairing ID identifies the controller who should be removed from accessory')
parser.add_argument('--adapter', action='store', dest='adapter', default='hci0',
help='the bluetooth adapter to be used (defaults to hci0)')
parser.add_argument('--force', action='store_true', dest='force',
help='force removing of pairing in pairing data even if accessory is not reachable')
add_log_arguments(parser)
return parser.parse_args()

Expand All @@ -46,6 +48,6 @@ def setup_args_parser():
print('"{a}" is no known alias'.format(a=args.alias))
exit(-1)

controller.remove_pairing(args.alias, args.controllerPairingId)
controller.remove_pairing(args.alias, args.controllerPairingId, args.force)
controller.save_data(args.file)
print('Pairing for "{a}" was removed.'.format(a=args.alias))