1""" 2MicroPython Remote - Interaction and automation tool for MicroPython 3MIT license; Copyright (c) 2019-2021 Damien P. George 4 5This program provides a set of utilities to interact with and automate a 6MicroPython device over a serial connection. Commands supported are: 7 8 mpremote -- auto-detect, connect and enter REPL 9 mpremote <device-shortcut> -- connect to given device 10 mpremote connect <device> -- connect to given device 11 mpremote disconnect -- disconnect current device 12 mpremote mount <local-dir> -- mount local directory on device 13 mpremote eval <string> -- evaluate and print the string 14 mpremote exec <string> -- execute the string 15 mpremote run <script> -- run the given local script 16 mpremote fs <command> <args...> -- execute filesystem commands on the device 17 mpremote repl -- enter REPL 18""" 19 20import os, sys 21import serial.tools.list_ports 22 23from . import pyboardextended as pyboard 24from .console import Console, ConsolePosix 25 26_PROG = "mpremote" 27 28_BUILTIN_COMMAND_EXPANSIONS = { 29 # Device connection shortcuts. 30 "devs": "connect list", 31 "a0": "connect /dev/ttyACM0", 32 "a1": "connect /dev/ttyACM1", 33 "a2": "connect /dev/ttyACM2", 34 "a3": "connect /dev/ttyACM3", 35 "u0": "connect /dev/ttyUSB0", 36 "u1": "connect /dev/ttyUSB1", 37 "u2": "connect /dev/ttyUSB2", 38 "u3": "connect /dev/ttyUSB3", 39 "c0": "connect COM0", 40 "c1": "connect COM1", 41 "c2": "connect COM2", 42 "c3": "connect COM3", 43 # Filesystem shortcuts. 44 "cat": "fs cat", 45 "ls": "fs ls", 46 "cp": "fs cp", 47 "rm": "fs rm", 48 "mkdir": "fs mkdir", 49 "rmdir": "fs rmdir", 50 "df": [ 51 "exec", 52 "import uos\nprint('mount \\tsize \\tused \\tavail \\tuse%')\nfor _m in [''] + uos.listdir('/'):\n _s = uos.stat('/' + _m)\n if not _s[0] & 1 << 14: continue\n _s = uos.statvfs(_m)\n if _s[0]:\n _size = _s[0] * _s[2]; _free = _s[0] * _s[3]; print(_m, _size, _size - _free, _free, int(100 * (_size - _free) / _size), sep='\\t')", 53 ], 54 # Other shortcuts. 55 "reset t_ms=100": [ 56 "exec", 57 "--no-follow", 58 "import utime, umachine; utime.sleep_ms(t_ms); umachine.reset()", 59 ], 60 "bootloader t_ms=100": [ 61 "exec", 62 "--no-follow", 63 "import utime, umachine; utime.sleep_ms(t_ms); umachine.bootloader()", 64 ], 65 "setrtc": [ 66 "exec", 67 "import machine; machine.RTC().datetime((2020, 1, 1, 0, 10, 0, 0, 0))", 68 ], 69} 70 71 72def load_user_config(): 73 # Create empty config object. 74 config = __build_class__(lambda: None, "Config")() 75 config.commands = {} 76 77 # Get config file name. 78 path = os.getenv("XDG_CONFIG_HOME") 79 if path is None: 80 path = os.getenv("HOME") 81 if path is None: 82 return config 83 path = os.path.join(path, ".config") 84 path = os.path.join(path, _PROG) 85 config_file = os.path.join(path, "config.py") 86 87 # Check if config file exists. 88 if not os.path.exists(config_file): 89 return config 90 91 # Exec the config file in its directory. 92 with open(config_file) as f: 93 config_data = f.read() 94 prev_cwd = os.getcwd() 95 os.chdir(path) 96 exec(config_data, config.__dict__) 97 os.chdir(prev_cwd) 98 99 return config 100 101 102def prepare_command_expansions(config): 103 global _command_expansions 104 105 _command_expansions = {} 106 107 for command_set in (_BUILTIN_COMMAND_EXPANSIONS, config.commands): 108 for cmd, sub in command_set.items(): 109 cmd = cmd.split() 110 if len(cmd) == 1: 111 args = () 112 else: 113 args = tuple(c.split("=") for c in cmd[1:]) 114 if isinstance(sub, str): 115 sub = sub.split() 116 _command_expansions[cmd[0]] = (args, sub) 117 118 119def do_command_expansion(args): 120 def usage_error(cmd, exp_args, msg): 121 print(f"Command {cmd} {msg}; signature is:") 122 print(" ", cmd, " ".join("=".join(a) for a in exp_args)) 123 sys.exit(1) 124 125 last_arg_idx = len(args) 126 pre = [] 127 while args and args[0] in _command_expansions: 128 cmd = args.pop(0) 129 exp_args, exp_sub = _command_expansions[cmd] 130 for exp_arg in exp_args: 131 exp_arg_name = exp_arg[0] 132 if args and "=" not in args[0]: 133 # Argument given without a name. 134 value = args.pop(0) 135 elif args and args[0].startswith(exp_arg_name + "="): 136 # Argument given with correct name. 137 value = args.pop(0).split("=", 1)[1] 138 else: 139 # No argument given, or argument given with a different name. 140 if len(exp_arg) == 1: 141 # Required argument (it has no default). 142 usage_error(cmd, exp_args, f"missing argument {exp_arg_name}") 143 else: 144 # Optional argument with a default. 145 value = exp_arg[1] 146 pre.append(f"{exp_arg_name}={value}") 147 148 args[0:0] = exp_sub 149 last_arg_idx = len(exp_sub) 150 151 if last_arg_idx < len(args) and "=" in args[last_arg_idx]: 152 # Extra unknown arguments given. 153 arg = args[last_arg_idx].split("=", 1)[0] 154 usage_error(cmd, exp_args, f"given unexpected argument {arg}") 155 sys.exit(1) 156 157 # Insert expansion with optional setting of arguments. 158 if pre: 159 args[0:0] = ["exec", ";".join(pre)] 160 161 162def do_connect(args): 163 dev = args.pop(0) 164 try: 165 if dev == "list": 166 # List attached devices. 167 for p in sorted(serial.tools.list_ports.comports()): 168 print( 169 "{} {} {:04x}:{:04x} {} {}".format( 170 p.device, 171 p.serial_number, 172 p.vid if isinstance(p.vid, int) else 0, 173 p.pid if isinstance(p.pid, int) else 0, 174 p.manufacturer, 175 p.product, 176 ) 177 ) 178 return None 179 elif dev == "auto": 180 # Auto-detect and auto-connect to the first available device. 181 for p in sorted(serial.tools.list_ports.comports()): 182 try: 183 return pyboard.PyboardExtended(p.device, baudrate=115200) 184 except pyboard.PyboardError as er: 185 if not er.args[0].startswith("failed to access"): 186 raise er 187 raise pyboard.PyboardError("no device found") 188 elif dev.startswith("id:"): 189 # Search for a device with the given serial number. 190 serial_number = dev[len("id:") :] 191 dev = None 192 for p in serial.tools.list_ports.comports(): 193 if p.serial_number == serial_number: 194 return pyboard.PyboardExtended(p.device, baudrate=115200) 195 raise pyboard.PyboardError("no device with serial number {}".format(serial_number)) 196 else: 197 # Connect to the given device. 198 if dev.startswith("port:"): 199 dev = dev[len("port:") :] 200 return pyboard.PyboardExtended(dev, baudrate=115200) 201 except pyboard.PyboardError as er: 202 msg = er.args[0] 203 if msg.startswith("failed to access"): 204 msg += " (it may be in use by another program)" 205 print(msg) 206 sys.exit(1) 207 208 209def do_disconnect(pyb): 210 try: 211 if pyb.mounted: 212 if not pyb.in_raw_repl: 213 pyb.enter_raw_repl(soft_reset=False) 214 pyb.umount_local() 215 if pyb.in_raw_repl: 216 pyb.exit_raw_repl() 217 except OSError: 218 # Ignore any OSError exceptions when shutting down, eg: 219 # - pyboard.filesystem_command will close the connecton if it had an error 220 # - umounting will fail if serial port disappeared 221 pass 222 pyb.close() 223 224 225def do_filesystem(pyb, args): 226 def _list_recursive(files, path): 227 if os.path.isdir(path): 228 for entry in os.listdir(path): 229 _list_recursive(files, os.path.join(path, entry)) 230 else: 231 files.append(os.path.split(path)) 232 233 if args[0] == "cp" and args[1] == "-r": 234 args.pop(0) 235 args.pop(0) 236 assert args[-1] == ":" 237 args.pop() 238 src_files = [] 239 for path in args: 240 _list_recursive(src_files, path) 241 known_dirs = {""} 242 pyb.exec_("import uos") 243 for dir, file in src_files: 244 dir_parts = dir.split("/") 245 for i in range(len(dir_parts)): 246 d = "/".join(dir_parts[: i + 1]) 247 if d not in known_dirs: 248 pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d) 249 known_dirs.add(d) 250 pyboard.filesystem_command(pyb, ["cp", os.path.join(dir, file), ":" + dir + "/"]) 251 else: 252 pyboard.filesystem_command(pyb, args) 253 args.clear() 254 255 256def do_repl_main_loop(pyb, console_in, console_out_write, *, code_to_inject, file_to_inject): 257 while True: 258 console_in.waitchar(pyb.serial) 259 c = console_in.readchar() 260 if c: 261 if c == b"\x1d": # ctrl-], quit 262 break 263 elif c == b"\x04": # ctrl-D 264 # do a soft reset and reload the filesystem hook 265 pyb.soft_reset_with_mount(console_out_write) 266 elif c == b"\x0a" and code_to_inject is not None: # ctrl-j, inject code 267 pyb.serial.write(code_to_inject) 268 elif c == b"\x0b" and file_to_inject is not None: # ctrl-k, inject script 269 console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8")) 270 pyb.enter_raw_repl(soft_reset=False) 271 with open(file_to_inject, "rb") as f: 272 pyfile = f.read() 273 try: 274 pyb.exec_raw_no_follow(pyfile) 275 except pyboard.PyboardError as er: 276 console_out_write(b"Error:\r\n") 277 console_out_write(er) 278 pyb.exit_raw_repl() 279 else: 280 pyb.serial.write(c) 281 282 try: 283 n = pyb.serial.inWaiting() 284 except OSError as er: 285 if er.args[0] == 5: # IO error, device disappeared 286 print("device disconnected") 287 break 288 289 if n > 0: 290 c = pyb.serial.read(1) 291 if c is not None: 292 # pass character through to the console 293 oc = ord(c) 294 if oc in (8, 9, 10, 13, 27) or 32 <= oc <= 126: 295 console_out_write(c) 296 else: 297 console_out_write(b"[%02x]" % ord(c)) 298 299 300def do_repl(pyb, args): 301 capture_file = None 302 code_to_inject = None 303 file_to_inject = None 304 305 while len(args): 306 if args[0] == "--capture": 307 args.pop(0) 308 capture_file = args.pop(0) 309 elif args[0] == "--inject-code": 310 args.pop(0) 311 code_to_inject = bytes(args.pop(0).replace("\\n", "\r\n"), "utf8") 312 elif args[0] == "--inject-file": 313 args.pop(0) 314 file_to_inject = args.pop(0) 315 else: 316 break 317 318 print("Connected to MicroPython at %s" % pyb.device_name) 319 print("Use Ctrl-] to exit this shell") 320 if capture_file is not None: 321 print('Capturing session to file "%s"' % capture_file) 322 capture_file = open(capture_file, "wb") 323 if code_to_inject is not None: 324 print("Use Ctrl-J to inject", code_to_inject) 325 if file_to_inject is not None: 326 print('Use Ctrl-K to inject file "%s"' % file_to_inject) 327 328 console = Console() 329 console.enter() 330 331 def console_out_write(b): 332 console.write(b) 333 if capture_file is not None: 334 capture_file.write(b) 335 capture_file.flush() 336 337 try: 338 do_repl_main_loop( 339 pyb, 340 console, 341 console_out_write, 342 code_to_inject=code_to_inject, 343 file_to_inject=file_to_inject, 344 ) 345 finally: 346 console.exit() 347 if capture_file is not None: 348 capture_file.close() 349 350 351def execbuffer(pyb, buf, follow): 352 ret_val = 0 353 try: 354 pyb.exec_raw_no_follow(buf) 355 if follow: 356 ret, ret_err = pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes) 357 if ret_err: 358 pyboard.stdout_write_bytes(ret_err) 359 ret_val = 1 360 except pyboard.PyboardError as er: 361 print(er) 362 ret_val = 1 363 except KeyboardInterrupt: 364 ret_val = 1 365 return ret_val 366 367 368def main(): 369 config = load_user_config() 370 prepare_command_expansions(config) 371 372 args = sys.argv[1:] 373 pyb = None 374 did_action = False 375 376 try: 377 while args: 378 do_command_expansion(args) 379 380 cmds = { 381 "connect": (False, False, 1), 382 "disconnect": (False, False, 0), 383 "mount": (True, False, 1), 384 "repl": (False, True, 0), 385 "eval": (True, True, 1), 386 "exec": (True, True, 1), 387 "run": (True, True, 1), 388 "fs": (True, True, 1), 389 } 390 cmd = args.pop(0) 391 try: 392 need_raw_repl, is_action, num_args_min = cmds[cmd] 393 except KeyError: 394 print(f"{_PROG}: '{cmd}' is not a command") 395 return 1 396 397 if len(args) < num_args_min: 398 print(f"{_PROG}: '{cmd}' neads at least {num_args_min} argument(s)") 399 return 1 400 401 if cmd == "connect": 402 if pyb is not None: 403 do_disconnect(pyb) 404 pyb = do_connect(args) 405 if pyb is None: 406 did_action = True 407 continue 408 409 if pyb is None: 410 pyb = do_connect(["auto"]) 411 412 if need_raw_repl: 413 if not pyb.in_raw_repl: 414 pyb.enter_raw_repl() 415 else: 416 if pyb.in_raw_repl: 417 pyb.exit_raw_repl() 418 if is_action: 419 did_action = True 420 421 if cmd == "disconnect": 422 do_disconnect(pyb) 423 pyb = None 424 elif cmd == "mount": 425 path = args.pop(0) 426 pyb.mount_local(path) 427 print(f"Local directory {path} is mounted at /remote") 428 elif cmd in ("exec", "eval", "run"): 429 follow = True 430 if args[0] == "--no-follow": 431 args.pop(0) 432 follow = False 433 if cmd == "exec": 434 buf = args.pop(0) 435 elif cmd == "eval": 436 buf = "print(" + args.pop(0) + ")" 437 else: 438 filename = args.pop(0) 439 try: 440 with open(filename, "rb") as f: 441 buf = f.read() 442 except OSError: 443 print(f"{_PROG}: could not read file '{filename}'") 444 return 1 445 ret = execbuffer(pyb, buf, follow) 446 if ret: 447 return ret 448 elif cmd == "fs": 449 do_filesystem(pyb, args) 450 elif cmd == "repl": 451 do_repl(pyb, args) 452 453 if not did_action: 454 if pyb is None: 455 pyb = do_connect(["auto"]) 456 if pyb.in_raw_repl: 457 pyb.exit_raw_repl() 458 do_repl(pyb, args) 459 finally: 460 if pyb is not None: 461 do_disconnect(pyb) 462