From 7e22cf2cddb4f6f6bb25c523e606043ce694691c Mon Sep 17 00:00:00 2001 From: Mr Date: Sat, 15 Mar 2025 18:32:18 +0000 Subject: [PATCH 1/3] added cli params --- .../LocateTracker/decrypt_locations.py | 88 ++++++++++++------- .../LocateTracker/location_request.py | 6 +- NovaApi/ListDevices/nbe_list_devices.py | 68 +++++++++----- SpotApi/CreateBleDevice/create_ble_device.py | 28 +++--- requirements.txt | 3 +- 5 files changed, 125 insertions(+), 68 deletions(-) diff --git a/NovaApi/ExecuteAction/LocateTracker/decrypt_locations.py b/NovaApi/ExecuteAction/LocateTracker/decrypt_locations.py index c0e7033..b4e4fd8 100644 --- a/NovaApi/ExecuteAction/LocateTracker/decrypt_locations.py +++ b/NovaApi/ExecuteAction/LocateTracker/decrypt_locations.py @@ -20,7 +20,7 @@ def create_google_maps_link(latitude, longitude): - try: + try: latitude = float(latitude) longitude = float(longitude) if not (-90 <= latitude <= 90 and -180 <= longitude <= 180): @@ -28,7 +28,7 @@ def create_google_maps_link(latitude, longitude): except ValueError as e: return f"Error: {e}" #more descriptive error message for the user base_url = "https://www.google.com/maps/search/?api=1" - query_params = f"query={latitude},{longitude}" + query_params = f"query={latitude},{longitude}" return f"{base_url}&{query_params}" @@ -67,7 +67,7 @@ def retrieve_identity_key(device_registration: DeviceRegistration) -> bytes: exit(1) -def decrypt_location_response_locations(device_update_protobuf): +def decrypt_location_response_locations(device_update_protobuf, batch_mode=False): device_registration = device_update_protobuf.deviceMetadata.information.deviceRegistration @@ -124,39 +124,67 @@ def decrypt_location_response_locations(device_update_protobuf): ) location_time_array.append(wrapped_location) - print("-" * 40) - print("[DecryptLocations] Decrypted Locations:") + if batch_mode is False: + print("-" * 40) + print("[DecryptLocations] Decrypted Locations:") - if not location_time_array: - print("No locations found.") - return + if not location_time_array: + print("No locations found.") + return - for loc in location_time_array: + for loc in location_time_array: - if loc.status == Common_pb2.Status.SEMANTIC: - print(f"Semantic Location: {loc.name}") + if loc.status == Common_pb2.Status.SEMANTIC: + print(f"Semantic Location: {loc.name}") - else: - proto_loc = DeviceUpdate_pb2.Location() - proto_loc.ParseFromString(loc.decrypted_location) - - latitude = proto_loc.latitude / 1e7 - longitude = proto_loc.longitude / 1e7 - altitude = proto_loc.altitude - - print(f"Latitude: {latitude}") - print(f"Longitude: {longitude}") - print(f"Altitude: {altitude}") - print(f"Google Maps Link: {create_google_maps_link(latitude, longitude)}") - - print(f"Time: {datetime.datetime.fromtimestamp(loc.time).strftime('%Y-%m-%d %H:%M:%S')}") - print(f"Status: {loc.status}") - print(f"Is Own Report: {loc.is_own_report}") - print("-" * 40) + else: + proto_loc = DeviceUpdate_pb2.Location() + proto_loc.ParseFromString(loc.decrypted_location) + + latitude = proto_loc.latitude / 1e7 + longitude = proto_loc.longitude / 1e7 + altitude = proto_loc.altitude + + print(f"Latitude: {latitude}") + print(f"Longitude: {longitude}") + print(f"Altitude: {altitude}") + print(f"Google Maps Link: {create_google_maps_link(latitude, longitude)}") + + print(f"Time: {datetime.datetime.fromtimestamp(loc.time).strftime('%Y-%m-%d %H:%M:%S')}") + print(f"Status: {loc.status}") + print(f"Is Own Report: {loc.is_own_report}") + print("-" * 40) + + else: + # batch mode + if not location_time_array: + print("JSON {}") + return + + for loc in location_time_array: + ts=datetime.datetime.fromtimestamp(loc.time).strftime('%Y-%m-%d %H:%M:%S') + + if loc.status == Common_pb2.Status.SEMANTIC: + name=loc.name # Semantic Location + lat=None + lon=None + alt=None + + else: + proto_loc = DeviceUpdate_pb2.Location() + proto_loc.ParseFromString(loc.decrypted_location) - pass + latitude = proto_loc.latitude / 1e7 + longitude = proto_loc.longitude / 1e7 + altitude = proto_loc.altitude + name='' + lat=latitude + lon=longitude + alt=altitude + # format https://www.traccar.org/osmand/ + print(f'JSON {{"timestamp":"{ts}", "lat":{lat}, "lon":{lon}, "altitude":{alt}, "posname":"{name}"}}') if __name__ == '__main__': res = parse_device_update_protobuf("") - decrypt_location_response_locations(res) \ No newline at end of file + decrypt_location_response_locations(res) diff --git a/NovaApi/ExecuteAction/LocateTracker/location_request.py b/NovaApi/ExecuteAction/LocateTracker/location_request.py index 9dec8a5..a0a4639 100644 --- a/NovaApi/ExecuteAction/LocateTracker/location_request.py +++ b/NovaApi/ExecuteAction/LocateTracker/location_request.py @@ -29,7 +29,7 @@ def create_location_request(canonic_device_id, fcm_registration_id, request_uuid return hex_payload -def get_location_data_for_device(canonic_device_id, name): +def get_location_data_for_device(canonic_device_id, name, batch_mode=False): print(f"[LocationRequest] Requesting location data for {name}...") @@ -53,7 +53,7 @@ def handle_location_response(response): while result is None: asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.1)) - decrypt_location_response_locations(result) + decrypt_location_response_locations(result, batch_mode=batch_mode) if __name__ == '__main__': - get_location_data_for_device(get_example_data("sample_canonic_device_id"), "Test") \ No newline at end of file + get_location_data_for_device(get_example_data("sample_canonic_device_id"), "Test") diff --git a/NovaApi/ListDevices/nbe_list_devices.py b/NovaApi/ListDevices/nbe_list_devices.py index 7590f31..0e22aa3 100644 --- a/NovaApi/ListDevices/nbe_list_devices.py +++ b/NovaApi/ListDevices/nbe_list_devices.py @@ -3,6 +3,8 @@ # Copyright © 2024 Leon Böttger. All rights reserved. # +import sys +import argparse import binascii from NovaApi.ExecuteAction.LocateTracker.location_request import get_location_data_for_device from NovaApi.nova_request import nova_request @@ -42,34 +44,56 @@ def create_device_list_request(): def list_devices(): print("Loading...") - result_hex = request_device_list() - device_list = parse_device_list_protobuf(result_hex) + got_args = len(sys.argv) > 1 + parser = argparse.ArgumentParser() + parser.add_argument('-add', '-a', dest='add', default=None, type=int, required=False, help='how many items to add') + parser.add_argument('-query','-q', dest='query', default=None, type=str, required=False, help='query location by name, comma separated list') + args = parser.parse_args() + # talk to server + result_hex = request_device_list() + device_list = parse_device_list_protobuf(result_hex) refresh_custom_trackers(device_list) canonic_ids = get_canonic_ids(device_list) - print("") - print("-" * 50) - print("Welcome to GoogleFindMyTools!") - print("-" * 50) - print("") - print("The following trackers are available:") - - for idx, (device_name, canonic_id) in enumerate(canonic_ids, start=1): - print(f"{idx}. {device_name}: {canonic_id}") - - selected_value = input("\nIf you want to see locations of a tracker, type the number of the tracker and press 'Enter'.\nIf you want to register a new ESP32- or Zephyr-based tracker, type 'r' and press 'Enter': ") - - if selected_value == 'r': - print("Loading...") - register_esp32() + if got_args is False: + # interactive mode + print("") + print("-" * 50) + print("Welcome to GoogleFindMyTools!") + print("-" * 50) + print("") + print("The following trackers are available:") + + for idx, (device_name, canonic_id) in enumerate(canonic_ids, start=1): + print(f"{idx}. {device_name}: {canonic_id}") + + selected_value = input("\nIf you want to see locations of a tracker, type the number of the tracker and press 'Enter'.\nIf you want to register a new ESP32- or Zephyr-based tracker, type 'r' and press 'Enter': ") + + if selected_value == 'r': + print("Loading...") + register_esp32() + else: + selected_idx = int(selected_value) - 1 + selected_device_name = canonic_ids[selected_idx][0] + selected_canonic_id = canonic_ids[selected_idx][1] + + get_location_data_for_device(selected_canonic_id, selected_device_name) else: - selected_idx = int(selected_value) - 1 - selected_device_name = canonic_ids[selected_idx][0] - selected_canonic_id = canonic_ids[selected_idx][1] - - get_location_data_for_device(selected_canonic_id, selected_device_name) + # batch mode + if args.add is not None: + assert (args.add>0) and (args.add<1000), 'add argument out of range' + for _ in range(args.add): + register_esp32(batch_mode=True) + + if args.query is not None: + # filter devices by name + assert len(args.query)>0, 'name argument out of range' + for nm in args.query.split(','): + for device_name, canonic_id in canonic_ids: + if nm == device_name: + get_location_data_for_device(canonic_id, device_name, batch_mode=True) if __name__ == '__main__': diff --git a/SpotApi/CreateBleDevice/create_ble_device.py b/SpotApi/CreateBleDevice/create_ble_device.py index 9466319..11ac834 100644 --- a/SpotApi/CreateBleDevice/create_ble_device.py +++ b/SpotApi/CreateBleDevice/create_ble_device.py @@ -16,19 +16,20 @@ from SpotApi.spot_request import spot_request -def register_esp32(): +def register_esp32(batch_mode=False): owner_key = get_owner_key() eik = secrets.token_bytes(32) eid = generate_eid(eik, 0) pair_date = int(time.time()) + name = "GFMT-" + secrets.token_hex(5) # phone app uses huge fonts, need to fit screen.. register_request = RegisterBleDeviceRequest() register_request.fastPairModelId = mcu_fast_pair_model_id # Description - register_request.description.userDefinedName = "GoogleFindMyTools µC" + register_request.description.userDefinedName = name register_request.description.deviceType = SpotDeviceType.DEVICE_TYPE_BEACON # Device Components Information @@ -42,7 +43,7 @@ def register_esp32(): register_request.capabilities.capableComponents = 1 # E2EE Registration - register_request.e2eePublicKeyRegistration.rotationExponent = 10 + register_request.e2eePublicKeyRegistration.rotationExponent = 10 # dictated by protocol register_request.e2eePublicKeyRegistration.pairingDate = pair_date # Encrypted User Secrets @@ -80,12 +81,15 @@ def register_esp32(): register_request.unwantedTrackingKey = ownerKeys.tracking_key bytes_data = register_request.SerializeToString() - spot_request("CreateBleDevice", bytes_data) - - print("Registered device successfully. Copy the Advertisement Key below. It will not be shown again.") - print("Afterward, go to the folder 'GoogleFindMyTools/ESP32Firmware' or 'GoogleFindMyTools/ZephyrFirmware' and follow the instructions in the README.md file.") - - print("+" + "-" * 78 + "+") - print("|" + " " * 19 + eid.hex() + " " * 19 + "|") - print("|" + " " * 30 + "Advertisement Key" + " " * 31 + "|") - print("+" + "-" * 78 + "+") \ No newline at end of file + resp = spot_request("CreateBleDevice", bytes_data) + + if batch_mode is False: + print("Registered device successfully. Copy the Advertisement Key below. It will not be shown again.") + print("Afterward, go to the folder 'GoogleFindMyTools/ESP32Firmware' or 'GoogleFindMyTools/ZephyrFirmware' and follow the instructions in the README.md file.") + + print("+" + "-" * 78 + "+") + print("|" + " " * 19 + eid.hex() + " " * 19 + "|") + print("|" + " " * 30 + "Advertisement Key" + " " * 31 + "|") + print("+" + "-" * 78 + "+") + else: + print('JSON {name:"%s", eid:"%s"}' % (name, eid.hex())) diff --git a/requirements.txt b/requirements.txt index 8ddccc4..e066cda 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,4 +14,5 @@ httpx>=0.28.0 h2>=4.1.0 setuptools>=75.6.0 aiohttp>=3.11.8 -http_ece>=1.1.0 \ No newline at end of file +http_ece>=1.1.0 +argparse>=1.4.0 From 68f8465ed6a8e3a49fde4642328f3501a7d9b3aa Mon Sep 17 00:00:00 2001 From: Mr Date: Sun, 16 Mar 2025 13:41:51 +0000 Subject: [PATCH 2/3] fixed json for new device --- SpotApi/CreateBleDevice/create_ble_device.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SpotApi/CreateBleDevice/create_ble_device.py b/SpotApi/CreateBleDevice/create_ble_device.py index 11ac834..e1d2d4d 100644 --- a/SpotApi/CreateBleDevice/create_ble_device.py +++ b/SpotApi/CreateBleDevice/create_ble_device.py @@ -92,4 +92,4 @@ def register_esp32(batch_mode=False): print("|" + " " * 30 + "Advertisement Key" + " " * 31 + "|") print("+" + "-" * 78 + "+") else: - print('JSON {name:"%s", eid:"%s"}' % (name, eid.hex())) + print('JSON {"name":"%s", "eid":"%s"}' % (name, eid.hex())) From 14428ed9ce67bdfce62e87a99ccee1bd4b7cd2d3 Mon Sep 17 00:00:00 2001 From: Mr Date: Sun, 16 Mar 2025 13:44:10 +0000 Subject: [PATCH 3/3] added web (REST) interface --- requirements.txt | 3 ++ webserver.py | 99 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 webserver.py diff --git a/requirements.txt b/requirements.txt index e066cda..1c09c65 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,6 @@ setuptools>=75.6.0 aiohttp>=3.11.8 http_ece>=1.1.0 argparse>=1.4.0 +Flask>=0.12.5 +Flask>=0.12.5 +requests>=2.0.0 diff --git a/webserver.py b/webserver.py new file mode 100644 index 0000000..372a27d --- /dev/null +++ b/webserver.py @@ -0,0 +1,99 @@ +import subprocess as sub +import json +from flask import Flask, request, jsonify +import requests + +PROG = 'timeout 10m python3 main.py'.split() + +app = Flask(__name__) + +def call_exe(scmd, cwd='.'): + """ calls external sw with all kind of runtime error suppression """ + assert isinstance(scmd, list) and len(scmd)>0 + print(" run", scmd) + p = sub.Popen(scmd, stdin=sub.PIPE, stdout=sub.PIPE, stderr=sub.STDOUT, shell=False, cwd=cwd) + out,err = p.communicate() + output = ((out or bytes()) + (err or bytes())).decode('ascii', 'ignore').replace('\r', '').strip() + return p.returncode, output.splitlines() + +def is_online(tries=3): + oUrl = 'https://google.com' + for retry in range(tries): + try: + resp = requests.get(oUrl, timeout=3*(1+retry)) + if resp.status_code == 200: + return True + except: + pass + return False + +@app.route('/get') +def get_coords(): + # comma sep device names + try: + names = request.args.get('names').strip() + assert len(names)>0 + if is_online() is False: + print('offline') + assert False + except: + return jsonify({'result':'error1'}) + + # run + retval, txt = call_exe(PROG+['-q', names]) + + # eval + if retval == 0: + points=[] + wasError = False + + for line in txt: + if 'error' in line.lower(): + print(line) + + if line.strip().startswith('JSON '): + try: + parsed = json.loads( line.replace('JSON ', '') ) + if 'lat' in parsed: + points.append(parsed) + except: + print("error parsing line", line) + wasError = True + + # no points because of error + if len(points)==0 and wasError: + return jsonify({'result':'error2', 'points':points}) + + # accept point with error + return jsonify({'result':'ok', 'points':points}) + + return jsonify({'result':'error3'}) + + +@app.route('/newkey') +def new_device(): + if is_online() is True: + # run + retval, txt = call_exe(PROG+['-a 1']) + + # eval + if retval == 0: + for line in txt: + if 'error' in line.lower(): + print(line) + + if line.strip().startswith('JSON '): + try: + parsed = json.loads( line.replace('JSON ', '') ) + parsed['result'] = 'ok' + return jsonify(parsed) # first is enough + except: + print("error parsing line", line) + + return jsonify({'result':'error'}) + +# entry point +if __name__ == "__main__": + PORT=10678 # port <1024 needs root + print(f"http://127.0.0.1:{PORT}/") + app.run(host='0.0.0.0', port=PORT, debug = True, use_reloader=False)