1# utils.py -- Git compatibility utilities
2# Copyright (C) 2010 Google, Inc.
3#
4# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
5# General Public License as public by the Free Software Foundation; version 2.0
6# or (at your option) any later version. You can redistribute it and/or
7# modify it under the terms of either of these two licenses.
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15# You should have received a copy of the licenses; if not, see
16# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
17# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
18# License, Version 2.0.
19#
20
21"""Utilities for interacting with cgit."""
22
23import errno
24import functools
25import os
26import shutil
27import socket
28import stat
29import subprocess
30import sys
31import tempfile
32import time
33
34from dulwich.repo import Repo
35from dulwich.protocol import TCP_GIT_PORT
36
37from dulwich.tests import (
38    SkipTest,
39    TestCase,
40    )
41
42_DEFAULT_GIT = 'git'
43_VERSION_LEN = 4
44_REPOS_DATA_DIR = os.path.abspath(os.path.join(
45    os.path.dirname(__file__), os.pardir, 'data', 'repos'))
46
47
48def git_version(git_path=_DEFAULT_GIT):
49    """Attempt to determine the version of git currently installed.
50
51    Args:
52      git_path: Path to the git executable; defaults to the version in
53        the system path.
54    Returns: A tuple of ints of the form (major, minor, point, sub-point), or
55        None if no git installation was found.
56    """
57    try:
58        output = run_git_or_fail(['--version'], git_path=git_path)
59    except OSError:
60        return None
61    version_prefix = b'git version '
62    if not output.startswith(version_prefix):
63        return None
64
65    parts = output[len(version_prefix):].split(b'.')
66    nums = []
67    for part in parts:
68        try:
69            nums.append(int(part))
70        except ValueError:
71            break
72
73    while len(nums) < _VERSION_LEN:
74        nums.append(0)
75    return tuple(nums[:_VERSION_LEN])
76
77
78def require_git_version(required_version, git_path=_DEFAULT_GIT):
79    """Require git version >= version, or skip the calling test.
80
81    Args:
82      required_version: A tuple of ints of the form (major, minor, point,
83        sub-point); ommitted components default to 0.
84      git_path: Path to the git executable; defaults to the version in
85        the system path.
86    Raises:
87      ValueError: if the required version tuple has too many parts.
88      SkipTest: if no suitable git version was found at the given path.
89    """
90    found_version = git_version(git_path=git_path)
91    if found_version is None:
92        raise SkipTest('Test requires git >= %s, but c git not found' %
93                       (required_version, ))
94
95    if len(required_version) > _VERSION_LEN:
96        raise ValueError('Invalid version tuple %s, expected %i parts' %
97                         (required_version, _VERSION_LEN))
98
99    required_version = list(required_version)
100    while len(found_version) < len(required_version):
101        required_version.append(0)
102    required_version = tuple(required_version)
103
104    if found_version < required_version:
105        required_version = '.'.join(map(str, required_version))
106        found_version = '.'.join(map(str, found_version))
107        raise SkipTest('Test requires git >= %s, found %s' %
108                       (required_version, found_version))
109
110
111def run_git(args, git_path=_DEFAULT_GIT, input=None, capture_stdout=False,
112            **popen_kwargs):
113    """Run a git command.
114
115    Input is piped from the input parameter and output is sent to the standard
116    streams, unless capture_stdout is set.
117
118    Args:
119      args: A list of args to the git command.
120      git_path: Path to to the git executable.
121      input: Input data to be sent to stdin.
122      capture_stdout: Whether to capture and return stdout.
123      popen_kwargs: Additional kwargs for subprocess.Popen;
124        stdin/stdout args are ignored.
125    Returns: A tuple of (returncode, stdout contents). If capture_stdout is
126        False, None will be returned as stdout contents.
127    Raises:
128      OSError: if the git executable was not found.
129    """
130
131    env = popen_kwargs.pop('env', {})
132    env['LC_ALL'] = env['LANG'] = 'C'
133
134    args = [git_path] + args
135    popen_kwargs['stdin'] = subprocess.PIPE
136    if capture_stdout:
137        popen_kwargs['stdout'] = subprocess.PIPE
138    else:
139        popen_kwargs.pop('stdout', None)
140    p = subprocess.Popen(args, env=env, **popen_kwargs)
141    stdout, stderr = p.communicate(input=input)
142    return (p.returncode, stdout)
143
144
145def run_git_or_fail(args, git_path=_DEFAULT_GIT, input=None, **popen_kwargs):
146    """Run a git command, capture stdout/stderr, and fail if git fails."""
147    if 'stderr' not in popen_kwargs:
148        popen_kwargs['stderr'] = subprocess.STDOUT
149    returncode, stdout = run_git(args, git_path=git_path, input=input,
150                                 capture_stdout=True, **popen_kwargs)
151    if returncode != 0:
152        raise AssertionError("git with args %r failed with %d: %r" % (
153            args, returncode, stdout))
154    return stdout
155
156
157def import_repo_to_dir(name):
158    """Import a repo from a fast-export file in a temporary directory.
159
160    These are used rather than binary repos for compat tests because they are
161    more compact and human-editable, and we already depend on git.
162
163    Args:
164      name: The name of the repository export file, relative to
165        dulwich/tests/data/repos.
166    Returns: The path to the imported repository.
167    """
168    temp_dir = tempfile.mkdtemp()
169    export_path = os.path.join(_REPOS_DATA_DIR, name)
170    temp_repo_dir = os.path.join(temp_dir, name)
171    export_file = open(export_path, 'rb')
172    run_git_or_fail(['init', '--quiet', '--bare', temp_repo_dir])
173    run_git_or_fail(['fast-import'], input=export_file.read(),
174                    cwd=temp_repo_dir)
175    export_file.close()
176    return temp_repo_dir
177
178
179def check_for_daemon(limit=10, delay=0.1, timeout=0.1, port=TCP_GIT_PORT):
180    """Check for a running TCP daemon.
181
182    Defaults to checking 10 times with a delay of 0.1 sec between tries.
183
184    Args:
185      limit: Number of attempts before deciding no daemon is running.
186      delay: Delay between connection attempts.
187      timeout: Socket timeout for connection attempts.
188      port: Port on which we expect the daemon to appear.
189    Returns: A boolean, true if a daemon is running on the specified port,
190        false if not.
191    """
192    for _ in range(limit):
193        time.sleep(delay)
194        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
195        s.settimeout(delay)
196        try:
197            s.connect(('localhost', port))
198            return True
199        except socket.timeout:
200            pass
201        except socket.error as e:
202            if getattr(e, 'errno', False) and e.errno != errno.ECONNREFUSED:
203                raise
204            elif e.args[0] != errno.ECONNREFUSED:
205                raise
206        finally:
207            s.close()
208    return False
209
210
211class CompatTestCase(TestCase):
212    """Test case that requires git for compatibility checks.
213
214    Subclasses can change the git version required by overriding
215    min_git_version.
216    """
217
218    min_git_version = (1, 5, 0)
219
220    def setUp(self):
221        super(CompatTestCase, self).setUp()
222        require_git_version(self.min_git_version)
223
224    def assertObjectStoreEqual(self, store1, store2):
225        self.assertEqual(sorted(set(store1)), sorted(set(store2)))
226
227    def assertReposEqual(self, repo1, repo2):
228        self.assertEqual(repo1.get_refs(), repo2.get_refs())
229        self.assertObjectStoreEqual(repo1.object_store, repo2.object_store)
230
231    def assertReposNotEqual(self, repo1, repo2):
232        refs1 = repo1.get_refs()
233        objs1 = set(repo1.object_store)
234        refs2 = repo2.get_refs()
235        objs2 = set(repo2.object_store)
236        self.assertFalse(refs1 == refs2 and objs1 == objs2)
237
238    def import_repo(self, name):
239        """Import a repo from a fast-export file in a temporary directory.
240
241        Args:
242          name: The name of the repository export file, relative to
243            dulwich/tests/data/repos.
244        Returns: An initialized Repo object that lives in a temporary
245            directory.
246        """
247        path = import_repo_to_dir(name)
248        repo = Repo(path)
249
250        def cleanup():
251            repo.close()
252            rmtree_ro(os.path.dirname(path.rstrip(os.sep)))
253        self.addCleanup(cleanup)
254        return repo
255
256
257if sys.platform == 'win32':
258    def remove_ro(action, name, exc):
259        os.chmod(name, stat.S_IWRITE)
260        os.remove(name)
261
262    rmtree_ro = functools.partial(shutil.rmtree, onerror=remove_ro)
263else:
264    rmtree_ro = shutil.rmtree
265