1# Copyright (c) 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""SCM-specific utility classes.""" 6 7import distutils.version 8import glob 9import io 10import os 11import platform 12import re 13import sys 14 15import gclient_utils 16import subprocess2 17 18 19def ValidateEmail(email): 20 return ( 21 re.match(r"^[a-zA-Z0-9._%\-+]+@[a-zA-Z0-9._%-]+.[a-zA-Z]{2,6}$", email) 22 is not None) 23 24 25def GetCasedPath(path): 26 """Elcheapos way to get the real path case on Windows.""" 27 if sys.platform.startswith('win') and os.path.exists(path): 28 # Reconstruct the path. 29 path = os.path.abspath(path) 30 paths = path.split('\\') 31 for i in range(len(paths)): 32 if i == 0: 33 # Skip drive letter. 34 continue 35 subpath = '\\'.join(paths[:i+1]) 36 prev = len('\\'.join(paths[:i])) 37 # glob.glob will return the cased path for the last item only. This is why 38 # we are calling it in a loop. Extract the data we want and put it back 39 # into the list. 40 paths[i] = glob.glob(subpath + '*')[0][prev+1:len(subpath)] 41 path = '\\'.join(paths) 42 return path 43 44 45def GenFakeDiff(filename): 46 """Generates a fake diff from a file.""" 47 file_content = gclient_utils.FileRead(filename, 'rb').splitlines(True) 48 filename = filename.replace(os.sep, '/') 49 nb_lines = len(file_content) 50 # We need to use / since patch on unix will fail otherwise. 51 data = io.StringIO() 52 data.write("Index: %s\n" % filename) 53 data.write('=' * 67 + '\n') 54 # Note: Should we use /dev/null instead? 55 data.write("--- %s\n" % filename) 56 data.write("+++ %s\n" % filename) 57 data.write("@@ -0,0 +1,%d @@\n" % nb_lines) 58 # Prepend '+' to every lines. 59 for line in file_content: 60 data.write('+') 61 data.write(line) 62 result = data.getvalue() 63 data.close() 64 return result 65 66 67def determine_scm(root): 68 """Similar to upload.py's version but much simpler. 69 70 Returns 'git' or None. 71 """ 72 if os.path.isdir(os.path.join(root, '.git')): 73 return 'git' 74 else: 75 try: 76 subprocess2.check_call( 77 ['git', 'rev-parse', '--show-cdup'], 78 stdout=subprocess2.VOID, 79 stderr=subprocess2.VOID, 80 cwd=root) 81 return 'git' 82 except (OSError, subprocess2.CalledProcessError): 83 return None 84 85 86def only_int(val): 87 if val.isdigit(): 88 return int(val) 89 else: 90 return 0 91 92 93class GIT(object): 94 current_version = None 95 96 @staticmethod 97 def ApplyEnvVars(kwargs): 98 env = kwargs.pop('env', None) or os.environ.copy() 99 # Don't prompt for passwords; just fail quickly and noisily. 100 # By default, git will use an interactive terminal prompt when a username/ 101 # password is needed. That shouldn't happen in the chromium workflow, 102 # and if it does, then gclient may hide the prompt in the midst of a flood 103 # of terminal spew. The only indication that something has gone wrong 104 # will be when gclient hangs unresponsively. Instead, we disable the 105 # password prompt and simply allow git to fail noisily. The error 106 # message produced by git will be copied to gclient's output. 107 env.setdefault('GIT_ASKPASS', 'true') 108 env.setdefault('SSH_ASKPASS', 'true') 109 # 'cat' is a magical git string that disables pagers on all platforms. 110 env.setdefault('GIT_PAGER', 'cat') 111 return env 112 113 @staticmethod 114 def Capture(args, cwd, strip_out=True, **kwargs): 115 env = GIT.ApplyEnvVars(kwargs) 116 output = subprocess2.check_output( 117 ['git'] + args, cwd=cwd, stderr=subprocess2.PIPE, env=env, **kwargs) 118 output = output.decode('utf-8', 'replace') 119 return output.strip() if strip_out else output 120 121 @staticmethod 122 def CaptureStatus(cwd, upstream_branch): 123 """Returns git status. 124 125 Returns an array of (status, file) tuples.""" 126 if upstream_branch is None: 127 upstream_branch = GIT.GetUpstreamBranch(cwd) 128 if upstream_branch is None: 129 raise gclient_utils.Error('Cannot determine upstream branch') 130 command = ['-c', 'core.quotePath=false', 'diff', 131 '--name-status', '--no-renames', '-r', '%s...' % upstream_branch] 132 status = GIT.Capture(command, cwd) 133 results = [] 134 if status: 135 for statusline in status.splitlines(): 136 # 3-way merges can cause the status can be 'MMM' instead of 'M'. This 137 # can happen when the user has 2 local branches and he diffs between 138 # these 2 branches instead diffing to upstream. 139 m = re.match(r'^(\w)+\t(.+)$', statusline) 140 if not m: 141 raise gclient_utils.Error( 142 'status currently unsupported: %s' % statusline) 143 # Only grab the first letter. 144 results.append(('%s ' % m.group(1)[0], m.group(2))) 145 return results 146 147 @staticmethod 148 def GetConfig(cwd, key, default=None): 149 try: 150 return GIT.Capture(['config', key], cwd=cwd) 151 except subprocess2.CalledProcessError: 152 return default 153 154 @staticmethod 155 def GetBranchConfig(cwd, branch, key, default=None): 156 assert branch, 'A branch must be given' 157 key = 'branch.%s.%s' % (branch, key) 158 return GIT.GetConfig(cwd, key, default) 159 160 @staticmethod 161 def SetConfig(cwd, key, value=None): 162 if value is None: 163 args = ['config', '--unset', key] 164 else: 165 args = ['config', key, value] 166 GIT.Capture(args, cwd=cwd) 167 168 @staticmethod 169 def SetBranchConfig(cwd, branch, key, value=None): 170 assert branch, 'A branch must be given' 171 key = 'branch.%s.%s' % (branch, key) 172 GIT.SetConfig(cwd, key, value) 173 174 @staticmethod 175 def IsWorkTreeDirty(cwd): 176 return GIT.Capture(['status', '-s'], cwd=cwd) != '' 177 178 @staticmethod 179 def GetEmail(cwd): 180 """Retrieves the user email address if known.""" 181 return GIT.GetConfig(cwd, 'user.email', '') 182 183 @staticmethod 184 def ShortBranchName(branch): 185 """Converts a name like 'refs/heads/foo' to just 'foo'.""" 186 return branch.replace('refs/heads/', '') 187 188 @staticmethod 189 def GetBranchRef(cwd): 190 """Returns the full branch reference, e.g. 'refs/heads/master'.""" 191 try: 192 return GIT.Capture(['symbolic-ref', 'HEAD'], cwd=cwd) 193 except subprocess2.CalledProcessError: 194 return None 195 196 @staticmethod 197 def GetBranch(cwd): 198 """Returns the short branch name, e.g. 'master'.""" 199 branchref = GIT.GetBranchRef(cwd) 200 if branchref: 201 return GIT.ShortBranchName(branchref) 202 return None 203 204 @staticmethod 205 def GetRemoteBranches(cwd): 206 return GIT.Capture(['branch', '-r'], cwd=cwd).split() 207 208 @staticmethod 209 def FetchUpstreamTuple(cwd, branch=None): 210 """Returns a tuple containing remote and remote ref, 211 e.g. 'origin', 'refs/heads/master' 212 """ 213 try: 214 branch = branch or GIT.GetBranch(cwd) 215 except subprocess2.CalledProcessError: 216 pass 217 if branch: 218 upstream_branch = GIT.GetBranchConfig(cwd, branch, 'merge') 219 if upstream_branch: 220 remote = GIT.GetBranchConfig(cwd, branch, 'remote', '.') 221 return remote, upstream_branch 222 223 upstream_branch = GIT.GetConfig(cwd, 'rietveld.upstream-branch') 224 if upstream_branch: 225 remote = GIT.GetConfig(cwd, 'rietveld.upstream-remote', '.') 226 return remote, upstream_branch 227 228 # Else, try to guess the origin remote. 229 remote_branches = GIT.GetRemoteBranches(cwd) 230 if 'origin/main' in remote_branches: 231 # Fall back on origin/main if it exits. 232 return 'origin', 'refs/heads/main' 233 elif 'origin/master' in remote_branches: 234 # Fall back on origin/master if it exits. 235 return 'origin', 'refs/heads/master' 236 237 return None, None 238 239 @staticmethod 240 def RefToRemoteRef(ref, remote): 241 """Convert a checkout ref to the equivalent remote ref. 242 243 Returns: 244 A tuple of the remote ref's (common prefix, unique suffix), or None if it 245 doesn't appear to refer to a remote ref (e.g. it's a commit hash). 246 """ 247 # TODO(mmoss): This is just a brute-force mapping based of the expected git 248 # config. It's a bit better than the even more brute-force replace('heads', 249 # ...), but could still be smarter (like maybe actually using values gleaned 250 # from the git config). 251 m = re.match('^(refs/(remotes/)?)?branch-heads/', ref or '') 252 if m: 253 return ('refs/remotes/branch-heads/', ref.replace(m.group(0), '')) 254 255 m = re.match('^((refs/)?remotes/)?%s/|(refs/)?heads/' % remote, ref or '') 256 if m: 257 return ('refs/remotes/%s/' % remote, ref.replace(m.group(0), '')) 258 259 return None 260 261 @staticmethod 262 def RemoteRefToRef(ref, remote): 263 assert remote, 'A remote must be given' 264 if not ref or not ref.startswith('refs/'): 265 return None 266 if not ref.startswith('refs/remotes/'): 267 return ref 268 if ref.startswith('refs/remotes/branch-heads/'): 269 return 'refs' + ref[len('refs/remotes'):] 270 if ref.startswith('refs/remotes/%s/' % remote): 271 return 'refs/heads' + ref[len('refs/remotes/%s' % remote):] 272 return None 273 274 @staticmethod 275 def GetUpstreamBranch(cwd): 276 """Gets the current branch's upstream branch.""" 277 remote, upstream_branch = GIT.FetchUpstreamTuple(cwd) 278 if remote != '.' and upstream_branch: 279 remote_ref = GIT.RefToRemoteRef(upstream_branch, remote) 280 if remote_ref: 281 upstream_branch = ''.join(remote_ref) 282 return upstream_branch 283 284 @staticmethod 285 def IsAncestor(cwd, maybe_ancestor, ref): 286 """Verifies if |maybe_ancestor| is an ancestor of |ref|.""" 287 try: 288 GIT.Capture(['merge-base', '--is-ancestor', maybe_ancestor, ref], cwd=cwd) 289 return True 290 except subprocess2.CalledProcessError: 291 return False 292 293 @staticmethod 294 def GetOldContents(cwd, filename, branch=None): 295 if not branch: 296 branch = GIT.GetUpstreamBranch(cwd) 297 if platform.system() == 'Windows': 298 # git show <sha>:<path> wants a posix path. 299 filename = filename.replace('\\', '/') 300 command = ['show', '%s:%s' % (branch, filename)] 301 try: 302 return GIT.Capture(command, cwd=cwd, strip_out=False) 303 except subprocess2.CalledProcessError: 304 return '' 305 306 @staticmethod 307 def GenerateDiff(cwd, branch=None, branch_head='HEAD', full_move=False, 308 files=None): 309 """Diffs against the upstream branch or optionally another branch. 310 311 full_move means that move or copy operations should completely recreate the 312 files, usually in the prospect to apply the patch for a try job.""" 313 if not branch: 314 branch = GIT.GetUpstreamBranch(cwd) 315 command = ['-c', 'core.quotePath=false', 'diff', 316 '-p', '--no-color', '--no-prefix', '--no-ext-diff', 317 branch + "..." + branch_head] 318 if full_move: 319 command.append('--no-renames') 320 else: 321 command.append('-C') 322 # TODO(maruel): --binary support. 323 if files: 324 command.append('--') 325 command.extend(files) 326 diff = GIT.Capture(command, cwd=cwd, strip_out=False).splitlines(True) 327 for i in range(len(diff)): 328 # In the case of added files, replace /dev/null with the path to the 329 # file being added. 330 if diff[i].startswith('--- /dev/null'): 331 diff[i] = '--- %s' % diff[i+1][4:] 332 return ''.join(diff) 333 334 @staticmethod 335 def GetDifferentFiles(cwd, branch=None, branch_head='HEAD'): 336 """Returns the list of modified files between two branches.""" 337 if not branch: 338 branch = GIT.GetUpstreamBranch(cwd) 339 command = ['-c', 'core.quotePath=false', 'diff', 340 '--name-only', branch + "..." + branch_head] 341 return GIT.Capture(command, cwd=cwd).splitlines(False) 342 343 @staticmethod 344 def GetAllFiles(cwd): 345 """Returns the list of all files under revision control.""" 346 command = ['-c', 'core.quotePath=false', 'ls-files', '--', '.'] 347 return GIT.Capture(command, cwd=cwd).splitlines(False) 348 349 @staticmethod 350 def GetPatchName(cwd): 351 """Constructs a name for this patch.""" 352 short_sha = GIT.Capture(['rev-parse', '--short=4', 'HEAD'], cwd=cwd) 353 return "%s#%s" % (GIT.GetBranch(cwd), short_sha) 354 355 @staticmethod 356 def GetCheckoutRoot(cwd): 357 """Returns the top level directory of a git checkout as an absolute path. 358 """ 359 root = GIT.Capture(['rev-parse', '--show-cdup'], cwd=cwd) 360 return os.path.abspath(os.path.join(cwd, root)) 361 362 @staticmethod 363 def GetGitDir(cwd): 364 return os.path.abspath(GIT.Capture(['rev-parse', '--git-dir'], cwd=cwd)) 365 366 @staticmethod 367 def IsInsideWorkTree(cwd): 368 try: 369 return GIT.Capture(['rev-parse', '--is-inside-work-tree'], cwd=cwd) 370 except (OSError, subprocess2.CalledProcessError): 371 return False 372 373 @staticmethod 374 def IsDirectoryVersioned(cwd, relative_dir): 375 """Checks whether the given |relative_dir| is part of cwd's repo.""" 376 return bool(GIT.Capture(['ls-tree', 'HEAD', relative_dir], cwd=cwd)) 377 378 @staticmethod 379 def CleanupDir(cwd, relative_dir): 380 """Cleans up untracked file inside |relative_dir|.""" 381 return bool(GIT.Capture(['clean', '-df', relative_dir], cwd=cwd)) 382 383 @staticmethod 384 def ResolveCommit(cwd, rev): 385 # We do this instead of rev-parse --verify rev^{commit}, since on Windows 386 # git can be either an executable or batch script, each of which requires 387 # escaping the caret (^) a different way. 388 if gclient_utils.IsFullGitSha(rev): 389 # git-rev parse --verify FULL_GIT_SHA always succeeds, even if we don't 390 # have FULL_GIT_SHA locally. Removing the last character forces git to 391 # check if FULL_GIT_SHA refers to an object in the local database. 392 rev = rev[:-1] 393 try: 394 return GIT.Capture(['rev-parse', '--quiet', '--verify', rev], cwd=cwd) 395 except subprocess2.CalledProcessError: 396 return None 397 398 @staticmethod 399 def IsValidRevision(cwd, rev, sha_only=False): 400 """Verifies the revision is a proper git revision. 401 402 sha_only: Fail unless rev is a sha hash. 403 """ 404 sha = GIT.ResolveCommit(cwd, rev) 405 if sha is None: 406 return False 407 if sha_only: 408 return sha == rev.lower() 409 return True 410 411 @classmethod 412 def AssertVersion(cls, min_version): 413 """Asserts git's version is at least min_version.""" 414 if cls.current_version is None: 415 current_version = cls.Capture(['--version'], '.') 416 matched = re.search(r'git version (.+)', current_version) 417 cls.current_version = distutils.version.LooseVersion(matched.group(1)) 418 min_version = distutils.version.LooseVersion(min_version) 419 return (min_version <= cls.current_version, cls.current_version) 420