1# -*- coding: utf-8 -*- 2"""Prompt formatter for simple version control branches""" 3# pylint:disable=no-member, invalid-name 4 5import os 6import sys 7import queue 8import builtins 9import threading 10import subprocess 11 12import xonsh.tools as xt 13 14 15def _get_git_branch(q): 16 denv = builtins.__xonsh_env__.detype() 17 try: 18 branches = xt.decode_bytes( 19 subprocess.check_output( 20 ["git", "branch"], env=denv, stderr=subprocess.DEVNULL 21 ) 22 ).splitlines() 23 except (subprocess.CalledProcessError, OSError, FileNotFoundError): 24 q.put(None) 25 else: 26 for branch in branches: 27 if not branch.startswith("* "): 28 continue 29 elif branch.endswith(")"): 30 branch = branch.split()[-1][:-1] 31 else: 32 branch = branch.split()[-1] 33 34 q.put(branch) 35 break 36 else: 37 q.put(None) 38 39 40def get_git_branch(): 41 """Attempts to find the current git branch. If this could not 42 be determined (timeout, not in a git repo, etc.) then this returns None. 43 """ 44 branch = None 45 timeout = builtins.__xonsh_env__.get("VC_BRANCH_TIMEOUT") 46 q = queue.Queue() 47 48 t = threading.Thread(target=_get_git_branch, args=(q,)) 49 t.start() 50 t.join(timeout=timeout) 51 try: 52 branch = q.get_nowait() 53 except queue.Empty: 54 branch = None 55 return branch 56 57 58def _get_hg_root(q): 59 _curpwd = builtins.__xonsh_env__["PWD"] 60 while True: 61 if not os.path.isdir(_curpwd): 62 return False 63 if any([b.name == ".hg" for b in xt.scandir(_curpwd)]): 64 q.put(_curpwd) 65 break 66 else: 67 _oldpwd = _curpwd 68 _curpwd = os.path.split(_curpwd)[0] 69 if _oldpwd == _curpwd: 70 return False 71 72 73def get_hg_branch(root=None): 74 """Try to get the mercurial branch of the current directory, 75 return None if not in a repo or subprocess.TimeoutExpired if timed out. 76 """ 77 env = builtins.__xonsh_env__ 78 timeout = env["VC_BRANCH_TIMEOUT"] 79 q = queue.Queue() 80 t = threading.Thread(target=_get_hg_root, args=(q,)) 81 t.start() 82 t.join(timeout=timeout) 83 try: 84 root = q.get_nowait() 85 except queue.Empty: 86 return None 87 if env.get("VC_HG_SHOW_BRANCH"): 88 # get branch name 89 branch_path = os.path.sep.join([root, ".hg", "branch"]) 90 if os.path.exists(branch_path): 91 with open(branch_path, "r") as branch_file: 92 branch = branch_file.read() 93 else: 94 branch = "default" 95 else: 96 branch = "" 97 # add bookmark, if we can 98 bookmark_path = os.path.sep.join([root, ".hg", "bookmarks.current"]) 99 if os.path.exists(bookmark_path): 100 with open(bookmark_path, "r") as bookmark_file: 101 active_bookmark = bookmark_file.read() 102 if env.get("VC_HG_SHOW_BRANCH") is True: 103 branch = "{0}, {1}".format( 104 *(b.strip(os.linesep) for b in (branch, active_bookmark)) 105 ) 106 else: 107 branch = active_bookmark.strip(os.linesep) 108 else: 109 branch = branch.strip(os.linesep) 110 return branch 111 112 113_FIRST_BRANCH_TIMEOUT = True 114 115 116def _first_branch_timeout_message(): 117 global _FIRST_BRANCH_TIMEOUT 118 sbtm = builtins.__xonsh_env__["SUPPRESS_BRANCH_TIMEOUT_MESSAGE"] 119 if not _FIRST_BRANCH_TIMEOUT or sbtm: 120 return 121 _FIRST_BRANCH_TIMEOUT = False 122 print( 123 "xonsh: branch timeout: computing the branch name, color, or both " 124 "timed out while formatting the prompt. You may avoid this by " 125 "increasing the value of $VC_BRANCH_TIMEOUT or by removing branch " 126 "fields, like {curr_branch}, from your $PROMPT. See the FAQ " 127 "for more details. This message will be suppressed for the remainder " 128 "of this session. To suppress this message permanently, set " 129 "$SUPPRESS_BRANCH_TIMEOUT_MESSAGE = True in your xonshrc file.", 130 file=sys.stderr, 131 ) 132 133 134def current_branch(): 135 """Gets the branch for a current working directory. Returns an empty string 136 if the cwd is not a repository. This currently only works for git and hg 137 and should be extended in the future. If a timeout occurred, the string 138 '<branch-timeout>' is returned. 139 """ 140 branch = None 141 cmds = builtins.__xonsh_commands_cache__ 142 # check for binary only once 143 if cmds.is_empty(): 144 has_git = bool(cmds.locate_binary("git", ignore_alias=True)) 145 has_hg = bool(cmds.locate_binary("hg", ignore_alias=True)) 146 else: 147 has_git = bool(cmds.lazy_locate_binary("git", ignore_alias=True)) 148 has_hg = bool(cmds.lazy_locate_binary("hg", ignore_alias=True)) 149 if has_git: 150 branch = get_git_branch() 151 if not branch and has_hg: 152 branch = get_hg_branch() 153 if isinstance(branch, subprocess.TimeoutExpired): 154 branch = "<branch-timeout>" 155 _first_branch_timeout_message() 156 return branch or None 157 158 159def _git_dirty_working_directory(q, include_untracked): 160 status = None 161 denv = builtins.__xonsh_env__.detype() 162 try: 163 cmd = ["git", "status", "--porcelain"] 164 if include_untracked: 165 cmd.append("--untracked-files=normal") 166 else: 167 cmd.append("--untracked-files=no") 168 status = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, env=denv) 169 except (subprocess.CalledProcessError, OSError, FileNotFoundError): 170 q.put(None) 171 if status is not None: 172 return q.put(bool(status)) 173 174 175def git_dirty_working_directory(include_untracked=False): 176 """Returns whether or not the git directory is dirty. If this could not 177 be determined (timeout, file not found, etc.) then this returns None. 178 """ 179 timeout = builtins.__xonsh_env__.get("VC_BRANCH_TIMEOUT") 180 q = queue.Queue() 181 t = threading.Thread( 182 target=_git_dirty_working_directory, args=(q, include_untracked) 183 ) 184 t.start() 185 t.join(timeout=timeout) 186 try: 187 return q.get_nowait() 188 except queue.Empty: 189 return None 190 191 192def hg_dirty_working_directory(): 193 """Computes whether or not the mercurial working directory is dirty or not. 194 If this cannot be determined, None is returned. 195 """ 196 env = builtins.__xonsh_env__ 197 cwd = env["PWD"] 198 denv = env.detype() 199 vcbt = env["VC_BRANCH_TIMEOUT"] 200 # Override user configurations settings and aliases 201 denv["HGRCPATH"] = "" 202 try: 203 s = subprocess.check_output( 204 ["hg", "identify", "--id"], 205 stderr=subprocess.PIPE, 206 cwd=cwd, 207 timeout=vcbt, 208 universal_newlines=True, 209 env=denv, 210 ) 211 return s.strip(os.linesep).endswith("+") 212 except ( 213 subprocess.CalledProcessError, 214 subprocess.TimeoutExpired, 215 FileNotFoundError, 216 ): 217 return None 218 219 220def dirty_working_directory(): 221 """Returns a boolean as to whether there are uncommitted files in version 222 control repository we are inside. If this cannot be determined, returns 223 None. Currently supports git and hg. 224 """ 225 dwd = None 226 cmds = builtins.__xonsh_commands_cache__ 227 if cmds.lazy_locate_binary("git", ignore_alias=True): 228 dwd = git_dirty_working_directory() 229 if cmds.lazy_locate_binary("hg", ignore_alias=True) and dwd is None: 230 dwd = hg_dirty_working_directory() 231 return dwd 232 233 234def branch_color(): 235 """Return red if the current branch is dirty, yellow if the dirtiness can 236 not be determined, and green if it clean. These are bold, intense colors 237 for the foreground. 238 """ 239 dwd = dirty_working_directory() 240 if dwd is None: 241 color = "{BOLD_INTENSE_YELLOW}" 242 elif dwd: 243 color = "{BOLD_INTENSE_RED}" 244 else: 245 color = "{BOLD_INTENSE_GREEN}" 246 return color 247 248 249def branch_bg_color(): 250 """Return red if the current branch is dirty, yellow if the dirtiness can 251 not be determined, and green if it clean. These are background colors. 252 """ 253 dwd = dirty_working_directory() 254 if dwd is None: 255 color = "{BACKGROUND_YELLOW}" 256 elif dwd: 257 color = "{BACKGROUND_RED}" 258 else: 259 color = "{BACKGROUND_GREEN}" 260 return color 261