1# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17"""Tests for Transport implementations.
18
19Transport implementations tested here are supplied by
20TransportTestProviderAdapter.
21"""
22
23from io import BytesIO
24import os
25import stat
26import sys
27
28from .. import (
29    errors,
30    osutils,
31    pyutils,
32    tests,
33    transport as _mod_transport,
34    urlutils,
35    )
36from ..errors import (ConnectionError,
37                      FileExists,
38                      NoSuchFile,
39                      PathError,
40                      TransportNotPossible,
41                      )
42from ..osutils import getcwd
43from . import (
44    TestSkipped,
45    TestNotApplicable,
46    multiply_tests,
47    )
48from . import test_server
49from .test_transport import TestTransportImplementation
50from ..transport import (
51    ConnectedTransport,
52    Transport,
53    _get_transport_modules,
54    )
55from ..transport.memory import MemoryTransport
56from ..transport.remote import RemoteTransport
57
58
59def get_transport_test_permutations(module):
60    """Get the permutations module wants to have tested."""
61    if getattr(module, 'get_test_permutations', None) is None:
62        raise AssertionError(
63            "transport module %s doesn't provide get_test_permutations()"
64            % module.__name__)
65        return []
66    return module.get_test_permutations()
67
68
69def transport_test_permutations():
70    """Return a list of the klass, server_factory pairs to test."""
71    result = []
72    for module in _get_transport_modules():
73        try:
74            permutations = get_transport_test_permutations(
75                pyutils.get_named_object(module))
76            for (klass, server_factory) in permutations:
77                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
78                            {"transport_class": klass,
79                             "transport_server": server_factory})
80                result.append(scenario)
81        except errors.DependencyNotPresent as e:
82            # Continue even if a dependency prevents us
83            # from adding this test
84            pass
85    return result
86
87
88def load_tests(loader, standard_tests, pattern):
89    """Multiply tests for tranport implementations."""
90    result = loader.suiteClass()
91    scenarios = transport_test_permutations()
92    return multiply_tests(standard_tests, scenarios, result)
93
94
95class TransportTests(TestTransportImplementation):
96
97    def setUp(self):
98        super(TransportTests, self).setUp()
99        self.overrideEnv('BRZ_NO_SMART_VFS', None)
100
101    def check_transport_contents(self, content, transport, relpath):
102        """Check that transport.get_bytes(relpath) == content."""
103        self.assertEqualDiff(content, transport.get_bytes(relpath))
104
105    def test_ensure_base_missing(self):
106        """.ensure_base() should create the directory if it doesn't exist"""
107        t = self.get_transport()
108        t_a = t.clone('a')
109        if t_a.is_readonly():
110            self.assertRaises(TransportNotPossible,
111                              t_a.ensure_base)
112            return
113        self.assertTrue(t_a.ensure_base())
114        self.assertTrue(t.has('a'))
115
116    def test_ensure_base_exists(self):
117        """.ensure_base() should just be happy if it already exists"""
118        t = self.get_transport()
119        if t.is_readonly():
120            return
121
122        t.mkdir('a')
123        t_a = t.clone('a')
124        # ensure_base returns False if it didn't create the base
125        self.assertFalse(t_a.ensure_base())
126
127    def test_ensure_base_missing_parent(self):
128        """.ensure_base() will fail if the parent dir doesn't exist"""
129        t = self.get_transport()
130        if t.is_readonly():
131            return
132
133        t_a = t.clone('a')
134        t_b = t_a.clone('b')
135        self.assertRaises(NoSuchFile, t_b.ensure_base)
136
137    def test_external_url(self):
138        """.external_url either works or raises InProcessTransport."""
139        t = self.get_transport()
140        try:
141            t.external_url()
142        except errors.InProcessTransport:
143            pass
144
145    def test_has(self):
146        t = self.get_transport()
147
148        files = ['a', 'b', 'e', 'g', '%']
149        self.build_tree(files, transport=t)
150        self.assertEqual(True, t.has('a'))
151        self.assertEqual(False, t.has('c'))
152        self.assertEqual(True, t.has(urlutils.escape('%')))
153        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
154        self.assertEqual(False, t.has_any(['c', 'd', 'f',
155                                           urlutils.escape('%%')]))
156        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
157        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
158
159    def test_has_root_works(self):
160        if self.transport_server is test_server.SmartTCPServer_for_testing:
161            raise TestNotApplicable(
162                "SmartTCPServer_for_testing intentionally does not allow "
163                "access to /.")
164        current_transport = self.get_transport()
165        self.assertTrue(current_transport.has('/'))
166        root = current_transport.clone('/')
167        self.assertTrue(root.has(''))
168
169    def test_get(self):
170        t = self.get_transport()
171
172        files = ['a']
173        content = b'contents of a\n'
174        self.build_tree(['a'], transport=t, line_endings='binary')
175        self.check_transport_contents(b'contents of a\n', t, 'a')
176        f = t.get('a')
177        self.assertEqual(content, f.read())
178
179    def test_get_unknown_file(self):
180        t = self.get_transport()
181        files = ['a', 'b']
182        contents = [b'contents of a\n',
183                    b'contents of b\n',
184                    ]
185        self.build_tree(files, transport=t, line_endings='binary')
186        self.assertRaises(NoSuchFile, t.get, 'c')
187
188        def iterate_and_close(func, *args):
189            for f in func(*args):
190                # We call f.read() here because things like paramiko actually
191                # spawn a thread to prefetch the content, which we want to
192                # consume before we close the handle.
193                content = f.read()
194                f.close()
195
196    def test_get_directory_read_gives_ReadError(self):
197        """consistent errors for read() on a file returned by get()."""
198        t = self.get_transport()
199        if t.is_readonly():
200            self.build_tree(['a directory/'])
201        else:
202            t.mkdir('a%20directory')
203        # getting the file must either work or fail with a PathError
204        try:
205            a_file = t.get('a%20directory')
206        except (errors.PathError, errors.RedirectRequested):
207            # early failure return immediately.
208            return
209        # having got a file, read() must either work (i.e. http reading a dir
210        # listing) or fail with ReadError
211        try:
212            a_file.read()
213        except errors.ReadError:
214            pass
215
216    def test_get_bytes(self):
217        t = self.get_transport()
218
219        files = ['a', 'b', 'e', 'g']
220        contents = [b'contents of a\n',
221                    b'contents of b\n',
222                    b'contents of e\n',
223                    b'contents of g\n',
224                    ]
225        self.build_tree(files, transport=t, line_endings='binary')
226        self.check_transport_contents(b'contents of a\n', t, 'a')
227
228        for content, fname in zip(contents, files):
229            self.assertEqual(content, t.get_bytes(fname))
230
231    def test_get_bytes_unknown_file(self):
232        t = self.get_transport()
233        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
234
235    def test_get_with_open_write_stream_sees_all_content(self):
236        t = self.get_transport()
237        if t.is_readonly():
238            return
239        with t.open_write_stream('foo') as handle:
240            handle.write(b'b')
241            self.assertEqual(b'b', t.get_bytes('foo'))
242
243    def test_get_bytes_with_open_write_stream_sees_all_content(self):
244        t = self.get_transport()
245        if t.is_readonly():
246            return
247        with t.open_write_stream('foo') as handle:
248            handle.write(b'b')
249            self.assertEqual(b'b', t.get_bytes('foo'))
250            with t.get('foo') as f:
251                self.assertEqual(b'b', f.read())
252
253    def test_put_bytes(self):
254        t = self.get_transport()
255
256        if t.is_readonly():
257            self.assertRaises(TransportNotPossible,
258                              t.put_bytes, 'a', b'some text for a\n')
259            return
260
261        t.put_bytes('a', b'some text for a\n')
262        self.assertTrue(t.has('a'))
263        self.check_transport_contents(b'some text for a\n', t, 'a')
264
265        # The contents should be overwritten
266        t.put_bytes('a', b'new text for a\n')
267        self.check_transport_contents(b'new text for a\n', t, 'a')
268
269        self.assertRaises(NoSuchFile,
270                          t.put_bytes, 'path/doesnt/exist/c', b'contents')
271
272    def test_put_bytes_non_atomic(self):
273        t = self.get_transport()
274
275        if t.is_readonly():
276            self.assertRaises(TransportNotPossible,
277                              t.put_bytes_non_atomic, 'a', b'some text for a\n')
278            return
279
280        self.assertFalse(t.has('a'))
281        t.put_bytes_non_atomic('a', b'some text for a\n')
282        self.assertTrue(t.has('a'))
283        self.check_transport_contents(b'some text for a\n', t, 'a')
284        # Put also replaces contents
285        t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
286        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
287
288        # Make sure we can create another file
289        t.put_bytes_non_atomic('d', b'contents for\nd\n')
290        # And overwrite 'a' with empty contents
291        t.put_bytes_non_atomic('a', b'')
292        self.check_transport_contents(b'contents for\nd\n', t, 'd')
293        self.check_transport_contents(b'', t, 'a')
294
295        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
296                          b'contents\n')
297        # Now test the create_parent flag
298        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
299                          b'contents\n')
300        self.assertFalse(t.has('dir/a'))
301        t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
302                               create_parent_dir=True)
303        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
304
305        # But we still get NoSuchFile if we can't make the parent dir
306        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
307                          b'contents\n',
308                          create_parent_dir=True)
309
310    def test_put_bytes_permissions(self):
311        t = self.get_transport()
312
313        if t.is_readonly():
314            return
315        if not t._can_roundtrip_unix_modebits():
316            # Can't roundtrip, so no need to run this test
317            return
318        t.put_bytes('mode644', b'test text\n', mode=0o644)
319        self.assertTransportMode(t, 'mode644', 0o644)
320        t.put_bytes('mode666', b'test text\n', mode=0o666)
321        self.assertTransportMode(t, 'mode666', 0o666)
322        t.put_bytes('mode600', b'test text\n', mode=0o600)
323        self.assertTransportMode(t, 'mode600', 0o600)
324        # Yes, you can put_bytes a file such that it becomes readonly
325        t.put_bytes('mode400', b'test text\n', mode=0o400)
326        self.assertTransportMode(t, 'mode400', 0o400)
327
328        # The default permissions should be based on the current umask
329        umask = osutils.get_umask()
330        t.put_bytes('nomode', b'test text\n', mode=None)
331        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
332
333    def test_put_bytes_non_atomic_permissions(self):
334        t = self.get_transport()
335
336        if t.is_readonly():
337            return
338        if not t._can_roundtrip_unix_modebits():
339            # Can't roundtrip, so no need to run this test
340            return
341        t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
342        self.assertTransportMode(t, 'mode644', 0o644)
343        t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
344        self.assertTransportMode(t, 'mode666', 0o666)
345        t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
346        self.assertTransportMode(t, 'mode600', 0o600)
347        t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
348        self.assertTransportMode(t, 'mode400', 0o400)
349
350        # The default permissions should be based on the current umask
351        umask = osutils.get_umask()
352        t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
353        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
354
355        # We should also be able to set the mode for a parent directory
356        # when it is created
357        t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
358                               dir_mode=0o700, create_parent_dir=True)
359        self.assertTransportMode(t, 'dir700', 0o700)
360        t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
361                               dir_mode=0o770, create_parent_dir=True)
362        self.assertTransportMode(t, 'dir770', 0o770)
363        t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
364                               dir_mode=0o777, create_parent_dir=True)
365        self.assertTransportMode(t, 'dir777', 0o777)
366
367    def test_put_file(self):
368        t = self.get_transport()
369
370        if t.is_readonly():
371            self.assertRaises(TransportNotPossible,
372                              t.put_file, 'a', BytesIO(b'some text for a\n'))
373            return
374
375        result = t.put_file('a', BytesIO(b'some text for a\n'))
376        # put_file returns the length of the data written
377        self.assertEqual(16, result)
378        self.assertTrue(t.has('a'))
379        self.check_transport_contents(b'some text for a\n', t, 'a')
380        # Put also replaces contents
381        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
382        self.assertEqual(19, result)
383        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
384        self.assertRaises(NoSuchFile,
385                          t.put_file, 'path/doesnt/exist/c',
386                          BytesIO(b'contents'))
387
388    def test_put_file_non_atomic(self):
389        t = self.get_transport()
390
391        if t.is_readonly():
392            self.assertRaises(TransportNotPossible,
393                              t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
394            return
395
396        self.assertFalse(t.has('a'))
397        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
398        self.assertTrue(t.has('a'))
399        self.check_transport_contents(b'some text for a\n', t, 'a')
400        # Put also replaces contents
401        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
402        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
403
404        # Make sure we can create another file
405        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
406        # And overwrite 'a' with empty contents
407        t.put_file_non_atomic('a', BytesIO(b''))
408        self.check_transport_contents(b'contents for\nd\n', t, 'd')
409        self.check_transport_contents(b'', t, 'a')
410
411        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
412                          BytesIO(b'contents\n'))
413        # Now test the create_parent flag
414        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
415                          BytesIO(b'contents\n'))
416        self.assertFalse(t.has('dir/a'))
417        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
418                              create_parent_dir=True)
419        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
420
421        # But we still get NoSuchFile if we can't make the parent dir
422        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
423                          BytesIO(b'contents\n'),
424                          create_parent_dir=True)
425
426    def test_put_file_permissions(self):
427
428        t = self.get_transport()
429
430        if t.is_readonly():
431            return
432        if not t._can_roundtrip_unix_modebits():
433            # Can't roundtrip, so no need to run this test
434            return
435        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
436        self.assertTransportMode(t, 'mode644', 0o644)
437        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
438        self.assertTransportMode(t, 'mode666', 0o666)
439        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
440        self.assertTransportMode(t, 'mode600', 0o600)
441        # Yes, you can put a file such that it becomes readonly
442        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
443        self.assertTransportMode(t, 'mode400', 0o400)
444        # The default permissions should be based on the current umask
445        umask = osutils.get_umask()
446        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
447        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
448
449    def test_put_file_non_atomic_permissions(self):
450        t = self.get_transport()
451
452        if t.is_readonly():
453            return
454        if not t._can_roundtrip_unix_modebits():
455            # Can't roundtrip, so no need to run this test
456            return
457        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
458        self.assertTransportMode(t, 'mode644', 0o644)
459        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
460        self.assertTransportMode(t, 'mode666', 0o666)
461        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
462        self.assertTransportMode(t, 'mode600', 0o600)
463        # Yes, you can put_file_non_atomic a file such that it becomes readonly
464        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
465        self.assertTransportMode(t, 'mode400', 0o400)
466
467        # The default permissions should be based on the current umask
468        umask = osutils.get_umask()
469        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
470        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
471
472        # We should also be able to set the mode for a parent directory
473        # when it is created
474        sio = BytesIO()
475        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
476                              dir_mode=0o700, create_parent_dir=True)
477        self.assertTransportMode(t, 'dir700', 0o700)
478        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
479                              dir_mode=0o770, create_parent_dir=True)
480        self.assertTransportMode(t, 'dir770', 0o770)
481        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
482                              dir_mode=0o777, create_parent_dir=True)
483        self.assertTransportMode(t, 'dir777', 0o777)
484
485    def test_put_bytes_unicode(self):
486        t = self.get_transport()
487        if t.is_readonly():
488            return
489        unicode_string = u'\u1234'
490        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
491
492    def test_mkdir(self):
493        t = self.get_transport()
494
495        if t.is_readonly():
496            # cannot mkdir on readonly transports. We're not testing for
497            # cache coherency because cache behaviour is not currently
498            # defined for the transport interface.
499            self.assertRaises(TransportNotPossible, t.mkdir, '.')
500            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
501            self.assertRaises(TransportNotPossible,
502                              t.mkdir, 'path/doesnt/exist')
503            return
504        # Test mkdir
505        t.mkdir('dir_a')
506        self.assertEqual(t.has('dir_a'), True)
507        self.assertEqual(t.has('dir_b'), False)
508
509        t.mkdir('dir_b')
510        self.assertEqual(t.has('dir_b'), True)
511
512        self.assertEqual([t.has(n) for n in
513                          ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
514                         [True, True, False, True])
515
516        # we were testing that a local mkdir followed by a transport
517        # mkdir failed thusly, but given that we * in one process * do not
518        # concurrently fiddle with disk dirs and then use transport to do
519        # things, the win here seems marginal compared to the constraint on
520        # the interface. RBC 20051227
521        t.mkdir('dir_g')
522        self.assertRaises(FileExists, t.mkdir, 'dir_g')
523
524        # Test get/put in sub-directories
525        t.put_bytes('dir_a/a', b'contents of dir_a/a')
526        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
527        self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
528        self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
529
530        # mkdir of a dir with an absent parent
531        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
532
533    def test_mkdir_permissions(self):
534        t = self.get_transport()
535        if t.is_readonly():
536            return
537        if not t._can_roundtrip_unix_modebits():
538            # no sense testing on this transport
539            return
540        # Test mkdir with a mode
541        t.mkdir('dmode755', mode=0o755)
542        self.assertTransportMode(t, 'dmode755', 0o755)
543        t.mkdir('dmode555', mode=0o555)
544        self.assertTransportMode(t, 'dmode555', 0o555)
545        t.mkdir('dmode777', mode=0o777)
546        self.assertTransportMode(t, 'dmode777', 0o777)
547        t.mkdir('dmode700', mode=0o700)
548        self.assertTransportMode(t, 'dmode700', 0o700)
549        t.mkdir('mdmode755', mode=0o755)
550        self.assertTransportMode(t, 'mdmode755', 0o755)
551
552        # Default mode should be based on umask
553        umask = osutils.get_umask()
554        t.mkdir('dnomode', mode=None)
555        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
556
557    def test_opening_a_file_stream_creates_file(self):
558        t = self.get_transport()
559        if t.is_readonly():
560            return
561        handle = t.open_write_stream('foo')
562        try:
563            self.assertEqual(b'', t.get_bytes('foo'))
564        finally:
565            handle.close()
566
567    def test_opening_a_file_stream_can_set_mode(self):
568        t = self.get_transport()
569        if t.is_readonly():
570            self.assertRaises((TransportNotPossible, NotImplementedError),
571                              t.open_write_stream, 'foo')
572            return
573        if not t._can_roundtrip_unix_modebits():
574            # Can't roundtrip, so no need to run this test
575            return
576
577        def check_mode(name, mode, expected):
578            handle = t.open_write_stream(name, mode=mode)
579            handle.close()
580            self.assertTransportMode(t, name, expected)
581        check_mode('mode644', 0o644, 0o644)
582        check_mode('mode666', 0o666, 0o666)
583        check_mode('mode600', 0o600, 0o600)
584        # The default permissions should be based on the current umask
585        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
586
587    def test_copy_to(self):
588        # FIXME: test:   same server to same server (partly done)
589        # same protocol two servers
590        # and    different protocols (done for now except for MemoryTransport.
591        # - RBC 20060122
592
593        def simple_copy_files(transport_from, transport_to):
594            files = ['a', 'b', 'c', 'd']
595            self.build_tree(files, transport=transport_from)
596            self.assertEqual(4, transport_from.copy_to(files, transport_to))
597            for f in files:
598                self.check_transport_contents(transport_to.get_bytes(f),
599                                              transport_from, f)
600
601        t = self.get_transport()
602        if t.__class__.__name__ == "SFTPTransport":
603            self.skipTest("SFTP copy_to currently too flakey to use")
604        temp_transport = MemoryTransport('memory:///')
605        simple_copy_files(t, temp_transport)
606        if not t.is_readonly():
607            t.mkdir('copy_to_simple')
608            t2 = t.clone('copy_to_simple')
609            simple_copy_files(t, t2)
610
611        # Test that copying into a missing directory raises
612        # NoSuchFile
613        if t.is_readonly():
614            self.build_tree(['e/', 'e/f'])
615        else:
616            t.mkdir('e')
617            t.put_bytes('e/f', b'contents of e')
618        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
619        temp_transport.mkdir('e')
620        t.copy_to(['e/f'], temp_transport)
621
622        del temp_transport
623        temp_transport = MemoryTransport('memory:///')
624
625        files = ['a', 'b', 'c', 'd']
626        t.copy_to(iter(files), temp_transport)
627        for f in files:
628            self.check_transport_contents(temp_transport.get_bytes(f),
629                                          t, f)
630        del temp_transport
631
632        for mode in (0o666, 0o644, 0o600, 0o400):
633            temp_transport = MemoryTransport("memory:///")
634            t.copy_to(files, temp_transport, mode=mode)
635            for f in files:
636                self.assertTransportMode(temp_transport, f, mode)
637
638    def test_create_prefix(self):
639        t = self.get_transport()
640        sub = t.clone('foo').clone('bar')
641        try:
642            sub.create_prefix()
643        except TransportNotPossible:
644            self.assertTrue(t.is_readonly())
645        else:
646            self.assertTrue(t.has('foo/bar'))
647
648    def test_append_file(self):
649        t = self.get_transport()
650
651        if t.is_readonly():
652            self.assertRaises(TransportNotPossible,
653                              t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
654            return
655        t.put_bytes('a', b'diff\ncontents for\na\n')
656        t.put_bytes('b', b'contents\nfor b\n')
657
658        self.assertEqual(20,
659                         t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
660
661        self.check_transport_contents(
662            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
663            t, 'a')
664
665        # a file with no parent should fail..
666        self.assertRaises(NoSuchFile,
667                          t.append_file, 'missing/path', BytesIO(b'content'))
668
669        # And we can create new files, too
670        self.assertEqual(0,
671                         t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
672        self.check_transport_contents(b'some text\nfor a missing file\n',
673                                      t, 'c')
674
675    def test_append_bytes(self):
676        t = self.get_transport()
677
678        if t.is_readonly():
679            self.assertRaises(TransportNotPossible,
680                              t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
681            return
682
683        self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
684        self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
685
686        self.assertEqual(20,
687                         t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
688
689        self.check_transport_contents(
690            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
691            t, 'a')
692
693        # a file with no parent should fail..
694        self.assertRaises(NoSuchFile,
695                          t.append_bytes, 'missing/path', b'content')
696
697    def test_append_file_mode(self):
698        """Check that append accepts a mode parameter"""
699        # check append accepts a mode
700        t = self.get_transport()
701        if t.is_readonly():
702            self.assertRaises(TransportNotPossible,
703                              t.append_file, 'f', BytesIO(b'f'), mode=None)
704            return
705        t.append_file('f', BytesIO(b'f'), mode=None)
706
707    def test_append_bytes_mode(self):
708        # check append_bytes accepts a mode
709        t = self.get_transport()
710        if t.is_readonly():
711            self.assertRaises(TransportNotPossible,
712                              t.append_bytes, 'f', b'f', mode=None)
713            return
714        t.append_bytes('f', b'f', mode=None)
715
716    def test_delete(self):
717        # TODO: Test Transport.delete
718        t = self.get_transport()
719
720        # Not much to do with a readonly transport
721        if t.is_readonly():
722            self.assertRaises(TransportNotPossible, t.delete, 'missing')
723            return
724
725        t.put_bytes('a', b'a little bit of text\n')
726        self.assertTrue(t.has('a'))
727        t.delete('a')
728        self.assertFalse(t.has('a'))
729
730        self.assertRaises(NoSuchFile, t.delete, 'a')
731
732        t.put_bytes('a', b'a text\n')
733        t.put_bytes('b', b'b text\n')
734        t.put_bytes('c', b'c text\n')
735        self.assertEqual([True, True, True],
736                         [t.has(n) for n in ['a', 'b', 'c']])
737        t.delete('a')
738        t.delete('c')
739        self.assertEqual([False, True, False],
740                         [t.has(n) for n in ['a', 'b', 'c']])
741        self.assertFalse(t.has('a'))
742        self.assertTrue(t.has('b'))
743        self.assertFalse(t.has('c'))
744
745        for name in ['a', 'c', 'd']:
746            self.assertRaises(NoSuchFile, t.delete, name)
747
748        # We should have deleted everything
749        # SftpServer creates control files in the
750        # working directory, so we can just do a
751        # plain "listdir".
752        # self.assertEqual([], os.listdir('.'))
753
754    def test_recommended_page_size(self):
755        """Transports recommend a page size for partial access to files."""
756        t = self.get_transport()
757        self.assertIsInstance(t.recommended_page_size(), int)
758
759    def test_rmdir(self):
760        t = self.get_transport()
761        # Not much to do with a readonly transport
762        if t.is_readonly():
763            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
764            return
765        t.mkdir('adir')
766        t.mkdir('adir/bdir')
767        t.rmdir('adir/bdir')
768        # ftp may not be able to raise NoSuchFile for lack of
769        # details when failing
770        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
771        t.rmdir('adir')
772        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
773
774    def test_rmdir_not_empty(self):
775        """Deleting a non-empty directory raises an exception
776
777        sftp (and possibly others) don't give us a specific "directory not
778        empty" exception -- we can just see that the operation failed.
779        """
780        t = self.get_transport()
781        if t.is_readonly():
782            return
783        t.mkdir('adir')
784        t.mkdir('adir/bdir')
785        self.assertRaises(PathError, t.rmdir, 'adir')
786
787    def test_rmdir_empty_but_similar_prefix(self):
788        """rmdir does not get confused by sibling paths.
789
790        A naive implementation of MemoryTransport would refuse to rmdir
791        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
792        uses "path.startswith(dir)" on all file paths to determine if directory
793        is empty.
794        """
795        t = self.get_transport()
796        if t.is_readonly():
797            return
798        t.mkdir('foo')
799        t.put_bytes('foo-bar', b'')
800        t.mkdir('foo-baz')
801        t.rmdir('foo')
802        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
803        self.assertTrue(t.has('foo-bar'))
804
805    def test_rename_dir_succeeds(self):
806        t = self.get_transport()
807        if t.is_readonly():
808            self.assertRaises((TransportNotPossible, NotImplementedError),
809                              t.rename, 'foo', 'bar')
810            return
811        t.mkdir('adir')
812        t.mkdir('adir/asubdir')
813        t.rename('adir', 'bdir')
814        self.assertTrue(t.has('bdir/asubdir'))
815        self.assertFalse(t.has('adir'))
816
817    def test_rename_dir_nonempty(self):
818        """Attempting to replace a nonemtpy directory should fail"""
819        t = self.get_transport()
820        if t.is_readonly():
821            self.assertRaises((TransportNotPossible, NotImplementedError),
822                              t.rename, 'foo', 'bar')
823            return
824        t.mkdir('adir')
825        t.mkdir('adir/asubdir')
826        t.mkdir('bdir')
827        t.mkdir('bdir/bsubdir')
828        # any kind of PathError would be OK, though we normally expect
829        # DirectoryNotEmpty
830        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
831        # nothing was changed so it should still be as before
832        self.assertTrue(t.has('bdir/bsubdir'))
833        self.assertFalse(t.has('adir/bdir'))
834        self.assertFalse(t.has('adir/bsubdir'))
835
836    def test_rename_across_subdirs(self):
837        t = self.get_transport()
838        if t.is_readonly():
839            raise TestNotApplicable("transport is readonly")
840        t.mkdir('a')
841        t.mkdir('b')
842        ta = t.clone('a')
843        tb = t.clone('b')
844        ta.put_bytes('f', b'aoeu')
845        ta.rename('f', '../b/f')
846        self.assertTrue(tb.has('f'))
847        self.assertFalse(ta.has('f'))
848        self.assertTrue(t.has('b/f'))
849
850    def test_delete_tree(self):
851        t = self.get_transport()
852
853        # Not much to do with a readonly transport
854        if t.is_readonly():
855            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
856            return
857
858        # and does it like listing ?
859        t.mkdir('adir')
860        try:
861            t.delete_tree('adir')
862        except TransportNotPossible:
863            # ok, this transport does not support delete_tree
864            return
865
866        # did it delete that trivial case?
867        self.assertRaises(NoSuchFile, t.stat, 'adir')
868
869        self.build_tree(['adir/',
870                         'adir/file',
871                         'adir/subdir/',
872                         'adir/subdir/file',
873                         'adir/subdir2/',
874                         'adir/subdir2/file',
875                         ], transport=t)
876
877        t.delete_tree('adir')
878        # adir should be gone now.
879        self.assertRaises(NoSuchFile, t.stat, 'adir')
880
881    def test_move(self):
882        t = self.get_transport()
883
884        if t.is_readonly():
885            return
886
887        # TODO: I would like to use os.listdir() to
888        # make sure there are no extra files, but SftpServer
889        # creates control files in the working directory
890        # perhaps all of this could be done in a subdirectory
891
892        t.put_bytes('a', b'a first file\n')
893        self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
894
895        t.move('a', 'b')
896        self.assertTrue(t.has('b'))
897        self.assertFalse(t.has('a'))
898
899        self.check_transport_contents(b'a first file\n', t, 'b')
900        self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
901
902        # Overwrite a file
903        t.put_bytes('c', b'c this file\n')
904        t.move('c', 'b')
905        self.assertFalse(t.has('c'))
906        self.check_transport_contents(b'c this file\n', t, 'b')
907
908        # TODO: Try to write a test for atomicity
909        # TODO: Test moving into a non-existent subdirectory
910
911    def test_copy(self):
912        t = self.get_transport()
913
914        if t.is_readonly():
915            return
916
917        t.put_bytes('a', b'a file\n')
918        t.copy('a', 'b')
919        self.check_transport_contents(b'a file\n', t, 'b')
920
921        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
922        os.mkdir('c')
923        # What should the assert be if you try to copy a
924        # file over a directory?
925        #self.assertRaises(Something, t.copy, 'a', 'c')
926        t.put_bytes('d', b'text in d\n')
927        t.copy('d', 'b')
928        self.check_transport_contents(b'text in d\n', t, 'b')
929
930    def test_connection_error(self):
931        """ConnectionError is raised when connection is impossible.
932
933        The error should be raised from the first operation on the transport.
934        """
935        try:
936            url = self._server.get_bogus_url()
937        except NotImplementedError:
938            raise TestSkipped("Transport %s has no bogus URL support." %
939                              self._server.__class__)
940        t = _mod_transport.get_transport_from_url(url)
941        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
942
943    def test_stat(self):
944        # TODO: Test stat, just try once, and if it throws, stop testing
945        from stat import S_ISDIR, S_ISREG
946
947        t = self.get_transport()
948
949        try:
950            st = t.stat('.')
951        except TransportNotPossible as e:
952            # This transport cannot stat
953            return
954
955        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
956        sizes = [14, 0, 16, 0, 18]
957        self.build_tree(paths, transport=t, line_endings='binary')
958
959        for path, size in zip(paths, sizes):
960            st = t.stat(path)
961            if path.endswith('/'):
962                self.assertTrue(S_ISDIR(st.st_mode))
963                # directory sizes are meaningless
964            else:
965                self.assertTrue(S_ISREG(st.st_mode))
966                self.assertEqual(size, st.st_size)
967
968        self.assertRaises(NoSuchFile, t.stat, 'q')
969        self.assertRaises(NoSuchFile, t.stat, 'b/a')
970
971        self.build_tree(['subdir/', 'subdir/file'], transport=t)
972        subdir = t.clone('subdir')
973        st = subdir.stat('./file')
974        st = subdir.stat('.')
975
976    def test_hardlink(self):
977        from stat import ST_NLINK
978
979        t = self.get_transport()
980
981        source_name = "original_target"
982        link_name = "target_link"
983
984        self.build_tree([source_name], transport=t)
985
986        try:
987            t.hardlink(source_name, link_name)
988
989            self.assertTrue(t.has(source_name))
990            self.assertTrue(t.has(link_name))
991
992            st = t.stat(link_name)
993            self.assertEqual(st[ST_NLINK], 2)
994        except TransportNotPossible:
995            raise TestSkipped("Transport %s does not support hardlinks." %
996                              self._server.__class__)
997
998    def test_symlink(self):
999        from stat import S_ISLNK
1000
1001        t = self.get_transport()
1002
1003        source_name = "original_target"
1004        link_name = "target_link"
1005
1006        self.build_tree([source_name], transport=t)
1007
1008        try:
1009            t.symlink(source_name, link_name)
1010
1011            self.assertTrue(t.has(source_name))
1012            self.assertTrue(t.has(link_name))
1013
1014            st = t.stat(link_name)
1015            self.assertTrue(S_ISLNK(st.st_mode),
1016                            "expected symlink, got mode %o" % st.st_mode)
1017        except TransportNotPossible:
1018            raise TestSkipped("Transport %s does not support symlinks." %
1019                              self._server.__class__)
1020
1021        self.assertEqual(source_name, t.readlink(link_name))
1022
1023    def test_readlink_nonexistent(self):
1024        t = self.get_transport()
1025        try:
1026            self.assertRaises(NoSuchFile, t.readlink, 'nonexistent')
1027        except TransportNotPossible:
1028            raise TestSkipped("Transport %s does not support symlinks." %
1029                              self._server.__class__)
1030
1031    def test_list_dir(self):
1032        # TODO: Test list_dir, just try once, and if it throws, stop testing
1033        t = self.get_transport()
1034
1035        if not t.listable():
1036            self.assertRaises(TransportNotPossible, t.list_dir, '.')
1037            return
1038
1039        def sorted_list(d, transport):
1040            l = sorted(transport.list_dir(d))
1041            return l
1042
1043        self.assertEqual([], sorted_list('.', t))
1044        # c2 is precisely one letter longer than c here to test that
1045        # suffixing is not confused.
1046        # a%25b checks that quoting is done consistently across transports
1047        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1048
1049        if not t.is_readonly():
1050            self.build_tree(tree_names, transport=t)
1051        else:
1052            self.build_tree(tree_names)
1053
1054        self.assertEqual(
1055            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1056        self.assertEqual(
1057            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1058        self.assertEqual(['d', 'e'], sorted_list('c', t))
1059
1060        # Cloning the transport produces an equivalent listing
1061        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1062
1063        if not t.is_readonly():
1064            t.delete('c/d')
1065            t.delete('b')
1066        else:
1067            os.unlink('c/d')
1068            os.unlink('b')
1069
1070        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1071        self.assertEqual(['e'], sorted_list('c', t))
1072
1073        self.assertListRaises(PathError, t.list_dir, 'q')
1074        self.assertListRaises(PathError, t.list_dir, 'c/f')
1075        # 'a' is a file, list_dir should raise an error
1076        self.assertListRaises(PathError, t.list_dir, 'a')
1077
1078    def test_list_dir_result_is_url_escaped(self):
1079        t = self.get_transport()
1080        if not t.listable():
1081            raise TestSkipped("transport not listable")
1082
1083        if not t.is_readonly():
1084            self.build_tree(['a/', 'a/%'], transport=t)
1085        else:
1086            self.build_tree(['a/', 'a/%'])
1087
1088        names = list(t.list_dir('a'))
1089        self.assertEqual(['%25'], names)
1090        self.assertIsInstance(names[0], str)
1091
1092    def test_clone_preserve_info(self):
1093        t1 = self.get_transport()
1094        if not isinstance(t1, ConnectedTransport):
1095            raise TestSkipped("not a connected transport")
1096
1097        t2 = t1.clone('subdir')
1098        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
1099        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
1100        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
1101        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
1102        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1103
1104    def test__reuse_for(self):
1105        t = self.get_transport()
1106        if not isinstance(t, ConnectedTransport):
1107            raise TestSkipped("not a connected transport")
1108
1109        def new_url(scheme=None, user=None, password=None,
1110                    host=None, port=None, path=None):
1111            """Build a new url from t.base changing only parts of it.
1112
1113            Only the parameters different from None will be changed.
1114            """
1115            if scheme is None:
1116                scheme = t._parsed_url.scheme
1117            if user is None:
1118                user = t._parsed_url.user
1119            if password is None:
1120                password = t._parsed_url.password
1121            if user is None:
1122                user = t._parsed_url.user
1123            if host is None:
1124                host = t._parsed_url.host
1125            if port is None:
1126                port = t._parsed_url.port
1127            if path is None:
1128                path = t._parsed_url.path
1129            return str(urlutils.URL(scheme, user, password, host, port, path))
1130
1131        if t._parsed_url.scheme == 'ftp':
1132            scheme = 'sftp'
1133        else:
1134            scheme = 'ftp'
1135        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1136        if t._parsed_url.user == 'me':
1137            user = 'you'
1138        else:
1139            user = 'me'
1140        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1141        # passwords are not taken into account because:
1142        # - it makes no sense to have two different valid passwords for the
1143        #   same user
1144        # - _password in ConnectedTransport is intended to collect what the
1145        #   user specified from the command-line and there are cases where the
1146        #   new url can contain no password (if the url was built from an
1147        #   existing transport.base for example)
1148        # - password are considered part of the credentials provided at
1149        #   connection creation time and as such may not be present in the url
1150        #   (they may be typed by the user when prompted for example)
1151        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1152        # We will not connect, we can use a invalid host
1153        self.assertIsNot(t, t._reuse_for(
1154            new_url(host=t._parsed_url.host + 'bar')))
1155        if t._parsed_url.port == 1234:
1156            port = 4321
1157        else:
1158            port = 1234
1159        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1160        # No point in trying to reuse a transport for a local URL
1161        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1162
1163    def test_connection_sharing(self):
1164        t = self.get_transport()
1165        if not isinstance(t, ConnectedTransport):
1166            raise TestSkipped("not a connected transport")
1167
1168        c = t.clone('subdir')
1169        # Some transports will create the connection  only when needed
1170        t.has('surely_not')  # Force connection
1171        self.assertIs(t._get_connection(), c._get_connection())
1172
1173        # Temporary failure, we need to create a new dummy connection
1174        new_connection = None
1175        t._set_connection(new_connection)
1176        # Check that both transports use the same connection
1177        self.assertIs(new_connection, t._get_connection())
1178        self.assertIs(new_connection, c._get_connection())
1179
1180    def test_reuse_connection_for_various_paths(self):
1181        t = self.get_transport()
1182        if not isinstance(t, ConnectedTransport):
1183            raise TestSkipped("not a connected transport")
1184
1185        t.has('surely_not')  # Force connection
1186        self.assertIsNot(None, t._get_connection())
1187
1188        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1189        self.assertIsNot(t, subdir)
1190        self.assertIs(t._get_connection(), subdir._get_connection())
1191
1192        home = subdir._reuse_for(t.base + 'home')
1193        self.assertIs(t._get_connection(), home._get_connection())
1194        self.assertIs(subdir._get_connection(), home._get_connection())
1195
1196    def test_clone(self):
1197        # TODO: Test that clone moves up and down the filesystem
1198        t1 = self.get_transport()
1199
1200        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1201
1202        self.assertTrue(t1.has('a'))
1203        self.assertTrue(t1.has('b/c'))
1204        self.assertFalse(t1.has('c'))
1205
1206        t2 = t1.clone('b')
1207        self.assertEqual(t1.base + 'b/', t2.base)
1208
1209        self.assertTrue(t2.has('c'))
1210        self.assertFalse(t2.has('a'))
1211
1212        t3 = t2.clone('..')
1213        self.assertTrue(t3.has('a'))
1214        self.assertFalse(t3.has('c'))
1215
1216        self.assertFalse(t1.has('b/d'))
1217        self.assertFalse(t2.has('d'))
1218        self.assertFalse(t3.has('b/d'))
1219
1220        if t1.is_readonly():
1221            self.build_tree_contents([('b/d', b'newfile\n')])
1222        else:
1223            t2.put_bytes('d', b'newfile\n')
1224
1225        self.assertTrue(t1.has('b/d'))
1226        self.assertTrue(t2.has('d'))
1227        self.assertTrue(t3.has('b/d'))
1228
1229    def test_clone_to_root(self):
1230        orig_transport = self.get_transport()
1231        # Repeatedly go up to a parent directory until we're at the root
1232        # directory of this transport
1233        root_transport = orig_transport
1234        new_transport = root_transport.clone("..")
1235        # as we are walking up directories, the path must be
1236        # growing less, except at the top
1237        self.assertTrue(len(new_transport.base) < len(root_transport.base) or
1238                        new_transport.base == root_transport.base)
1239        while new_transport.base != root_transport.base:
1240            root_transport = new_transport
1241            new_transport = root_transport.clone("..")
1242            # as we are walking up directories, the path must be
1243            # growing less, except at the top
1244            self.assertTrue(len(new_transport.base) < len(root_transport.base) or
1245                            new_transport.base == root_transport.base)
1246
1247        # Cloning to "/" should take us to exactly the same location.
1248        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1249        # the abspath of "/" from the original transport should be the same
1250        # as the base at the root:
1251        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1252
1253        # At the root, the URL must still end with / as its a directory
1254        self.assertEqual(root_transport.base[-1], '/')
1255
1256    def test_clone_from_root(self):
1257        """At the root, cloning to a simple dir should just do string append."""
1258        orig_transport = self.get_transport()
1259        root_transport = orig_transport.clone('/')
1260        self.assertEqual(root_transport.base + '.bzr/',
1261                         root_transport.clone('.bzr').base)
1262
1263    def test_base_url(self):
1264        t = self.get_transport()
1265        self.assertEqual('/', t.base[-1])
1266
1267    def test_relpath(self):
1268        t = self.get_transport()
1269        self.assertEqual('', t.relpath(t.base))
1270        # base ends with /
1271        self.assertEqual('', t.relpath(t.base[:-1]))
1272        # subdirs which don't exist should still give relpaths.
1273        self.assertEqual('foo', t.relpath(t.base + 'foo'))
1274        # trailing slash should be the same.
1275        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1276
1277    def test_relpath_at_root(self):
1278        t = self.get_transport()
1279        # clone all the way to the top
1280        new_transport = t.clone('..')
1281        while new_transport.base != t.base:
1282            t = new_transport
1283            new_transport = t.clone('..')
1284        # we must be able to get a relpath below the root
1285        self.assertEqual('', t.relpath(t.base))
1286        # and a deeper one should work too
1287        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1288
1289    def test_abspath(self):
1290        # smoke test for abspath. Corner cases for backends like unix fs's
1291        # that have aliasing problems like symlinks should go in backend
1292        # specific test cases.
1293        transport = self.get_transport()
1294
1295        self.assertEqual(transport.base + 'relpath',
1296                         transport.abspath('relpath'))
1297
1298        # This should work without raising an error.
1299        transport.abspath("/")
1300
1301        # the abspath of "/" and "/foo/.." should result in the same location
1302        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1303
1304        self.assertEqual(transport.clone("/").abspath('foo'),
1305                         transport.abspath("/foo"))
1306
1307    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1308    def test_win32_abspath(self):
1309        # Note: we tried to set sys.platform='win32' so we could test on
1310        # other platforms too, but then osutils does platform specific
1311        # things at import time which defeated us...
1312        if sys.platform != 'win32':
1313            raise TestSkipped(
1314                'Testing drive letters in abspath implemented only for win32')
1315
1316        # smoke test for abspath on win32.
1317        # a transport based on 'file:///' never fully qualifies the drive.
1318        transport = _mod_transport.get_transport_from_url("file:///")
1319        self.assertEqual(transport.abspath("/"), "file:///")
1320
1321        # but a transport that starts with a drive spec must keep it.
1322        transport = _mod_transport.get_transport_from_url("file:///C:/")
1323        self.assertEqual(transport.abspath("/"), "file:///C:/")
1324
1325    def test_local_abspath(self):
1326        transport = self.get_transport()
1327        try:
1328            p = transport.local_abspath('.')
1329        except (errors.NotLocalUrl, TransportNotPossible) as e:
1330            # should be formattable
1331            s = str(e)
1332        else:
1333            self.assertEqual(getcwd(), p)
1334
1335    def test_abspath_at_root(self):
1336        t = self.get_transport()
1337        # clone all the way to the top
1338        new_transport = t.clone('..')
1339        while new_transport.base != t.base:
1340            t = new_transport
1341            new_transport = t.clone('..')
1342        # we must be able to get a abspath of the root when we ask for
1343        # t.abspath('..') - this due to our choice that clone('..')
1344        # should return the root from the root, combined with the desire that
1345        # the url from clone('..') and from abspath('..') should be the same.
1346        self.assertEqual(t.base, t.abspath('..'))
1347        # '' should give us the root
1348        self.assertEqual(t.base, t.abspath(''))
1349        # and a path should append to the url
1350        self.assertEqual(t.base + 'foo', t.abspath('foo'))
1351
1352    def test_iter_files_recursive(self):
1353        transport = self.get_transport()
1354        if not transport.listable():
1355            self.assertRaises(TransportNotPossible,
1356                              transport.iter_files_recursive)
1357            return
1358        self.build_tree(['isolated/',
1359                         'isolated/dir/',
1360                         'isolated/dir/foo',
1361                         'isolated/dir/bar',
1362                         'isolated/dir/b%25z',  # make sure quoting is correct
1363                         'isolated/bar'],
1364                        transport=transport)
1365        paths = set(transport.iter_files_recursive())
1366        # nb the directories are not converted
1367        self.assertEqual(paths,
1368                         {'isolated/dir/foo',
1369                          'isolated/dir/bar',
1370                          'isolated/dir/b%2525z',
1371                          'isolated/bar'})
1372        sub_transport = transport.clone('isolated')
1373        paths = set(sub_transport.iter_files_recursive())
1374        self.assertEqual(paths,
1375                         {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
1376
1377    def test_copy_tree(self):
1378        # TODO: test file contents and permissions are preserved. This test was
1379        # added just to ensure that quoting was handled correctly.
1380        # -- David Allouche 2006-08-11
1381        transport = self.get_transport()
1382        if not transport.listable():
1383            self.assertRaises(TransportNotPossible,
1384                              transport.iter_files_recursive)
1385            return
1386        if transport.is_readonly():
1387            return
1388        self.build_tree(['from/',
1389                         'from/dir/',
1390                         'from/dir/foo',
1391                         'from/dir/bar',
1392                         'from/dir/b%25z',  # make sure quoting is correct
1393                         'from/bar'],
1394                        transport=transport)
1395        transport.copy_tree('from', 'to')
1396        paths = set(transport.iter_files_recursive())
1397        self.assertEqual(paths,
1398                         {'from/dir/foo',
1399                          'from/dir/bar',
1400                          'from/dir/b%2525z',
1401                          'from/bar',
1402                          'to/dir/foo',
1403                          'to/dir/bar',
1404                          'to/dir/b%2525z',
1405                          'to/bar', })
1406
1407    def test_copy_tree_to_transport(self):
1408        transport = self.get_transport()
1409        if not transport.listable():
1410            self.assertRaises(TransportNotPossible,
1411                              transport.iter_files_recursive)
1412            return
1413        if transport.is_readonly():
1414            return
1415        self.build_tree(['from/',
1416                         'from/dir/',
1417                         'from/dir/foo',
1418                         'from/dir/bar',
1419                         'from/dir/b%25z',  # make sure quoting is correct
1420                         'from/bar'],
1421                        transport=transport)
1422        from_transport = transport.clone('from')
1423        to_transport = transport.clone('to')
1424        to_transport.ensure_base()
1425        from_transport.copy_tree_to_transport(to_transport)
1426        paths = set(transport.iter_files_recursive())
1427        self.assertEqual(paths,
1428                         {'from/dir/foo',
1429                          'from/dir/bar',
1430                          'from/dir/b%2525z',
1431                          'from/bar',
1432                          'to/dir/foo',
1433                          'to/dir/bar',
1434                          'to/dir/b%2525z',
1435                          'to/bar', })
1436
1437    def test_unicode_paths(self):
1438        """Test that we can read/write files with Unicode names."""
1439        t = self.get_transport()
1440
1441        # With FAT32 and certain encodings on win32
1442        # '\xe5' and '\xe4' actually map to the same file
1443        # adding a suffix kicks in the 'preserving but insensitive'
1444        # route, and maintains the right files
1445        files = [u'\xe5.1',  # a w/ circle iso-8859-1
1446                 u'\xe4.2',  # a w/ dots iso-8859-1
1447                 u'\u017d',  # Z with umlat iso-8859-2
1448                 u'\u062c',  # Arabic j
1449                 u'\u0410',  # Russian A
1450                 u'\u65e5',  # Kanji person
1451                 ]
1452
1453        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1454        if no_unicode_support:
1455            self.knownFailure("test server cannot handle unicode paths")
1456
1457        try:
1458            self.build_tree(files, transport=t, line_endings='binary')
1459        except UnicodeError:
1460            raise TestSkipped(
1461                "cannot handle unicode paths in current encoding")
1462
1463        # A plain unicode string is not a valid url
1464        for fname in files:
1465            self.assertRaises(urlutils.InvalidURL, t.get, fname)
1466
1467        for fname in files:
1468            fname_utf8 = fname.encode('utf-8')
1469            contents = b'contents of %s\n' % (fname_utf8,)
1470            self.check_transport_contents(contents, t, urlutils.escape(fname))
1471
1472    def test_connect_twice_is_same_content(self):
1473        # check that our server (whatever it is) is accessible reliably
1474        # via get_transport and multiple connections share content.
1475        transport = self.get_transport()
1476        if transport.is_readonly():
1477            return
1478        transport.put_bytes('foo', b'bar')
1479        transport3 = self.get_transport()
1480        self.check_transport_contents(b'bar', transport3, 'foo')
1481
1482        # now opening at a relative url should give use a sane result:
1483        transport.mkdir('newdir')
1484        transport5 = self.get_transport('newdir')
1485        transport6 = transport5.clone('..')
1486        self.check_transport_contents(b'bar', transport6, 'foo')
1487
1488    def test_lock_write(self):
1489        """Test transport-level write locks.
1490
1491        These are deprecated and transports may decline to support them.
1492        """
1493        transport = self.get_transport()
1494        if transport.is_readonly():
1495            self.assertRaises(TransportNotPossible,
1496                              transport.lock_write, 'foo')
1497            return
1498        transport.put_bytes('lock', b'')
1499        try:
1500            lock = transport.lock_write('lock')
1501        except TransportNotPossible:
1502            return
1503        # TODO make this consistent on all platforms:
1504        # self.assertRaises(LockError, transport.lock_write, 'lock')
1505        lock.unlock()
1506
1507    def test_lock_read(self):
1508        """Test transport-level read locks.
1509
1510        These are deprecated and transports may decline to support them.
1511        """
1512        transport = self.get_transport()
1513        if transport.is_readonly():
1514            open('lock', 'w').close()
1515        else:
1516            transport.put_bytes('lock', b'')
1517        try:
1518            lock = transport.lock_read('lock')
1519        except TransportNotPossible:
1520            return
1521        # TODO make this consistent on all platforms:
1522        # self.assertRaises(LockError, transport.lock_read, 'lock')
1523        lock.unlock()
1524
1525    def test_readv(self):
1526        transport = self.get_transport()
1527        if transport.is_readonly():
1528            with open('a', 'w') as f:
1529                f.write('0123456789')
1530        else:
1531            transport.put_bytes('a', b'0123456789')
1532
1533        d = list(transport.readv('a', ((0, 1),)))
1534        self.assertEqual(d[0], (0, b'0'))
1535
1536        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1537        self.assertEqual(d[0], (0, b'0'))
1538        self.assertEqual(d[1], (1, b'1'))
1539        self.assertEqual(d[2], (3, b'34'))
1540        self.assertEqual(d[3], (9, b'9'))
1541
1542    def test_readv_out_of_order(self):
1543        transport = self.get_transport()
1544        if transport.is_readonly():
1545            with open('a', 'w') as f:
1546                f.write('0123456789')
1547        else:
1548            transport.put_bytes('a', b'01234567890')
1549
1550        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1551        self.assertEqual(d[0], (1, b'1'))
1552        self.assertEqual(d[1], (9, b'9'))
1553        self.assertEqual(d[2], (0, b'0'))
1554        self.assertEqual(d[3], (3, b'34'))
1555
1556    def test_readv_with_adjust_for_latency(self):
1557        transport = self.get_transport()
1558        # the adjust for latency flag expands the data region returned
1559        # according to a per-transport heuristic, so testing is a little
1560        # tricky as we need more data than the largest combining that our
1561        # transports do. To accomodate this we generate random data and cross
1562        # reference the returned data with the random data. To avoid doing
1563        # multiple large random byte look ups we do several tests on the same
1564        # backing data.
1565        content = osutils.rand_bytes(200 * 1024)
1566        content_size = len(content)
1567        if transport.is_readonly():
1568            self.build_tree_contents([('a', content)])
1569        else:
1570            transport.put_bytes('a', content)
1571
1572        def check_result_data(result_vector):
1573            for item in result_vector:
1574                data_len = len(item[1])
1575                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1576
1577        # start corner case
1578        result = list(transport.readv('a', ((0, 30),),
1579                                      adjust_for_latency=True, upper_limit=content_size))
1580        # we expect 1 result, from 0, to something > 30
1581        self.assertEqual(1, len(result))
1582        self.assertEqual(0, result[0][0])
1583        self.assertTrue(len(result[0][1]) >= 30)
1584        check_result_data(result)
1585        # end of file corner case
1586        result = list(transport.readv('a', ((204700, 100),),
1587                                      adjust_for_latency=True, upper_limit=content_size))
1588        # we expect 1 result, from 204800- its length, to the end
1589        self.assertEqual(1, len(result))
1590        data_len = len(result[0][1])
1591        self.assertEqual(204800 - data_len, result[0][0])
1592        self.assertTrue(data_len >= 100)
1593        check_result_data(result)
1594        # out of order ranges are made in order
1595        result = list(transport.readv('a', ((204700, 100), (0, 50)),
1596                                      adjust_for_latency=True, upper_limit=content_size))
1597        # we expect 2 results, in order, start and end.
1598        self.assertEqual(2, len(result))
1599        # start
1600        data_len = len(result[0][1])
1601        self.assertEqual(0, result[0][0])
1602        self.assertTrue(data_len >= 30)
1603        # end
1604        data_len = len(result[1][1])
1605        self.assertEqual(204800 - data_len, result[1][0])
1606        self.assertTrue(data_len >= 100)
1607        check_result_data(result)
1608        # close ranges get combined (even if out of order)
1609        for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
1610            result = list(transport.readv('a', request_vector,
1611                                          adjust_for_latency=True, upper_limit=content_size))
1612            self.assertEqual(1, len(result))
1613            data_len = len(result[0][1])
1614            # minimum length is from 400 to 1034 - 634
1615            self.assertTrue(data_len >= 634)
1616            # must contain the region 400 to 1034
1617            self.assertTrue(result[0][0] <= 400)
1618            self.assertTrue(result[0][0] + data_len >= 1034)
1619            check_result_data(result)
1620
1621    def test_readv_with_adjust_for_latency_with_big_file(self):
1622        transport = self.get_transport()
1623        # test from observed failure case.
1624        if transport.is_readonly():
1625            with open('a', 'w') as f:
1626                f.write('a' * 1024 * 1024)
1627        else:
1628            transport.put_bytes('a', b'a' * 1024 * 1024)
1629        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1630                         (225037, 800), (221357, 800), (437077, 800), (947670, 800),
1631                         (465373, 800), (947422, 800)]
1632        results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
1633        found_items = [False] * 9
1634        for pos, (start, length) in enumerate(broken_vector):
1635            # check the range is covered by the result
1636            for offset, data in results:
1637                if offset <= start and start + length <= offset + len(data):
1638                    found_items[pos] = True
1639        self.assertEqual([True] * 9, found_items)
1640
1641    def test_get_with_open_write_stream_sees_all_content(self):
1642        t = self.get_transport()
1643        if t.is_readonly():
1644            return
1645        with t.open_write_stream('foo') as handle:
1646            handle.write(b'bcd')
1647            self.assertEqual([(0, b'b'), (2, b'd')], list(
1648                t.readv('foo', ((0, 1), (2, 1)))))
1649
1650    def test_get_smart_medium(self):
1651        """All transports must either give a smart medium, or know they can't.
1652        """
1653        transport = self.get_transport()
1654        try:
1655            client_medium = transport.get_smart_medium()
1656        except errors.NoSmartMedium:
1657            # as long as we got it we're fine
1658            pass
1659        else:
1660            from ..bzr.smart import medium
1661            self.assertIsInstance(client_medium, medium.SmartClientMedium)
1662
1663    def test_readv_short_read(self):
1664        transport = self.get_transport()
1665        if transport.is_readonly():
1666            with open('a', 'w') as f:
1667                f.write('0123456789')
1668        else:
1669            transport.put_bytes('a', b'01234567890')
1670
1671        # This is intentionally reading off the end of the file
1672        # since we are sure that it cannot get there
1673        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1674                               # Can be raised by paramiko
1675                               AssertionError),
1676                              transport.readv, 'a', [(1, 1), (8, 10)])
1677
1678        # This is trying to seek past the end of the file, it should
1679        # also raise a special error
1680        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1681                              transport.readv, 'a', [(12, 2)])
1682
1683    def test_no_segment_parameters(self):
1684        """Segment parameters should be stripped and stored in
1685        transport.segment_parameters."""
1686        transport = self.get_transport("foo")
1687        self.assertEqual({}, transport.get_segment_parameters())
1688
1689    def test_segment_parameters(self):
1690        """Segment parameters should be stripped and stored in
1691        transport.get_segment_parameters()."""
1692        base_url = self._server.get_url()
1693        parameters = {"key1": "val1", "key2": "val2"}
1694        url = urlutils.join_segment_parameters(base_url, parameters)
1695        transport = _mod_transport.get_transport_from_url(url)
1696        self.assertEqual(parameters, transport.get_segment_parameters())
1697
1698    def test_set_segment_parameters(self):
1699        """Segment parameters can be set and show up in base."""
1700        transport = self.get_transport("foo")
1701        orig_base = transport.base
1702        transport.set_segment_parameter("arm", "board")
1703        self.assertEqual("%s,arm=board" % orig_base, transport.base)
1704        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
1705        transport.set_segment_parameter("arm", None)
1706        transport.set_segment_parameter("nonexistant", None)
1707        self.assertEqual({}, transport.get_segment_parameters())
1708        self.assertEqual(orig_base, transport.base)
1709
1710    def test_stat_symlink(self):
1711        # if a transport points directly to a symlink (and supports symlinks
1712        # at all) you can tell this.  helps with bug 32669.
1713        t = self.get_transport()
1714        try:
1715            t.symlink('target', 'link')
1716        except TransportNotPossible:
1717            raise TestSkipped("symlinks not supported")
1718        t2 = t.clone('link')
1719        st = t2.stat('')
1720        self.assertTrue(stat.S_ISLNK(st.st_mode))
1721
1722    def test_abspath_url_unquote_unreserved(self):
1723        """URLs from abspath should have unreserved characters unquoted
1724
1725        Need consistent quoting notably for tildes, see lp:842223 for more.
1726        """
1727        t = self.get_transport()
1728        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1729        self.assertEqual(t.base + "-.09AZ_az~",
1730                         t.abspath(needlessly_escaped_dir))
1731
1732    def test_clone_url_unquote_unreserved(self):
1733        """Base URL of a cloned branch needs unreserved characters unquoted
1734
1735        Cloned transports should be prefix comparable for things like the
1736        isolation checking of tests, see lp:842223 for more.
1737        """
1738        t1 = self.get_transport()
1739        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1740        self.build_tree([needlessly_escaped_dir], transport=t1)
1741        t2 = t1.clone(needlessly_escaped_dir)
1742        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1743
1744    def test_hook_post_connection_one(self):
1745        """Fire post_connect hook after a ConnectedTransport is first used"""
1746        log = []
1747        Transport.hooks.install_named_hook("post_connect", log.append, None)
1748        t = self.get_transport()
1749        self.assertEqual([], log)
1750        t.has("non-existant")
1751        if isinstance(t, RemoteTransport):
1752            self.assertEqual([t.get_smart_medium()], log)
1753        elif isinstance(t, ConnectedTransport):
1754            self.assertEqual([t], log)
1755        else:
1756            self.assertEqual([], log)
1757
1758    def test_hook_post_connection_multi(self):
1759        """Fire post_connect hook once per unshared underlying connection"""
1760        log = []
1761        Transport.hooks.install_named_hook("post_connect", log.append, None)
1762        t1 = self.get_transport()
1763        t2 = t1.clone(".")
1764        t3 = self.get_transport()
1765        self.assertEqual([], log)
1766        t1.has("x")
1767        t2.has("x")
1768        t3.has("x")
1769        if isinstance(t1, RemoteTransport):
1770            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1771        elif isinstance(t1, ConnectedTransport):
1772            self.assertEqual([t1, t3], log)
1773        else:
1774            self.assertEqual([], log)
1775