1#
2#
3# Licensed to the Apache Software Foundation (ASF) under one
4# or more contributor license agreements.  See the NOTICE file
5# distributed with this work for additional information
6# regarding copyright ownership.  The ASF licenses this file
7# to you under the Apache License, Version 2.0 (the
8# "License"); you may not use this file except in compliance
9# with the License.  You may obtain a copy of the License at
10#
11#   http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing,
14# software distributed under the License is distributed on an
15# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16# KIND, either express or implied.  See the License for the
17# specific language governing permissions and limitations
18# under the License.
19#
20#
21from sys import version_info # For Python version check
22from io import BytesIO
23import unittest, os, tempfile, setup_path, binascii
24import svn.diff
25from svn import core, repos, wc, client
26from svn import delta, ra
27from svn.core import SubversionException, SVN_INVALID_REVNUM
28import utils
29
30class SubversionWorkingCopyTestCase(unittest.TestCase):
31  """Test cases for the Subversion working copy layer"""
32
33  def setUp(self):
34    """Load a Subversion repository"""
35
36    self.temper = utils.Temper()
37
38    # Isolate each test from the others with a fresh repository.
39    (self.repos, _, self.repos_uri) = self.temper.alloc_known_repo(
40      'trac/versioncontrol/tests/svnrepos.dump', suffix='-wc-repo')
41    self.fs = repos.fs(self.repos)
42
43    self.path = self.temper.alloc_empty_dir(suffix='-wc-wc')
44
45    client_ctx = client.create_context()
46
47    rev = core.svn_opt_revision_t()
48    rev.kind = core.svn_opt_revision_head
49
50    client.checkout2(self.repos_uri, self.path, rev, rev, True, True,
51            client_ctx)
52
53    self.wc = wc.adm_open3(None, self.path, True, -1, None)
54
55  def test_entry(self):
56      wc.entry(self.path, self.wc, True)
57
58  def test_lock(self):
59      readme_path = b'%s/trunk/README.txt' % self.path
60
61      lock = core.svn_lock_create(core.Pool())
62      lock.token = b'http://svnbook.org/nightly/en/svn.advanced.locking.html'
63
64      wc.add_lock(readme_path, lock, self.wc)
65      self.assertEqual(True, wc.adm_locked(self.wc))
66      self.assertEqual(True, wc.locked(self.path))
67      wc.remove_lock(readme_path, self.wc)
68
69  def test_version(self):
70      wc.version()
71
72  def test_access_path(self):
73      self.assertEqual(self.path, wc.adm_access_path(self.wc))
74
75  def test_is_adm_dir(self):
76      self.assertTrue(wc.is_adm_dir(b".svn"))
77      self.assertFalse(wc.is_adm_dir(b".foosvn"))
78
79  def test_get_adm_dir(self):
80      self.assertTrue(isinstance(wc.get_adm_dir(), bytes))
81
82  def test_set_adm_dir(self):
83      self.assertRaises(SubversionException, wc.set_adm_dir, b".foobar")
84      self.assertTrue(wc.is_adm_dir(b".svn"))
85      self.assertFalse(wc.is_adm_dir(b"_svn"))
86      self.assertFalse(wc.is_adm_dir(b".foobar"))
87      wc.set_adm_dir(b"_svn")
88      self.assertTrue(wc.is_adm_dir(b"_svn"))
89      self.assertEqual(b"_svn", wc.get_adm_dir())
90      wc.set_adm_dir(b".svn")
91      self.assertFalse(wc.is_adm_dir(b"_svn"))
92      self.assertEqual(b".svn", wc.get_adm_dir())
93
94  def test_init_traversal_info(self):
95      wc.init_traversal_info()
96
97  def test_crawl_revisions2(self):
98      infos = []
99      set_paths = []
100
101      def notify(info, pool):
102          infos.append(info)
103
104      class MyReporter:
105          def __init__(self):
106              self.finished_report = False
107
108          def abort_report(self, pool):
109              pass
110
111          def finish_report(self, pool):
112              self.finished_report = True
113
114          def set_path(self, path, revision, start_empty, lock_token, pool):
115              set_paths.append(path)
116
117          def link_path(self, path, url, revision, start_empty, lock_token,
118                        pool):
119              pass
120
121          def delete_path(self, path, pool):
122              pass
123
124      # Remove trunk/README.txt
125      readme_path = b'%s/trunk/README.txt' % self.path
126      self.assertTrue(os.path.exists(readme_path))
127      os.remove(readme_path)
128
129      # Restore trunk/README.txt using crawl_revision2
130      info = wc.init_traversal_info()
131      reporter = MyReporter()
132      wc.crawl_revisions2(self.path, self.wc, reporter,
133                          True, True, False, notify, info)
134
135      # Check that the report finished
136      self.assertTrue(reporter.finished_report)
137      self.assertEqual([b''], set_paths)
138      self.assertEqual(1, len(infos))
139
140      # Check content of infos object
141      [info] = infos
142      self.assertEqual(readme_path, info.path)
143      self.assertEqual(core.svn_node_file, info.kind)
144      self.assertEqual(core.SVN_INVALID_REVNUM, info.revision)
145
146  def test_create_notify(self):
147      wc.create_notify(self.path, wc.notify_add)
148
149  def test_check_wc(self):
150      self.assertTrue(wc.check_wc(self.path) > 0)
151
152  def test_get_ancestry(self):
153      self.assertEqual([self.repos_uri, 12],
154                       wc.get_ancestry(self.path, self.wc))
155
156  def test_status(self):
157      wc.status2(self.path, self.wc)
158
159  def test_status_editor(self):
160      paths = []
161      def status_func(target, status):
162        paths.append(target)
163
164      (anchor_access, target_access,
165       target) = wc.adm_open_anchor(self.path, False, -1, None)
166      (editor, edit_baton, set_locks_baton,
167       edit_revision) = wc.get_status_editor2(anchor_access,
168                                              target,
169                                              None,  # SvnConfig
170                                              True,  # recursive
171                                              True, # get_all
172                                              False, # no_ignore
173                                              status_func,
174                                              None,  # cancel_func
175                                              None,  # traversal_info
176                                              )
177      editor.close_edit(edit_baton)
178      self.assertTrue(len(paths) > 0)
179      for target in paths:
180        self.assertTrue(target.startswith(self.path))
181
182  def test_status_editor_callback_exception(self):
183      """test case for status_editor call back not to be crashed by Python exception"""
184      def status_func(target, status):
185        # Note: exception with in this call back doesn't propagate to
186        # the caller
187        raise AssertionError('intentional exception')
188
189      (anchor_access, target_access,
190       target) = wc.adm_open_anchor(self.path, False, -1, None)
191      (editor, edit_baton, set_locks_baton,
192       edit_revision) = wc.get_status_editor2(anchor_access,
193                                              target,
194                                              None,  # SvnConfig
195                                              True,  # recursive
196                                              True, # get_all
197                                              False, # no_ignore
198                                              status_func,
199                                              None,  # cancel_func
200                                              None,  # traversal_info
201                                              )
202      editor.close_edit(edit_baton)
203
204  def test_is_normal_prop(self):
205      self.assertFalse(wc.is_normal_prop(b'svn:wc:foo:bar'))
206      self.assertFalse(wc.is_normal_prop(b'svn:entry:foo:bar'))
207      self.assertTrue(wc.is_normal_prop(b'svn:foo:bar'))
208      self.assertTrue(wc.is_normal_prop(b'foreign:foo:bar'))
209
210  def test_is_wc_prop(self):
211      self.assertTrue(wc.is_wc_prop(b'svn:wc:foo:bar'))
212      self.assertFalse(wc.is_wc_prop(b'svn:entry:foo:bar'))
213      self.assertFalse(wc.is_wc_prop(b'svn:foo:bar'))
214      self.assertFalse(wc.is_wc_prop(b'foreign:foo:bar'))
215
216  def test_is_entry_prop(self):
217      self.assertTrue(wc.is_entry_prop(b'svn:entry:foo:bar'))
218      self.assertFalse(wc.is_entry_prop(b'svn:wc:foo:bar'))
219      self.assertFalse(wc.is_entry_prop(b'svn:foo:bar'))
220      self.assertFalse(wc.is_entry_prop(b'foreign:foo:bar'))
221
222  def test_get_prop_diffs(self):
223      wc.prop_set(b"foreign:foo", b"bla", self.path, self.wc)
224      self.assertEqual([{b"foreign:foo": b"bla"}, {}],
225              wc.get_prop_diffs(self.path, self.wc))
226
227  def test_get_pristine_copy_path(self):
228      path_to_file = b'%s/trunk/README.txt' % self.path
229      path_to_text_base = wc.get_pristine_copy_path(path_to_file)
230      with open(path_to_text_base, 'rb') as fp:
231          text_base = fp.read()
232      # TODO: This test should modify the working file first, to ensure the
233      # path isn't just the path to the working file.
234      self.assertEqual(text_base, b'A test.\n')
235
236  def test_entries_read(self):
237      entries = wc.entries_read(self.wc, True)
238      keys = core._as_list(entries.keys())
239      keys.sort()
240      self.assertEqual([b'', b'branches', b'tags', b'trunk'], keys)
241
242  def test_get_ignores(self):
243      self.assertTrue(isinstance(wc.get_ignores(None, self.wc), list))
244
245  def test_commit(self):
246    # Replace README.txt's contents, using binary mode so we know the
247    # exact contents even on Windows, and therefore the MD5 checksum.
248    readme_path = b'%s/trunk/README.txt' % self.path
249    fp = open(readme_path, 'wb')
250    fp.write(b'hello\n')
251    fp.close()
252
253    # Setup ra_ctx.
254    ra.initialize()
255    callbacks = ra.Callbacks()
256    ra_ctx = ra.open2(self.repos_uri, callbacks, None, None)
257
258    # Get commit editor.
259    commit_info = [None]
260    def commit_cb(_commit_info, pool):
261      commit_info[0] = _commit_info
262    (editor, edit_baton) = ra.get_commit_editor2(ra_ctx, b'log message',
263                                                 commit_cb,
264                                                 None,
265                                                 False)
266
267    # Drive the commit.
268    checksum = [None]
269    def driver_cb(parent, path, pool):
270      baton = editor.open_file(path, parent, -1, pool)
271      adm_access = wc.adm_probe_retrieve(self.wc, readme_path, pool)
272      (_, checksum[0]) = wc.transmit_text_deltas2(readme_path, adm_access,
273                                                  False, editor, baton, pool)
274      return baton
275    try:
276      delta.path_driver(editor, edit_baton, -1, [b'trunk/README.txt'],
277                        driver_cb)
278      editor.close_edit(edit_baton)
279    except:
280      try:
281        editor.abort_edit(edit_baton)
282      except:
283        # We already have an exception in progress, not much we can do
284        # about this.
285        pass
286      raise
287    (checksum,) = checksum
288    (commit_info,) = commit_info
289
290    # Assert the commit.
291    self.assertEqual(binascii.b2a_hex(checksum),
292                      b'b1946ac92492d2347c6235b4d2611184')
293    self.assertEqual(commit_info.revision, 13)
294
295    # Bump working copy state.
296    wc.process_committed4(readme_path,
297                          wc.adm_retrieve(self.wc,
298                                          os.path.dirname(readme_path)),
299                          False, commit_info.revision, commit_info.date,
300                          commit_info.author, None, False, False, checksum)
301
302    # Assert bumped state.
303    entry = wc.entry(readme_path, self.wc, False)
304    self.assertEqual(entry.revision, commit_info.revision)
305    self.assertEqual(entry.schedule, wc.schedule_normal)
306    self.assertEqual(entry.cmt_rev, commit_info.revision)
307    self.assertEqual(entry.cmt_date,
308                      core.svn_time_from_cstring(commit_info.date))
309
310  def test_diff_editor4(self):
311    pool = None
312    depth = core.svn_depth_infinity
313    url = self.repos_uri
314
315    # cause file_changed: Replace README.txt's contents.
316    readme_path = b'%s/trunk/README.txt' % self.path
317    fp = open(readme_path, 'wb')
318    fp.write(b'hello\n')
319    fp.close()
320    # cause file_added: Create readme3.
321    readme3_path = b'%s/trunk/readme3' % self.path
322    fp = open(readme3_path, 'wb')
323    fp.write(b'hello\n')
324    fp.close()
325    wc.add2(readme3_path,
326            wc.adm_probe_retrieve(self.wc,
327                                  os.path.dirname(readme3_path), pool),
328            None, SVN_INVALID_REVNUM, # copyfrom
329            None,                     # cancel_func
330            None,                     # notify_func
331            pool)
332    # cause file_deleted: Delete README2.txt.
333    readme2_path = b'%s/trunk/README2.txt' % self.path
334    wc.delete3(readme2_path,
335               wc.adm_probe_retrieve(self.wc,
336                                     os.path.dirname(readme2_path), pool),
337               None,                  # cancel_func
338               None,                  # notify_func
339               False,                 # keep_local
340               pool)
341    # cause dir_props_changed: ps testprop testval dir1/dir2
342    dir2_path = b'%s/trunk/dir1/dir2' % self.path
343    wc.prop_set2(b'testprop', b'testval', dir2_path,
344                 wc.adm_probe_retrieve(self.wc,
345                                       os.path.dirname(dir2_path), pool),
346                 False,               # skip_checks
347                 pool)
348    # TODO: cause dir_added/deleted
349
350    # Save prop changes.
351    got_prop_changes = []
352    def props_changed(path, propchanges):
353      for (name, value) in core._as_list(propchanges.items()):
354        (kind, _) = core.svn_property_kind(name)
355        if kind != core.svn_prop_regular_kind:
356          continue
357        got_prop_changes.append((path[len(self.path) + 1:], name, value))
358
359    # Save diffs.
360    got_diffs = {}
361    def write_diff(path, left, right):
362      options = svn.diff.file_options_create()
363      diff = svn.diff.file_diff_2(left, right, options, pool)
364      original_header = modified_header = b''
365      encoding = b'utf8'
366      relative_to_dir = None
367      bio = BytesIO()
368      svn.diff.file_output_unified3(bio, diff,
369                                    left, right,
370                                    original_header, modified_header,
371                                    encoding, relative_to_dir,
372                                    options.show_c_function, pool)
373      got_diffs[path[len(self.path) + 1:]] = bio.getvalue().splitlines()
374
375    # Diff callbacks that call props_changed and write_diff.
376    contentstate = propstate = state = wc.notify_state_unknown
377    class Callbacks(wc.DiffCallbacks2):
378      def file_changed(self, adm_access, path,
379                       tmpfile1, tmpfile2, rev1, rev2,
380                       mimetype1, mimetype2,
381                       propchanges, originalprops):
382        write_diff(path, tmpfile1, tmpfile2)
383        return (contentstate, propstate)
384
385      def file_added(self, adm_access, path,
386                     tmpfile1, tmpfile2, rev1, rev2,
387                     mimetype1, mimetype2,
388                     propchanges, originalprops):
389        write_diff(path, tmpfile1, tmpfile2)
390        return (contentstate, propstate)
391
392      def file_deleted(self, adm_access, path, tmpfile1, tmpfile2,
393                       mimetype1, mimetype2, originalprops):
394        write_diff(path, tmpfile1, tmpfile2)
395        return state
396
397      def dir_props_changed(self, adm_access, path,
398                            propchanges, original_props):
399        props_changed(path, propchanges)
400        return state
401    diff_callbacks = Callbacks()
402
403    # Setup wc diff editor.
404    (editor, edit_baton) = wc.get_diff_editor4(
405      self.wc, b'', diff_callbacks, depth,
406      False,                    # ignore_ancestry
407      False,                    # use_text_base
408      False,                    # reverse_order
409      None,                     # cancel_func
410      None,                     # changelists
411      pool)
412    # Setup ra_ctx.
413    ra.initialize()
414    ra_callbacks = ra.Callbacks()
415    ra_ctx = ra.open2(url, ra_callbacks, None, None)
416    # Use head rev for do_diff3 and set_path.
417    head = ra.get_latest_revnum(ra_ctx)
418    # Get diff reporter.
419    (reporter, report_baton) = ra.do_diff3(
420      ra_ctx,
421      head,                     # versus_url revision
422      b'',                       # diff_target
423      depth,
424      False,                    # ignore_ancestry
425      True,                     # text_deltas
426      url,                      # versus_url
427      editor, edit_baton, pool)
428    # Report wc state (pretty plain).
429    reporter.set_path(report_baton, b'', head, depth,
430                      False,    # start_empty
431                      None,     # lock_token
432                      pool)
433    reporter.finish_report(report_baton, pool)
434
435    # Assert we got the right diff.
436    expected_prop_changes = [(b'trunk/dir1/dir2',
437                              b'testprop', b'testval')]
438    expected_diffs = {
439      b'trunk/readme3':
440        [b'--- ',
441         b'+++ ',
442         b'@@ -0,0 +1 @@',
443         b'+hello'],
444      b'trunk/README.txt':
445        [b'--- ',
446         b'+++ ',
447         b'@@ -1 +1 @@',
448         b'-A test.',
449         b'+hello'],
450      b'trunk/README2.txt':
451        [b'--- ',
452         b'+++ ',
453         b'@@ -1 +0,0 @@',
454         b'-A test.'],
455      }
456    self.assertEqual(got_prop_changes, expected_prop_changes)
457    self.assertEqual(got_diffs, expected_diffs)
458
459  def tearDown(self):
460      wc.adm_close(self.wc)
461      self.fs = None
462      self.repos = None
463      self.temper.cleanup()
464
465def suite():
466    return unittest.defaultTestLoader.loadTestsFromTestCase(
467      SubversionWorkingCopyTestCase)
468
469if __name__ == '__main__':
470    runner = unittest.TextTestRunner()
471    runner.run(suite())
472