1# -*- coding: utf-8 -*-
2# test_repository.py -- tests for repository.py
3# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
4#
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as public by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
21
22"""Tests for the repository."""
23
24import locale
25import os
26import stat
27import shutil
28import sys
29import tempfile
30import warnings
31
32from dulwich import errors
33from dulwich.object_store import (
34    tree_lookup_path,
35    )
36from dulwich import objects
37from dulwich.config import Config
38from dulwich.errors import NotGitRepository
39from dulwich.repo import (
40    InvalidUserIdentity,
41    Repo,
42    MemoryRepo,
43    check_user_identity,
44    )
45from dulwich.tests import (
46    TestCase,
47    skipIf,
48    )
49from dulwich.tests.utils import (
50    open_repo,
51    tear_down_repo,
52    setup_warning_catcher,
53    )
54
55missing_sha = b'b91fa4d900e17e99b433218e988c4eb4a3e9a097'
56
57
58class CreateRepositoryTests(TestCase):
59
60    def assertFileContentsEqual(self, expected, repo, path):
61        f = repo.get_named_file(path)
62        if not f:
63            self.assertEqual(expected, None)
64        else:
65            with f:
66                self.assertEqual(expected, f.read())
67
68    def _check_repo_contents(self, repo, expect_bare):
69        self.assertEqual(expect_bare, repo.bare)
70        self.assertFileContentsEqual(
71            b'Unnamed repository', repo, 'description')
72        self.assertFileContentsEqual(
73            b'', repo, os.path.join('info', 'exclude'))
74        self.assertFileContentsEqual(None, repo, 'nonexistent file')
75        barestr = b'bare = ' + str(expect_bare).lower().encode('ascii')
76        with repo.get_named_file('config') as f:
77            config_text = f.read()
78            self.assertTrue(barestr in config_text, "%r" % config_text)
79        expect_filemode = sys.platform != 'win32'
80        barestr = b'filemode = ' + str(expect_filemode).lower().encode('ascii')
81        with repo.get_named_file('config') as f:
82            config_text = f.read()
83            self.assertTrue(barestr in config_text, "%r" % config_text)
84
85    def test_create_memory(self):
86        repo = MemoryRepo.init_bare([], {})
87        self._check_repo_contents(repo, True)
88
89    def test_create_disk_bare(self):
90        tmp_dir = tempfile.mkdtemp()
91        self.addCleanup(shutil.rmtree, tmp_dir)
92        repo = Repo.init_bare(tmp_dir)
93        self.assertEqual(tmp_dir, repo._controldir)
94        self._check_repo_contents(repo, True)
95
96    def test_create_disk_non_bare(self):
97        tmp_dir = tempfile.mkdtemp()
98        self.addCleanup(shutil.rmtree, tmp_dir)
99        repo = Repo.init(tmp_dir)
100        self.assertEqual(os.path.join(tmp_dir, '.git'), repo._controldir)
101        self._check_repo_contents(repo, False)
102
103    def test_create_disk_non_bare_mkdir(self):
104        tmp_dir = tempfile.mkdtemp()
105        target_dir = os.path.join(tmp_dir, "target")
106        self.addCleanup(shutil.rmtree, tmp_dir)
107        repo = Repo.init(target_dir, mkdir=True)
108        self.assertEqual(os.path.join(target_dir, '.git'), repo._controldir)
109        self._check_repo_contents(repo, False)
110
111    def test_create_disk_bare_mkdir(self):
112        tmp_dir = tempfile.mkdtemp()
113        target_dir = os.path.join(tmp_dir, "target")
114        self.addCleanup(shutil.rmtree, tmp_dir)
115        repo = Repo.init_bare(target_dir, mkdir=True)
116        self.assertEqual(target_dir, repo._controldir)
117        self._check_repo_contents(repo, True)
118
119
120class MemoryRepoTests(TestCase):
121
122    def test_set_description(self):
123        r = MemoryRepo.init_bare([], {})
124        description = b"Some description"
125        r.set_description(description)
126        self.assertEqual(description, r.get_description())
127
128
129class RepositoryRootTests(TestCase):
130
131    def mkdtemp(self):
132        return tempfile.mkdtemp()
133
134    def open_repo(self, name):
135        temp_dir = self.mkdtemp()
136        repo = open_repo(name, temp_dir)
137        self.addCleanup(tear_down_repo, repo)
138        return repo
139
140    def test_simple_props(self):
141        r = self.open_repo('a.git')
142        self.assertEqual(r.controldir(), r.path)
143
144    def test_setitem(self):
145        r = self.open_repo('a.git')
146        r[b"refs/tags/foo"] = b'a90fa2d900a17e99b433217e988c4eb4a2e9a097'
147        self.assertEqual(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
148                         r[b"refs/tags/foo"].id)
149
150    def test_getitem_unicode(self):
151        r = self.open_repo('a.git')
152
153        test_keys = [
154            (b'refs/heads/master', True),
155            (b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', True),
156            (b'11' * 19 + b'--', False),
157        ]
158
159        for k, contained in test_keys:
160            self.assertEqual(k in r, contained)
161
162        # Avoid deprecation warning under Py3.2+
163        if getattr(self, 'assertRaisesRegex', None):
164            assertRaisesRegexp = self.assertRaisesRegex
165        else:
166            assertRaisesRegexp = self.assertRaisesRegexp
167        for k, _ in test_keys:
168            assertRaisesRegexp(
169                TypeError, "'name' must be bytestring, not int",
170                r.__getitem__, 12
171            )
172
173    def test_delitem(self):
174        r = self.open_repo('a.git')
175
176        del r[b'refs/heads/master']
177        self.assertRaises(KeyError, lambda: r[b'refs/heads/master'])
178
179        del r[b'HEAD']
180        self.assertRaises(KeyError, lambda: r[b'HEAD'])
181
182        self.assertRaises(ValueError, r.__delitem__, b'notrefs/foo')
183
184    def test_get_refs(self):
185        r = self.open_repo('a.git')
186        self.assertEqual({
187            b'HEAD': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
188            b'refs/heads/master': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
189            b'refs/tags/mytag': b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a',
190            b'refs/tags/mytag-packed':
191                b'b0931cadc54336e78a1d980420e3268903b57a50',
192            }, r.get_refs())
193
194    def test_head(self):
195        r = self.open_repo('a.git')
196        self.assertEqual(r.head(), b'a90fa2d900a17e99b433217e988c4eb4a2e9a097')
197
198    def test_get_object(self):
199        r = self.open_repo('a.git')
200        obj = r.get_object(r.head())
201        self.assertEqual(obj.type_name, b'commit')
202
203    def test_get_object_non_existant(self):
204        r = self.open_repo('a.git')
205        self.assertRaises(KeyError, r.get_object, missing_sha)
206
207    def test_contains_object(self):
208        r = self.open_repo('a.git')
209        self.assertTrue(r.head() in r)
210
211    def test_contains_ref(self):
212        r = self.open_repo('a.git')
213        self.assertTrue(b"HEAD" in r)
214
215    def test_get_no_description(self):
216        r = self.open_repo('a.git')
217        self.assertIs(None, r.get_description())
218
219    def test_get_description(self):
220        r = self.open_repo('a.git')
221        with open(os.path.join(r.path, 'description'), 'wb') as f:
222            f.write(b"Some description")
223        self.assertEqual(b"Some description", r.get_description())
224
225    def test_set_description(self):
226        r = self.open_repo('a.git')
227        description = b"Some description"
228        r.set_description(description)
229        self.assertEqual(description, r.get_description())
230
231    def test_contains_missing(self):
232        r = self.open_repo('a.git')
233        self.assertFalse(b"bar" in r)
234
235    def test_get_peeled(self):
236        # unpacked ref
237        r = self.open_repo('a.git')
238        tag_sha = b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a'
239        self.assertNotEqual(r[tag_sha].sha().hexdigest(), r.head())
240        self.assertEqual(r.get_peeled(b'refs/tags/mytag'), r.head())
241
242        # packed ref with cached peeled value
243        packed_tag_sha = b'b0931cadc54336e78a1d980420e3268903b57a50'
244        parent_sha = r[r.head()].parents[0]
245        self.assertNotEqual(r[packed_tag_sha].sha().hexdigest(), parent_sha)
246        self.assertEqual(r.get_peeled(b'refs/tags/mytag-packed'), parent_sha)
247
248        # TODO: add more corner cases to test repo
249
250    def test_get_peeled_not_tag(self):
251        r = self.open_repo('a.git')
252        self.assertEqual(r.get_peeled(b'HEAD'), r.head())
253
254    def test_get_walker(self):
255        r = self.open_repo('a.git')
256        # include defaults to [r.head()]
257        self.assertEqual(
258            [e.commit.id for e in r.get_walker()],
259            [r.head(), b'2a72d929692c41d8554c07f6301757ba18a65d91'])
260        self.assertEqual(
261            [e.commit.id for e in
262                r.get_walker([b'2a72d929692c41d8554c07f6301757ba18a65d91'])],
263            [b'2a72d929692c41d8554c07f6301757ba18a65d91'])
264        self.assertEqual(
265            [e.commit.id for e in
266                r.get_walker(b'2a72d929692c41d8554c07f6301757ba18a65d91')],
267            [b'2a72d929692c41d8554c07f6301757ba18a65d91'])
268
269    def assertFilesystemHidden(self, path):
270        if sys.platform != 'win32':
271            return
272        import ctypes
273        from ctypes.wintypes import DWORD, LPCWSTR
274        GetFileAttributesW = ctypes.WINFUNCTYPE(DWORD, LPCWSTR)(
275            ('GetFileAttributesW', ctypes.windll.kernel32))
276        self.assertTrue(2 & GetFileAttributesW(path))
277
278    def test_init_existing(self):
279        tmp_dir = self.mkdtemp()
280        self.addCleanup(shutil.rmtree, tmp_dir)
281        t = Repo.init(tmp_dir)
282        self.addCleanup(t.close)
283        self.assertEqual(os.listdir(tmp_dir), ['.git'])
284        self.assertFilesystemHidden(os.path.join(tmp_dir, '.git'))
285
286    def test_init_mkdir(self):
287        tmp_dir = self.mkdtemp()
288        self.addCleanup(shutil.rmtree, tmp_dir)
289        repo_dir = os.path.join(tmp_dir, 'a-repo')
290
291        t = Repo.init(repo_dir, mkdir=True)
292        self.addCleanup(t.close)
293        self.assertEqual(os.listdir(repo_dir), ['.git'])
294        self.assertFilesystemHidden(os.path.join(repo_dir, '.git'))
295
296    def test_init_mkdir_unicode(self):
297        repo_name = u'\xa7'
298        try:
299            repo_name.encode(sys.getfilesystemencoding())
300        except UnicodeEncodeError:
301            self.skipTest('filesystem lacks unicode support')
302        tmp_dir = self.mkdtemp()
303        self.addCleanup(shutil.rmtree, tmp_dir)
304        repo_dir = os.path.join(tmp_dir, repo_name)
305
306        t = Repo.init(repo_dir, mkdir=True)
307        self.addCleanup(t.close)
308        self.assertEqual(os.listdir(repo_dir), ['.git'])
309        self.assertFilesystemHidden(os.path.join(repo_dir, '.git'))
310
311    @skipIf(sys.platform == 'win32', 'fails on Windows')
312    def test_fetch(self):
313        r = self.open_repo('a.git')
314        tmp_dir = self.mkdtemp()
315        self.addCleanup(shutil.rmtree, tmp_dir)
316        t = Repo.init(tmp_dir)
317        self.addCleanup(t.close)
318        r.fetch(t)
319        self.assertIn(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', t)
320        self.assertIn(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', t)
321        self.assertIn(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', t)
322        self.assertIn(b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a', t)
323        self.assertIn(b'b0931cadc54336e78a1d980420e3268903b57a50', t)
324
325    @skipIf(sys.platform == 'win32', 'fails on Windows')
326    def test_fetch_ignores_missing_refs(self):
327        r = self.open_repo('a.git')
328        missing = b'1234566789123456789123567891234657373833'
329        r.refs[b'refs/heads/blah'] = missing
330        tmp_dir = self.mkdtemp()
331        self.addCleanup(shutil.rmtree, tmp_dir)
332        t = Repo.init(tmp_dir)
333        self.addCleanup(t.close)
334        r.fetch(t)
335        self.assertIn(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', t)
336        self.assertIn(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', t)
337        self.assertIn(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', t)
338        self.assertIn(b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a', t)
339        self.assertIn(b'b0931cadc54336e78a1d980420e3268903b57a50', t)
340        self.assertNotIn(missing, t)
341
342    def test_clone(self):
343        r = self.open_repo('a.git')
344        tmp_dir = self.mkdtemp()
345        self.addCleanup(shutil.rmtree, tmp_dir)
346        with r.clone(tmp_dir, mkdir=False) as t:
347            self.assertEqual({
348                b'HEAD': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
349                b'refs/remotes/origin/master':
350                    b'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
351                b'refs/heads/master':
352                    b'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
353                b'refs/tags/mytag':
354                    b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a',
355                b'refs/tags/mytag-packed':
356                    b'b0931cadc54336e78a1d980420e3268903b57a50',
357                }, t.refs.as_dict())
358            shas = [e.commit.id for e in r.get_walker()]
359            self.assertEqual(shas, [t.head(),
360                             b'2a72d929692c41d8554c07f6301757ba18a65d91'])
361            c = t.get_config()
362            encoded_path = r.path
363            if not isinstance(encoded_path, bytes):
364                encoded_path = encoded_path.encode(sys.getfilesystemencoding())
365            self.assertEqual(encoded_path,
366                             c.get((b'remote', b'origin'), b'url'))
367            self.assertEqual(
368                b'+refs/heads/*:refs/remotes/origin/*',
369                c.get((b'remote', b'origin'), b'fetch'))
370
371    def test_clone_no_head(self):
372        temp_dir = self.mkdtemp()
373        self.addCleanup(shutil.rmtree, temp_dir)
374        repo_dir = os.path.join(os.path.dirname(__file__), 'data', 'repos')
375        dest_dir = os.path.join(temp_dir, 'a.git')
376        shutil.copytree(os.path.join(repo_dir, 'a.git'),
377                        dest_dir, symlinks=True)
378        r = Repo(dest_dir)
379        del r.refs[b"refs/heads/master"]
380        del r.refs[b"HEAD"]
381        t = r.clone(os.path.join(temp_dir, 'b.git'), mkdir=True)
382        self.assertEqual({
383            b'refs/tags/mytag': b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a',
384            b'refs/tags/mytag-packed':
385                b'b0931cadc54336e78a1d980420e3268903b57a50',
386            }, t.refs.as_dict())
387
388    def test_clone_empty(self):
389        """Test clone() doesn't crash if HEAD points to a non-existing ref.
390
391        This simulates cloning server-side bare repository either when it is
392        still empty or if user renames master branch and pushes private repo
393        to the server.
394        Non-bare repo HEAD always points to an existing ref.
395        """
396        r = self.open_repo('empty.git')
397        tmp_dir = self.mkdtemp()
398        self.addCleanup(shutil.rmtree, tmp_dir)
399        r.clone(tmp_dir, mkdir=False, bare=True)
400
401    def test_clone_bare(self):
402        r = self.open_repo('a.git')
403        tmp_dir = self.mkdtemp()
404        self.addCleanup(shutil.rmtree, tmp_dir)
405        t = r.clone(tmp_dir, mkdir=False)
406        t.close()
407
408    def test_clone_checkout_and_bare(self):
409        r = self.open_repo('a.git')
410        tmp_dir = self.mkdtemp()
411        self.addCleanup(shutil.rmtree, tmp_dir)
412        self.assertRaises(ValueError, r.clone, tmp_dir, mkdir=False,
413                          checkout=True, bare=True)
414
415    def test_merge_history(self):
416        r = self.open_repo('simple_merge.git')
417        shas = [e.commit.id for e in r.get_walker()]
418        self.assertEqual(shas, [b'5dac377bdded4c9aeb8dff595f0faeebcc8498cc',
419                                b'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd',
420                                b'4cffe90e0a41ad3f5190079d7c8f036bde29cbe6',
421                                b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
422                                b'0d89f20333fbb1d2f3a94da77f4981373d8f4310'])
423
424    def test_out_of_order_merge(self):
425        """Test that revision history is ordered by date, not parent order."""
426        r = self.open_repo('ooo_merge.git')
427        shas = [e.commit.id for e in r.get_walker()]
428        self.assertEqual(shas, [b'7601d7f6231db6a57f7bbb79ee52e4d462fd44d1',
429                                b'f507291b64138b875c28e03469025b1ea20bc614',
430                                b'fb5b0425c7ce46959bec94d54b9a157645e114f5',
431                                b'f9e39b120c68182a4ba35349f832d0e4e61f485c'])
432
433    def test_get_tags_empty(self):
434        r = self.open_repo('ooo_merge.git')
435        self.assertEqual({}, r.refs.as_dict(b'refs/tags'))
436
437    def test_get_config(self):
438        r = self.open_repo('ooo_merge.git')
439        self.assertIsInstance(r.get_config(), Config)
440
441    def test_get_config_stack(self):
442        r = self.open_repo('ooo_merge.git')
443        self.assertIsInstance(r.get_config_stack(), Config)
444
445    @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support')
446    def test_submodule(self):
447        temp_dir = self.mkdtemp()
448        self.addCleanup(shutil.rmtree, temp_dir)
449        repo_dir = os.path.join(os.path.dirname(__file__), 'data', 'repos')
450        shutil.copytree(os.path.join(repo_dir, 'a.git'),
451                        os.path.join(temp_dir, 'a.git'), symlinks=True)
452        rel = os.path.relpath(os.path.join(repo_dir, 'submodule'), temp_dir)
453        os.symlink(os.path.join(rel, 'dotgit'), os.path.join(temp_dir, '.git'))
454        with Repo(temp_dir) as r:
455            self.assertEqual(r.head(),
456                             b'a90fa2d900a17e99b433217e988c4eb4a2e9a097')
457
458    def test_common_revisions(self):
459        """
460        This test demonstrates that ``find_common_revisions()`` actually
461        returns common heads, not revisions; dulwich already uses
462        ``find_common_revisions()`` in such a manner (see
463        ``Repo.fetch_objects()``).
464        """
465
466        expected_shas = set([b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e'])
467
468        # Source for objects.
469        r_base = self.open_repo('simple_merge.git')
470
471        # Re-create each-side of the merge in simple_merge.git.
472        #
473        # Since the trees and blobs are missing, the repository created is
474        # corrupted, but we're only checking for commits for the purpose of
475        # this test, so it's immaterial.
476        r1_dir = self.mkdtemp()
477        self.addCleanup(shutil.rmtree, r1_dir)
478        r1_commits = [b'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd',  # HEAD
479                      b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
480                      b'0d89f20333fbb1d2f3a94da77f4981373d8f4310']
481
482        r2_dir = self.mkdtemp()
483        self.addCleanup(shutil.rmtree, r2_dir)
484        r2_commits = [b'4cffe90e0a41ad3f5190079d7c8f036bde29cbe6',  # HEAD
485                      b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
486                      b'0d89f20333fbb1d2f3a94da77f4981373d8f4310']
487
488        r1 = Repo.init_bare(r1_dir)
489        for c in r1_commits:
490            r1.object_store.add_object(r_base.get_object(c))
491        r1.refs[b'HEAD'] = r1_commits[0]
492
493        r2 = Repo.init_bare(r2_dir)
494        for c in r2_commits:
495            r2.object_store.add_object(r_base.get_object(c))
496        r2.refs[b'HEAD'] = r2_commits[0]
497
498        # Finally, the 'real' testing!
499        shas = r2.object_store.find_common_revisions(r1.get_graph_walker())
500        self.assertEqual(set(shas), expected_shas)
501
502        shas = r1.object_store.find_common_revisions(r2.get_graph_walker())
503        self.assertEqual(set(shas), expected_shas)
504
505    def test_shell_hook_pre_commit(self):
506        if os.name != 'posix':
507            self.skipTest('shell hook tests requires POSIX shell')
508
509        pre_commit_fail = """#!/bin/sh
510exit 1
511"""
512
513        pre_commit_success = """#!/bin/sh
514exit 0
515"""
516
517        repo_dir = os.path.join(self.mkdtemp())
518        self.addCleanup(shutil.rmtree, repo_dir)
519        r = Repo.init(repo_dir)
520        self.addCleanup(r.close)
521
522        pre_commit = os.path.join(r.controldir(), 'hooks', 'pre-commit')
523
524        with open(pre_commit, 'w') as f:
525            f.write(pre_commit_fail)
526        os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
527
528        self.assertRaises(errors.CommitError, r.do_commit, 'failed commit',
529                          committer='Test Committer <test@nodomain.com>',
530                          author='Test Author <test@nodomain.com>',
531                          commit_timestamp=12345, commit_timezone=0,
532                          author_timestamp=12345, author_timezone=0)
533
534        with open(pre_commit, 'w') as f:
535            f.write(pre_commit_success)
536        os.chmod(pre_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
537
538        commit_sha = r.do_commit(
539            b'empty commit',
540            committer=b'Test Committer <test@nodomain.com>',
541            author=b'Test Author <test@nodomain.com>',
542            commit_timestamp=12395, commit_timezone=0,
543            author_timestamp=12395, author_timezone=0)
544        self.assertEqual([], r[commit_sha].parents)
545
546    def test_shell_hook_commit_msg(self):
547        if os.name != 'posix':
548            self.skipTest('shell hook tests requires POSIX shell')
549
550        commit_msg_fail = """#!/bin/sh
551exit 1
552"""
553
554        commit_msg_success = """#!/bin/sh
555exit 0
556"""
557
558        repo_dir = self.mkdtemp()
559        self.addCleanup(shutil.rmtree, repo_dir)
560        r = Repo.init(repo_dir)
561        self.addCleanup(r.close)
562
563        commit_msg = os.path.join(r.controldir(), 'hooks', 'commit-msg')
564
565        with open(commit_msg, 'w') as f:
566            f.write(commit_msg_fail)
567        os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
568
569        self.assertRaises(errors.CommitError, r.do_commit, b'failed commit',
570                          committer=b'Test Committer <test@nodomain.com>',
571                          author=b'Test Author <test@nodomain.com>',
572                          commit_timestamp=12345, commit_timezone=0,
573                          author_timestamp=12345, author_timezone=0)
574
575        with open(commit_msg, 'w') as f:
576            f.write(commit_msg_success)
577        os.chmod(commit_msg, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
578
579        commit_sha = r.do_commit(
580            b'empty commit',
581            committer=b'Test Committer <test@nodomain.com>',
582            author=b'Test Author <test@nodomain.com>',
583            commit_timestamp=12395, commit_timezone=0,
584            author_timestamp=12395, author_timezone=0)
585        self.assertEqual([], r[commit_sha].parents)
586
587    def test_shell_hook_post_commit(self):
588        if os.name != 'posix':
589            self.skipTest('shell hook tests requires POSIX shell')
590
591        repo_dir = self.mkdtemp()
592        self.addCleanup(shutil.rmtree, repo_dir)
593
594        r = Repo.init(repo_dir)
595        self.addCleanup(r.close)
596
597        (fd, path) = tempfile.mkstemp(dir=repo_dir)
598        os.close(fd)
599        post_commit_msg = """#!/bin/sh
600rm """ + path + """
601"""
602
603        root_sha = r.do_commit(
604            b'empty commit',
605            committer=b'Test Committer <test@nodomain.com>',
606            author=b'Test Author <test@nodomain.com>',
607            commit_timestamp=12345, commit_timezone=0,
608            author_timestamp=12345, author_timezone=0)
609        self.assertEqual([], r[root_sha].parents)
610
611        post_commit = os.path.join(r.controldir(), 'hooks', 'post-commit')
612
613        with open(post_commit, 'wb') as f:
614            f.write(post_commit_msg.encode(locale.getpreferredencoding()))
615        os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
616
617        commit_sha = r.do_commit(
618            b'empty commit',
619            committer=b'Test Committer <test@nodomain.com>',
620            author=b'Test Author <test@nodomain.com>',
621            commit_timestamp=12345, commit_timezone=0,
622            author_timestamp=12345, author_timezone=0)
623        self.assertEqual([root_sha], r[commit_sha].parents)
624
625        self.assertFalse(os.path.exists(path))
626
627        post_commit_msg_fail = """#!/bin/sh
628exit 1
629"""
630        with open(post_commit, 'w') as f:
631            f.write(post_commit_msg_fail)
632        os.chmod(post_commit, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
633
634        warnings.simplefilter("always", UserWarning)
635        self.addCleanup(warnings.resetwarnings)
636        warnings_list, restore_warnings = setup_warning_catcher()
637        self.addCleanup(restore_warnings)
638
639        commit_sha2 = r.do_commit(
640            b'empty commit',
641            committer=b'Test Committer <test@nodomain.com>',
642            author=b'Test Author <test@nodomain.com>',
643            commit_timestamp=12345, commit_timezone=0,
644            author_timestamp=12345, author_timezone=0)
645        expected_warning = UserWarning(
646            'post-commit hook failed: Hook post-commit exited with '
647            'non-zero status',)
648        for w in warnings_list:
649            if (type(w) == type(expected_warning) and
650                    w.args == expected_warning.args):
651                break
652        else:
653            raise AssertionError(
654                'Expected warning %r not in %r' %
655                (expected_warning, warnings_list))
656        self.assertEqual([commit_sha], r[commit_sha2].parents)
657
658    def test_as_dict(self):
659        def check(repo):
660            self.assertEqual(
661                repo.refs.subkeys(b'refs/tags'),
662                repo.refs.subkeys(b'refs/tags/'))
663            self.assertEqual(
664                repo.refs.as_dict(b'refs/tags'),
665                repo.refs.as_dict(b'refs/tags/'))
666            self.assertEqual(
667                repo.refs.as_dict(b'refs/heads'),
668                repo.refs.as_dict(b'refs/heads/'))
669
670        bare = self.open_repo('a.git')
671        tmp_dir = self.mkdtemp()
672        self.addCleanup(shutil.rmtree, tmp_dir)
673        with bare.clone(tmp_dir, mkdir=False) as nonbare:
674            check(nonbare)
675            check(bare)
676
677    def test_working_tree(self):
678        temp_dir = tempfile.mkdtemp()
679        self.addCleanup(shutil.rmtree, temp_dir)
680        worktree_temp_dir = tempfile.mkdtemp()
681        self.addCleanup(shutil.rmtree, worktree_temp_dir)
682        r = Repo.init(temp_dir)
683        self.addCleanup(r.close)
684        root_sha = r.do_commit(
685                b'empty commit',
686                committer=b'Test Committer <test@nodomain.com>',
687                author=b'Test Author <test@nodomain.com>',
688                commit_timestamp=12345, commit_timezone=0,
689                author_timestamp=12345, author_timezone=0)
690        r.refs[b'refs/heads/master'] = root_sha
691        w = Repo._init_new_working_directory(worktree_temp_dir, r)
692        self.addCleanup(w.close)
693        new_sha = w.do_commit(
694                b'new commit',
695                committer=b'Test Committer <test@nodomain.com>',
696                author=b'Test Author <test@nodomain.com>',
697                commit_timestamp=12345, commit_timezone=0,
698                author_timestamp=12345, author_timezone=0)
699        w.refs[b'HEAD'] = new_sha
700        self.assertEqual(os.path.abspath(r.controldir()),
701                         os.path.abspath(w.commondir()))
702        self.assertEqual(r.refs.keys(), w.refs.keys())
703        self.assertNotEqual(r.head(), w.head())
704
705
706class BuildRepoRootTests(TestCase):
707    """Tests that build on-disk repos from scratch.
708
709    Repos live in a temp dir and are torn down after each test. They start with
710    a single commit in master having single file named 'a'.
711    """
712
713    def get_repo_dir(self):
714        return os.path.join(tempfile.mkdtemp(), 'test')
715
716    def setUp(self):
717        super(BuildRepoRootTests, self).setUp()
718        self._repo_dir = self.get_repo_dir()
719        os.makedirs(self._repo_dir)
720        r = self._repo = Repo.init(self._repo_dir)
721        self.addCleanup(tear_down_repo, r)
722        self.assertFalse(r.bare)
723        self.assertEqual(b'ref: refs/heads/master', r.refs.read_ref(b'HEAD'))
724        self.assertRaises(KeyError, lambda: r.refs[b'refs/heads/master'])
725
726        with open(os.path.join(r.path, 'a'), 'wb') as f:
727            f.write(b'file contents')
728        r.stage(['a'])
729        commit_sha = r.do_commit(
730                b'msg',
731                committer=b'Test Committer <test@nodomain.com>',
732                author=b'Test Author <test@nodomain.com>',
733                commit_timestamp=12345, commit_timezone=0,
734                author_timestamp=12345, author_timezone=0)
735        self.assertEqual([], r[commit_sha].parents)
736        self._root_commit = commit_sha
737
738    def test_get_shallow(self):
739        self.assertEqual(set(), self._repo.get_shallow())
740        with open(os.path.join(self._repo.path, '.git', 'shallow'), 'wb') as f:
741            f.write(b'a90fa2d900a17e99b433217e988c4eb4a2e9a097\n')
742        self.assertEqual({b'a90fa2d900a17e99b433217e988c4eb4a2e9a097'},
743                         self._repo.get_shallow())
744
745    def test_update_shallow(self):
746        self._repo.update_shallow(None, None)  # no op
747        self.assertEqual(set(), self._repo.get_shallow())
748        self._repo.update_shallow(
749                [b'a90fa2d900a17e99b433217e988c4eb4a2e9a097'],
750                None)
751        self.assertEqual(
752                {b'a90fa2d900a17e99b433217e988c4eb4a2e9a097'},
753                self._repo.get_shallow())
754        self._repo.update_shallow(
755                [b'a90fa2d900a17e99b433217e988c4eb4a2e9a097'],
756                [b'f9e39b120c68182a4ba35349f832d0e4e61f485c'])
757        self.assertEqual({b'a90fa2d900a17e99b433217e988c4eb4a2e9a097'},
758                         self._repo.get_shallow())
759
760    def test_build_repo(self):
761        r = self._repo
762        self.assertEqual(b'ref: refs/heads/master', r.refs.read_ref(b'HEAD'))
763        self.assertEqual(self._root_commit, r.refs[b'refs/heads/master'])
764        expected_blob = objects.Blob.from_string(b'file contents')
765        self.assertEqual(expected_blob.data, r[expected_blob.id].data)
766        actual_commit = r[self._root_commit]
767        self.assertEqual(b'msg', actual_commit.message)
768
769    def test_commit_modified(self):
770        r = self._repo
771        with open(os.path.join(r.path, 'a'), 'wb') as f:
772            f.write(b'new contents')
773        r.stage(['a'])
774        commit_sha = r.do_commit(
775            b'modified a',
776            committer=b'Test Committer <test@nodomain.com>',
777            author=b'Test Author <test@nodomain.com>',
778            commit_timestamp=12395, commit_timezone=0,
779            author_timestamp=12395, author_timezone=0)
780        self.assertEqual([self._root_commit], r[commit_sha].parents)
781        a_mode, a_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b'a')
782        self.assertEqual(stat.S_IFREG | 0o644, a_mode)
783        self.assertEqual(b'new contents', r[a_id].data)
784
785    @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support')
786    def test_commit_symlink(self):
787        r = self._repo
788        os.symlink('a', os.path.join(r.path, 'b'))
789        r.stage(['a', 'b'])
790        commit_sha = r.do_commit(
791            b'Symlink b',
792            committer=b'Test Committer <test@nodomain.com>',
793            author=b'Test Author <test@nodomain.com>',
794            commit_timestamp=12395, commit_timezone=0,
795            author_timestamp=12395, author_timezone=0)
796        self.assertEqual([self._root_commit], r[commit_sha].parents)
797        b_mode, b_id = tree_lookup_path(r.get_object, r[commit_sha].tree, b'b')
798        self.assertTrue(stat.S_ISLNK(b_mode))
799        self.assertEqual(b'a', r[b_id].data)
800
801    def test_commit_merge_heads_file(self):
802        tmp_dir = tempfile.mkdtemp()
803        self.addCleanup(shutil.rmtree, tmp_dir)
804        r = Repo.init(tmp_dir)
805        with open(os.path.join(r.path, 'a'), 'w') as f:
806            f.write('initial text')
807        c1 = r.do_commit(
808            b'initial commit',
809            committer=b'Test Committer <test@nodomain.com>',
810            author=b'Test Author <test@nodomain.com>',
811            commit_timestamp=12395, commit_timezone=0,
812            author_timestamp=12395, author_timezone=0)
813        with open(os.path.join(r.path, 'a'), 'w') as f:
814            f.write('merged text')
815        with open(os.path.join(r.path, '.git', 'MERGE_HEADS'), 'w') as f:
816            f.write('c27a2d21dd136312d7fa9e8baabb82561a1727d0\n')
817        r.stage(['a'])
818        commit_sha = r.do_commit(
819            b'deleted a',
820            committer=b'Test Committer <test@nodomain.com>',
821            author=b'Test Author <test@nodomain.com>',
822            commit_timestamp=12395, commit_timezone=0,
823            author_timestamp=12395, author_timezone=0)
824        self.assertEqual([
825            c1,
826            b'c27a2d21dd136312d7fa9e8baabb82561a1727d0'],
827            r[commit_sha].parents)
828
829    def test_commit_deleted(self):
830        r = self._repo
831        os.remove(os.path.join(r.path, 'a'))
832        r.stage(['a'])
833        commit_sha = r.do_commit(
834            b'deleted a',
835            committer=b'Test Committer <test@nodomain.com>',
836            author=b'Test Author <test@nodomain.com>',
837            commit_timestamp=12395, commit_timezone=0,
838            author_timestamp=12395, author_timezone=0)
839        self.assertEqual([self._root_commit], r[commit_sha].parents)
840        self.assertEqual([], list(r.open_index()))
841        tree = r[r[commit_sha].tree]
842        self.assertEqual([], list(tree.iteritems()))
843
844    def test_commit_follows(self):
845        r = self._repo
846        r.refs.set_symbolic_ref(b'HEAD', b'refs/heads/bla')
847        commit_sha = r.do_commit(
848            b'commit with strange character',
849            committer=b'Test Committer <test@nodomain.com>',
850            author=b'Test Author <test@nodomain.com>',
851            commit_timestamp=12395, commit_timezone=0,
852            author_timestamp=12395, author_timezone=0,
853            ref=b'HEAD')
854        self.assertEqual(commit_sha, r[b'refs/heads/bla'].id)
855
856    def test_commit_encoding(self):
857        r = self._repo
858        commit_sha = r.do_commit(
859            b'commit with strange character \xee',
860            committer=b'Test Committer <test@nodomain.com>',
861            author=b'Test Author <test@nodomain.com>',
862            commit_timestamp=12395, commit_timezone=0,
863            author_timestamp=12395, author_timezone=0,
864            encoding=b"iso8859-1")
865        self.assertEqual(b"iso8859-1", r[commit_sha].encoding)
866
867    def test_compression_level(self):
868        r = self._repo
869        c = r.get_config()
870        c.set(('core',), 'compression', '3')
871        c.set(('core',), 'looseCompression', '4')
872        c.write_to_path()
873        r = Repo(self._repo_dir)
874        self.assertEqual(r.object_store.loose_compression_level, 4)
875
876    def test_commit_encoding_from_config(self):
877        r = self._repo
878        c = r.get_config()
879        c.set(('i18n',), 'commitEncoding', 'iso8859-1')
880        c.write_to_path()
881        commit_sha = r.do_commit(
882            b'commit with strange character \xee',
883            committer=b'Test Committer <test@nodomain.com>',
884            author=b'Test Author <test@nodomain.com>',
885            commit_timestamp=12395, commit_timezone=0,
886            author_timestamp=12395, author_timezone=0)
887        self.assertEqual(b"iso8859-1", r[commit_sha].encoding)
888
889    def test_commit_config_identity(self):
890        # commit falls back to the users' identity if it wasn't specified
891        r = self._repo
892        c = r.get_config()
893        c.set((b"user", ), b"name", b"Jelmer")
894        c.set((b"user", ), b"email", b"jelmer@apache.org")
895        c.write_to_path()
896        commit_sha = r.do_commit(b'message')
897        self.assertEqual(
898            b"Jelmer <jelmer@apache.org>",
899            r[commit_sha].author)
900        self.assertEqual(
901            b"Jelmer <jelmer@apache.org>",
902            r[commit_sha].committer)
903
904    def test_commit_config_identity_strips_than(self):
905        # commit falls back to the users' identity if it wasn't specified,
906        # and strips superfluous <>
907        r = self._repo
908        c = r.get_config()
909        c.set((b"user", ), b"name", b"Jelmer")
910        c.set((b"user", ), b"email", b"<jelmer@apache.org>")
911        c.write_to_path()
912        commit_sha = r.do_commit(b'message')
913        self.assertEqual(
914            b"Jelmer <jelmer@apache.org>",
915            r[commit_sha].author)
916        self.assertEqual(
917            b"Jelmer <jelmer@apache.org>",
918            r[commit_sha].committer)
919
920    def test_commit_config_identity_in_memoryrepo(self):
921        # commit falls back to the users' identity if it wasn't specified
922        r = MemoryRepo.init_bare([], {})
923        c = r.get_config()
924        c.set((b"user", ), b"name", b"Jelmer")
925        c.set((b"user", ), b"email", b"jelmer@apache.org")
926
927        commit_sha = r.do_commit(b'message', tree=objects.Tree().id)
928        self.assertEqual(
929            b"Jelmer <jelmer@apache.org>",
930            r[commit_sha].author)
931        self.assertEqual(
932            b"Jelmer <jelmer@apache.org>",
933            r[commit_sha].committer)
934
935    def overrideEnv(self, name, value):
936        def restore():
937            if oldval is not None:
938                os.environ[name] = oldval
939            else:
940                del os.environ[name]
941        oldval = os.environ.get(name)
942        os.environ[name] = value
943        self.addCleanup(restore)
944
945    def test_commit_config_identity_from_env(self):
946        # commit falls back to the users' identity if it wasn't specified
947        self.overrideEnv('GIT_COMMITTER_NAME', 'joe')
948        self.overrideEnv('GIT_COMMITTER_EMAIL', 'joe@example.com')
949        r = self._repo
950        c = r.get_config()
951        c.set((b"user", ), b"name", b"Jelmer")
952        c.set((b"user", ), b"email", b"jelmer@apache.org")
953        c.write_to_path()
954        commit_sha = r.do_commit(b'message')
955        self.assertEqual(
956            b"Jelmer <jelmer@apache.org>",
957            r[commit_sha].author)
958        self.assertEqual(
959            b"joe <joe@example.com>",
960            r[commit_sha].committer)
961
962    def test_commit_fail_ref(self):
963        r = self._repo
964
965        def set_if_equals(name, old_ref, new_ref, **kwargs):
966            return False
967        r.refs.set_if_equals = set_if_equals
968
969        def add_if_new(name, new_ref, **kwargs):
970            self.fail('Unexpected call to add_if_new')
971        r.refs.add_if_new = add_if_new
972
973        old_shas = set(r.object_store)
974        self.assertRaises(errors.CommitError, r.do_commit, b'failed commit',
975                          committer=b'Test Committer <test@nodomain.com>',
976                          author=b'Test Author <test@nodomain.com>',
977                          commit_timestamp=12345, commit_timezone=0,
978                          author_timestamp=12345, author_timezone=0)
979        new_shas = set(r.object_store) - old_shas
980        self.assertEqual(1, len(new_shas))
981        # Check that the new commit (now garbage) was added.
982        new_commit = r[new_shas.pop()]
983        self.assertEqual(r[self._root_commit].tree, new_commit.tree)
984        self.assertEqual(b'failed commit', new_commit.message)
985
986    def test_commit_branch(self):
987        r = self._repo
988
989        commit_sha = r.do_commit(
990            b'commit to branch',
991            committer=b'Test Committer <test@nodomain.com>',
992            author=b'Test Author <test@nodomain.com>',
993            commit_timestamp=12395, commit_timezone=0,
994            author_timestamp=12395, author_timezone=0,
995            ref=b"refs/heads/new_branch")
996        self.assertEqual(self._root_commit, r[b"HEAD"].id)
997        self.assertEqual(commit_sha, r[b"refs/heads/new_branch"].id)
998        self.assertEqual([], r[commit_sha].parents)
999        self.assertTrue(b"refs/heads/new_branch" in r)
1000
1001        new_branch_head = commit_sha
1002
1003        commit_sha = r.do_commit(
1004            b'commit to branch 2',
1005            committer=b'Test Committer <test@nodomain.com>',
1006            author=b'Test Author <test@nodomain.com>',
1007            commit_timestamp=12395, commit_timezone=0,
1008            author_timestamp=12395, author_timezone=0,
1009            ref=b"refs/heads/new_branch")
1010        self.assertEqual(self._root_commit, r[b"HEAD"].id)
1011        self.assertEqual(commit_sha, r[b"refs/heads/new_branch"].id)
1012        self.assertEqual([new_branch_head], r[commit_sha].parents)
1013
1014    def test_commit_merge_heads(self):
1015        r = self._repo
1016        merge_1 = r.do_commit(
1017            b'commit to branch 2',
1018            committer=b'Test Committer <test@nodomain.com>',
1019            author=b'Test Author <test@nodomain.com>',
1020            commit_timestamp=12395, commit_timezone=0,
1021            author_timestamp=12395, author_timezone=0,
1022            ref=b"refs/heads/new_branch")
1023        commit_sha = r.do_commit(
1024            b'commit with merge',
1025            committer=b'Test Committer <test@nodomain.com>',
1026            author=b'Test Author <test@nodomain.com>',
1027            commit_timestamp=12395, commit_timezone=0,
1028            author_timestamp=12395, author_timezone=0,
1029            merge_heads=[merge_1])
1030        self.assertEqual(
1031            [self._root_commit, merge_1],
1032            r[commit_sha].parents)
1033
1034    def test_commit_dangling_commit(self):
1035        r = self._repo
1036
1037        old_shas = set(r.object_store)
1038        old_refs = r.get_refs()
1039        commit_sha = r.do_commit(
1040            b'commit with no ref',
1041            committer=b'Test Committer <test@nodomain.com>',
1042            author=b'Test Author <test@nodomain.com>',
1043            commit_timestamp=12395, commit_timezone=0,
1044            author_timestamp=12395, author_timezone=0,
1045            ref=None)
1046        new_shas = set(r.object_store) - old_shas
1047
1048        # New sha is added, but no new refs
1049        self.assertEqual(1, len(new_shas))
1050        new_commit = r[new_shas.pop()]
1051        self.assertEqual(r[self._root_commit].tree, new_commit.tree)
1052        self.assertEqual([], r[commit_sha].parents)
1053        self.assertEqual(old_refs, r.get_refs())
1054
1055    def test_commit_dangling_commit_with_parents(self):
1056        r = self._repo
1057
1058        old_shas = set(r.object_store)
1059        old_refs = r.get_refs()
1060        commit_sha = r.do_commit(
1061            b'commit with no ref',
1062            committer=b'Test Committer <test@nodomain.com>',
1063            author=b'Test Author <test@nodomain.com>',
1064            commit_timestamp=12395, commit_timezone=0,
1065            author_timestamp=12395, author_timezone=0,
1066            ref=None, merge_heads=[self._root_commit])
1067        new_shas = set(r.object_store) - old_shas
1068
1069        # New sha is added, but no new refs
1070        self.assertEqual(1, len(new_shas))
1071        new_commit = r[new_shas.pop()]
1072        self.assertEqual(r[self._root_commit].tree, new_commit.tree)
1073        self.assertEqual([self._root_commit], r[commit_sha].parents)
1074        self.assertEqual(old_refs, r.get_refs())
1075
1076    def test_stage_absolute(self):
1077        r = self._repo
1078        os.remove(os.path.join(r.path, 'a'))
1079        self.assertRaises(ValueError, r.stage, [os.path.join(r.path, 'a')])
1080
1081    def test_stage_deleted(self):
1082        r = self._repo
1083        os.remove(os.path.join(r.path, 'a'))
1084        r.stage(['a'])
1085        r.stage(['a'])  # double-stage a deleted path
1086
1087    def test_stage_directory(self):
1088        r = self._repo
1089        os.mkdir(os.path.join(r.path, 'c'))
1090        r.stage(['c'])
1091        self.assertEqual([b'a'], list(r.open_index()))
1092
1093    @skipIf(sys.platform == 'win32' and sys.version_info[:2] >= (3, 6),
1094            'tries to implicitly decode as utf8')
1095    def test_commit_no_encode_decode(self):
1096        r = self._repo
1097        repo_path_bytes = r.path.encode(sys.getfilesystemencoding())
1098        encodings = ('utf8', 'latin1')
1099        names = [u'À'.encode(encoding) for encoding in encodings]
1100        for name, encoding in zip(names, encodings):
1101            full_path = os.path.join(repo_path_bytes, name)
1102            with open(full_path, 'wb') as f:
1103                f.write(encoding.encode('ascii'))
1104            # These files are break tear_down_repo, so cleanup these files
1105            # ourselves.
1106            self.addCleanup(os.remove, full_path)
1107
1108        r.stage(names)
1109        commit_sha = r.do_commit(
1110            b'Files with different encodings',
1111            committer=b'Test Committer <test@nodomain.com>',
1112            author=b'Test Author <test@nodomain.com>',
1113            commit_timestamp=12395, commit_timezone=0,
1114            author_timestamp=12395, author_timezone=0,
1115            ref=None, merge_heads=[self._root_commit])
1116
1117        for name, encoding in zip(names, encodings):
1118            mode, id = tree_lookup_path(r.get_object, r[commit_sha].tree, name)
1119            self.assertEqual(stat.S_IFREG | 0o644, mode)
1120            self.assertEqual(encoding.encode('ascii'), r[id].data)
1121
1122    def test_discover_intended(self):
1123        path = os.path.join(self._repo_dir, 'b/c')
1124        r = Repo.discover(path)
1125        self.assertEqual(r.head(), self._repo.head())
1126
1127    def test_discover_isrepo(self):
1128        r = Repo.discover(self._repo_dir)
1129        self.assertEqual(r.head(), self._repo.head())
1130
1131    def test_discover_notrepo(self):
1132        with self.assertRaises(NotGitRepository):
1133            Repo.discover('/')
1134
1135
1136class CheckUserIdentityTests(TestCase):
1137
1138    def test_valid(self):
1139        check_user_identity(b'Me <me@example.com>')
1140
1141    def test_invalid(self):
1142        self.assertRaises(InvalidUserIdentity,
1143                          check_user_identity, b'No Email')
1144        self.assertRaises(InvalidUserIdentity,
1145                          check_user_identity, b'Fullname <missing')
1146        self.assertRaises(InvalidUserIdentity,
1147                          check_user_identity, b'Fullname missing>')
1148        self.assertRaises(InvalidUserIdentity,
1149                          check_user_identity, b'Fullname >order<>')
1150