1# -*- coding: utf-8 -*-
2
3"""
4Classes used both by front-end and back-end
5"""
6import ast
7import logging
8import os.path
9import platform
10import site
11import subprocess
12import sys
13from collections import namedtuple
14from typing import List, Optional, Dict, Iterable, Tuple  # @UnusedImport
15
16logger = logging.getLogger(__name__)
17
18MESSAGE_MARKER = "\x02"
19OBJECT_LINK_START = "[object_link_for_thonny=%d]"
20OBJECT_LINK_END = "[/object_link_for_thonny]"
21
22IGNORED_FILES_AND_DIRS = [
23    "System Volume Information",
24    "._.Trashes",
25    ".Trashes",
26    "__MACOSX",
27    ".DS_Store",
28]
29
30ValueInfo = namedtuple("ValueInfo", ["id", "repr"])
31FrameInfo = namedtuple(
32    "FrameInfo",
33    [
34        "id",
35        "filename",
36        "module_name",
37        "code_name",
38        "source",
39        "lineno",
40        "firstlineno",
41        "in_library",
42        "locals",
43        "globals",
44        "freevars",
45        "event",
46        "focus",
47        "node_tags",
48        "current_statement",
49        "current_root_expression",
50        "current_evaluations",
51    ],
52)
53
54TextRange = namedtuple("TextRange", ["lineno", "col_offset", "end_lineno", "end_col_offset"])
55
56
57class Record:
58    def __init__(self, **kw):
59        self.__dict__.update(kw)
60
61    def update(self, e, **kw):
62        self.__dict__.update(e, **kw)
63
64    def setdefault(self, **kw):
65        "updates those fields that are not yet present (similar to dict.setdefault)"
66        for key in kw:
67            if not hasattr(self, key):
68                setattr(self, key, kw[key])
69
70    def get(self, key, default=None):
71        return self.__dict__.get(key, default)
72
73    def __getitem__(self, key):
74        return self.__dict__[key]
75
76    def __delitem__(self, key):
77        self.__dict__.__delitem__(key)
78
79    def __setitem__(self, key, value):
80        self.__dict__[key] = value
81
82    def __contains__(self, key):
83        return key in self.__dict__
84
85    def __repr__(self):
86        keys = self.__dict__.keys()
87        items = ("{}={}".format(k, repr(self.__dict__[k])) for k in keys)
88        return "{}({})".format(self.__class__.__name__, ", ".join(items))
89
90    def __str__(self):
91        keys = sorted(self.__dict__.keys())
92        items = ("{}={}".format(k, repr(self.__dict__[k])) for k in keys)
93        return "{}({})".format(self.__class__.__name__, ", ".join(items))
94
95    def __eq__(self, other):
96        # pylint: disable=unidiomatic-typecheck
97
98        if type(self) != type(other):
99            return False
100
101        if len(self.__dict__) != len(other.__dict__):
102            return False
103
104        for key in self.__dict__:
105            if not hasattr(other, key):
106                return False
107            self_value = getattr(self, key)
108            other_value = getattr(other, key)
109
110            if type(self_value) != type(other_value) or self_value != other_value:
111                return False
112
113        return True
114
115    def __ne__(self, other):
116        return not self.__eq__(other)
117
118    def __hash__(self):
119        return hash(repr(self))
120
121
122def range_contains_smaller(one: TextRange, other: TextRange) -> bool:
123    this_start = (one.lineno, one.col_offset)
124    this_end = (one.end_lineno, one.end_col_offset)
125    other_start = (other.lineno, other.col_offset)
126    other_end = (other.end_lineno, other.end_col_offset)
127
128    return (
129        this_start < other_start
130        and this_end > other_end
131        or this_start == other_start
132        and this_end > other_end
133        or this_start < other_start
134        and this_end == other_end
135    )
136
137
138def range_contains_smaller_or_equal(one: TextRange, other: TextRange) -> bool:
139    return range_contains_smaller(one, other) or one == other
140
141
142class InputSubmission(Record):
143    """For sending data to backend's stdin"""
144
145    def __init__(self, data: str, **kw) -> None:
146        super().__init__(**kw)
147        self.data = data
148
149
150class CommandToBackend(Record):
151    """Command meant for the back-end"""
152
153    def __init__(self, name: str, **kw) -> None:
154        super().__init__(**kw)
155        self.name = name
156
157
158class ImmediateCommand(CommandToBackend):
159    pass
160
161
162class EOFCommand(CommandToBackend):
163    def __init__(self, **kw) -> None:
164        if "name" in kw:
165            del kw["name"]
166        super().__init__("eof", **kw)
167
168
169class ToplevelCommand(CommandToBackend):
170    def __init__(self, name: str, argv: List[str] = [], **kw) -> None:
171        super().__init__(name, **kw)
172        self.argv = argv
173
174
175class DebuggerCommand(CommandToBackend):
176    pass
177
178
179class InlineCommand(CommandToBackend):
180    """
181    Can be used both during debugging and in waiting_toplevel_command state
182    (eg. for sending variable and heap info requests)
183    """
184
185    pass
186
187
188class MessageFromBackend(Record):
189    def __init__(self, **kw) -> None:
190        self.event_type = type(self).__name__  # allow event_type to be overridden by kw
191        super().__init__(**kw)
192        if not hasattr(self, "sequence"):
193            self.sequence = self.event_type
194
195
196class ToplevelResponse(MessageFromBackend):
197    pass
198
199
200class DebuggerResponse(MessageFromBackend):
201    pass
202
203
204class BackendEvent(MessageFromBackend):
205    def __init__(self, event_type: str, **kw) -> None:
206        super().__init__(**kw)
207        self.event_type = event_type
208
209
210class InlineResponse(MessageFromBackend):
211    def __init__(self, command_name: str, **kw) -> None:
212        super().__init__(**kw)
213        self.command_name = command_name
214        self.event_type = self.command_name + "_response"
215
216
217def serialize_message(msg: Record) -> str:
218    # I want to transfer only ASCII chars because encodings are not reliable
219    # (eg. can't find a way to specify PYTHONIOENCODING for cx_freeze'd program)
220    return MESSAGE_MARKER + ascii(msg)
221
222
223def parse_message(msg_string: str) -> Record:
224    # DataFrames may have nan
225    # pylint: disable=unused-variable
226    nan = float("nan")  # @UnusedVariable
227    assert msg_string[0] == MESSAGE_MARKER
228    return eval(msg_string[1:])
229
230
231def normpath_with_actual_case(name: str) -> str:
232    """In Windows return the path with the case it is stored in the filesystem"""
233    if not os.path.exists(name):
234        return os.path.normpath(name)
235
236    assert os.path.isabs(name) or os.path.ismount(name), "Not abs nor mount: " + name
237    assert os.path.exists(name), "Not exists: " + name
238
239    if os.name == "nt":
240        # https://stackoverflow.com/questions/2113822/python-getting-filename-case-as-stored-in-windows/2114975
241        name = os.path.normpath(name)
242
243        from ctypes import create_unicode_buffer, windll
244
245        buf = create_unicode_buffer(512)
246        # GetLongPathNameW alone doesn't fix filename part
247        windll.kernel32.GetShortPathNameW(name, buf, 512)  # @UndefinedVariable
248        windll.kernel32.GetLongPathNameW(buf.value, buf, 512)  # @UndefinedVariable
249        result = buf.value
250
251        if result.casefold() != name.casefold():
252            # Sometimes GetShortPathNameW + GetLongPathNameW doesn't work
253            # see eg. https://github.com/thonny/thonny/issues/925
254            windll.kernel32.GetLongPathNameW(name, buf, 512)  # @UndefinedVariable
255            result = buf.value
256
257            if result.casefold() != name.casefold():
258                result = name
259
260        if result[1] == ":":
261            # ensure drive letter is capital
262            return result[0].upper() + result[1:]
263        else:
264            return result
265    else:
266        # easy on Linux
267        # too difficult on mac
268        # https://stackoverflow.com/questions/14515073/in-python-on-osx-with-hfs-how-can-i-get-the-correct-case-of-an-existing-filenam
269        # Hopefully only correct case comes into Thonny (eg. via open dialog)
270        return os.path.normpath(name)
271
272
273def is_same_path(name1: str, name2: str) -> bool:
274    return os.path.normpath(os.path.normcase(name1)) == os.path.normpath(os.path.normcase(name2))
275
276
277def path_startswith(child_name: str, dir_name: str) -> bool:
278    normchild = os.path.normpath(os.path.normcase(child_name))
279    normdir = os.path.normpath(os.path.normcase(dir_name))
280    return normdir == normchild or normchild.startswith(normdir.rstrip(os.path.sep) + os.path.sep)
281
282
283def read_source(filename):
284    import tokenize
285
286    with tokenize.open(filename) as fp:
287        return fp.read()
288
289
290def get_exe_dirs():
291    result = []
292    if site.ENABLE_USER_SITE:
293        if platform.system() == "Windows":
294            if site.getusersitepackages():
295                result.append(site.getusersitepackages().replace("site-packages", "Scripts"))
296        else:
297            if site.getuserbase():
298                result.append(site.getuserbase() + "/bin")
299
300    main_scripts = os.path.join(sys.prefix, "Scripts")
301    if os.path.isdir(main_scripts) and main_scripts not in result:
302        result.append(main_scripts)
303
304    if os.path.dirname(sys.executable) not in result:
305        result.append(os.path.dirname(sys.executable))
306
307    # These entries are used by Anaconda
308    for part in [
309        "Library/mingw-w64/bin",
310        "Library/usr/bin",
311        "Library/bin",
312        "Scripts",
313        "bin",
314        "condabin",
315    ]:
316        dirpath = os.path.join(sys.prefix, part.replace("/", os.sep))
317        if os.path.isdir(dirpath) and dirpath not in result:
318            result.append(dirpath)
319
320    if platform.system() != "Windows" and "/usr/local/bin" not in result:
321        # May be missing on macOS, when started as bundle
322        # (yes, more may be missing, but this one is most useful)
323        result.append("/usr/local/bin")
324
325    return result
326
327
328def get_site_dir(symbolic_name, executable=None):
329    if not executable or executable == sys.executable:
330        result = getattr(site, symbolic_name, "")
331    else:
332        result = (
333            subprocess.check_output(
334                [executable, "-m", "site", "--" + symbolic_name.lower().replace("_", "-")],
335                universal_newlines=True,
336            )
337            .decode()
338            .strip()
339        )
340
341    return result if result else None
342
343
344def get_base_executable():
345    if sys.exec_prefix == sys.base_exec_prefix:
346        return sys.executable
347
348    if platform.system() == "Windows":
349        result = sys.base_exec_prefix + "\\" + os.path.basename(sys.executable)
350        result = normpath_with_actual_case(result)
351    else:
352        result = sys.executable.replace(sys.exec_prefix, sys.base_exec_prefix)
353
354    if not os.path.isfile(result):
355        raise RuntimeError("Can't locate base executable")
356
357    return result
358
359
360def get_augmented_system_path(extra_dirs):
361    path_items = os.environ.get("PATH", "").split(os.pathsep)
362
363    for d in reversed(extra_dirs):
364        if d not in path_items:
365            path_items.insert(0, d)
366
367    return os.pathsep.join(path_items)
368
369
370def update_system_path(env, value):
371    # in Windows, env keys are not case sensitive
372    # this is important if env is a dict (not os.environ)
373    if platform.system() == "Windows":
374        found = False
375        for key in env:
376            if key.upper() == "PATH":
377                found = True
378                env[key] = value
379
380        if not found:
381            env["PATH"] = value
382    else:
383        env["PATH"] = value
384
385
386class UserError(RuntimeError):
387    """Errors of this class are meant to be presented without stacktrace"""
388
389    pass
390
391
392def is_hidden_or_system_file(path: str) -> bool:
393    if os.path.basename(path).startswith("."):
394        return True
395    elif platform.system() == "Windows":
396        from ctypes import windll
397
398        FILE_ATTRIBUTE_HIDDEN = 0x2
399        FILE_ATTRIBUTE_SYSTEM = 0x4
400        return bool(
401            windll.kernel32.GetFileAttributesW(path)  # @UndefinedVariable
402            & (FILE_ATTRIBUTE_HIDDEN | FILE_ATTRIBUTE_SYSTEM)
403        )
404    else:
405        return False
406
407
408def get_dirs_children_info(
409    paths: List[str], include_hidden: bool = False
410) -> Dict[str, Optional[Dict[str, Dict]]]:
411    return {path: get_single_dir_child_data(path, include_hidden) for path in paths}
412
413
414def get_single_dir_child_data(path: str, include_hidden: bool = False) -> Optional[Dict[str, Dict]]:
415    if path == "":
416        if platform.system() == "Windows":
417            return {**get_windows_volumes_info(), **get_windows_network_locations()}
418        else:
419            return get_single_dir_child_data("/", include_hidden)
420
421    elif os.path.isdir(path) or os.path.ismount(path):
422        result = {}
423
424        try:
425            for child in os.listdir(path):
426                full_child_path = os.path.join(path, child)
427                if not os.path.exists(full_child_path):
428                    # must be broken link
429                    continue
430                full_child_path = normpath_with_actual_case(full_child_path)
431                hidden = is_hidden_or_system_file(full_child_path)
432                if not hidden or include_hidden:
433                    name = os.path.basename(full_child_path)
434                    st = os.stat(full_child_path, dir_fd=None, follow_symlinks=True)
435                    result[name] = {
436                        "size": None if os.path.isdir(full_child_path) else st.st_size,
437                        "modified": st.st_mtime,
438                        "hidden": hidden,
439                    }
440        except PermissionError:
441            result["<not accessible>"] = {
442                "kind": "error",
443                "size": -1,
444                "modified": None,
445                "hidden": None,
446            }
447
448        return result
449    else:
450        return None
451
452
453def get_windows_volumes_info():
454    # http://stackoverflow.com/a/2288225/261181
455    # http://msdn.microsoft.com/en-us/library/windows/desktop/aa364939%28v=vs.85%29.aspx
456    import string
457    from ctypes import windll
458
459    all_drive_types = [
460        "DRIVE_UNKNOWN",
461        "DRIVE_NO_ROOT_DIR",
462        "DRIVE_REMOVABLE",
463        "DRIVE_FIXED",
464        "DRIVE_REMOTE",
465        "DRIVE_CDROM",
466        "DRIVE_RAMDISK",
467    ]
468
469    required_drive_types = ["DRIVE_REMOVABLE", "DRIVE_FIXED", "DRIVE_REMOTE", "DRIVE_RAMDISK"]
470
471    result = {}
472
473    bitmask = windll.kernel32.GetLogicalDrives()  # @UndefinedVariable
474    for letter in string.ascii_uppercase:
475        if not bitmask & 1:
476            pass
477        else:
478            drive_type = all_drive_types[
479                windll.kernel32.GetDriveTypeW("%s:\\" % letter)
480            ]  # @UndefinedVariable
481
482            # NB! Drive A can be present in bitmask but actually missing.
483            # In this case querying information about it would freeze the UI
484            # for several seconds.
485            # One solution is to uninstall the device in device manager,
486            # but OS may restore the drive later.
487            # Therefore it is safest to skip A drive (user can access it via Open dialog)
488
489            if drive_type in required_drive_types and (
490                letter != "A" or drive_type != "DRIVE_REMOVABLE"
491            ):
492                drive = letter + ":"
493                path = drive + "\\"
494
495                try:
496                    st = os.stat(path)
497                    volume_name = get_windows_volume_name(path)
498                    if not volume_name:
499                        volume_name = "Disk"
500
501                    label = volume_name + " (" + drive + ")"
502                    result[path] = {
503                        "label": label,
504                        "size": None,
505                        "modified": max(st.st_mtime, st.st_ctime),
506                    }
507                except OSError as e:
508                    logger.warning("Could not get information for %s", path, exc_info=e)
509
510        bitmask >>= 1
511
512    return result
513
514
515def get_windows_volume_name(path):
516    # https://stackoverflow.com/a/12056414/261181
517    import ctypes
518
519    kernel32 = ctypes.windll.kernel32
520    volume_name_buffer = ctypes.create_unicode_buffer(1024)
521    file_system_name_buffer = ctypes.create_unicode_buffer(1024)
522    serial_number = None
523    max_component_length = None
524    file_system_flags = None
525
526    result = kernel32.GetVolumeInformationW(
527        ctypes.c_wchar_p(path),
528        volume_name_buffer,
529        ctypes.sizeof(volume_name_buffer),
530        serial_number,
531        max_component_length,
532        file_system_flags,
533        file_system_name_buffer,
534        ctypes.sizeof(file_system_name_buffer),
535    )
536
537    if result:
538        return volume_name_buffer.value
539    else:
540        return None
541
542
543def get_windows_network_locations():
544    import ctypes.wintypes
545
546    CSIDL_NETHOOD = 0x13
547    SHGFP_TYPE_CURRENT = 0
548    buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
549    ctypes.windll.shell32.SHGetFolderPathW(0, CSIDL_NETHOOD, 0, SHGFP_TYPE_CURRENT, buf)
550    shortcuts_dir = buf.value
551
552    result = {}
553    for entry in os.scandir(shortcuts_dir):
554        # full_path = normpath_with_actual_case(entry.path)
555        lnk_path = os.path.join(entry.path, "target.lnk")
556        if os.path.exists(lnk_path):
557            try:
558                target = get_windows_lnk_target(lnk_path)
559                result[target] = {
560                    "label": entry.name + " (" + target + ")",
561                    "size": None,
562                    "modified": None,
563                }
564            except Exception:
565                logger.error("Can't get target from %s", lnk_path, exc_info=True)
566
567    return result
568
569
570def get_windows_lnk_target(lnk_file_path):
571    import thonny
572
573    script_path = os.path.join(os.path.dirname(thonny.__file__), "res", "PrintLnkTarget.vbs")
574    cmd = ["cscript", "/NoLogo", script_path, lnk_file_path]
575    result = subprocess.check_output(cmd, universal_newlines=True, timeout=3)
576
577    return result.strip()
578
579
580def execute_system_command(cmd, cwd=None, disconnect_stdin=False):
581    logger.debug("execute_system_command, cmd=%r, cwd=%s", cmd, cwd)
582    env = dict(os.environ).copy()
583    encoding = "utf-8"
584    env["PYTHONIOENCODING"] = encoding
585    # Make sure this python interpreter and its scripts are available
586    # in PATH
587    update_system_path(env, get_augmented_system_path(get_exe_dirs()))
588    popen_kw = dict(
589        env=env,
590        universal_newlines=True,
591        bufsize=0,
592    )
593
594    if cwd and os.path.isdir(cwd):
595        popen_kw["cwd"] = cwd
596
597    if disconnect_stdin:
598        popen_kw["stdin"] = subprocess.DEVNULL
599
600    if sys.version_info >= (3, 6):
601        popen_kw["errors"] = "replace"
602        popen_kw["encoding"] = encoding
603
604    if isinstance(cmd.cmd_line, str) and cmd.cmd_line.startswith("!"):
605        cmd_line = cmd.cmd_line[1:]
606        popen_kw["shell"] = True
607    else:
608        assert isinstance(cmd.cmd_line, list)
609        cmd_line = cmd.cmd_line
610    logger.debug("Popen(%r, ...)", cmd_line)
611    proc = subprocess.Popen(cmd_line, **popen_kw)
612    proc.communicate()
613    return proc.wait()
614
615
616def universal_dirname(path: str) -> str:
617    if "/" in path:
618        sep = "/"
619    elif "\\" in path:
620        sep = "\\"
621    else:
622        # micro:bit
623        return ""
624
625    path = path.rstrip(sep)
626    result = path[: path.rindex(sep)]
627    if not result:
628        return sep
629    else:
630        return result
631
632
633def universal_relpath(path: str, context: str) -> str:
634    """Tries to give relative path"""
635    if "/" in path:
636        import pathlib
637
638        p = pathlib.PurePosixPath(path)
639        try:
640            return str(p.relative_to(context))
641        except ValueError:
642            return path
643    else:
644        return os.path.relpath(path, context)
645
646
647def get_python_version_string(version_info: Optional[Tuple] = None, maxsize=None):
648    result = ".".join(map(str, sys.version_info[:3]))
649    if sys.version_info[3] != "final":
650        result += "-" + sys.version_info[3]
651
652    if maxsize is not None:
653        result += " (" + ("64" if sys.maxsize > 2 ** 32 else "32") + " bit)"
654
655    return result
656
657
658def try_load_modules_with_frontend_sys_path(module_names):
659    try:
660        frontend_sys_path = ast.literal_eval(os.environ["THONNY_FRONTEND_SYS_PATH"])
661        assert isinstance(frontend_sys_path, list)
662    except Exception as e:
663        logger.warning("Could not get THONNY_FRONTEND_SYS_PATH", exc_info=e)
664        return
665
666    from importlib import import_module
667
668    old_sys_path = sys.path.copy()
669    sys.path = sys.path + frontend_sys_path
670    try:
671        for name in module_names:
672            try:
673                import_module(name)
674            except ImportError:
675                pass
676    finally:
677        sys.path = old_sys_path
678
679
680class ConnectionFailedException(Exception):
681    pass
682
683
684class ConnectionClosedException(Exception):
685    pass
686