1"""
2MicroPython Remote - Interaction and automation tool for MicroPython
3MIT license; Copyright (c) 2019-2021 Damien P. George
4
5This program provides a set of utilities to interact with and automate a
6MicroPython device over a serial connection.  Commands supported are:
7
8    mpremote                         -- auto-detect, connect and enter REPL
9    mpremote <device-shortcut>       -- connect to given device
10    mpremote connect <device>        -- connect to given device
11    mpremote disconnect              -- disconnect current device
12    mpremote mount <local-dir>       -- mount local directory on device
13    mpremote eval <string>           -- evaluate and print the string
14    mpremote exec <string>           -- execute the string
15    mpremote run <script>            -- run the given local script
16    mpremote fs <command> <args...>  -- execute filesystem commands on the device
17    mpremote repl                    -- enter REPL
18"""
19
20import os, sys
21import serial.tools.list_ports
22
23from . import pyboardextended as pyboard
24from .console import Console, ConsolePosix
25
26_PROG = "mpremote"
27
28_BUILTIN_COMMAND_EXPANSIONS = {
29    # Device connection shortcuts.
30    "devs": "connect list",
31    "a0": "connect /dev/ttyACM0",
32    "a1": "connect /dev/ttyACM1",
33    "a2": "connect /dev/ttyACM2",
34    "a3": "connect /dev/ttyACM3",
35    "u0": "connect /dev/ttyUSB0",
36    "u1": "connect /dev/ttyUSB1",
37    "u2": "connect /dev/ttyUSB2",
38    "u3": "connect /dev/ttyUSB3",
39    "c0": "connect COM0",
40    "c1": "connect COM1",
41    "c2": "connect COM2",
42    "c3": "connect COM3",
43    # Filesystem shortcuts.
44    "cat": "fs cat",
45    "ls": "fs ls",
46    "cp": "fs cp",
47    "rm": "fs rm",
48    "mkdir": "fs mkdir",
49    "rmdir": "fs rmdir",
50    "df": [
51        "exec",
52        "import uos\nprint('mount \\tsize \\tused \\tavail \\tuse%')\nfor _m in [''] + uos.listdir('/'):\n _s = uos.stat('/' + _m)\n if not _s[0] & 1 << 14: continue\n _s = uos.statvfs(_m)\n if _s[0]:\n  _size = _s[0] * _s[2]; _free = _s[0] * _s[3]; print(_m, _size, _size - _free, _free, int(100 * (_size - _free) / _size), sep='\\t')",
53    ],
54    # Other shortcuts.
55    "reset t_ms=100": [
56        "exec",
57        "--no-follow",
58        "import utime, umachine; utime.sleep_ms(t_ms); umachine.reset()",
59    ],
60    "bootloader t_ms=100": [
61        "exec",
62        "--no-follow",
63        "import utime, umachine; utime.sleep_ms(t_ms); umachine.bootloader()",
64    ],
65    "setrtc": [
66        "exec",
67        "import machine; machine.RTC().datetime((2020, 1, 1, 0, 10, 0, 0, 0))",
68    ],
69}
70
71
72def load_user_config():
73    # Create empty config object.
74    config = __build_class__(lambda: None, "Config")()
75    config.commands = {}
76
77    # Get config file name.
78    path = os.getenv("XDG_CONFIG_HOME")
79    if path is None:
80        path = os.getenv("HOME")
81        if path is None:
82            return config
83        path = os.path.join(path, ".config")
84    path = os.path.join(path, _PROG)
85    config_file = os.path.join(path, "config.py")
86
87    # Check if config file exists.
88    if not os.path.exists(config_file):
89        return config
90
91    # Exec the config file in its directory.
92    with open(config_file) as f:
93        config_data = f.read()
94    prev_cwd = os.getcwd()
95    os.chdir(path)
96    exec(config_data, config.__dict__)
97    os.chdir(prev_cwd)
98
99    return config
100
101
102def prepare_command_expansions(config):
103    global _command_expansions
104
105    _command_expansions = {}
106
107    for command_set in (_BUILTIN_COMMAND_EXPANSIONS, config.commands):
108        for cmd, sub in command_set.items():
109            cmd = cmd.split()
110            if len(cmd) == 1:
111                args = ()
112            else:
113                args = tuple(c.split("=") for c in cmd[1:])
114            if isinstance(sub, str):
115                sub = sub.split()
116            _command_expansions[cmd[0]] = (args, sub)
117
118
119def do_command_expansion(args):
120    def usage_error(cmd, exp_args, msg):
121        print(f"Command {cmd} {msg}; signature is:")
122        print("   ", cmd, " ".join("=".join(a) for a in exp_args))
123        sys.exit(1)
124
125    last_arg_idx = len(args)
126    pre = []
127    while args and args[0] in _command_expansions:
128        cmd = args.pop(0)
129        exp_args, exp_sub = _command_expansions[cmd]
130        for exp_arg in exp_args:
131            exp_arg_name = exp_arg[0]
132            if args and "=" not in args[0]:
133                # Argument given without a name.
134                value = args.pop(0)
135            elif args and args[0].startswith(exp_arg_name + "="):
136                # Argument given with correct name.
137                value = args.pop(0).split("=", 1)[1]
138            else:
139                # No argument given, or argument given with a different name.
140                if len(exp_arg) == 1:
141                    # Required argument (it has no default).
142                    usage_error(cmd, exp_args, f"missing argument {exp_arg_name}")
143                else:
144                    # Optional argument with a default.
145                    value = exp_arg[1]
146            pre.append(f"{exp_arg_name}={value}")
147
148        args[0:0] = exp_sub
149        last_arg_idx = len(exp_sub)
150
151    if last_arg_idx < len(args) and "=" in args[last_arg_idx]:
152        # Extra unknown arguments given.
153        arg = args[last_arg_idx].split("=", 1)[0]
154        usage_error(cmd, exp_args, f"given unexpected argument {arg}")
155        sys.exit(1)
156
157    # Insert expansion with optional setting of arguments.
158    if pre:
159        args[0:0] = ["exec", ";".join(pre)]
160
161
162def do_connect(args):
163    dev = args.pop(0)
164    try:
165        if dev == "list":
166            # List attached devices.
167            for p in sorted(serial.tools.list_ports.comports()):
168                print(
169                    "{} {} {:04x}:{:04x} {} {}".format(
170                        p.device,
171                        p.serial_number,
172                        p.vid if isinstance(p.vid, int) else 0,
173                        p.pid if isinstance(p.pid, int) else 0,
174                        p.manufacturer,
175                        p.product,
176                    )
177                )
178            return None
179        elif dev == "auto":
180            # Auto-detect and auto-connect to the first available device.
181            for p in sorted(serial.tools.list_ports.comports()):
182                try:
183                    return pyboard.PyboardExtended(p.device, baudrate=115200)
184                except pyboard.PyboardError as er:
185                    if not er.args[0].startswith("failed to access"):
186                        raise er
187            raise pyboard.PyboardError("no device found")
188        elif dev.startswith("id:"):
189            # Search for a device with the given serial number.
190            serial_number = dev[len("id:") :]
191            dev = None
192            for p in serial.tools.list_ports.comports():
193                if p.serial_number == serial_number:
194                    return pyboard.PyboardExtended(p.device, baudrate=115200)
195            raise pyboard.PyboardError("no device with serial number {}".format(serial_number))
196        else:
197            # Connect to the given device.
198            if dev.startswith("port:"):
199                dev = dev[len("port:") :]
200            return pyboard.PyboardExtended(dev, baudrate=115200)
201    except pyboard.PyboardError as er:
202        msg = er.args[0]
203        if msg.startswith("failed to access"):
204            msg += " (it may be in use by another program)"
205        print(msg)
206        sys.exit(1)
207
208
209def do_disconnect(pyb):
210    try:
211        if pyb.mounted:
212            if not pyb.in_raw_repl:
213                pyb.enter_raw_repl(soft_reset=False)
214            pyb.umount_local()
215        if pyb.in_raw_repl:
216            pyb.exit_raw_repl()
217    except OSError:
218        # Ignore any OSError exceptions when shutting down, eg:
219        # - pyboard.filesystem_command will close the connecton if it had an error
220        # - umounting will fail if serial port disappeared
221        pass
222    pyb.close()
223
224
225def do_filesystem(pyb, args):
226    def _list_recursive(files, path):
227        if os.path.isdir(path):
228            for entry in os.listdir(path):
229                _list_recursive(files, os.path.join(path, entry))
230        else:
231            files.append(os.path.split(path))
232
233    if args[0] == "cp" and args[1] == "-r":
234        args.pop(0)
235        args.pop(0)
236        assert args[-1] == ":"
237        args.pop()
238        src_files = []
239        for path in args:
240            _list_recursive(src_files, path)
241        known_dirs = {""}
242        pyb.exec_("import uos")
243        for dir, file in src_files:
244            dir_parts = dir.split("/")
245            for i in range(len(dir_parts)):
246                d = "/".join(dir_parts[: i + 1])
247                if d not in known_dirs:
248                    pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d)
249                    known_dirs.add(d)
250            pyboard.filesystem_command(pyb, ["cp", os.path.join(dir, file), ":" + dir + "/"])
251    else:
252        pyboard.filesystem_command(pyb, args)
253    args.clear()
254
255
256def do_repl_main_loop(pyb, console_in, console_out_write, *, code_to_inject, file_to_inject):
257    while True:
258        console_in.waitchar(pyb.serial)
259        c = console_in.readchar()
260        if c:
261            if c == b"\x1d":  # ctrl-], quit
262                break
263            elif c == b"\x04":  # ctrl-D
264                # do a soft reset and reload the filesystem hook
265                pyb.soft_reset_with_mount(console_out_write)
266            elif c == b"\x0a" and code_to_inject is not None:  # ctrl-j, inject code
267                pyb.serial.write(code_to_inject)
268            elif c == b"\x0b" and file_to_inject is not None:  # ctrl-k, inject script
269                console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8"))
270                pyb.enter_raw_repl(soft_reset=False)
271                with open(file_to_inject, "rb") as f:
272                    pyfile = f.read()
273                try:
274                    pyb.exec_raw_no_follow(pyfile)
275                except pyboard.PyboardError as er:
276                    console_out_write(b"Error:\r\n")
277                    console_out_write(er)
278                pyb.exit_raw_repl()
279            else:
280                pyb.serial.write(c)
281
282        try:
283            n = pyb.serial.inWaiting()
284        except OSError as er:
285            if er.args[0] == 5:  # IO error, device disappeared
286                print("device disconnected")
287                break
288
289        if n > 0:
290            c = pyb.serial.read(1)
291            if c is not None:
292                # pass character through to the console
293                oc = ord(c)
294                if oc in (8, 9, 10, 13, 27) or 32 <= oc <= 126:
295                    console_out_write(c)
296                else:
297                    console_out_write(b"[%02x]" % ord(c))
298
299
300def do_repl(pyb, args):
301    capture_file = None
302    code_to_inject = None
303    file_to_inject = None
304
305    while len(args):
306        if args[0] == "--capture":
307            args.pop(0)
308            capture_file = args.pop(0)
309        elif args[0] == "--inject-code":
310            args.pop(0)
311            code_to_inject = bytes(args.pop(0).replace("\\n", "\r\n"), "utf8")
312        elif args[0] == "--inject-file":
313            args.pop(0)
314            file_to_inject = args.pop(0)
315        else:
316            break
317
318    print("Connected to MicroPython at %s" % pyb.device_name)
319    print("Use Ctrl-] to exit this shell")
320    if capture_file is not None:
321        print('Capturing session to file "%s"' % capture_file)
322        capture_file = open(capture_file, "wb")
323    if code_to_inject is not None:
324        print("Use Ctrl-J to inject", code_to_inject)
325    if file_to_inject is not None:
326        print('Use Ctrl-K to inject file "%s"' % file_to_inject)
327
328    console = Console()
329    console.enter()
330
331    def console_out_write(b):
332        console.write(b)
333        if capture_file is not None:
334            capture_file.write(b)
335            capture_file.flush()
336
337    try:
338        do_repl_main_loop(
339            pyb,
340            console,
341            console_out_write,
342            code_to_inject=code_to_inject,
343            file_to_inject=file_to_inject,
344        )
345    finally:
346        console.exit()
347        if capture_file is not None:
348            capture_file.close()
349
350
351def execbuffer(pyb, buf, follow):
352    ret_val = 0
353    try:
354        pyb.exec_raw_no_follow(buf)
355        if follow:
356            ret, ret_err = pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes)
357            if ret_err:
358                pyboard.stdout_write_bytes(ret_err)
359                ret_val = 1
360    except pyboard.PyboardError as er:
361        print(er)
362        ret_val = 1
363    except KeyboardInterrupt:
364        ret_val = 1
365    return ret_val
366
367
368def main():
369    config = load_user_config()
370    prepare_command_expansions(config)
371
372    args = sys.argv[1:]
373    pyb = None
374    did_action = False
375
376    try:
377        while args:
378            do_command_expansion(args)
379
380            cmds = {
381                "connect": (False, False, 1),
382                "disconnect": (False, False, 0),
383                "mount": (True, False, 1),
384                "repl": (False, True, 0),
385                "eval": (True, True, 1),
386                "exec": (True, True, 1),
387                "run": (True, True, 1),
388                "fs": (True, True, 1),
389            }
390            cmd = args.pop(0)
391            try:
392                need_raw_repl, is_action, num_args_min = cmds[cmd]
393            except KeyError:
394                print(f"{_PROG}: '{cmd}' is not a command")
395                return 1
396
397            if len(args) < num_args_min:
398                print(f"{_PROG}: '{cmd}' neads at least {num_args_min} argument(s)")
399                return 1
400
401            if cmd == "connect":
402                if pyb is not None:
403                    do_disconnect(pyb)
404                pyb = do_connect(args)
405                if pyb is None:
406                    did_action = True
407                continue
408
409            if pyb is None:
410                pyb = do_connect(["auto"])
411
412            if need_raw_repl:
413                if not pyb.in_raw_repl:
414                    pyb.enter_raw_repl()
415            else:
416                if pyb.in_raw_repl:
417                    pyb.exit_raw_repl()
418            if is_action:
419                did_action = True
420
421            if cmd == "disconnect":
422                do_disconnect(pyb)
423                pyb = None
424            elif cmd == "mount":
425                path = args.pop(0)
426                pyb.mount_local(path)
427                print(f"Local directory {path} is mounted at /remote")
428            elif cmd in ("exec", "eval", "run"):
429                follow = True
430                if args[0] == "--no-follow":
431                    args.pop(0)
432                    follow = False
433                if cmd == "exec":
434                    buf = args.pop(0)
435                elif cmd == "eval":
436                    buf = "print(" + args.pop(0) + ")"
437                else:
438                    filename = args.pop(0)
439                    try:
440                        with open(filename, "rb") as f:
441                            buf = f.read()
442                    except OSError:
443                        print(f"{_PROG}: could not read file '{filename}'")
444                        return 1
445                ret = execbuffer(pyb, buf, follow)
446                if ret:
447                    return ret
448            elif cmd == "fs":
449                do_filesystem(pyb, args)
450            elif cmd == "repl":
451                do_repl(pyb, args)
452
453        if not did_action:
454            if pyb is None:
455                pyb = do_connect(["auto"])
456            if pyb.in_raw_repl:
457                pyb.exit_raw_repl()
458            do_repl(pyb, args)
459    finally:
460        if pyb is not None:
461            do_disconnect(pyb)
462