1# -*- coding: utf-8 -*- 2 3""" 4Classes used both by front-end and back-end 5""" 6import ast 7import logging 8import os.path 9import platform 10import site 11import subprocess 12import sys 13from collections import namedtuple 14from typing import List, Optional, Dict, Iterable, Tuple # @UnusedImport 15 16logger = logging.getLogger(__name__) 17 18MESSAGE_MARKER = "\x02" 19OBJECT_LINK_START = "[object_link_for_thonny=%d]" 20OBJECT_LINK_END = "[/object_link_for_thonny]" 21 22IGNORED_FILES_AND_DIRS = [ 23 "System Volume Information", 24 "._.Trashes", 25 ".Trashes", 26 "__MACOSX", 27 ".DS_Store", 28] 29 30ValueInfo = namedtuple("ValueInfo", ["id", "repr"]) 31FrameInfo = namedtuple( 32 "FrameInfo", 33 [ 34 "id", 35 "filename", 36 "module_name", 37 "code_name", 38 "source", 39 "lineno", 40 "firstlineno", 41 "in_library", 42 "locals", 43 "globals", 44 "freevars", 45 "event", 46 "focus", 47 "node_tags", 48 "current_statement", 49 "current_root_expression", 50 "current_evaluations", 51 ], 52) 53 54TextRange = namedtuple("TextRange", ["lineno", "col_offset", "end_lineno", "end_col_offset"]) 55 56 57class Record: 58 def __init__(self, **kw): 59 self.__dict__.update(kw) 60 61 def update(self, e, **kw): 62 self.__dict__.update(e, **kw) 63 64 def setdefault(self, **kw): 65 "updates those fields that are not yet present (similar to dict.setdefault)" 66 for key in kw: 67 if not hasattr(self, key): 68 setattr(self, key, kw[key]) 69 70 def get(self, key, default=None): 71 return self.__dict__.get(key, default) 72 73 def __getitem__(self, key): 74 return self.__dict__[key] 75 76 def __delitem__(self, key): 77 self.__dict__.__delitem__(key) 78 79 def __setitem__(self, key, value): 80 self.__dict__[key] = value 81 82 def __contains__(self, key): 83 return key in self.__dict__ 84 85 def __repr__(self): 86 keys = self.__dict__.keys() 87 items = ("{}={}".format(k, repr(self.__dict__[k])) for k in keys) 88 return "{}({})".format(self.__class__.__name__, ", ".join(items)) 89 90 def __str__(self): 91 keys = sorted(self.__dict__.keys()) 92 items = ("{}={}".format(k, repr(self.__dict__[k])) for k in keys) 93 return "{}({})".format(self.__class__.__name__, ", ".join(items)) 94 95 def __eq__(self, other): 96 # pylint: disable=unidiomatic-typecheck 97 98 if type(self) != type(other): 99 return False 100 101 if len(self.__dict__) != len(other.__dict__): 102 return False 103 104 for key in self.__dict__: 105 if not hasattr(other, key): 106 return False 107 self_value = getattr(self, key) 108 other_value = getattr(other, key) 109 110 if type(self_value) != type(other_value) or self_value != other_value: 111 return False 112 113 return True 114 115 def __ne__(self, other): 116 return not self.__eq__(other) 117 118 def __hash__(self): 119 return hash(repr(self)) 120 121 122def range_contains_smaller(one: TextRange, other: TextRange) -> bool: 123 this_start = (one.lineno, one.col_offset) 124 this_end = (one.end_lineno, one.end_col_offset) 125 other_start = (other.lineno, other.col_offset) 126 other_end = (other.end_lineno, other.end_col_offset) 127 128 return ( 129 this_start < other_start 130 and this_end > other_end 131 or this_start == other_start 132 and this_end > other_end 133 or this_start < other_start 134 and this_end == other_end 135 ) 136 137 138def range_contains_smaller_or_equal(one: TextRange, other: TextRange) -> bool: 139 return range_contains_smaller(one, other) or one == other 140 141 142class InputSubmission(Record): 143 """For sending data to backend's stdin""" 144 145 def __init__(self, data: str, **kw) -> None: 146 super().__init__(**kw) 147 self.data = data 148 149 150class CommandToBackend(Record): 151 """Command meant for the back-end""" 152 153 def __init__(self, name: str, **kw) -> None: 154 super().__init__(**kw) 155 self.name = name 156 157 158class ImmediateCommand(CommandToBackend): 159 pass 160 161 162class EOFCommand(CommandToBackend): 163 def __init__(self, **kw) -> None: 164 if "name" in kw: 165 del kw["name"] 166 super().__init__("eof", **kw) 167 168 169class ToplevelCommand(CommandToBackend): 170 def __init__(self, name: str, argv: List[str] = [], **kw) -> None: 171 super().__init__(name, **kw) 172 self.argv = argv 173 174 175class DebuggerCommand(CommandToBackend): 176 pass 177 178 179class InlineCommand(CommandToBackend): 180 """ 181 Can be used both during debugging and in waiting_toplevel_command state 182 (eg. for sending variable and heap info requests) 183 """ 184 185 pass 186 187 188class MessageFromBackend(Record): 189 def __init__(self, **kw) -> None: 190 self.event_type = type(self).__name__ # allow event_type to be overridden by kw 191 super().__init__(**kw) 192 if not hasattr(self, "sequence"): 193 self.sequence = self.event_type 194 195 196class ToplevelResponse(MessageFromBackend): 197 pass 198 199 200class DebuggerResponse(MessageFromBackend): 201 pass 202 203 204class BackendEvent(MessageFromBackend): 205 def __init__(self, event_type: str, **kw) -> None: 206 super().__init__(**kw) 207 self.event_type = event_type 208 209 210class InlineResponse(MessageFromBackend): 211 def __init__(self, command_name: str, **kw) -> None: 212 super().__init__(**kw) 213 self.command_name = command_name 214 self.event_type = self.command_name + "_response" 215 216 217def serialize_message(msg: Record) -> str: 218 # I want to transfer only ASCII chars because encodings are not reliable 219 # (eg. can't find a way to specify PYTHONIOENCODING for cx_freeze'd program) 220 return MESSAGE_MARKER + ascii(msg) 221 222 223def parse_message(msg_string: str) -> Record: 224 # DataFrames may have nan 225 # pylint: disable=unused-variable 226 nan = float("nan") # @UnusedVariable 227 assert msg_string[0] == MESSAGE_MARKER 228 return eval(msg_string[1:]) 229 230 231def normpath_with_actual_case(name: str) -> str: 232 """In Windows return the path with the case it is stored in the filesystem""" 233 if not os.path.exists(name): 234 return os.path.normpath(name) 235 236 assert os.path.isabs(name) or os.path.ismount(name), "Not abs nor mount: " + name 237 assert os.path.exists(name), "Not exists: " + name 238 239 if os.name == "nt": 240 # https://stackoverflow.com/questions/2113822/python-getting-filename-case-as-stored-in-windows/2114975 241 name = os.path.normpath(name) 242 243 from ctypes import create_unicode_buffer, windll 244 245 buf = create_unicode_buffer(512) 246 # GetLongPathNameW alone doesn't fix filename part 247 windll.kernel32.GetShortPathNameW(name, buf, 512) # @UndefinedVariable 248 windll.kernel32.GetLongPathNameW(buf.value, buf, 512) # @UndefinedVariable 249 result = buf.value 250 251 if result.casefold() != name.casefold(): 252 # Sometimes GetShortPathNameW + GetLongPathNameW doesn't work 253 # see eg. https://github.com/thonny/thonny/issues/925 254 windll.kernel32.GetLongPathNameW(name, buf, 512) # @UndefinedVariable 255 result = buf.value 256 257 if result.casefold() != name.casefold(): 258 result = name 259 260 if result[1] == ":": 261 # ensure drive letter is capital 262 return result[0].upper() + result[1:] 263 else: 264 return result 265 else: 266 # easy on Linux 267 # too difficult on mac 268 # https://stackoverflow.com/questions/14515073/in-python-on-osx-with-hfs-how-can-i-get-the-correct-case-of-an-existing-filenam 269 # Hopefully only correct case comes into Thonny (eg. via open dialog) 270 return os.path.normpath(name) 271 272 273def is_same_path(name1: str, name2: str) -> bool: 274 return os.path.normpath(os.path.normcase(name1)) == os.path.normpath(os.path.normcase(name2)) 275 276 277def path_startswith(child_name: str, dir_name: str) -> bool: 278 normchild = os.path.normpath(os.path.normcase(child_name)) 279 normdir = os.path.normpath(os.path.normcase(dir_name)) 280 return normdir == normchild or normchild.startswith(normdir.rstrip(os.path.sep) + os.path.sep) 281 282 283def read_source(filename): 284 import tokenize 285 286 with tokenize.open(filename) as fp: 287 return fp.read() 288 289 290def get_exe_dirs(): 291 result = [] 292 if site.ENABLE_USER_SITE: 293 if platform.system() == "Windows": 294 if site.getusersitepackages(): 295 result.append(site.getusersitepackages().replace("site-packages", "Scripts")) 296 else: 297 if site.getuserbase(): 298 result.append(site.getuserbase() + "/bin") 299 300 main_scripts = os.path.join(sys.prefix, "Scripts") 301 if os.path.isdir(main_scripts) and main_scripts not in result: 302 result.append(main_scripts) 303 304 if os.path.dirname(sys.executable) not in result: 305 result.append(os.path.dirname(sys.executable)) 306 307 # These entries are used by Anaconda 308 for part in [ 309 "Library/mingw-w64/bin", 310 "Library/usr/bin", 311 "Library/bin", 312 "Scripts", 313 "bin", 314 "condabin", 315 ]: 316 dirpath = os.path.join(sys.prefix, part.replace("/", os.sep)) 317 if os.path.isdir(dirpath) and dirpath not in result: 318 result.append(dirpath) 319 320 if platform.system() != "Windows" and "/usr/local/bin" not in result: 321 # May be missing on macOS, when started as bundle 322 # (yes, more may be missing, but this one is most useful) 323 result.append("/usr/local/bin") 324 325 return result 326 327 328def get_site_dir(symbolic_name, executable=None): 329 if not executable or executable == sys.executable: 330 result = getattr(site, symbolic_name, "") 331 else: 332 result = ( 333 subprocess.check_output( 334 [executable, "-m", "site", "--" + symbolic_name.lower().replace("_", "-")], 335 universal_newlines=True, 336 ) 337 .decode() 338 .strip() 339 ) 340 341 return result if result else None 342 343 344def get_base_executable(): 345 if sys.exec_prefix == sys.base_exec_prefix: 346 return sys.executable 347 348 if platform.system() == "Windows": 349 result = sys.base_exec_prefix + "\\" + os.path.basename(sys.executable) 350 result = normpath_with_actual_case(result) 351 else: 352 result = sys.executable.replace(sys.exec_prefix, sys.base_exec_prefix) 353 354 if not os.path.isfile(result): 355 raise RuntimeError("Can't locate base executable") 356 357 return result 358 359 360def get_augmented_system_path(extra_dirs): 361 path_items = os.environ.get("PATH", "").split(os.pathsep) 362 363 for d in reversed(extra_dirs): 364 if d not in path_items: 365 path_items.insert(0, d) 366 367 return os.pathsep.join(path_items) 368 369 370def update_system_path(env, value): 371 # in Windows, env keys are not case sensitive 372 # this is important if env is a dict (not os.environ) 373 if platform.system() == "Windows": 374 found = False 375 for key in env: 376 if key.upper() == "PATH": 377 found = True 378 env[key] = value 379 380 if not found: 381 env["PATH"] = value 382 else: 383 env["PATH"] = value 384 385 386class UserError(RuntimeError): 387 """Errors of this class are meant to be presented without stacktrace""" 388 389 pass 390 391 392def is_hidden_or_system_file(path: str) -> bool: 393 if os.path.basename(path).startswith("."): 394 return True 395 elif platform.system() == "Windows": 396 from ctypes import windll 397 398 FILE_ATTRIBUTE_HIDDEN = 0x2 399 FILE_ATTRIBUTE_SYSTEM = 0x4 400 return bool( 401 windll.kernel32.GetFileAttributesW(path) # @UndefinedVariable 402 & (FILE_ATTRIBUTE_HIDDEN | FILE_ATTRIBUTE_SYSTEM) 403 ) 404 else: 405 return False 406 407 408def get_dirs_children_info( 409 paths: List[str], include_hidden: bool = False 410) -> Dict[str, Optional[Dict[str, Dict]]]: 411 return {path: get_single_dir_child_data(path, include_hidden) for path in paths} 412 413 414def get_single_dir_child_data(path: str, include_hidden: bool = False) -> Optional[Dict[str, Dict]]: 415 if path == "": 416 if platform.system() == "Windows": 417 return {**get_windows_volumes_info(), **get_windows_network_locations()} 418 else: 419 return get_single_dir_child_data("/", include_hidden) 420 421 elif os.path.isdir(path) or os.path.ismount(path): 422 result = {} 423 424 try: 425 for child in os.listdir(path): 426 full_child_path = os.path.join(path, child) 427 if not os.path.exists(full_child_path): 428 # must be broken link 429 continue 430 full_child_path = normpath_with_actual_case(full_child_path) 431 hidden = is_hidden_or_system_file(full_child_path) 432 if not hidden or include_hidden: 433 name = os.path.basename(full_child_path) 434 st = os.stat(full_child_path, dir_fd=None, follow_symlinks=True) 435 result[name] = { 436 "size": None if os.path.isdir(full_child_path) else st.st_size, 437 "modified": st.st_mtime, 438 "hidden": hidden, 439 } 440 except PermissionError: 441 result["<not accessible>"] = { 442 "kind": "error", 443 "size": -1, 444 "modified": None, 445 "hidden": None, 446 } 447 448 return result 449 else: 450 return None 451 452 453def get_windows_volumes_info(): 454 # http://stackoverflow.com/a/2288225/261181 455 # http://msdn.microsoft.com/en-us/library/windows/desktop/aa364939%28v=vs.85%29.aspx 456 import string 457 from ctypes import windll 458 459 all_drive_types = [ 460 "DRIVE_UNKNOWN", 461 "DRIVE_NO_ROOT_DIR", 462 "DRIVE_REMOVABLE", 463 "DRIVE_FIXED", 464 "DRIVE_REMOTE", 465 "DRIVE_CDROM", 466 "DRIVE_RAMDISK", 467 ] 468 469 required_drive_types = ["DRIVE_REMOVABLE", "DRIVE_FIXED", "DRIVE_REMOTE", "DRIVE_RAMDISK"] 470 471 result = {} 472 473 bitmask = windll.kernel32.GetLogicalDrives() # @UndefinedVariable 474 for letter in string.ascii_uppercase: 475 if not bitmask & 1: 476 pass 477 else: 478 drive_type = all_drive_types[ 479 windll.kernel32.GetDriveTypeW("%s:\\" % letter) 480 ] # @UndefinedVariable 481 482 # NB! Drive A can be present in bitmask but actually missing. 483 # In this case querying information about it would freeze the UI 484 # for several seconds. 485 # One solution is to uninstall the device in device manager, 486 # but OS may restore the drive later. 487 # Therefore it is safest to skip A drive (user can access it via Open dialog) 488 489 if drive_type in required_drive_types and ( 490 letter != "A" or drive_type != "DRIVE_REMOVABLE" 491 ): 492 drive = letter + ":" 493 path = drive + "\\" 494 495 try: 496 st = os.stat(path) 497 volume_name = get_windows_volume_name(path) 498 if not volume_name: 499 volume_name = "Disk" 500 501 label = volume_name + " (" + drive + ")" 502 result[path] = { 503 "label": label, 504 "size": None, 505 "modified": max(st.st_mtime, st.st_ctime), 506 } 507 except OSError as e: 508 logger.warning("Could not get information for %s", path, exc_info=e) 509 510 bitmask >>= 1 511 512 return result 513 514 515def get_windows_volume_name(path): 516 # https://stackoverflow.com/a/12056414/261181 517 import ctypes 518 519 kernel32 = ctypes.windll.kernel32 520 volume_name_buffer = ctypes.create_unicode_buffer(1024) 521 file_system_name_buffer = ctypes.create_unicode_buffer(1024) 522 serial_number = None 523 max_component_length = None 524 file_system_flags = None 525 526 result = kernel32.GetVolumeInformationW( 527 ctypes.c_wchar_p(path), 528 volume_name_buffer, 529 ctypes.sizeof(volume_name_buffer), 530 serial_number, 531 max_component_length, 532 file_system_flags, 533 file_system_name_buffer, 534 ctypes.sizeof(file_system_name_buffer), 535 ) 536 537 if result: 538 return volume_name_buffer.value 539 else: 540 return None 541 542 543def get_windows_network_locations(): 544 import ctypes.wintypes 545 546 CSIDL_NETHOOD = 0x13 547 SHGFP_TYPE_CURRENT = 0 548 buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) 549 ctypes.windll.shell32.SHGetFolderPathW(0, CSIDL_NETHOOD, 0, SHGFP_TYPE_CURRENT, buf) 550 shortcuts_dir = buf.value 551 552 result = {} 553 for entry in os.scandir(shortcuts_dir): 554 # full_path = normpath_with_actual_case(entry.path) 555 lnk_path = os.path.join(entry.path, "target.lnk") 556 if os.path.exists(lnk_path): 557 try: 558 target = get_windows_lnk_target(lnk_path) 559 result[target] = { 560 "label": entry.name + " (" + target + ")", 561 "size": None, 562 "modified": None, 563 } 564 except Exception: 565 logger.error("Can't get target from %s", lnk_path, exc_info=True) 566 567 return result 568 569 570def get_windows_lnk_target(lnk_file_path): 571 import thonny 572 573 script_path = os.path.join(os.path.dirname(thonny.__file__), "res", "PrintLnkTarget.vbs") 574 cmd = ["cscript", "/NoLogo", script_path, lnk_file_path] 575 result = subprocess.check_output(cmd, universal_newlines=True, timeout=3) 576 577 return result.strip() 578 579 580def execute_system_command(cmd, cwd=None, disconnect_stdin=False): 581 logger.debug("execute_system_command, cmd=%r, cwd=%s", cmd, cwd) 582 env = dict(os.environ).copy() 583 encoding = "utf-8" 584 env["PYTHONIOENCODING"] = encoding 585 # Make sure this python interpreter and its scripts are available 586 # in PATH 587 update_system_path(env, get_augmented_system_path(get_exe_dirs())) 588 popen_kw = dict( 589 env=env, 590 universal_newlines=True, 591 bufsize=0, 592 ) 593 594 if cwd and os.path.isdir(cwd): 595 popen_kw["cwd"] = cwd 596 597 if disconnect_stdin: 598 popen_kw["stdin"] = subprocess.DEVNULL 599 600 if sys.version_info >= (3, 6): 601 popen_kw["errors"] = "replace" 602 popen_kw["encoding"] = encoding 603 604 if isinstance(cmd.cmd_line, str) and cmd.cmd_line.startswith("!"): 605 cmd_line = cmd.cmd_line[1:] 606 popen_kw["shell"] = True 607 else: 608 assert isinstance(cmd.cmd_line, list) 609 cmd_line = cmd.cmd_line 610 logger.debug("Popen(%r, ...)", cmd_line) 611 proc = subprocess.Popen(cmd_line, **popen_kw) 612 proc.communicate() 613 return proc.wait() 614 615 616def universal_dirname(path: str) -> str: 617 if "/" in path: 618 sep = "/" 619 elif "\\" in path: 620 sep = "\\" 621 else: 622 # micro:bit 623 return "" 624 625 path = path.rstrip(sep) 626 result = path[: path.rindex(sep)] 627 if not result: 628 return sep 629 else: 630 return result 631 632 633def universal_relpath(path: str, context: str) -> str: 634 """Tries to give relative path""" 635 if "/" in path: 636 import pathlib 637 638 p = pathlib.PurePosixPath(path) 639 try: 640 return str(p.relative_to(context)) 641 except ValueError: 642 return path 643 else: 644 return os.path.relpath(path, context) 645 646 647def get_python_version_string(version_info: Optional[Tuple] = None, maxsize=None): 648 result = ".".join(map(str, sys.version_info[:3])) 649 if sys.version_info[3] != "final": 650 result += "-" + sys.version_info[3] 651 652 if maxsize is not None: 653 result += " (" + ("64" if sys.maxsize > 2 ** 32 else "32") + " bit)" 654 655 return result 656 657 658def try_load_modules_with_frontend_sys_path(module_names): 659 try: 660 frontend_sys_path = ast.literal_eval(os.environ["THONNY_FRONTEND_SYS_PATH"]) 661 assert isinstance(frontend_sys_path, list) 662 except Exception as e: 663 logger.warning("Could not get THONNY_FRONTEND_SYS_PATH", exc_info=e) 664 return 665 666 from importlib import import_module 667 668 old_sys_path = sys.path.copy() 669 sys.path = sys.path + frontend_sys_path 670 try: 671 for name in module_names: 672 try: 673 import_module(name) 674 except ImportError: 675 pass 676 finally: 677 sys.path = old_sys_path 678 679 680class ConnectionFailedException(Exception): 681 pass 682 683 684class ConnectionClosedException(Exception): 685 pass 686