1# utils.py -- Git compatibility utilities 2# Copyright (C) 2010 Google, Inc. 3# 4# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 5# General Public License as public by the Free Software Foundation; version 2.0 6# or (at your option) any later version. You can redistribute it and/or 7# modify it under the terms of either of these two licenses. 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15# You should have received a copy of the licenses; if not, see 16# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 17# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 18# License, Version 2.0. 19# 20 21"""Utilities for interacting with cgit.""" 22 23import errno 24import functools 25import os 26import shutil 27import socket 28import stat 29import subprocess 30import sys 31import tempfile 32import time 33 34from dulwich.repo import Repo 35from dulwich.protocol import TCP_GIT_PORT 36 37from dulwich.tests import ( 38 SkipTest, 39 TestCase, 40 ) 41 42_DEFAULT_GIT = 'git' 43_VERSION_LEN = 4 44_REPOS_DATA_DIR = os.path.abspath(os.path.join( 45 os.path.dirname(__file__), os.pardir, 'data', 'repos')) 46 47 48def git_version(git_path=_DEFAULT_GIT): 49 """Attempt to determine the version of git currently installed. 50 51 Args: 52 git_path: Path to the git executable; defaults to the version in 53 the system path. 54 Returns: A tuple of ints of the form (major, minor, point, sub-point), or 55 None if no git installation was found. 56 """ 57 try: 58 output = run_git_or_fail(['--version'], git_path=git_path) 59 except OSError: 60 return None 61 version_prefix = b'git version ' 62 if not output.startswith(version_prefix): 63 return None 64 65 parts = output[len(version_prefix):].split(b'.') 66 nums = [] 67 for part in parts: 68 try: 69 nums.append(int(part)) 70 except ValueError: 71 break 72 73 while len(nums) < _VERSION_LEN: 74 nums.append(0) 75 return tuple(nums[:_VERSION_LEN]) 76 77 78def require_git_version(required_version, git_path=_DEFAULT_GIT): 79 """Require git version >= version, or skip the calling test. 80 81 Args: 82 required_version: A tuple of ints of the form (major, minor, point, 83 sub-point); ommitted components default to 0. 84 git_path: Path to the git executable; defaults to the version in 85 the system path. 86 Raises: 87 ValueError: if the required version tuple has too many parts. 88 SkipTest: if no suitable git version was found at the given path. 89 """ 90 found_version = git_version(git_path=git_path) 91 if found_version is None: 92 raise SkipTest('Test requires git >= %s, but c git not found' % 93 (required_version, )) 94 95 if len(required_version) > _VERSION_LEN: 96 raise ValueError('Invalid version tuple %s, expected %i parts' % 97 (required_version, _VERSION_LEN)) 98 99 required_version = list(required_version) 100 while len(found_version) < len(required_version): 101 required_version.append(0) 102 required_version = tuple(required_version) 103 104 if found_version < required_version: 105 required_version = '.'.join(map(str, required_version)) 106 found_version = '.'.join(map(str, found_version)) 107 raise SkipTest('Test requires git >= %s, found %s' % 108 (required_version, found_version)) 109 110 111def run_git(args, git_path=_DEFAULT_GIT, input=None, capture_stdout=False, 112 **popen_kwargs): 113 """Run a git command. 114 115 Input is piped from the input parameter and output is sent to the standard 116 streams, unless capture_stdout is set. 117 118 Args: 119 args: A list of args to the git command. 120 git_path: Path to to the git executable. 121 input: Input data to be sent to stdin. 122 capture_stdout: Whether to capture and return stdout. 123 popen_kwargs: Additional kwargs for subprocess.Popen; 124 stdin/stdout args are ignored. 125 Returns: A tuple of (returncode, stdout contents). If capture_stdout is 126 False, None will be returned as stdout contents. 127 Raises: 128 OSError: if the git executable was not found. 129 """ 130 131 env = popen_kwargs.pop('env', {}) 132 env['LC_ALL'] = env['LANG'] = 'C' 133 134 args = [git_path] + args 135 popen_kwargs['stdin'] = subprocess.PIPE 136 if capture_stdout: 137 popen_kwargs['stdout'] = subprocess.PIPE 138 else: 139 popen_kwargs.pop('stdout', None) 140 p = subprocess.Popen(args, env=env, **popen_kwargs) 141 stdout, stderr = p.communicate(input=input) 142 return (p.returncode, stdout) 143 144 145def run_git_or_fail(args, git_path=_DEFAULT_GIT, input=None, **popen_kwargs): 146 """Run a git command, capture stdout/stderr, and fail if git fails.""" 147 if 'stderr' not in popen_kwargs: 148 popen_kwargs['stderr'] = subprocess.STDOUT 149 returncode, stdout = run_git(args, git_path=git_path, input=input, 150 capture_stdout=True, **popen_kwargs) 151 if returncode != 0: 152 raise AssertionError("git with args %r failed with %d: %r" % ( 153 args, returncode, stdout)) 154 return stdout 155 156 157def import_repo_to_dir(name): 158 """Import a repo from a fast-export file in a temporary directory. 159 160 These are used rather than binary repos for compat tests because they are 161 more compact and human-editable, and we already depend on git. 162 163 Args: 164 name: The name of the repository export file, relative to 165 dulwich/tests/data/repos. 166 Returns: The path to the imported repository. 167 """ 168 temp_dir = tempfile.mkdtemp() 169 export_path = os.path.join(_REPOS_DATA_DIR, name) 170 temp_repo_dir = os.path.join(temp_dir, name) 171 export_file = open(export_path, 'rb') 172 run_git_or_fail(['init', '--quiet', '--bare', temp_repo_dir]) 173 run_git_or_fail(['fast-import'], input=export_file.read(), 174 cwd=temp_repo_dir) 175 export_file.close() 176 return temp_repo_dir 177 178 179def check_for_daemon(limit=10, delay=0.1, timeout=0.1, port=TCP_GIT_PORT): 180 """Check for a running TCP daemon. 181 182 Defaults to checking 10 times with a delay of 0.1 sec between tries. 183 184 Args: 185 limit: Number of attempts before deciding no daemon is running. 186 delay: Delay between connection attempts. 187 timeout: Socket timeout for connection attempts. 188 port: Port on which we expect the daemon to appear. 189 Returns: A boolean, true if a daemon is running on the specified port, 190 false if not. 191 """ 192 for _ in range(limit): 193 time.sleep(delay) 194 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 195 s.settimeout(delay) 196 try: 197 s.connect(('localhost', port)) 198 return True 199 except socket.timeout: 200 pass 201 except socket.error as e: 202 if getattr(e, 'errno', False) and e.errno != errno.ECONNREFUSED: 203 raise 204 elif e.args[0] != errno.ECONNREFUSED: 205 raise 206 finally: 207 s.close() 208 return False 209 210 211class CompatTestCase(TestCase): 212 """Test case that requires git for compatibility checks. 213 214 Subclasses can change the git version required by overriding 215 min_git_version. 216 """ 217 218 min_git_version = (1, 5, 0) 219 220 def setUp(self): 221 super(CompatTestCase, self).setUp() 222 require_git_version(self.min_git_version) 223 224 def assertObjectStoreEqual(self, store1, store2): 225 self.assertEqual(sorted(set(store1)), sorted(set(store2))) 226 227 def assertReposEqual(self, repo1, repo2): 228 self.assertEqual(repo1.get_refs(), repo2.get_refs()) 229 self.assertObjectStoreEqual(repo1.object_store, repo2.object_store) 230 231 def assertReposNotEqual(self, repo1, repo2): 232 refs1 = repo1.get_refs() 233 objs1 = set(repo1.object_store) 234 refs2 = repo2.get_refs() 235 objs2 = set(repo2.object_store) 236 self.assertFalse(refs1 == refs2 and objs1 == objs2) 237 238 def import_repo(self, name): 239 """Import a repo from a fast-export file in a temporary directory. 240 241 Args: 242 name: The name of the repository export file, relative to 243 dulwich/tests/data/repos. 244 Returns: An initialized Repo object that lives in a temporary 245 directory. 246 """ 247 path = import_repo_to_dir(name) 248 repo = Repo(path) 249 250 def cleanup(): 251 repo.close() 252 rmtree_ro(os.path.dirname(path.rstrip(os.sep))) 253 self.addCleanup(cleanup) 254 return repo 255 256 257if sys.platform == 'win32': 258 def remove_ro(action, name, exc): 259 os.chmod(name, stat.S_IWRITE) 260 os.remove(name) 261 262 rmtree_ro = functools.partial(shutil.rmtree, onerror=remove_ro) 263else: 264 rmtree_ro = shutil.rmtree 265