1# -*- coding: utf-8 -*-
2"""Prompt formatter for simple version control branches"""
3# pylint:disable=no-member, invalid-name
4
5import os
6import sys
7import queue
8import builtins
9import threading
10import subprocess
11
12import xonsh.tools as xt
13
14
15def _get_git_branch(q):
16    denv = builtins.__xonsh_env__.detype()
17    try:
18        branches = xt.decode_bytes(
19            subprocess.check_output(
20                ["git", "branch"], env=denv, stderr=subprocess.DEVNULL
21            )
22        ).splitlines()
23    except (subprocess.CalledProcessError, OSError, FileNotFoundError):
24        q.put(None)
25    else:
26        for branch in branches:
27            if not branch.startswith("* "):
28                continue
29            elif branch.endswith(")"):
30                branch = branch.split()[-1][:-1]
31            else:
32                branch = branch.split()[-1]
33
34            q.put(branch)
35            break
36        else:
37            q.put(None)
38
39
40def get_git_branch():
41    """Attempts to find the current git branch. If this could not
42    be determined (timeout, not in a git repo, etc.) then this returns None.
43    """
44    branch = None
45    timeout = builtins.__xonsh_env__.get("VC_BRANCH_TIMEOUT")
46    q = queue.Queue()
47
48    t = threading.Thread(target=_get_git_branch, args=(q,))
49    t.start()
50    t.join(timeout=timeout)
51    try:
52        branch = q.get_nowait()
53    except queue.Empty:
54        branch = None
55    return branch
56
57
58def _get_hg_root(q):
59    _curpwd = builtins.__xonsh_env__["PWD"]
60    while True:
61        if not os.path.isdir(_curpwd):
62            return False
63        if any([b.name == ".hg" for b in xt.scandir(_curpwd)]):
64            q.put(_curpwd)
65            break
66        else:
67            _oldpwd = _curpwd
68            _curpwd = os.path.split(_curpwd)[0]
69            if _oldpwd == _curpwd:
70                return False
71
72
73def get_hg_branch(root=None):
74    """Try to get the mercurial branch of the current directory,
75    return None if not in a repo or subprocess.TimeoutExpired if timed out.
76    """
77    env = builtins.__xonsh_env__
78    timeout = env["VC_BRANCH_TIMEOUT"]
79    q = queue.Queue()
80    t = threading.Thread(target=_get_hg_root, args=(q,))
81    t.start()
82    t.join(timeout=timeout)
83    try:
84        root = q.get_nowait()
85    except queue.Empty:
86        return None
87    if env.get("VC_HG_SHOW_BRANCH"):
88        # get branch name
89        branch_path = os.path.sep.join([root, ".hg", "branch"])
90        if os.path.exists(branch_path):
91            with open(branch_path, "r") as branch_file:
92                branch = branch_file.read()
93        else:
94            branch = "default"
95    else:
96        branch = ""
97    # add bookmark, if we can
98    bookmark_path = os.path.sep.join([root, ".hg", "bookmarks.current"])
99    if os.path.exists(bookmark_path):
100        with open(bookmark_path, "r") as bookmark_file:
101            active_bookmark = bookmark_file.read()
102        if env.get("VC_HG_SHOW_BRANCH") is True:
103            branch = "{0}, {1}".format(
104                *(b.strip(os.linesep) for b in (branch, active_bookmark))
105            )
106        else:
107            branch = active_bookmark.strip(os.linesep)
108    else:
109        branch = branch.strip(os.linesep)
110    return branch
111
112
113_FIRST_BRANCH_TIMEOUT = True
114
115
116def _first_branch_timeout_message():
117    global _FIRST_BRANCH_TIMEOUT
118    sbtm = builtins.__xonsh_env__["SUPPRESS_BRANCH_TIMEOUT_MESSAGE"]
119    if not _FIRST_BRANCH_TIMEOUT or sbtm:
120        return
121    _FIRST_BRANCH_TIMEOUT = False
122    print(
123        "xonsh: branch timeout: computing the branch name, color, or both "
124        "timed out while formatting the prompt. You may avoid this by "
125        "increasing the value of $VC_BRANCH_TIMEOUT or by removing branch "
126        "fields, like {curr_branch}, from your $PROMPT. See the FAQ "
127        "for more details. This message will be suppressed for the remainder "
128        "of this session. To suppress this message permanently, set "
129        "$SUPPRESS_BRANCH_TIMEOUT_MESSAGE = True in your xonshrc file.",
130        file=sys.stderr,
131    )
132
133
134def current_branch():
135    """Gets the branch for a current working directory. Returns an empty string
136    if the cwd is not a repository.  This currently only works for git and hg
137    and should be extended in the future.  If a timeout occurred, the string
138    '<branch-timeout>' is returned.
139    """
140    branch = None
141    cmds = builtins.__xonsh_commands_cache__
142    # check for binary only once
143    if cmds.is_empty():
144        has_git = bool(cmds.locate_binary("git", ignore_alias=True))
145        has_hg = bool(cmds.locate_binary("hg", ignore_alias=True))
146    else:
147        has_git = bool(cmds.lazy_locate_binary("git", ignore_alias=True))
148        has_hg = bool(cmds.lazy_locate_binary("hg", ignore_alias=True))
149    if has_git:
150        branch = get_git_branch()
151    if not branch and has_hg:
152        branch = get_hg_branch()
153    if isinstance(branch, subprocess.TimeoutExpired):
154        branch = "<branch-timeout>"
155        _first_branch_timeout_message()
156    return branch or None
157
158
159def _git_dirty_working_directory(q, include_untracked):
160    status = None
161    denv = builtins.__xonsh_env__.detype()
162    try:
163        cmd = ["git", "status", "--porcelain"]
164        if include_untracked:
165            cmd.append("--untracked-files=normal")
166        else:
167            cmd.append("--untracked-files=no")
168        status = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, env=denv)
169    except (subprocess.CalledProcessError, OSError, FileNotFoundError):
170        q.put(None)
171    if status is not None:
172        return q.put(bool(status))
173
174
175def git_dirty_working_directory(include_untracked=False):
176    """Returns whether or not the git directory is dirty. If this could not
177    be determined (timeout, file not found, etc.) then this returns None.
178    """
179    timeout = builtins.__xonsh_env__.get("VC_BRANCH_TIMEOUT")
180    q = queue.Queue()
181    t = threading.Thread(
182        target=_git_dirty_working_directory, args=(q, include_untracked)
183    )
184    t.start()
185    t.join(timeout=timeout)
186    try:
187        return q.get_nowait()
188    except queue.Empty:
189        return None
190
191
192def hg_dirty_working_directory():
193    """Computes whether or not the mercurial working directory is dirty or not.
194    If this cannot be determined, None is returned.
195    """
196    env = builtins.__xonsh_env__
197    cwd = env["PWD"]
198    denv = env.detype()
199    vcbt = env["VC_BRANCH_TIMEOUT"]
200    # Override user configurations settings and aliases
201    denv["HGRCPATH"] = ""
202    try:
203        s = subprocess.check_output(
204            ["hg", "identify", "--id"],
205            stderr=subprocess.PIPE,
206            cwd=cwd,
207            timeout=vcbt,
208            universal_newlines=True,
209            env=denv,
210        )
211        return s.strip(os.linesep).endswith("+")
212    except (
213        subprocess.CalledProcessError,
214        subprocess.TimeoutExpired,
215        FileNotFoundError,
216    ):
217        return None
218
219
220def dirty_working_directory():
221    """Returns a boolean as to whether there are uncommitted files in version
222    control repository we are inside. If this cannot be determined, returns
223    None. Currently supports git and hg.
224    """
225    dwd = None
226    cmds = builtins.__xonsh_commands_cache__
227    if cmds.lazy_locate_binary("git", ignore_alias=True):
228        dwd = git_dirty_working_directory()
229    if cmds.lazy_locate_binary("hg", ignore_alias=True) and dwd is None:
230        dwd = hg_dirty_working_directory()
231    return dwd
232
233
234def branch_color():
235    """Return red if the current branch is dirty, yellow if the dirtiness can
236    not be determined, and green if it clean. These are bold, intense colors
237    for the foreground.
238    """
239    dwd = dirty_working_directory()
240    if dwd is None:
241        color = "{BOLD_INTENSE_YELLOW}"
242    elif dwd:
243        color = "{BOLD_INTENSE_RED}"
244    else:
245        color = "{BOLD_INTENSE_GREEN}"
246    return color
247
248
249def branch_bg_color():
250    """Return red if the current branch is dirty, yellow if the dirtiness can
251    not be determined, and green if it clean. These are background colors.
252    """
253    dwd = dirty_working_directory()
254    if dwd is None:
255        color = "{BACKGROUND_YELLOW}"
256    elif dwd:
257        color = "{BACKGROUND_RED}"
258    else:
259        color = "{BACKGROUND_GREEN}"
260    return color
261