From 787a344a2349d00280f1cc89c2e05e7523b6e5ff Mon Sep 17 00:00:00 2001 From: Andrew Ransom Date: Tue, 24 Jun 2025 20:18:39 +1000 Subject: [PATCH] Updated webtime.py to fix crash caused by omission of seconds by the user in the web console. All time inputs are now in format HH:MM --- webtime.py | 438 ++++++++++++++++++++++++++++------------------------- 1 file changed, 232 insertions(+), 206 deletions(-) diff --git a/webtime.py b/webtime.py index cb08b49..aca7331 100644 --- a/webtime.py +++ b/webtime.py @@ -1,245 +1,271 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -from machine import Pin, RTC, reset import time -from math import ceil, floor import network -import secrets import urequests +import uasyncio as asyncio +import os +import secrets # ← Ensure this is imported + +from machine import Pin, RTC, reset +from microdot import Microdot, Response -worldtimeurl = "https://timeapi.io/api/TimeZone/zone?timeZone=Australia/Canberra" # Time based on timezone -pulsefrequency = 60 # Pulse frequency in seconds -wifi_retry_interval = 600 # 10 minutes between WiFi reconnection attempts +# ————— Configuration ————— +worldtimeurl = "https://timeapi.io/api/TimeZone/zone?timeZone=Australia/Canberra" +pulsefrequency = 60 # seconds per pulse = 1 minute +wifi_retry_interval = 600 # retry Wi-Fi every 10 minutes +PICO_IP_SUBNET = "192.168.50." + +# ————— Startup Delay ————— time.sleep(5) +print("=== main.py loaded ===") + +# ————— Helper Functions ————— -# Helper function to format time as human-readable string def format_time(t): year, month, day, hour, minute, second, *_ = t return f"{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}" -# Function to print GMT and local time -def print_gmt_and_local_time(worldtimeurl): - # Print GMT time - gmt_time = time.gmtime() # Get the GMT time - print(f"GMT time: {format_time(gmt_time)} (UTC)") - - # Fetch local time from timezone API +def print_gmt_and_local_time(url): + print("[DEBUG] print_gmt_and_local_time()") + print(" GMT time:", format_time(time.gmtime()), "(UTC)") try: - print(f"Fetching local time from: {worldtimeurl}") - response = urequests.get(worldtimeurl) - - if response.status_code != 200: - print(f"Failed to fetch time. HTTP Status Code: {response.status_code}") + resp = urequests.get(url) + if resp.status_code != 200: + print(" Failed to fetch local time (status", resp.status_code, ")") return - - parsed = response.json() - datetime_str = str(parsed["currentLocalTime"]) - timezone_name = parsed["timeZone"] - - # Parse local time string - year = int(datetime_str[0:4]) - month = int(datetime_str[5:7]) - day = int(datetime_str[8:10]) - hour = int(datetime_str[11:13]) - minute = int(datetime_str[14:16]) - second = int(datetime_str[17:19]) - - local_time = (year, month, day, hour, minute, second, 0, 0) - print(f"Local time: {format_time(local_time)} ({timezone_name})") - - except OSError as e: - print(f"Network error occurred: {e}") - except ValueError as e: - print(f"JSON parsing error: {e}") + data = resp.json() + dt = data["currentLocalTime"] + tz = data["timeZone"] + y = int(dt[0:4]); mo = int(dt[5:7]); d = int(dt[8:10]) + h = int(dt[11:13]); mi = int(dt[14:16]); s = int(dt[17:19]) + print(" Local time:", format_time((y,mo,d,h,mi,s,0,0)), f"({tz})") except Exception as e: - print(f"Unexpected error fetching local time: {e}") + print(" Error fetching local time:", e) -# WiFi connection function with retry logic -def set_time(worldtimeurl, wlan): +def set_time(url, wlan): + print("[DEBUG] set_time() start") wlan.connect(secrets.SSID, secrets.PASSWORD) - retry_count = 0 - max_retries = 10 - while not wlan.isconnected() and retry_count < max_retries: + retries = 0 + while not wlan.isconnected() and retries < 10: time.sleep(2) - retry_count += 1 - print(f"Not connecting to WiFi, retry {retry_count}/{max_retries}\n") - + retries += 1 + print(f"[DEBUG] Wi-Fi retry {retries}/10") if not wlan.isconnected(): - print("Failed to connect to WiFi after maximum retries.") - return False # WiFi connection failed - + print("[ERROR] Could not connect to Wi-Fi") + return False ip = wlan.ifconfig()[0] - netw = secrets.SSID - print(f'Connected to {netw} on {ip}') - + print(f"[DEBUG] Connected to Wi-Fi, IP = {ip}") try: - # Fetch and print GMT and local time after connecting to WiFi - print_gmt_and_local_time(worldtimeurl) - - # Fetch time data from API - response = urequests.get(worldtimeurl) - parsed = response.json() - datetime_str = str(parsed["currentLocalTime"]) - print(f"Received time: {datetime_str}") - - # Parse the time string - year = int(datetime_str[0:4]) - month = int(datetime_str[5:7]) - day = int(datetime_str[8:10]) - hour = int(datetime_str[11:13]) - minute = int(datetime_str[14:16]) - second = int(datetime_str[17:19]) - - # Update internal RTC - RTC().datetime((year, month, day, 0, hour, minute, second, 0)) - print("RTC updated\n") + print_gmt_and_local_time(url) + resp = urequests.get(url) + data = resp.json() + dt = data["currentLocalTime"] + y = int(dt[0:4]); mo = int(dt[5:7]); d = int(dt[8:10]) + h = int(dt[11:13]); mi = int(dt[14:16]); s = int(dt[17:19]) + RTC().datetime((y, mo, d, 0, h, mi, s, 0)) + print("[DEBUG] RTC updated") except Exception as e: - print(f"Error fetching or updating time: {e}") + print("[ERROR] Error setting RTC:", e) return False - - # Disconnect from WiFi - wlan.disconnect() - return True - -# Takes a single digit integer and turns it into a two digit string -def twodigits(digit): - digitstring = str(digit) - if len(digitstring) == 1: - digitstring = "0" + digitstring - return digitstring - -def pulsetoclock(lasttime, a, b): - print('PULSE') - # Reverse polarity from the last pulse - a = not a - b = not b - print(f"Polarity: {a}, {b}") - - # Trigger clock pulses - clock1(int(a)) - clock2(int(b)) - - # Pulse the onboard LED during the clock pulse - led = Pin("LED", Pin.OUT) - led.on() # Turn the LED on - time.sleep(1) # 1 second pulse (adjust if necessary) - clock1(0) - clock2(0) - led.off() # Turn the LED off after the pulse - - # Split time into components - lasttimehour, lasttimemin, lasttimesecs = map(int, lasttime.split(':')) - - # Increment time by pulse frequency - delta = lasttimesecs + pulsefrequency - inctimesecs = delta % 60 - inctimemin = (lasttimemin + (delta // 60)) % 60 - inctimehour = (lasttimehour + (lasttimemin + (delta // 60)) // 60) % 12 - - newtime = f"{twodigits(inctimehour)}:{twodigits(inctimemin)}:{twodigits(inctimesecs)}" - print(newtime) - - # Save new time and polarity to the file + return True # keep Wi-Fi up for web server + +def twodigits(n): + s = str(n) + return s if len(s) == 2 else "0" + s + +def pulsessince12(timestr): + parts = timestr.split(":") + if len(parts) < 2: + raise ValueError("Invalid time string") + h = int(parts[0]); m = int(parts[1]) + seconds = (h % 12) * 3600 + m * 60 + return seconds // pulsefrequency + +def pulsetoclock(last_hm, a, b): + print(f"[DEBUG] pulsetoclock() last='{last_hm}' a={a} b={b}") + a = not a; b = not b + clock1(int(a)); clock2(int(b)) + led = Pin("LED", Pin.OUT); led.on() + time.sleep(1) + clock1(0); clock2(0); led.off() + h, m = map(int, last_hm.split(":")) + m += 1 + h = (h + m // 60) % 12 + m %= 60 + new_hm = f"{twodigits(h)}:{twodigits(m)}" try: - with open("lastpulseat.txt", "w+") as file: - strngtofile = f"{newtime}\t{a}\t{b}" - file.write(strngtofile) + with open("lastpulseat.txt", "w") as f: + f.write(f"{new_hm}\t{a}\t{b}") except Exception as e: - print(f"Error writing to file: {e}") - - # Dignified little sleep so we don't upset the clock mechanism + print("[ERROR] writing lastpulseat.txt:", e) time.sleep(0.5) - return + print(f"[DEBUG] pulsetoclock() new='{new_hm}' a={a} b={b}") + return new_hm, a, b -def pulsessince12(timestring): - breakuptime = timestring.split(":") - secondssince12 = (int(breakuptime[0]) % 12) * 3600 + int(breakuptime[1]) * 60 + int(breakuptime[2]) - pulses = int(secondssince12 / pulsefrequency) - return pulses - -def calcoffset(timenow): +def calcoffset(current_hm): + # Only log errors or bootstrap; normal zero-offsets are silent try: - with open('lastpulseat.txt', "r") as f: - string = f.read().split('\t') - a = (string[1] == 'True') - b = (string[2] == 'True') - lastpulseat = string[0] - lastpulse = pulsessince12(lastpulseat) - except FileNotFoundError: - print('File does not exist. Assuming this is the first run') - with open('firstruntime.txt', "r") as f: - initialstring = f.read() - lastpulseat = initialstring - lastpulse = pulsessince12(initialstring) - a = True # Adjust based on clock wiring - b = False - except Exception as e: - print(f"Error reading pulse file: {e}") - return None, None, None, None - - rtcpulsessince12 = pulsessince12(timenow) - offset = rtcpulsessince12 - lastpulse - return offset, lastpulseat, a, b - -# These are the pins where you toggle polarity to advance the clock + data = open("lastpulseat.txt").read().strip().split("\t") + last_hm, a_str, b_str = data + a = a_str.lower() == "true" + b = b_str.lower() == "true" + last_p = pulsessince12(last_hm) + except OSError: + print("[DEBUG] Bootstrapping lastpulseat.txt from firstruntime.txt") + try: + last_hm = open("firstruntime.txt").read().strip() + except OSError: + print("[ERROR] firstruntime.txt missing") + return None, None, None, None + last_p = pulsessince12(last_hm) + a, b = False, True + try: + with open("lastpulseat.txt", "w") as f: + f.write(f"{last_hm}\t{a}\t{b}") + except Exception as e: + print("[ERROR] bootstrapping lastpulseat.txt:", e) + current_p = pulsessince12(current_hm) + offset = current_p - last_p + return offset, last_hm, a, b + +# ————— GPIO Setup ————— clock2 = Pin(14, Pin.OUT, value=0) clock1 = Pin(13, Pin.OUT, value=0) -#---------------- MAIN LOGIC -def main(): - led = Pin("LED", Pin.OUT) - led.on() - time.sleep(1) - led.off() - - print("Startup. RTC reads:") - print(time.gmtime()) - - print('Connecting to internet and getting time') - - wlan = network.WLAN(network.STA_IF) - wlan.active(True) - - wifi_failed = False - - if not set_time(worldtimeurl, wlan): - print("Failed to sync time. Running based on RTC.") - wifi_failed = True - - # Main loop - last_wifi_attempt = time.time() +# ————— Web Server (Microdot) ————— +Response.default_content_type = "text/html" +app = Microdot() + +@app.before_request +def restrict(request): + client_ip = request.client_addr[0] + allowed = client_ip.startswith(PICO_IP_SUBNET) + print(f"[DEBUG] Request from {client_ip}, allowed={allowed}") + if not allowed: + return Response("Forbidden", status_code=403) + +@app.route('/') +def index(request): + print("[DEBUG] GET /") + now = time.localtime() + current_hm = f"{twodigits(now[3])}:{twodigits(now[4])}" + try: + base = open("firstruntime.txt").read().strip() + except OSError: + base = "(not set)" + return f""" + + Clock Control + +

