1# -*- coding: utf-8 -*- 2 3import os.path 4import platform 5import queue 6import shlex 7import subprocess 8import sys 9import threading 10import time 11from typing import Optional, Sequence, Tuple 12 13PASSWORD_METHOD = "password" 14PUBLIC_KEY_NO_PASS_METHOD = "public-key (without passphrase)" 15PUBLIC_KEY_WITH_PASS_METHOD = "public-key (with passphrase)" 16 17 18def delete_dir_try_hard(path: str, hardness: int = 5) -> None: 19 # Deleting the folder on Windows is not so easy task 20 # http://bugs.python.org/issue15496 21 import shutil 22 23 for i in range(hardness): 24 if os.path.exists(path): 25 time.sleep(i * 0.5) 26 shutil.rmtree(path, True) 27 else: 28 break 29 30 if os.path.exists(path): 31 # try once more but now without ignoring errors 32 shutil.rmtree(path, False) 33 34 35def running_on_windows() -> bool: 36 return platform.system() == "Windows" 37 38 39def running_on_mac_os() -> bool: 40 return platform.system() == "Darwin" 41 42 43def running_on_linux() -> bool: 44 return platform.system() == "Linux" 45 46 47def running_on_rpi() -> bool: 48 return running_on_linux() and ( 49 platform.uname().machine.lower().startswith("arm") 50 or os.environ.get("DESKTOP_SESSION") == "LXDE-pi" 51 ) 52 53 54def list_volumes(skip_letters=set()) -> Sequence[str]: 55 "Adapted from https://github.com/ntoll/uflash/blob/master/uflash.py" 56 if sys.platform == "win32": 57 import ctypes 58 59 # 60 # In certain circumstances, volumes are allocated to USB 61 # storage devices which cause a Windows popup to raise if their 62 # volume contains no media. Wrapping the check in SetErrorMode 63 # with SEM_FAILCRITICALERRORS (1) prevents this popup. 64 # 65 old_mode = ctypes.windll.kernel32.SetErrorMode(1) # @UndefinedVariable 66 try: 67 volumes = [] 68 for disk in "ABCDEFGHIJKLMNOPQRSTUVWXYZ": 69 if disk in skip_letters: 70 continue 71 path = "{}:\\".format(disk) 72 if os.path.exists(path): 73 volumes.append(path) 74 75 return volumes 76 finally: 77 ctypes.windll.kernel32.SetErrorMode(old_mode) # @UndefinedVariable 78 else: 79 # 'posix' means we're on Linux or OSX (Mac). 80 # Call the unix "mount" command to list the mounted volumes. 81 mount_output = subprocess.check_output("mount").splitlines() 82 return [x.split()[2].decode("utf-8") for x in mount_output] 83 84 85def get_win_volume_name(path: str) -> str: 86 """ 87 Each disk or external device connected to windows has an attribute 88 called "volume name". This function returns the volume name for 89 the given disk/device. 90 Code from http://stackoverflow.com/a/12056414 91 """ 92 if sys.platform == "win32": 93 import ctypes 94 95 vol_name_buf = ctypes.create_unicode_buffer(1024) 96 ctypes.windll.kernel32.GetVolumeInformationW( # @UndefinedVariable 97 ctypes.c_wchar_p(path), 98 vol_name_buf, 99 ctypes.sizeof(vol_name_buf), 100 None, 101 None, 102 None, 103 None, 104 0, 105 ) 106 assert isinstance(vol_name_buf.value, str) 107 return vol_name_buf.value 108 else: 109 raise RuntimeError("Only meant for Windows") 110 111 112def find_volumes_by_name(volume_name: str, skip_letters={"A"}) -> Sequence[str]: 113 volumes = list_volumes(skip_letters=skip_letters) 114 if os.name == "nt": 115 return [ 116 volume 117 for volume in volumes 118 if get_win_volume_name(volume).upper() == volume_name.upper() 119 ] 120 else: 121 return [volume for volume in volumes if volume.endswith(volume_name)] 122 123 124def find_volume_by_name( 125 volume_name: str, 126 not_found_msg: Optional[str] = None, 127 found_several_msg: Optional[str] = None, 128 parent=None, 129) -> Optional[str]: 130 from thonny.languages import tr 131 132 # Can't translate in the header as _ may not be available at import time 133 if not_found_msg is None: 134 not_found_msg = tr("Could not find disk '%s'. Do you want to locate it yourself?") 135 136 if found_several_msg is None: 137 found_several_msg = tr("Found several '%s' disks. Do you want to choose one yourself?") 138 139 volumes = find_volumes_by_name(volume_name) 140 if len(volumes) == 1: 141 return volumes[0] 142 else: 143 if len(volumes) == 0: 144 msg = not_found_msg % volume_name 145 else: 146 msg = found_several_msg % volume_name 147 148 import tkinter as tk 149 from tkinter.messagebox import askyesno 150 151 from thonny.ui_utils import askdirectory 152 153 if askyesno(tr("Can't find suitable disk"), msg, master=parent): 154 path = askdirectory(parent=parent) 155 if path: 156 return path 157 158 return None 159 160 161def shorten_repr(original_repr: str, max_len: int = 1000) -> str: 162 if len(original_repr) > max_len: 163 return original_repr[: max_len - 1] + "…" 164 else: 165 return original_repr 166 167 168def _win_get_used_memory(): 169 # http://code.activestate.com/recipes/578513-get-memory-usage-of-windows-processes-using-getpro/ 170 import ctypes 171 from ctypes import wintypes 172 173 GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess 174 GetCurrentProcess.argtypes = [] 175 GetCurrentProcess.restype = wintypes.HANDLE 176 177 SIZE_T = ctypes.c_size_t 178 179 class PROCESS_MEMORY_COUNTERS_EX(ctypes.Structure): 180 _fields_ = [ 181 ("cb", wintypes.DWORD), 182 ("PageFaultCount", wintypes.DWORD), 183 ("PeakWorkingSetSize", SIZE_T), 184 ("WorkingSetSize", SIZE_T), 185 ("QuotaPeakPagedPoolUsage", SIZE_T), 186 ("QuotaPagedPoolUsage", SIZE_T), 187 ("QuotaPeakNonPagedPoolUsage", SIZE_T), 188 ("QuotaNonPagedPoolUsage", SIZE_T), 189 ("PagefileUsage", SIZE_T), 190 ("PeakPagefileUsage", SIZE_T), 191 ("PrivateUsage", SIZE_T), 192 ] 193 194 GetProcessMemoryInfo = ctypes.windll.psapi.GetProcessMemoryInfo 195 GetProcessMemoryInfo.argtypes = [ 196 wintypes.HANDLE, 197 ctypes.POINTER(PROCESS_MEMORY_COUNTERS_EX), 198 wintypes.DWORD, 199 ] 200 GetProcessMemoryInfo.restype = wintypes.BOOL 201 202 def get_current_process(): 203 """Return handle to current process.""" 204 return GetCurrentProcess() 205 206 def get_memory_info(process=None): 207 """Return Win32 process memory counters structure as a dict.""" 208 if process is None: 209 process = get_current_process() 210 counters = PROCESS_MEMORY_COUNTERS_EX() 211 ret = GetProcessMemoryInfo(process, ctypes.byref(counters), ctypes.sizeof(counters)) 212 if not ret: 213 raise ctypes.WinError() 214 info = dict((name, getattr(counters, name)) for name, _ in counters._fields_) 215 return info 216 217 return get_memory_info()["PrivateUsage"] 218 219 220def _unix_get_used_memory(): 221 # http://fa.bianp.net/blog/2013/different-ways-to-get-memory-consumption-or-lessons-learned-from-memory_profiler/ 222 "TODO:" 223 224 225def construct_cmd_line(parts, safe_tokens=[]) -> str: 226 def quote(s): 227 if s in safe_tokens: 228 return s 229 else: 230 return shlex.quote(s) 231 232 return " ".join(map(quote, parts)) 233 234 235def user_friendly_python_command_line(cmd): 236 if "-m" in cmd: 237 cmd = cmd[cmd.index("-m") + 1 :] 238 239 lines = [""] 240 for item in cmd: 241 if lines[-1] and len(lines[-1] + " " + item) > 60: 242 lines.append("") 243 lines[-1] = (lines[-1] + " " + item).strip() 244 245 return "\n".join(lines) 246 247 return subprocess.list2cmdline(cmd) 248 249 250def parse_cmd_line(s): 251 return shlex.split(s, posix=True) 252 253 254def levenshtein_distance(s1, s2): 255 # https://en.wikibooks.org/wiki/Algorithm_Implementation/Strings/Levenshtein_distance#Python 256 if len(s1) < len(s2): 257 return levenshtein_distance(s2, s1) # pylint: disable=arguments-out-of-order 258 259 # len(s1) >= len(s2) 260 if len(s2) == 0: 261 return len(s1) 262 263 previous_row = range(len(s2) + 1) 264 for i, c1 in enumerate(s1): 265 current_row = [i + 1] 266 for j, c2 in enumerate(s2): 267 insertions = ( 268 previous_row[j + 1] + 1 269 ) # j+1 instead of j since previous_row and current_row are one character longer 270 deletions = current_row[j] + 1 # than s2 271 substitutions = previous_row[j] + (c1 != c2) 272 current_row.append(min(insertions, deletions, substitutions)) 273 previous_row = current_row 274 275 return previous_row[-1] 276 277 278def levenshtein_damerau_distance(s1, s2, maxDistance): 279 # https://gist.github.com/giststhebearbear/4145811 280 # get smallest string so our rows are minimized 281 s1, s2 = (s1, s2) if len(s1) <= len(s2) else (s2, s1) 282 # set lengths 283 l1, l2 = len(s1), len(s2) 284 285 # We are simulatng an NM matrix where n is the longer string 286 # and m is the shorter string. By doing this we can minimize 287 # memory usage to O(M). 288 # Since we are simulating the matrix we only maintain two rows 289 # at a time the current row and the previous rows. 290 # A move from the current cell looking at the cell before it indicates 291 # consideration of an insert operation. 292 # A move from the current cell looking at the cell above it indicates 293 # consideration of a deletion 294 # Both operations are cost 1 295 # A move from the current cell to the cell up and to the left indicates 296 # an edit operation of 0 cost for a matching character and a 1 cost for 297 # a non matching characters 298 # no row has been previously computed yet, set empty row 299 # Since this is also a Damerou-Levenshtein calculation transposition 300 # costs will be taken into account. These look back 2 characters to 301 # determine optimal cost based on a possible transposition 302 # example: aei -> aie with levensthein has a cost of 2 303 # match a, change e->i change i->e => aie 304 # Damarau-Levenshtein has a cost of 1 305 # match a, transpose ei to ie => aie 306 transpositionRow = [] 307 prevRow = [] 308 309 # build first leven matrix row 310 # The first row represents transformation from an empty string 311 # to the shorter string making it static [0-n] 312 # since this row is static we can set it as 313 # curRow and start computation at the second row or index 1 314 curRow = list(range(0, l1 + 1)) 315 316 # use second length to loop through all the rows being built 317 # we start at row one 318 for rowNum in range(1, l2 + 1): 319 # set transposition, previous, and current 320 # because the rowNum always increments by one 321 # we can use rowNum to set the value representing 322 # the first column which is indicitive of transforming TO 323 # the empty string from our longer string 324 # transposition row maintains an extra row so that it is possible 325 # for us to apply Damarou's formula 326 transpositionRow, prevRow, curRow = prevRow, curRow, [rowNum] + [0] * l1 327 328 # consider if we have passed the max distance if all paths through 329 # the transposition row are larger than the max we can stop calculating 330 # distance and return the last element in that row and return the max 331 if transpositionRow: 332 if not any(cellValue < maxDistance for cellValue in transpositionRow): 333 return maxDistance 334 335 for colNum in range(1, l1 + 1): 336 insertionCost = curRow[colNum - 1] + 1 337 deletionCost = prevRow[colNum] + 1 338 changeCost = prevRow[colNum - 1] + (0 if s1[colNum - 1] == s2[rowNum - 1] else 1) 339 # set the cell value - min distance to reach this 340 # position 341 curRow[colNum] = min(insertionCost, deletionCost, changeCost) 342 343 # test for a possible transposition optimization 344 # check to see if we have at least 2 characters 345 if 1 < rowNum <= colNum: 346 # test for possible transposition 347 if s1[colNum - 1] == s2[colNum - 2] and s2[colNum - 1] == s1[colNum - 2]: 348 curRow[colNum] = min(curRow[colNum], transpositionRow[colNum - 2] + 1) 349 350 # the last cell of the matrix is ALWAYS the shortest distance between the two strings 351 return curRow[-1] 352 353 354def get_file_creation_date(path_to_file): 355 """ 356 Try to get the date that a file was created, falling back to when it was 357 last modified if that isn't possible. 358 See http://stackoverflow.com/a/39501288/1709587 for explanation. 359 """ 360 if platform.system() == "Windows": 361 return os.path.getctime(path_to_file) 362 else: 363 stat = os.stat(path_to_file) 364 try: 365 return stat.st_birthtime 366 except AttributeError: 367 # We're probably on Linux. No easy way to get creation dates here, 368 # so we'll settle for when its content was last modified. 369 return stat.st_mtime 370 371 372_timer_time = 0 373 374 375def start_time(text=""): 376 global _timer_time 377 print("RESET", text) 378 _timer_time = time.time() 379 380 381def lap_time(text=""): 382 global _timer_time 383 new_time = time.time() 384 print("LAP", text, round(new_time - _timer_time, 4)) 385 _timer_time = time.time() 386 387 388class TimeHelper: 389 def __init__(self, time_allowed): 390 self.start_time = time.time() 391 self.time_allowed = time_allowed 392 393 @property 394 def time_spent(self): 395 return time.time() - self.start_time 396 397 @property 398 def time_left(self): 399 return max(self.time_allowed - self.time_spent, 0) 400 401 402def copy_to_clipboard(data): 403 if running_on_windows(): 404 _copy_to_windows_clipboard(data) 405 elif running_on_mac_os(): 406 command = ["pbcopy"] 407 else: 408 command = ["xsel", "-b", "-i"] 409 410 env = dict(os.environ).copy() 411 encoding = "utf-8" 412 env["PYTHONIOENCODING"] = encoding 413 414 if sys.version_info >= (3, 6): 415 extra = {"encoding": encoding} 416 else: 417 extra = {} 418 419 proc = subprocess.Popen( 420 command, 421 stdin=subprocess.PIPE, 422 shell=False, 423 env=env, 424 universal_newlines=True, 425 close_fds=True, 426 **extra 427 ) 428 proc.communicate(input=data, timeout=0.1) 429 430 431def _copy_to_windows_clipboard(data): 432 # https://bugs.python.org/file37366/test_clipboard_win.py 433 import ctypes 434 435 wcscpy = ctypes.cdll.msvcrt.wcscpy 436 OpenClipboard = ctypes.windll.user32.OpenClipboard 437 EmptyClipboard = ctypes.windll.user32.EmptyClipboard 438 SetClipboardData = ctypes.windll.user32.SetClipboardData 439 CloseClipboard = ctypes.windll.user32.CloseClipboard 440 CF_UNICODETEXT = 13 441 GlobalAlloc = ctypes.windll.kernel32.GlobalAlloc 442 GlobalLock = ctypes.windll.kernel32.GlobalLock 443 GlobalUnlock = ctypes.windll.kernel32.GlobalUnlock 444 GMEM_DDESHARE = 0x2000 445 446 OpenClipboard(None) 447 EmptyClipboard() 448 hCd = GlobalAlloc(GMEM_DDESHARE, 2 * (len(data) + 1)) 449 pchData = GlobalLock(hCd) 450 wcscpy(ctypes.c_wchar_p(pchData), data) 451 GlobalUnlock(hCd) 452 SetClipboardData(CF_UNICODETEXT, hCd) 453 # ctypes.windll.user32.SetClipboardText(CF_UNICODETEXT, hCd) 454 CloseClipboard() 455 456 457def sizeof_fmt(num, suffix="B"): 458 """Readable file size 459 :param num: Bytes value 460 :type num: int 461 :param suffix: Unit suffix (optionnal) default = B 462 :type suffix: str 463 :rtype: str 464 """ 465 # https://gist.github.com/cbwar/d2dfbc19b140bd599daccbe0fe925597 466 for unit in ["", "k", "M", "G", "T", "P", "E", "Z"]: 467 if abs(num) < 1024.0: 468 if unit == "": 469 return "%d %s%s" % (num, unit, suffix) 470 return "%.1f %s%s" % (num, unit, suffix) 471 num /= 1024.0 472 return "%.1f%s%s" % (num, "Yi", suffix) 473 474 475def _get_known_folder(ID): 476 # http://stackoverflow.com/a/3859336/261181 477 # http://www.installmate.com/support/im9/using/symbols/functions/csidls.htm 478 import ctypes.wintypes 479 480 SHGFP_TYPE_CURRENT = 0 481 buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) 482 ctypes.windll.shell32.SHGetFolderPathW(0, ID, 0, SHGFP_TYPE_CURRENT, buf) 483 assert buf.value 484 return buf.value 485 486 487def get_roaming_appdata_dir(): 488 return _get_known_folder(26) 489 490 491def get_local_appdata_dir(): 492 return _get_known_folder(28) 493 494 495class PopenWithOutputQueues(subprocess.Popen): 496 def __init__(self, *args, **kw): 497 super().__init__(*args, **kw) 498 self.stdout_queue = queue.Queue() 499 self.stderr_queue = queue.Queue() 500 501 for stream, target_queue in [ 502 (self.stdout, self.stdout_queue), 503 (self.stderr, self.stderr_queue), 504 ]: 505 threading.Thread( 506 target=self._listen_thread, args=[stream, target_queue], daemon=True 507 ).start() 508 509 def _listen_thread(self, stream, target_queue: queue.Queue): 510 while True: 511 data = stream.readline() 512 if data == "": 513 break 514 target_queue.put(data) 515 516 517def inside_flatpak(): 518 import shutil 519 520 return shutil.which("flatpak-spawn") and os.path.isfile("/app/manifest.json") 521 522 523def show_command_not_available_in_flatpak_message(): 524 from tkinter import messagebox 525 from thonny.languages import tr 526 527 messagebox.showinfo( 528 tr("Command not available"), 529 tr("This command is not available if Thonny is run via Flatpak"), 530 ) 531