1# -*- coding: utf-8 -*-
2
3import os.path
4import platform
5import queue
6import shlex
7import subprocess
8import sys
9import threading
10import time
11from typing import Optional, Sequence, Tuple
12
13PASSWORD_METHOD = "password"
14PUBLIC_KEY_NO_PASS_METHOD = "public-key (without passphrase)"
15PUBLIC_KEY_WITH_PASS_METHOD = "public-key (with passphrase)"
16
17
18def delete_dir_try_hard(path: str, hardness: int = 5) -> None:
19    # Deleting the folder on Windows is not so easy task
20    # http://bugs.python.org/issue15496
21    import shutil
22
23    for i in range(hardness):
24        if os.path.exists(path):
25            time.sleep(i * 0.5)
26            shutil.rmtree(path, True)
27        else:
28            break
29
30    if os.path.exists(path):
31        # try once more but now without ignoring errors
32        shutil.rmtree(path, False)
33
34
35def running_on_windows() -> bool:
36    return platform.system() == "Windows"
37
38
39def running_on_mac_os() -> bool:
40    return platform.system() == "Darwin"
41
42
43def running_on_linux() -> bool:
44    return platform.system() == "Linux"
45
46
47def running_on_rpi() -> bool:
48    return running_on_linux() and (
49        platform.uname().machine.lower().startswith("arm")
50        or os.environ.get("DESKTOP_SESSION") == "LXDE-pi"
51    )
52
53
54def list_volumes(skip_letters=set()) -> Sequence[str]:
55    "Adapted from https://github.com/ntoll/uflash/blob/master/uflash.py"
56    if sys.platform == "win32":
57        import ctypes
58
59        #
60        # In certain circumstances, volumes are allocated to USB
61        # storage devices which cause a Windows popup to raise if their
62        # volume contains no media. Wrapping the check in SetErrorMode
63        # with SEM_FAILCRITICALERRORS (1) prevents this popup.
64        #
65        old_mode = ctypes.windll.kernel32.SetErrorMode(1)  # @UndefinedVariable
66        try:
67            volumes = []
68            for disk in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
69                if disk in skip_letters:
70                    continue
71                path = "{}:\\".format(disk)
72                if os.path.exists(path):
73                    volumes.append(path)
74
75            return volumes
76        finally:
77            ctypes.windll.kernel32.SetErrorMode(old_mode)  # @UndefinedVariable
78    else:
79        # 'posix' means we're on Linux or OSX (Mac).
80        # Call the unix "mount" command to list the mounted volumes.
81        mount_output = subprocess.check_output("mount").splitlines()
82        return [x.split()[2].decode("utf-8") for x in mount_output]
83
84
85def get_win_volume_name(path: str) -> str:
86    """
87    Each disk or external device connected to windows has an attribute
88    called "volume name". This function returns the volume name for
89    the given disk/device.
90    Code from http://stackoverflow.com/a/12056414
91    """
92    if sys.platform == "win32":
93        import ctypes
94
95        vol_name_buf = ctypes.create_unicode_buffer(1024)
96        ctypes.windll.kernel32.GetVolumeInformationW(  # @UndefinedVariable
97            ctypes.c_wchar_p(path),
98            vol_name_buf,
99            ctypes.sizeof(vol_name_buf),
100            None,
101            None,
102            None,
103            None,
104            0,
105        )
106        assert isinstance(vol_name_buf.value, str)
107        return vol_name_buf.value
108    else:
109        raise RuntimeError("Only meant for Windows")
110
111
112def find_volumes_by_name(volume_name: str, skip_letters={"A"}) -> Sequence[str]:
113    volumes = list_volumes(skip_letters=skip_letters)
114    if os.name == "nt":
115        return [
116            volume
117            for volume in volumes
118            if get_win_volume_name(volume).upper() == volume_name.upper()
119        ]
120    else:
121        return [volume for volume in volumes if volume.endswith(volume_name)]
122
123
124def find_volume_by_name(
125    volume_name: str,
126    not_found_msg: Optional[str] = None,
127    found_several_msg: Optional[str] = None,
128    parent=None,
129) -> Optional[str]:
130    from thonny.languages import tr
131
132    # Can't translate in the header as _ may not be available at import time
133    if not_found_msg is None:
134        not_found_msg = tr("Could not find disk '%s'. Do you want to locate it yourself?")
135
136    if found_several_msg is None:
137        found_several_msg = tr("Found several '%s' disks. Do you want to choose one yourself?")
138
139    volumes = find_volumes_by_name(volume_name)
140    if len(volumes) == 1:
141        return volumes[0]
142    else:
143        if len(volumes) == 0:
144            msg = not_found_msg % volume_name
145        else:
146            msg = found_several_msg % volume_name
147
148        import tkinter as tk
149        from tkinter.messagebox import askyesno
150
151        from thonny.ui_utils import askdirectory
152
153        if askyesno(tr("Can't find suitable disk"), msg, master=parent):
154            path = askdirectory(parent=parent)
155            if path:
156                return path
157
158    return None
159
160
161def shorten_repr(original_repr: str, max_len: int = 1000) -> str:
162    if len(original_repr) > max_len:
163        return original_repr[: max_len - 1] + "…"
164    else:
165        return original_repr
166
167
168def _win_get_used_memory():
169    # http://code.activestate.com/recipes/578513-get-memory-usage-of-windows-processes-using-getpro/
170    import ctypes
171    from ctypes import wintypes
172
173    GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess
174    GetCurrentProcess.argtypes = []
175    GetCurrentProcess.restype = wintypes.HANDLE
176
177    SIZE_T = ctypes.c_size_t
178
179    class PROCESS_MEMORY_COUNTERS_EX(ctypes.Structure):
180        _fields_ = [
181            ("cb", wintypes.DWORD),
182            ("PageFaultCount", wintypes.DWORD),
183            ("PeakWorkingSetSize", SIZE_T),
184            ("WorkingSetSize", SIZE_T),
185            ("QuotaPeakPagedPoolUsage", SIZE_T),
186            ("QuotaPagedPoolUsage", SIZE_T),
187            ("QuotaPeakNonPagedPoolUsage", SIZE_T),
188            ("QuotaNonPagedPoolUsage", SIZE_T),
189            ("PagefileUsage", SIZE_T),
190            ("PeakPagefileUsage", SIZE_T),
191            ("PrivateUsage", SIZE_T),
192        ]
193
194    GetProcessMemoryInfo = ctypes.windll.psapi.GetProcessMemoryInfo
195    GetProcessMemoryInfo.argtypes = [
196        wintypes.HANDLE,
197        ctypes.POINTER(PROCESS_MEMORY_COUNTERS_EX),
198        wintypes.DWORD,
199    ]
200    GetProcessMemoryInfo.restype = wintypes.BOOL
201
202    def get_current_process():
203        """Return handle to current process."""
204        return GetCurrentProcess()
205
206    def get_memory_info(process=None):
207        """Return Win32 process memory counters structure as a dict."""
208        if process is None:
209            process = get_current_process()
210        counters = PROCESS_MEMORY_COUNTERS_EX()
211        ret = GetProcessMemoryInfo(process, ctypes.byref(counters), ctypes.sizeof(counters))
212        if not ret:
213            raise ctypes.WinError()
214        info = dict((name, getattr(counters, name)) for name, _ in counters._fields_)
215        return info
216
217    return get_memory_info()["PrivateUsage"]
218
219
220def _unix_get_used_memory():
221    # http://fa.bianp.net/blog/2013/different-ways-to-get-memory-consumption-or-lessons-learned-from-memory_profiler/
222    "TODO:"
223
224
225def construct_cmd_line(parts, safe_tokens=[]) -> str:
226    def quote(s):
227        if s in safe_tokens:
228            return s
229        else:
230            return shlex.quote(s)
231
232    return " ".join(map(quote, parts))
233
234
235def user_friendly_python_command_line(cmd):
236    if "-m" in cmd:
237        cmd = cmd[cmd.index("-m") + 1 :]
238
239    lines = [""]
240    for item in cmd:
241        if lines[-1] and len(lines[-1] + " " + item) > 60:
242            lines.append("")
243        lines[-1] = (lines[-1] + " " + item).strip()
244
245    return "\n".join(lines)
246
247    return subprocess.list2cmdline(cmd)
248
249
250def parse_cmd_line(s):
251    return shlex.split(s, posix=True)
252
253
254def levenshtein_distance(s1, s2):
255    # https://en.wikibooks.org/wiki/Algorithm_Implementation/Strings/Levenshtein_distance#Python
256    if len(s1) < len(s2):
257        return levenshtein_distance(s2, s1)  # pylint: disable=arguments-out-of-order
258
259    # len(s1) >= len(s2)
260    if len(s2) == 0:
261        return len(s1)
262
263    previous_row = range(len(s2) + 1)
264    for i, c1 in enumerate(s1):
265        current_row = [i + 1]
266        for j, c2 in enumerate(s2):
267            insertions = (
268                previous_row[j + 1] + 1
269            )  # j+1 instead of j since previous_row and current_row are one character longer
270            deletions = current_row[j] + 1  # than s2
271            substitutions = previous_row[j] + (c1 != c2)
272            current_row.append(min(insertions, deletions, substitutions))
273        previous_row = current_row
274
275    return previous_row[-1]
276
277
278def levenshtein_damerau_distance(s1, s2, maxDistance):
279    # https://gist.github.com/giststhebearbear/4145811
280    #  get smallest string so our rows are minimized
281    s1, s2 = (s1, s2) if len(s1) <= len(s2) else (s2, s1)
282    #  set lengths
283    l1, l2 = len(s1), len(s2)
284
285    #  We are simulatng an NM matrix where n is the longer string
286    #  and m is the shorter string. By doing this we can minimize
287    #  memory usage to O(M).
288    #  Since we are simulating the matrix we only maintain two rows
289    #  at a time the current row and the previous rows.
290    #  A move from the current cell looking at the cell before it indicates
291    #  consideration of an insert operation.
292    #  A move from the current cell looking at the cell above it indicates
293    #  consideration of a deletion
294    #  Both operations are cost 1
295    #  A move from the current cell to the cell up and to the left indicates
296    #  an edit operation of 0 cost for a matching character and a 1 cost for
297    #  a non matching characters
298    #  no row has been previously computed yet, set empty row
299    #  Since this is also a Damerou-Levenshtein calculation transposition
300    #  costs will be taken into account. These look back 2 characters to
301    #  determine optimal cost based on a possible transposition
302    #  example: aei -> aie with levensthein has a cost of 2
303    #  match a, change e->i change i->e => aie
304    #  Damarau-Levenshtein has a cost of 1
305    #  match a, transpose ei to ie => aie
306    transpositionRow = []
307    prevRow = []
308
309    #  build first leven matrix row
310    #  The first row represents transformation from an empty string
311    #  to the shorter string making it static [0-n]
312    #  since this row is static we can set it as
313    #  curRow and start computation at the second row or index 1
314    curRow = list(range(0, l1 + 1))
315
316    # use second length to loop through all the rows being built
317    # we start at row one
318    for rowNum in range(1, l2 + 1):
319        #  set transposition, previous, and current
320        #  because the rowNum always increments by one
321        #  we can use rowNum to set the value representing
322        #  the first column which is indicitive of transforming TO
323        #  the empty string from our longer string
324        #  transposition row maintains an extra row so that it is possible
325        #  for us to apply Damarou's formula
326        transpositionRow, prevRow, curRow = prevRow, curRow, [rowNum] + [0] * l1
327
328        #  consider if we have passed the max distance if all paths through
329        #  the transposition row are larger than the max we can stop calculating
330        #  distance and return the last element in that row and return the max
331        if transpositionRow:
332            if not any(cellValue < maxDistance for cellValue in transpositionRow):
333                return maxDistance
334
335        for colNum in range(1, l1 + 1):
336            insertionCost = curRow[colNum - 1] + 1
337            deletionCost = prevRow[colNum] + 1
338            changeCost = prevRow[colNum - 1] + (0 if s1[colNum - 1] == s2[rowNum - 1] else 1)
339            #  set the cell value - min distance to reach this
340            #  position
341            curRow[colNum] = min(insertionCost, deletionCost, changeCost)
342
343            #  test for a possible transposition optimization
344            #  check to see if we have at least 2 characters
345            if 1 < rowNum <= colNum:
346                #  test for possible transposition
347                if s1[colNum - 1] == s2[colNum - 2] and s2[colNum - 1] == s1[colNum - 2]:
348                    curRow[colNum] = min(curRow[colNum], transpositionRow[colNum - 2] + 1)
349
350    #  the last cell of the matrix is ALWAYS the shortest distance between the two strings
351    return curRow[-1]
352
353
354def get_file_creation_date(path_to_file):
355    """
356    Try to get the date that a file was created, falling back to when it was
357    last modified if that isn't possible.
358    See http://stackoverflow.com/a/39501288/1709587 for explanation.
359    """
360    if platform.system() == "Windows":
361        return os.path.getctime(path_to_file)
362    else:
363        stat = os.stat(path_to_file)
364        try:
365            return stat.st_birthtime
366        except AttributeError:
367            # We're probably on Linux. No easy way to get creation dates here,
368            # so we'll settle for when its content was last modified.
369            return stat.st_mtime
370
371
372_timer_time = 0
373
374
375def start_time(text=""):
376    global _timer_time
377    print("RESET", text)
378    _timer_time = time.time()
379
380
381def lap_time(text=""):
382    global _timer_time
383    new_time = time.time()
384    print("LAP", text, round(new_time - _timer_time, 4))
385    _timer_time = time.time()
386
387
388class TimeHelper:
389    def __init__(self, time_allowed):
390        self.start_time = time.time()
391        self.time_allowed = time_allowed
392
393    @property
394    def time_spent(self):
395        return time.time() - self.start_time
396
397    @property
398    def time_left(self):
399        return max(self.time_allowed - self.time_spent, 0)
400
401
402def copy_to_clipboard(data):
403    if running_on_windows():
404        _copy_to_windows_clipboard(data)
405    elif running_on_mac_os():
406        command = ["pbcopy"]
407    else:
408        command = ["xsel", "-b", "-i"]
409
410    env = dict(os.environ).copy()
411    encoding = "utf-8"
412    env["PYTHONIOENCODING"] = encoding
413
414    if sys.version_info >= (3, 6):
415        extra = {"encoding": encoding}
416    else:
417        extra = {}
418
419    proc = subprocess.Popen(
420        command,
421        stdin=subprocess.PIPE,
422        shell=False,
423        env=env,
424        universal_newlines=True,
425        close_fds=True,
426        **extra
427    )
428    proc.communicate(input=data, timeout=0.1)
429
430
431def _copy_to_windows_clipboard(data):
432    # https://bugs.python.org/file37366/test_clipboard_win.py
433    import ctypes
434
435    wcscpy = ctypes.cdll.msvcrt.wcscpy
436    OpenClipboard = ctypes.windll.user32.OpenClipboard
437    EmptyClipboard = ctypes.windll.user32.EmptyClipboard
438    SetClipboardData = ctypes.windll.user32.SetClipboardData
439    CloseClipboard = ctypes.windll.user32.CloseClipboard
440    CF_UNICODETEXT = 13
441    GlobalAlloc = ctypes.windll.kernel32.GlobalAlloc
442    GlobalLock = ctypes.windll.kernel32.GlobalLock
443    GlobalUnlock = ctypes.windll.kernel32.GlobalUnlock
444    GMEM_DDESHARE = 0x2000
445
446    OpenClipboard(None)
447    EmptyClipboard()
448    hCd = GlobalAlloc(GMEM_DDESHARE, 2 * (len(data) + 1))
449    pchData = GlobalLock(hCd)
450    wcscpy(ctypes.c_wchar_p(pchData), data)
451    GlobalUnlock(hCd)
452    SetClipboardData(CF_UNICODETEXT, hCd)
453    # ctypes.windll.user32.SetClipboardText(CF_UNICODETEXT, hCd)
454    CloseClipboard()
455
456
457def sizeof_fmt(num, suffix="B"):
458    """Readable file size
459    :param num: Bytes value
460    :type num: int
461    :param suffix: Unit suffix (optionnal) default = B
462    :type suffix: str
463    :rtype: str
464    """
465    # https://gist.github.com/cbwar/d2dfbc19b140bd599daccbe0fe925597
466    for unit in ["", "k", "M", "G", "T", "P", "E", "Z"]:
467        if abs(num) < 1024.0:
468            if unit == "":
469                return "%d %s%s" % (num, unit, suffix)
470            return "%.1f %s%s" % (num, unit, suffix)
471        num /= 1024.0
472    return "%.1f%s%s" % (num, "Yi", suffix)
473
474
475def _get_known_folder(ID):
476    # http://stackoverflow.com/a/3859336/261181
477    # http://www.installmate.com/support/im9/using/symbols/functions/csidls.htm
478    import ctypes.wintypes
479
480    SHGFP_TYPE_CURRENT = 0
481    buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
482    ctypes.windll.shell32.SHGetFolderPathW(0, ID, 0, SHGFP_TYPE_CURRENT, buf)
483    assert buf.value
484    return buf.value
485
486
487def get_roaming_appdata_dir():
488    return _get_known_folder(26)
489
490
491def get_local_appdata_dir():
492    return _get_known_folder(28)
493
494
495class PopenWithOutputQueues(subprocess.Popen):
496    def __init__(self, *args, **kw):
497        super().__init__(*args, **kw)
498        self.stdout_queue = queue.Queue()
499        self.stderr_queue = queue.Queue()
500
501        for stream, target_queue in [
502            (self.stdout, self.stdout_queue),
503            (self.stderr, self.stderr_queue),
504        ]:
505            threading.Thread(
506                target=self._listen_thread, args=[stream, target_queue], daemon=True
507            ).start()
508
509    def _listen_thread(self, stream, target_queue: queue.Queue):
510        while True:
511            data = stream.readline()
512            if data == "":
513                break
514            target_queue.put(data)
515
516
517def inside_flatpak():
518    import shutil
519
520    return shutil.which("flatpak-spawn") and os.path.isfile("/app/manifest.json")
521
522
523def show_command_not_available_in_flatpak_message():
524    from tkinter import messagebox
525    from thonny.languages import tr
526
527    messagebox.showinfo(
528        tr("Command not available"),
529        tr("This command is not available if Thonny is run via Flatpak"),
530    )
531