Clock Control Panel

+

Current Time: {current_hm}

+

Baseline: {base}

+
+ + +
+
+ +
+
+ +
+ + +""" + +@app.post('/sync') +def sync_clock(request): + new = request.form.get('initial_time') + print(f"[DEBUG] POST /sync initial_time={new}") + if not new: + return Response("No time provided.", 400) + parts = new.split(":") + try: + h, m = map(int, parts) + assert 0 <= h < 24 and 0 <= m < 60 + except: + return Response("Invalid time. Use HH:MM.", 400) + try: + with open("firstruntime.txt", "w") as f: + f.write(new) + except Exception as e: + return Response(f"Error writing baseline: {e}", 500) + try: + os.remove("lastpulseat.txt") + except OSError: + pass + now = time.localtime() + now_hm = f"{twodigits(now[3])}:{twodigits(now[4])}" + offset, last_hm, a, b = calcoffset(now_hm) + if offset is None: + return Response("Synchronization failed.", 500) + if offset != 0: + print(f"[DEBUG] sync_clock: offset={offset}, pulsing...") + for _ in range(max(0, offset)): + last_hm, a, b = pulsetoclock(last_hm, a, b) + return index(request) + +@app.post('/advance1') +def advance_one(request): + print("[DEBUG] POST /advance1") + try: + last_hm, a_str, b_str = open("lastpulseat.txt").read().strip().split("\t") + a = a_str.lower() == "true" + b = b_str.lower() == "true" + except: + try: + last_hm = open("firstruntime.txt").read().strip() + except: + return Response("No baseline to advance.", 500) + a, b = False, True + pulsetoclock(last_hm, a, b) + return index(request) + +@app.post('/advance5') +def advance_five(request): + print("[DEBUG] POST /advance5") + for _ in range(5): + advance_one(request) + return index(request) + +async def clock_loop(): + print("[DEBUG] clock_loop starting") + global wlan, wifi_failed, last_wifi_attempt while True: - # Get RTC time - rtctimestring = f"{twodigits(time.localtime()[3])}:{twodigits(time.localtime()[4])}:{twodigits(time.localtime()[5])}" - - # Reset daily at 03:00:00 - if rtctimestring == "03:00:00": - print("Daily reset triggered.") - machine.reset() - - # Calculate offset by comparing last pulse time with current RTC - offset, lasttime, a, b = calcoffset(rtctimestring) - if offset is None: - print("Error in calculating offset, skipping this cycle.") - continue - - # If clock needs adjustment - if offset < -60 * 60 / pulsefrequency or offset > 0: - pulsetoclock(lasttime, a, b) - - # Periodically retry WiFi connection if it previously failed + now = time.localtime() + rtc_hm = f"{twodigits(now[3])}:{twodigits(now[4])}" + offset, last_hm, a, b = calcoffset(rtc_hm) + if offset is not None and offset != 0: + print(f"[DEBUG] clock_loop: offset={offset}, pulsing...") + pulsetoclock(last_hm, a, b) if wifi_failed and (time.time() - last_wifi_attempt) > wifi_retry_interval: - print("Attempting to reconnect to WiFi...") - last_wifi_attempt = time.time() + print("[DEBUG] retrying Wi-Fi sync") if set_time(worldtimeurl, wlan): wifi_failed = False - print("WiFi reconnected and time synced.") - else: - print("WiFi reconnection failed, continuing with RTC.") - - time.sleep(0.1) + await asyncio.sleep(0.1) -if __name__ == '__main__': +def main(): + print("[DEBUG] Entering main()") + led = Pin("LED", Pin.OUT) + led.on(); time.sleep(1); led.off() + print("[DEBUG] Startup RTC:", time.gmtime()) + print("[DEBUG] Connecting to Wi-Fi…") + global wlan, wifi_failed, last_wifi_attempt + wlan = network.WLAN(network.STA_IF); wlan.active(True) + wifi_failed = not set_time(worldtimeurl, wlan) + last_wifi_attempt = time.time() + print("[DEBUG] Launching asyncio runner") + async def runner(): + print("[DEBUG] Creating server & clock tasks") + server = asyncio.create_task(app.start_server(host="0.0.0.0", port=80)) + clock = asyncio.create_task(clock_loop()) + print("[DEBUG] Tasks created") + await asyncio.gather(server, clock) + asyncio.run(runner()) + +if __name__ == "__main__": main()