#!/usr/bin/env python3 import time import websocket import argparse import zlib from websocket import WebSocketTimeoutException parser = argparse.ArgumentParser(description='Update fiatlux firmware via websocket.') parser.add_argument("binfile") parser.add_argument("address") args = parser.parse_args() bs = 1024 def parse_reply(bytes): cmd = bytes[0:1].decode("utf-8") ret = int.from_bytes(bytes[1:2], "big") val = int.from_bytes(bytes[2:4], "big") return {'cmd': cmd, 'ret': ret, 'val': val} with open(args.binfile, "rb") as f: try: ws = websocket.WebSocket() print("send {}".format(args.binfile)) ws.connect("ws://" + args.address) i = 0 bytes = f.read() rolling = zlib.crc32(bytes) total = len(bytes) while True: chunk = bytes[bs * i:bs * i + bs] msg = b'F\x00\x00\x00' msg += i.to_bytes(2, 'big') msg += len(chunk).to_bytes(2, 'big') msg += (zlib.crc32(chunk) & 0xffffffff).to_bytes(4, 'big') msg += chunk ws.send(msg) print("\r{:6.2f}%".format(100 * i * bs / total), end='') reply = parse_reply(ws.recv()) if reply['cmd'] == 'F' and reply['ret'] == 0: i += 1 elif reply['cmd'] == 'F' and reply['ret'] == 1: print("Error: SEQUENCE_OUT_OF_ORDER") i = reply['val'] elif reply['cmd'] == 'F' and reply['ret'] == 2: print("Error: CHECKSUM_MISMATCH") i = reply['val'] else: print(reply) time.sleep(0.05) if len(chunk) != bs: break print("\rdone ") msg = b'C\x00\x00\x00' msg += total.to_bytes(4, 'big') msg += (rolling).to_bytes(4, 'big') ws.settimeout(5) ws.send(msg) reply = parse_reply(ws.recv()) print(reply) ws.close() except WebSocketTimeoutException: pass except ConnectionResetError: pass except KeyboardInterrupt: pass