1# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17"""Tests for Transport implementations. 18 19Transport implementations tested here are supplied by 20TransportTestProviderAdapter. 21""" 22 23from io import BytesIO 24import os 25import stat 26import sys 27 28from .. import ( 29 errors, 30 osutils, 31 pyutils, 32 tests, 33 transport as _mod_transport, 34 urlutils, 35 ) 36from ..errors import (ConnectionError, 37 FileExists, 38 NoSuchFile, 39 PathError, 40 TransportNotPossible, 41 ) 42from ..osutils import getcwd 43from . import ( 44 TestSkipped, 45 TestNotApplicable, 46 multiply_tests, 47 ) 48from . import test_server 49from .test_transport import TestTransportImplementation 50from ..transport import ( 51 ConnectedTransport, 52 Transport, 53 _get_transport_modules, 54 ) 55from ..transport.memory import MemoryTransport 56from ..transport.remote import RemoteTransport 57 58 59def get_transport_test_permutations(module): 60 """Get the permutations module wants to have tested.""" 61 if getattr(module, 'get_test_permutations', None) is None: 62 raise AssertionError( 63 "transport module %s doesn't provide get_test_permutations()" 64 % module.__name__) 65 return [] 66 return module.get_test_permutations() 67 68 69def transport_test_permutations(): 70 """Return a list of the klass, server_factory pairs to test.""" 71 result = [] 72 for module in _get_transport_modules(): 73 try: 74 permutations = get_transport_test_permutations( 75 pyutils.get_named_object(module)) 76 for (klass, server_factory) in permutations: 77 scenario = ('%s,%s' % (klass.__name__, server_factory.__name__), 78 {"transport_class": klass, 79 "transport_server": server_factory}) 80 result.append(scenario) 81 except errors.DependencyNotPresent as e: 82 # Continue even if a dependency prevents us 83 # from adding this test 84 pass 85 return result 86 87 88def load_tests(loader, standard_tests, pattern): 89 """Multiply tests for tranport implementations.""" 90 result = loader.suiteClass() 91 scenarios = transport_test_permutations() 92 return multiply_tests(standard_tests, scenarios, result) 93 94 95class TransportTests(TestTransportImplementation): 96 97 def setUp(self): 98 super(TransportTests, self).setUp() 99 self.overrideEnv('BRZ_NO_SMART_VFS', None) 100 101 def check_transport_contents(self, content, transport, relpath): 102 """Check that transport.get_bytes(relpath) == content.""" 103 self.assertEqualDiff(content, transport.get_bytes(relpath)) 104 105 def test_ensure_base_missing(self): 106 """.ensure_base() should create the directory if it doesn't exist""" 107 t = self.get_transport() 108 t_a = t.clone('a') 109 if t_a.is_readonly(): 110 self.assertRaises(TransportNotPossible, 111 t_a.ensure_base) 112 return 113 self.assertTrue(t_a.ensure_base()) 114 self.assertTrue(t.has('a')) 115 116 def test_ensure_base_exists(self): 117 """.ensure_base() should just be happy if it already exists""" 118 t = self.get_transport() 119 if t.is_readonly(): 120 return 121 122 t.mkdir('a') 123 t_a = t.clone('a') 124 # ensure_base returns False if it didn't create the base 125 self.assertFalse(t_a.ensure_base()) 126 127 def test_ensure_base_missing_parent(self): 128 """.ensure_base() will fail if the parent dir doesn't exist""" 129 t = self.get_transport() 130 if t.is_readonly(): 131 return 132 133 t_a = t.clone('a') 134 t_b = t_a.clone('b') 135 self.assertRaises(NoSuchFile, t_b.ensure_base) 136 137 def test_external_url(self): 138 """.external_url either works or raises InProcessTransport.""" 139 t = self.get_transport() 140 try: 141 t.external_url() 142 except errors.InProcessTransport: 143 pass 144 145 def test_has(self): 146 t = self.get_transport() 147 148 files = ['a', 'b', 'e', 'g', '%'] 149 self.build_tree(files, transport=t) 150 self.assertEqual(True, t.has('a')) 151 self.assertEqual(False, t.has('c')) 152 self.assertEqual(True, t.has(urlutils.escape('%'))) 153 self.assertEqual(True, t.has_any(['a', 'b', 'c'])) 154 self.assertEqual(False, t.has_any(['c', 'd', 'f', 155 urlutils.escape('%%')])) 156 self.assertEqual(False, t.has_any(['c', 'c', 'c'])) 157 self.assertEqual(True, t.has_any(['b', 'b', 'b'])) 158 159 def test_has_root_works(self): 160 if self.transport_server is test_server.SmartTCPServer_for_testing: 161 raise TestNotApplicable( 162 "SmartTCPServer_for_testing intentionally does not allow " 163 "access to /.") 164 current_transport = self.get_transport() 165 self.assertTrue(current_transport.has('/')) 166 root = current_transport.clone('/') 167 self.assertTrue(root.has('')) 168 169 def test_get(self): 170 t = self.get_transport() 171 172 files = ['a'] 173 content = b'contents of a\n' 174 self.build_tree(['a'], transport=t, line_endings='binary') 175 self.check_transport_contents(b'contents of a\n', t, 'a') 176 f = t.get('a') 177 self.assertEqual(content, f.read()) 178 179 def test_get_unknown_file(self): 180 t = self.get_transport() 181 files = ['a', 'b'] 182 contents = [b'contents of a\n', 183 b'contents of b\n', 184 ] 185 self.build_tree(files, transport=t, line_endings='binary') 186 self.assertRaises(NoSuchFile, t.get, 'c') 187 188 def iterate_and_close(func, *args): 189 for f in func(*args): 190 # We call f.read() here because things like paramiko actually 191 # spawn a thread to prefetch the content, which we want to 192 # consume before we close the handle. 193 content = f.read() 194 f.close() 195 196 def test_get_directory_read_gives_ReadError(self): 197 """consistent errors for read() on a file returned by get().""" 198 t = self.get_transport() 199 if t.is_readonly(): 200 self.build_tree(['a directory/']) 201 else: 202 t.mkdir('a%20directory') 203 # getting the file must either work or fail with a PathError 204 try: 205 a_file = t.get('a%20directory') 206 except (errors.PathError, errors.RedirectRequested): 207 # early failure return immediately. 208 return 209 # having got a file, read() must either work (i.e. http reading a dir 210 # listing) or fail with ReadError 211 try: 212 a_file.read() 213 except errors.ReadError: 214 pass 215 216 def test_get_bytes(self): 217 t = self.get_transport() 218 219 files = ['a', 'b', 'e', 'g'] 220 contents = [b'contents of a\n', 221 b'contents of b\n', 222 b'contents of e\n', 223 b'contents of g\n', 224 ] 225 self.build_tree(files, transport=t, line_endings='binary') 226 self.check_transport_contents(b'contents of a\n', t, 'a') 227 228 for content, fname in zip(contents, files): 229 self.assertEqual(content, t.get_bytes(fname)) 230 231 def test_get_bytes_unknown_file(self): 232 t = self.get_transport() 233 self.assertRaises(NoSuchFile, t.get_bytes, 'c') 234 235 def test_get_with_open_write_stream_sees_all_content(self): 236 t = self.get_transport() 237 if t.is_readonly(): 238 return 239 with t.open_write_stream('foo') as handle: 240 handle.write(b'b') 241 self.assertEqual(b'b', t.get_bytes('foo')) 242 243 def test_get_bytes_with_open_write_stream_sees_all_content(self): 244 t = self.get_transport() 245 if t.is_readonly(): 246 return 247 with t.open_write_stream('foo') as handle: 248 handle.write(b'b') 249 self.assertEqual(b'b', t.get_bytes('foo')) 250 with t.get('foo') as f: 251 self.assertEqual(b'b', f.read()) 252 253 def test_put_bytes(self): 254 t = self.get_transport() 255 256 if t.is_readonly(): 257 self.assertRaises(TransportNotPossible, 258 t.put_bytes, 'a', b'some text for a\n') 259 return 260 261 t.put_bytes('a', b'some text for a\n') 262 self.assertTrue(t.has('a')) 263 self.check_transport_contents(b'some text for a\n', t, 'a') 264 265 # The contents should be overwritten 266 t.put_bytes('a', b'new text for a\n') 267 self.check_transport_contents(b'new text for a\n', t, 'a') 268 269 self.assertRaises(NoSuchFile, 270 t.put_bytes, 'path/doesnt/exist/c', b'contents') 271 272 def test_put_bytes_non_atomic(self): 273 t = self.get_transport() 274 275 if t.is_readonly(): 276 self.assertRaises(TransportNotPossible, 277 t.put_bytes_non_atomic, 'a', b'some text for a\n') 278 return 279 280 self.assertFalse(t.has('a')) 281 t.put_bytes_non_atomic('a', b'some text for a\n') 282 self.assertTrue(t.has('a')) 283 self.check_transport_contents(b'some text for a\n', t, 'a') 284 # Put also replaces contents 285 t.put_bytes_non_atomic('a', b'new\ncontents for\na\n') 286 self.check_transport_contents(b'new\ncontents for\na\n', t, 'a') 287 288 # Make sure we can create another file 289 t.put_bytes_non_atomic('d', b'contents for\nd\n') 290 # And overwrite 'a' with empty contents 291 t.put_bytes_non_atomic('a', b'') 292 self.check_transport_contents(b'contents for\nd\n', t, 'd') 293 self.check_transport_contents(b'', t, 'a') 294 295 self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path', 296 b'contents\n') 297 # Now test the create_parent flag 298 self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a', 299 b'contents\n') 300 self.assertFalse(t.has('dir/a')) 301 t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n', 302 create_parent_dir=True) 303 self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a') 304 305 # But we still get NoSuchFile if we can't make the parent dir 306 self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a', 307 b'contents\n', 308 create_parent_dir=True) 309 310 def test_put_bytes_permissions(self): 311 t = self.get_transport() 312 313 if t.is_readonly(): 314 return 315 if not t._can_roundtrip_unix_modebits(): 316 # Can't roundtrip, so no need to run this test 317 return 318 t.put_bytes('mode644', b'test text\n', mode=0o644) 319 self.assertTransportMode(t, 'mode644', 0o644) 320 t.put_bytes('mode666', b'test text\n', mode=0o666) 321 self.assertTransportMode(t, 'mode666', 0o666) 322 t.put_bytes('mode600', b'test text\n', mode=0o600) 323 self.assertTransportMode(t, 'mode600', 0o600) 324 # Yes, you can put_bytes a file such that it becomes readonly 325 t.put_bytes('mode400', b'test text\n', mode=0o400) 326 self.assertTransportMode(t, 'mode400', 0o400) 327 328 # The default permissions should be based on the current umask 329 umask = osutils.get_umask() 330 t.put_bytes('nomode', b'test text\n', mode=None) 331 self.assertTransportMode(t, 'nomode', 0o666 & ~umask) 332 333 def test_put_bytes_non_atomic_permissions(self): 334 t = self.get_transport() 335 336 if t.is_readonly(): 337 return 338 if not t._can_roundtrip_unix_modebits(): 339 # Can't roundtrip, so no need to run this test 340 return 341 t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644) 342 self.assertTransportMode(t, 'mode644', 0o644) 343 t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666) 344 self.assertTransportMode(t, 'mode666', 0o666) 345 t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600) 346 self.assertTransportMode(t, 'mode600', 0o600) 347 t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400) 348 self.assertTransportMode(t, 'mode400', 0o400) 349 350 # The default permissions should be based on the current umask 351 umask = osutils.get_umask() 352 t.put_bytes_non_atomic('nomode', b'test text\n', mode=None) 353 self.assertTransportMode(t, 'nomode', 0o666 & ~umask) 354 355 # We should also be able to set the mode for a parent directory 356 # when it is created 357 t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664, 358 dir_mode=0o700, create_parent_dir=True) 359 self.assertTransportMode(t, 'dir700', 0o700) 360 t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664, 361 dir_mode=0o770, create_parent_dir=True) 362 self.assertTransportMode(t, 'dir770', 0o770) 363 t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664, 364 dir_mode=0o777, create_parent_dir=True) 365 self.assertTransportMode(t, 'dir777', 0o777) 366 367 def test_put_file(self): 368 t = self.get_transport() 369 370 if t.is_readonly(): 371 self.assertRaises(TransportNotPossible, 372 t.put_file, 'a', BytesIO(b'some text for a\n')) 373 return 374 375 result = t.put_file('a', BytesIO(b'some text for a\n')) 376 # put_file returns the length of the data written 377 self.assertEqual(16, result) 378 self.assertTrue(t.has('a')) 379 self.check_transport_contents(b'some text for a\n', t, 'a') 380 # Put also replaces contents 381 result = t.put_file('a', BytesIO(b'new\ncontents for\na\n')) 382 self.assertEqual(19, result) 383 self.check_transport_contents(b'new\ncontents for\na\n', t, 'a') 384 self.assertRaises(NoSuchFile, 385 t.put_file, 'path/doesnt/exist/c', 386 BytesIO(b'contents')) 387 388 def test_put_file_non_atomic(self): 389 t = self.get_transport() 390 391 if t.is_readonly(): 392 self.assertRaises(TransportNotPossible, 393 t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n')) 394 return 395 396 self.assertFalse(t.has('a')) 397 t.put_file_non_atomic('a', BytesIO(b'some text for a\n')) 398 self.assertTrue(t.has('a')) 399 self.check_transport_contents(b'some text for a\n', t, 'a') 400 # Put also replaces contents 401 t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n')) 402 self.check_transport_contents(b'new\ncontents for\na\n', t, 'a') 403 404 # Make sure we can create another file 405 t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n')) 406 # And overwrite 'a' with empty contents 407 t.put_file_non_atomic('a', BytesIO(b'')) 408 self.check_transport_contents(b'contents for\nd\n', t, 'd') 409 self.check_transport_contents(b'', t, 'a') 410 411 self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path', 412 BytesIO(b'contents\n')) 413 # Now test the create_parent flag 414 self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a', 415 BytesIO(b'contents\n')) 416 self.assertFalse(t.has('dir/a')) 417 t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'), 418 create_parent_dir=True) 419 self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a') 420 421 # But we still get NoSuchFile if we can't make the parent dir 422 self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a', 423 BytesIO(b'contents\n'), 424 create_parent_dir=True) 425 426 def test_put_file_permissions(self): 427 428 t = self.get_transport() 429 430 if t.is_readonly(): 431 return 432 if not t._can_roundtrip_unix_modebits(): 433 # Can't roundtrip, so no need to run this test 434 return 435 t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644) 436 self.assertTransportMode(t, 'mode644', 0o644) 437 t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666) 438 self.assertTransportMode(t, 'mode666', 0o666) 439 t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600) 440 self.assertTransportMode(t, 'mode600', 0o600) 441 # Yes, you can put a file such that it becomes readonly 442 t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400) 443 self.assertTransportMode(t, 'mode400', 0o400) 444 # The default permissions should be based on the current umask 445 umask = osutils.get_umask() 446 t.put_file('nomode', BytesIO(b'test text\n'), mode=None) 447 self.assertTransportMode(t, 'nomode', 0o666 & ~umask) 448 449 def test_put_file_non_atomic_permissions(self): 450 t = self.get_transport() 451 452 if t.is_readonly(): 453 return 454 if not t._can_roundtrip_unix_modebits(): 455 # Can't roundtrip, so no need to run this test 456 return 457 t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644) 458 self.assertTransportMode(t, 'mode644', 0o644) 459 t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666) 460 self.assertTransportMode(t, 'mode666', 0o666) 461 t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600) 462 self.assertTransportMode(t, 'mode600', 0o600) 463 # Yes, you can put_file_non_atomic a file such that it becomes readonly 464 t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400) 465 self.assertTransportMode(t, 'mode400', 0o400) 466 467 # The default permissions should be based on the current umask 468 umask = osutils.get_umask() 469 t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None) 470 self.assertTransportMode(t, 'nomode', 0o666 & ~umask) 471 472 # We should also be able to set the mode for a parent directory 473 # when it is created 474 sio = BytesIO() 475 t.put_file_non_atomic('dir700/mode664', sio, mode=0o664, 476 dir_mode=0o700, create_parent_dir=True) 477 self.assertTransportMode(t, 'dir700', 0o700) 478 t.put_file_non_atomic('dir770/mode664', sio, mode=0o664, 479 dir_mode=0o770, create_parent_dir=True) 480 self.assertTransportMode(t, 'dir770', 0o770) 481 t.put_file_non_atomic('dir777/mode664', sio, mode=0o664, 482 dir_mode=0o777, create_parent_dir=True) 483 self.assertTransportMode(t, 'dir777', 0o777) 484 485 def test_put_bytes_unicode(self): 486 t = self.get_transport() 487 if t.is_readonly(): 488 return 489 unicode_string = u'\u1234' 490 self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string) 491 492 def test_mkdir(self): 493 t = self.get_transport() 494 495 if t.is_readonly(): 496 # cannot mkdir on readonly transports. We're not testing for 497 # cache coherency because cache behaviour is not currently 498 # defined for the transport interface. 499 self.assertRaises(TransportNotPossible, t.mkdir, '.') 500 self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir') 501 self.assertRaises(TransportNotPossible, 502 t.mkdir, 'path/doesnt/exist') 503 return 504 # Test mkdir 505 t.mkdir('dir_a') 506 self.assertEqual(t.has('dir_a'), True) 507 self.assertEqual(t.has('dir_b'), False) 508 509 t.mkdir('dir_b') 510 self.assertEqual(t.has('dir_b'), True) 511 512 self.assertEqual([t.has(n) for n in 513 ['dir_a', 'dir_b', 'dir_q', 'dir_b']], 514 [True, True, False, True]) 515 516 # we were testing that a local mkdir followed by a transport 517 # mkdir failed thusly, but given that we * in one process * do not 518 # concurrently fiddle with disk dirs and then use transport to do 519 # things, the win here seems marginal compared to the constraint on 520 # the interface. RBC 20051227 521 t.mkdir('dir_g') 522 self.assertRaises(FileExists, t.mkdir, 'dir_g') 523 524 # Test get/put in sub-directories 525 t.put_bytes('dir_a/a', b'contents of dir_a/a') 526 t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b')) 527 self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a') 528 self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b') 529 530 # mkdir of a dir with an absent parent 531 self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir') 532 533 def test_mkdir_permissions(self): 534 t = self.get_transport() 535 if t.is_readonly(): 536 return 537 if not t._can_roundtrip_unix_modebits(): 538 # no sense testing on this transport 539 return 540 # Test mkdir with a mode 541 t.mkdir('dmode755', mode=0o755) 542 self.assertTransportMode(t, 'dmode755', 0o755) 543 t.mkdir('dmode555', mode=0o555) 544 self.assertTransportMode(t, 'dmode555', 0o555) 545 t.mkdir('dmode777', mode=0o777) 546 self.assertTransportMode(t, 'dmode777', 0o777) 547 t.mkdir('dmode700', mode=0o700) 548 self.assertTransportMode(t, 'dmode700', 0o700) 549 t.mkdir('mdmode755', mode=0o755) 550 self.assertTransportMode(t, 'mdmode755', 0o755) 551 552 # Default mode should be based on umask 553 umask = osutils.get_umask() 554 t.mkdir('dnomode', mode=None) 555 self.assertTransportMode(t, 'dnomode', 0o777 & ~umask) 556 557 def test_opening_a_file_stream_creates_file(self): 558 t = self.get_transport() 559 if t.is_readonly(): 560 return 561 handle = t.open_write_stream('foo') 562 try: 563 self.assertEqual(b'', t.get_bytes('foo')) 564 finally: 565 handle.close() 566 567 def test_opening_a_file_stream_can_set_mode(self): 568 t = self.get_transport() 569 if t.is_readonly(): 570 self.assertRaises((TransportNotPossible, NotImplementedError), 571 t.open_write_stream, 'foo') 572 return 573 if not t._can_roundtrip_unix_modebits(): 574 # Can't roundtrip, so no need to run this test 575 return 576 577 def check_mode(name, mode, expected): 578 handle = t.open_write_stream(name, mode=mode) 579 handle.close() 580 self.assertTransportMode(t, name, expected) 581 check_mode('mode644', 0o644, 0o644) 582 check_mode('mode666', 0o666, 0o666) 583 check_mode('mode600', 0o600, 0o600) 584 # The default permissions should be based on the current umask 585 check_mode('nomode', None, 0o666 & ~osutils.get_umask()) 586 587 def test_copy_to(self): 588 # FIXME: test: same server to same server (partly done) 589 # same protocol two servers 590 # and different protocols (done for now except for MemoryTransport. 591 # - RBC 20060122 592 593 def simple_copy_files(transport_from, transport_to): 594 files = ['a', 'b', 'c', 'd'] 595 self.build_tree(files, transport=transport_from) 596 self.assertEqual(4, transport_from.copy_to(files, transport_to)) 597 for f in files: 598 self.check_transport_contents(transport_to.get_bytes(f), 599 transport_from, f) 600 601 t = self.get_transport() 602 if t.__class__.__name__ == "SFTPTransport": 603 self.skipTest("SFTP copy_to currently too flakey to use") 604 temp_transport = MemoryTransport('memory:///') 605 simple_copy_files(t, temp_transport) 606 if not t.is_readonly(): 607 t.mkdir('copy_to_simple') 608 t2 = t.clone('copy_to_simple') 609 simple_copy_files(t, t2) 610 611 # Test that copying into a missing directory raises 612 # NoSuchFile 613 if t.is_readonly(): 614 self.build_tree(['e/', 'e/f']) 615 else: 616 t.mkdir('e') 617 t.put_bytes('e/f', b'contents of e') 618 self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport) 619 temp_transport.mkdir('e') 620 t.copy_to(['e/f'], temp_transport) 621 622 del temp_transport 623 temp_transport = MemoryTransport('memory:///') 624 625 files = ['a', 'b', 'c', 'd'] 626 t.copy_to(iter(files), temp_transport) 627 for f in files: 628 self.check_transport_contents(temp_transport.get_bytes(f), 629 t, f) 630 del temp_transport 631 632 for mode in (0o666, 0o644, 0o600, 0o400): 633 temp_transport = MemoryTransport("memory:///") 634 t.copy_to(files, temp_transport, mode=mode) 635 for f in files: 636 self.assertTransportMode(temp_transport, f, mode) 637 638 def test_create_prefix(self): 639 t = self.get_transport() 640 sub = t.clone('foo').clone('bar') 641 try: 642 sub.create_prefix() 643 except TransportNotPossible: 644 self.assertTrue(t.is_readonly()) 645 else: 646 self.assertTrue(t.has('foo/bar')) 647 648 def test_append_file(self): 649 t = self.get_transport() 650 651 if t.is_readonly(): 652 self.assertRaises(TransportNotPossible, 653 t.append_file, 'a', 'add\nsome\nmore\ncontents\n') 654 return 655 t.put_bytes('a', b'diff\ncontents for\na\n') 656 t.put_bytes('b', b'contents\nfor b\n') 657 658 self.assertEqual(20, 659 t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n'))) 660 661 self.check_transport_contents( 662 b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n', 663 t, 'a') 664 665 # a file with no parent should fail.. 666 self.assertRaises(NoSuchFile, 667 t.append_file, 'missing/path', BytesIO(b'content')) 668 669 # And we can create new files, too 670 self.assertEqual(0, 671 t.append_file('c', BytesIO(b'some text\nfor a missing file\n'))) 672 self.check_transport_contents(b'some text\nfor a missing file\n', 673 t, 'c') 674 675 def test_append_bytes(self): 676 t = self.get_transport() 677 678 if t.is_readonly(): 679 self.assertRaises(TransportNotPossible, 680 t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n') 681 return 682 683 self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n')) 684 self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n')) 685 686 self.assertEqual(20, 687 t.append_bytes('a', b'add\nsome\nmore\ncontents\n')) 688 689 self.check_transport_contents( 690 b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n', 691 t, 'a') 692 693 # a file with no parent should fail.. 694 self.assertRaises(NoSuchFile, 695 t.append_bytes, 'missing/path', b'content') 696 697 def test_append_file_mode(self): 698 """Check that append accepts a mode parameter""" 699 # check append accepts a mode 700 t = self.get_transport() 701 if t.is_readonly(): 702 self.assertRaises(TransportNotPossible, 703 t.append_file, 'f', BytesIO(b'f'), mode=None) 704 return 705 t.append_file('f', BytesIO(b'f'), mode=None) 706 707 def test_append_bytes_mode(self): 708 # check append_bytes accepts a mode 709 t = self.get_transport() 710 if t.is_readonly(): 711 self.assertRaises(TransportNotPossible, 712 t.append_bytes, 'f', b'f', mode=None) 713 return 714 t.append_bytes('f', b'f', mode=None) 715 716 def test_delete(self): 717 # TODO: Test Transport.delete 718 t = self.get_transport() 719 720 # Not much to do with a readonly transport 721 if t.is_readonly(): 722 self.assertRaises(TransportNotPossible, t.delete, 'missing') 723 return 724 725 t.put_bytes('a', b'a little bit of text\n') 726 self.assertTrue(t.has('a')) 727 t.delete('a') 728 self.assertFalse(t.has('a')) 729 730 self.assertRaises(NoSuchFile, t.delete, 'a') 731 732 t.put_bytes('a', b'a text\n') 733 t.put_bytes('b', b'b text\n') 734 t.put_bytes('c', b'c text\n') 735 self.assertEqual([True, True, True], 736 [t.has(n) for n in ['a', 'b', 'c']]) 737 t.delete('a') 738 t.delete('c') 739 self.assertEqual([False, True, False], 740 [t.has(n) for n in ['a', 'b', 'c']]) 741 self.assertFalse(t.has('a')) 742 self.assertTrue(t.has('b')) 743 self.assertFalse(t.has('c')) 744 745 for name in ['a', 'c', 'd']: 746 self.assertRaises(NoSuchFile, t.delete, name) 747 748 # We should have deleted everything 749 # SftpServer creates control files in the 750 # working directory, so we can just do a 751 # plain "listdir". 752 # self.assertEqual([], os.listdir('.')) 753 754 def test_recommended_page_size(self): 755 """Transports recommend a page size for partial access to files.""" 756 t = self.get_transport() 757 self.assertIsInstance(t.recommended_page_size(), int) 758 759 def test_rmdir(self): 760 t = self.get_transport() 761 # Not much to do with a readonly transport 762 if t.is_readonly(): 763 self.assertRaises(TransportNotPossible, t.rmdir, 'missing') 764 return 765 t.mkdir('adir') 766 t.mkdir('adir/bdir') 767 t.rmdir('adir/bdir') 768 # ftp may not be able to raise NoSuchFile for lack of 769 # details when failing 770 self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir') 771 t.rmdir('adir') 772 self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir') 773 774 def test_rmdir_not_empty(self): 775 """Deleting a non-empty directory raises an exception 776 777 sftp (and possibly others) don't give us a specific "directory not 778 empty" exception -- we can just see that the operation failed. 779 """ 780 t = self.get_transport() 781 if t.is_readonly(): 782 return 783 t.mkdir('adir') 784 t.mkdir('adir/bdir') 785 self.assertRaises(PathError, t.rmdir, 'adir') 786 787 def test_rmdir_empty_but_similar_prefix(self): 788 """rmdir does not get confused by sibling paths. 789 790 A naive implementation of MemoryTransport would refuse to rmdir 791 ".bzr/branch" if there is a ".bzr/branch-format" directory, because it 792 uses "path.startswith(dir)" on all file paths to determine if directory 793 is empty. 794 """ 795 t = self.get_transport() 796 if t.is_readonly(): 797 return 798 t.mkdir('foo') 799 t.put_bytes('foo-bar', b'') 800 t.mkdir('foo-baz') 801 t.rmdir('foo') 802 self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo') 803 self.assertTrue(t.has('foo-bar')) 804 805 def test_rename_dir_succeeds(self): 806 t = self.get_transport() 807 if t.is_readonly(): 808 self.assertRaises((TransportNotPossible, NotImplementedError), 809 t.rename, 'foo', 'bar') 810 return 811 t.mkdir('adir') 812 t.mkdir('adir/asubdir') 813 t.rename('adir', 'bdir') 814 self.assertTrue(t.has('bdir/asubdir')) 815 self.assertFalse(t.has('adir')) 816 817 def test_rename_dir_nonempty(self): 818 """Attempting to replace a nonemtpy directory should fail""" 819 t = self.get_transport() 820 if t.is_readonly(): 821 self.assertRaises((TransportNotPossible, NotImplementedError), 822 t.rename, 'foo', 'bar') 823 return 824 t.mkdir('adir') 825 t.mkdir('adir/asubdir') 826 t.mkdir('bdir') 827 t.mkdir('bdir/bsubdir') 828 # any kind of PathError would be OK, though we normally expect 829 # DirectoryNotEmpty 830 self.assertRaises(PathError, t.rename, 'bdir', 'adir') 831 # nothing was changed so it should still be as before 832 self.assertTrue(t.has('bdir/bsubdir')) 833 self.assertFalse(t.has('adir/bdir')) 834 self.assertFalse(t.has('adir/bsubdir')) 835 836 def test_rename_across_subdirs(self): 837 t = self.get_transport() 838 if t.is_readonly(): 839 raise TestNotApplicable("transport is readonly") 840 t.mkdir('a') 841 t.mkdir('b') 842 ta = t.clone('a') 843 tb = t.clone('b') 844 ta.put_bytes('f', b'aoeu') 845 ta.rename('f', '../b/f') 846 self.assertTrue(tb.has('f')) 847 self.assertFalse(ta.has('f')) 848 self.assertTrue(t.has('b/f')) 849 850 def test_delete_tree(self): 851 t = self.get_transport() 852 853 # Not much to do with a readonly transport 854 if t.is_readonly(): 855 self.assertRaises(TransportNotPossible, t.delete_tree, 'missing') 856 return 857 858 # and does it like listing ? 859 t.mkdir('adir') 860 try: 861 t.delete_tree('adir') 862 except TransportNotPossible: 863 # ok, this transport does not support delete_tree 864 return 865 866 # did it delete that trivial case? 867 self.assertRaises(NoSuchFile, t.stat, 'adir') 868 869 self.build_tree(['adir/', 870 'adir/file', 871 'adir/subdir/', 872 'adir/subdir/file', 873 'adir/subdir2/', 874 'adir/subdir2/file', 875 ], transport=t) 876 877 t.delete_tree('adir') 878 # adir should be gone now. 879 self.assertRaises(NoSuchFile, t.stat, 'adir') 880 881 def test_move(self): 882 t = self.get_transport() 883 884 if t.is_readonly(): 885 return 886 887 # TODO: I would like to use os.listdir() to 888 # make sure there are no extra files, but SftpServer 889 # creates control files in the working directory 890 # perhaps all of this could be done in a subdirectory 891 892 t.put_bytes('a', b'a first file\n') 893 self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']]) 894 895 t.move('a', 'b') 896 self.assertTrue(t.has('b')) 897 self.assertFalse(t.has('a')) 898 899 self.check_transport_contents(b'a first file\n', t, 'b') 900 self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']]) 901 902 # Overwrite a file 903 t.put_bytes('c', b'c this file\n') 904 t.move('c', 'b') 905 self.assertFalse(t.has('c')) 906 self.check_transport_contents(b'c this file\n', t, 'b') 907 908 # TODO: Try to write a test for atomicity 909 # TODO: Test moving into a non-existent subdirectory 910 911 def test_copy(self): 912 t = self.get_transport() 913 914 if t.is_readonly(): 915 return 916 917 t.put_bytes('a', b'a file\n') 918 t.copy('a', 'b') 919 self.check_transport_contents(b'a file\n', t, 'b') 920 921 self.assertRaises(NoSuchFile, t.copy, 'c', 'd') 922 os.mkdir('c') 923 # What should the assert be if you try to copy a 924 # file over a directory? 925 #self.assertRaises(Something, t.copy, 'a', 'c') 926 t.put_bytes('d', b'text in d\n') 927 t.copy('d', 'b') 928 self.check_transport_contents(b'text in d\n', t, 'b') 929 930 def test_connection_error(self): 931 """ConnectionError is raised when connection is impossible. 932 933 The error should be raised from the first operation on the transport. 934 """ 935 try: 936 url = self._server.get_bogus_url() 937 except NotImplementedError: 938 raise TestSkipped("Transport %s has no bogus URL support." % 939 self._server.__class__) 940 t = _mod_transport.get_transport_from_url(url) 941 self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch') 942 943 def test_stat(self): 944 # TODO: Test stat, just try once, and if it throws, stop testing 945 from stat import S_ISDIR, S_ISREG 946 947 t = self.get_transport() 948 949 try: 950 st = t.stat('.') 951 except TransportNotPossible as e: 952 # This transport cannot stat 953 return 954 955 paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e'] 956 sizes = [14, 0, 16, 0, 18] 957 self.build_tree(paths, transport=t, line_endings='binary') 958 959 for path, size in zip(paths, sizes): 960 st = t.stat(path) 961 if path.endswith('/'): 962 self.assertTrue(S_ISDIR(st.st_mode)) 963 # directory sizes are meaningless 964 else: 965 self.assertTrue(S_ISREG(st.st_mode)) 966 self.assertEqual(size, st.st_size) 967 968 self.assertRaises(NoSuchFile, t.stat, 'q') 969 self.assertRaises(NoSuchFile, t.stat, 'b/a') 970 971 self.build_tree(['subdir/', 'subdir/file'], transport=t) 972 subdir = t.clone('subdir') 973 st = subdir.stat('./file') 974 st = subdir.stat('.') 975 976 def test_hardlink(self): 977 from stat import ST_NLINK 978 979 t = self.get_transport() 980 981 source_name = "original_target" 982 link_name = "target_link" 983 984 self.build_tree([source_name], transport=t) 985 986 try: 987 t.hardlink(source_name, link_name) 988 989 self.assertTrue(t.has(source_name)) 990 self.assertTrue(t.has(link_name)) 991 992 st = t.stat(link_name) 993 self.assertEqual(st[ST_NLINK], 2) 994 except TransportNotPossible: 995 raise TestSkipped("Transport %s does not support hardlinks." % 996 self._server.__class__) 997 998 def test_symlink(self): 999 from stat import S_ISLNK 1000 1001 t = self.get_transport() 1002 1003 source_name = "original_target" 1004 link_name = "target_link" 1005 1006 self.build_tree([source_name], transport=t) 1007 1008 try: 1009 t.symlink(source_name, link_name) 1010 1011 self.assertTrue(t.has(source_name)) 1012 self.assertTrue(t.has(link_name)) 1013 1014 st = t.stat(link_name) 1015 self.assertTrue(S_ISLNK(st.st_mode), 1016 "expected symlink, got mode %o" % st.st_mode) 1017 except TransportNotPossible: 1018 raise TestSkipped("Transport %s does not support symlinks." % 1019 self._server.__class__) 1020 1021 self.assertEqual(source_name, t.readlink(link_name)) 1022 1023 def test_readlink_nonexistent(self): 1024 t = self.get_transport() 1025 try: 1026 self.assertRaises(NoSuchFile, t.readlink, 'nonexistent') 1027 except TransportNotPossible: 1028 raise TestSkipped("Transport %s does not support symlinks." % 1029 self._server.__class__) 1030 1031 def test_list_dir(self): 1032 # TODO: Test list_dir, just try once, and if it throws, stop testing 1033 t = self.get_transport() 1034 1035 if not t.listable(): 1036 self.assertRaises(TransportNotPossible, t.list_dir, '.') 1037 return 1038 1039 def sorted_list(d, transport): 1040 l = sorted(transport.list_dir(d)) 1041 return l 1042 1043 self.assertEqual([], sorted_list('.', t)) 1044 # c2 is precisely one letter longer than c here to test that 1045 # suffixing is not confused. 1046 # a%25b checks that quoting is done consistently across transports 1047 tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/'] 1048 1049 if not t.is_readonly(): 1050 self.build_tree(tree_names, transport=t) 1051 else: 1052 self.build_tree(tree_names) 1053 1054 self.assertEqual( 1055 ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t)) 1056 self.assertEqual( 1057 ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t)) 1058 self.assertEqual(['d', 'e'], sorted_list('c', t)) 1059 1060 # Cloning the transport produces an equivalent listing 1061 self.assertEqual(['d', 'e'], sorted_list('', t.clone('c'))) 1062 1063 if not t.is_readonly(): 1064 t.delete('c/d') 1065 t.delete('b') 1066 else: 1067 os.unlink('c/d') 1068 os.unlink('b') 1069 1070 self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t)) 1071 self.assertEqual(['e'], sorted_list('c', t)) 1072 1073 self.assertListRaises(PathError, t.list_dir, 'q') 1074 self.assertListRaises(PathError, t.list_dir, 'c/f') 1075 # 'a' is a file, list_dir should raise an error 1076 self.assertListRaises(PathError, t.list_dir, 'a') 1077 1078 def test_list_dir_result_is_url_escaped(self): 1079 t = self.get_transport() 1080 if not t.listable(): 1081 raise TestSkipped("transport not listable") 1082 1083 if not t.is_readonly(): 1084 self.build_tree(['a/', 'a/%'], transport=t) 1085 else: 1086 self.build_tree(['a/', 'a/%']) 1087 1088 names = list(t.list_dir('a')) 1089 self.assertEqual(['%25'], names) 1090 self.assertIsInstance(names[0], str) 1091 1092 def test_clone_preserve_info(self): 1093 t1 = self.get_transport() 1094 if not isinstance(t1, ConnectedTransport): 1095 raise TestSkipped("not a connected transport") 1096 1097 t2 = t1.clone('subdir') 1098 self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme) 1099 self.assertEqual(t1._parsed_url.user, t2._parsed_url.user) 1100 self.assertEqual(t1._parsed_url.password, t2._parsed_url.password) 1101 self.assertEqual(t1._parsed_url.host, t2._parsed_url.host) 1102 self.assertEqual(t1._parsed_url.port, t2._parsed_url.port) 1103 1104 def test__reuse_for(self): 1105 t = self.get_transport() 1106 if not isinstance(t, ConnectedTransport): 1107 raise TestSkipped("not a connected transport") 1108 1109 def new_url(scheme=None, user=None, password=None, 1110 host=None, port=None, path=None): 1111 """Build a new url from t.base changing only parts of it. 1112 1113 Only the parameters different from None will be changed. 1114 """ 1115 if scheme is None: 1116 scheme = t._parsed_url.scheme 1117 if user is None: 1118 user = t._parsed_url.user 1119 if password is None: 1120 password = t._parsed_url.password 1121 if user is None: 1122 user = t._parsed_url.user 1123 if host is None: 1124 host = t._parsed_url.host 1125 if port is None: 1126 port = t._parsed_url.port 1127 if path is None: 1128 path = t._parsed_url.path 1129 return str(urlutils.URL(scheme, user, password, host, port, path)) 1130 1131 if t._parsed_url.scheme == 'ftp': 1132 scheme = 'sftp' 1133 else: 1134 scheme = 'ftp' 1135 self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme))) 1136 if t._parsed_url.user == 'me': 1137 user = 'you' 1138 else: 1139 user = 'me' 1140 self.assertIsNot(t, t._reuse_for(new_url(user=user))) 1141 # passwords are not taken into account because: 1142 # - it makes no sense to have two different valid passwords for the 1143 # same user 1144 # - _password in ConnectedTransport is intended to collect what the 1145 # user specified from the command-line and there are cases where the 1146 # new url can contain no password (if the url was built from an 1147 # existing transport.base for example) 1148 # - password are considered part of the credentials provided at 1149 # connection creation time and as such may not be present in the url 1150 # (they may be typed by the user when prompted for example) 1151 self.assertIs(t, t._reuse_for(new_url(password='from space'))) 1152 # We will not connect, we can use a invalid host 1153 self.assertIsNot(t, t._reuse_for( 1154 new_url(host=t._parsed_url.host + 'bar'))) 1155 if t._parsed_url.port == 1234: 1156 port = 4321 1157 else: 1158 port = 1234 1159 self.assertIsNot(t, t._reuse_for(new_url(port=port))) 1160 # No point in trying to reuse a transport for a local URL 1161 self.assertIs(None, t._reuse_for('/valid_but_not_existing')) 1162 1163 def test_connection_sharing(self): 1164 t = self.get_transport() 1165 if not isinstance(t, ConnectedTransport): 1166 raise TestSkipped("not a connected transport") 1167 1168 c = t.clone('subdir') 1169 # Some transports will create the connection only when needed 1170 t.has('surely_not') # Force connection 1171 self.assertIs(t._get_connection(), c._get_connection()) 1172 1173 # Temporary failure, we need to create a new dummy connection 1174 new_connection = None 1175 t._set_connection(new_connection) 1176 # Check that both transports use the same connection 1177 self.assertIs(new_connection, t._get_connection()) 1178 self.assertIs(new_connection, c._get_connection()) 1179 1180 def test_reuse_connection_for_various_paths(self): 1181 t = self.get_transport() 1182 if not isinstance(t, ConnectedTransport): 1183 raise TestSkipped("not a connected transport") 1184 1185 t.has('surely_not') # Force connection 1186 self.assertIsNot(None, t._get_connection()) 1187 1188 subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path') 1189 self.assertIsNot(t, subdir) 1190 self.assertIs(t._get_connection(), subdir._get_connection()) 1191 1192 home = subdir._reuse_for(t.base + 'home') 1193 self.assertIs(t._get_connection(), home._get_connection()) 1194 self.assertIs(subdir._get_connection(), home._get_connection()) 1195 1196 def test_clone(self): 1197 # TODO: Test that clone moves up and down the filesystem 1198 t1 = self.get_transport() 1199 1200 self.build_tree(['a', 'b/', 'b/c'], transport=t1) 1201 1202 self.assertTrue(t1.has('a')) 1203 self.assertTrue(t1.has('b/c')) 1204 self.assertFalse(t1.has('c')) 1205 1206 t2 = t1.clone('b') 1207 self.assertEqual(t1.base + 'b/', t2.base) 1208 1209 self.assertTrue(t2.has('c')) 1210 self.assertFalse(t2.has('a')) 1211 1212 t3 = t2.clone('..') 1213 self.assertTrue(t3.has('a')) 1214 self.assertFalse(t3.has('c')) 1215 1216 self.assertFalse(t1.has('b/d')) 1217 self.assertFalse(t2.has('d')) 1218 self.assertFalse(t3.has('b/d')) 1219 1220 if t1.is_readonly(): 1221 self.build_tree_contents([('b/d', b'newfile\n')]) 1222 else: 1223 t2.put_bytes('d', b'newfile\n') 1224 1225 self.assertTrue(t1.has('b/d')) 1226 self.assertTrue(t2.has('d')) 1227 self.assertTrue(t3.has('b/d')) 1228 1229 def test_clone_to_root(self): 1230 orig_transport = self.get_transport() 1231 # Repeatedly go up to a parent directory until we're at the root 1232 # directory of this transport 1233 root_transport = orig_transport 1234 new_transport = root_transport.clone("..") 1235 # as we are walking up directories, the path must be 1236 # growing less, except at the top 1237 self.assertTrue(len(new_transport.base) < len(root_transport.base) or 1238 new_transport.base == root_transport.base) 1239 while new_transport.base != root_transport.base: 1240 root_transport = new_transport 1241 new_transport = root_transport.clone("..") 1242 # as we are walking up directories, the path must be 1243 # growing less, except at the top 1244 self.assertTrue(len(new_transport.base) < len(root_transport.base) or 1245 new_transport.base == root_transport.base) 1246 1247 # Cloning to "/" should take us to exactly the same location. 1248 self.assertEqual(root_transport.base, orig_transport.clone("/").base) 1249 # the abspath of "/" from the original transport should be the same 1250 # as the base at the root: 1251 self.assertEqual(orig_transport.abspath("/"), root_transport.base) 1252 1253 # At the root, the URL must still end with / as its a directory 1254 self.assertEqual(root_transport.base[-1], '/') 1255 1256 def test_clone_from_root(self): 1257 """At the root, cloning to a simple dir should just do string append.""" 1258 orig_transport = self.get_transport() 1259 root_transport = orig_transport.clone('/') 1260 self.assertEqual(root_transport.base + '.bzr/', 1261 root_transport.clone('.bzr').base) 1262 1263 def test_base_url(self): 1264 t = self.get_transport() 1265 self.assertEqual('/', t.base[-1]) 1266 1267 def test_relpath(self): 1268 t = self.get_transport() 1269 self.assertEqual('', t.relpath(t.base)) 1270 # base ends with / 1271 self.assertEqual('', t.relpath(t.base[:-1])) 1272 # subdirs which don't exist should still give relpaths. 1273 self.assertEqual('foo', t.relpath(t.base + 'foo')) 1274 # trailing slash should be the same. 1275 self.assertEqual('foo', t.relpath(t.base + 'foo/')) 1276 1277 def test_relpath_at_root(self): 1278 t = self.get_transport() 1279 # clone all the way to the top 1280 new_transport = t.clone('..') 1281 while new_transport.base != t.base: 1282 t = new_transport 1283 new_transport = t.clone('..') 1284 # we must be able to get a relpath below the root 1285 self.assertEqual('', t.relpath(t.base)) 1286 # and a deeper one should work too 1287 self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar')) 1288 1289 def test_abspath(self): 1290 # smoke test for abspath. Corner cases for backends like unix fs's 1291 # that have aliasing problems like symlinks should go in backend 1292 # specific test cases. 1293 transport = self.get_transport() 1294 1295 self.assertEqual(transport.base + 'relpath', 1296 transport.abspath('relpath')) 1297 1298 # This should work without raising an error. 1299 transport.abspath("/") 1300 1301 # the abspath of "/" and "/foo/.." should result in the same location 1302 self.assertEqual(transport.abspath("/"), transport.abspath("/foo/..")) 1303 1304 self.assertEqual(transport.clone("/").abspath('foo'), 1305 transport.abspath("/foo")) 1306 1307 # GZ 2011-01-26: Test in per_transport but not using self.get_transport? 1308 def test_win32_abspath(self): 1309 # Note: we tried to set sys.platform='win32' so we could test on 1310 # other platforms too, but then osutils does platform specific 1311 # things at import time which defeated us... 1312 if sys.platform != 'win32': 1313 raise TestSkipped( 1314 'Testing drive letters in abspath implemented only for win32') 1315 1316 # smoke test for abspath on win32. 1317 # a transport based on 'file:///' never fully qualifies the drive. 1318 transport = _mod_transport.get_transport_from_url("file:///") 1319 self.assertEqual(transport.abspath("/"), "file:///") 1320 1321 # but a transport that starts with a drive spec must keep it. 1322 transport = _mod_transport.get_transport_from_url("file:///C:/") 1323 self.assertEqual(transport.abspath("/"), "file:///C:/") 1324 1325 def test_local_abspath(self): 1326 transport = self.get_transport() 1327 try: 1328 p = transport.local_abspath('.') 1329 except (errors.NotLocalUrl, TransportNotPossible) as e: 1330 # should be formattable 1331 s = str(e) 1332 else: 1333 self.assertEqual(getcwd(), p) 1334 1335 def test_abspath_at_root(self): 1336 t = self.get_transport() 1337 # clone all the way to the top 1338 new_transport = t.clone('..') 1339 while new_transport.base != t.base: 1340 t = new_transport 1341 new_transport = t.clone('..') 1342 # we must be able to get a abspath of the root when we ask for 1343 # t.abspath('..') - this due to our choice that clone('..') 1344 # should return the root from the root, combined with the desire that 1345 # the url from clone('..') and from abspath('..') should be the same. 1346 self.assertEqual(t.base, t.abspath('..')) 1347 # '' should give us the root 1348 self.assertEqual(t.base, t.abspath('')) 1349 # and a path should append to the url 1350 self.assertEqual(t.base + 'foo', t.abspath('foo')) 1351 1352 def test_iter_files_recursive(self): 1353 transport = self.get_transport() 1354 if not transport.listable(): 1355 self.assertRaises(TransportNotPossible, 1356 transport.iter_files_recursive) 1357 return 1358 self.build_tree(['isolated/', 1359 'isolated/dir/', 1360 'isolated/dir/foo', 1361 'isolated/dir/bar', 1362 'isolated/dir/b%25z', # make sure quoting is correct 1363 'isolated/bar'], 1364 transport=transport) 1365 paths = set(transport.iter_files_recursive()) 1366 # nb the directories are not converted 1367 self.assertEqual(paths, 1368 {'isolated/dir/foo', 1369 'isolated/dir/bar', 1370 'isolated/dir/b%2525z', 1371 'isolated/bar'}) 1372 sub_transport = transport.clone('isolated') 1373 paths = set(sub_transport.iter_files_recursive()) 1374 self.assertEqual(paths, 1375 {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'}) 1376 1377 def test_copy_tree(self): 1378 # TODO: test file contents and permissions are preserved. This test was 1379 # added just to ensure that quoting was handled correctly. 1380 # -- David Allouche 2006-08-11 1381 transport = self.get_transport() 1382 if not transport.listable(): 1383 self.assertRaises(TransportNotPossible, 1384 transport.iter_files_recursive) 1385 return 1386 if transport.is_readonly(): 1387 return 1388 self.build_tree(['from/', 1389 'from/dir/', 1390 'from/dir/foo', 1391 'from/dir/bar', 1392 'from/dir/b%25z', # make sure quoting is correct 1393 'from/bar'], 1394 transport=transport) 1395 transport.copy_tree('from', 'to') 1396 paths = set(transport.iter_files_recursive()) 1397 self.assertEqual(paths, 1398 {'from/dir/foo', 1399 'from/dir/bar', 1400 'from/dir/b%2525z', 1401 'from/bar', 1402 'to/dir/foo', 1403 'to/dir/bar', 1404 'to/dir/b%2525z', 1405 'to/bar', }) 1406 1407 def test_copy_tree_to_transport(self): 1408 transport = self.get_transport() 1409 if not transport.listable(): 1410 self.assertRaises(TransportNotPossible, 1411 transport.iter_files_recursive) 1412 return 1413 if transport.is_readonly(): 1414 return 1415 self.build_tree(['from/', 1416 'from/dir/', 1417 'from/dir/foo', 1418 'from/dir/bar', 1419 'from/dir/b%25z', # make sure quoting is correct 1420 'from/bar'], 1421 transport=transport) 1422 from_transport = transport.clone('from') 1423 to_transport = transport.clone('to') 1424 to_transport.ensure_base() 1425 from_transport.copy_tree_to_transport(to_transport) 1426 paths = set(transport.iter_files_recursive()) 1427 self.assertEqual(paths, 1428 {'from/dir/foo', 1429 'from/dir/bar', 1430 'from/dir/b%2525z', 1431 'from/bar', 1432 'to/dir/foo', 1433 'to/dir/bar', 1434 'to/dir/b%2525z', 1435 'to/bar', }) 1436 1437 def test_unicode_paths(self): 1438 """Test that we can read/write files with Unicode names.""" 1439 t = self.get_transport() 1440 1441 # With FAT32 and certain encodings on win32 1442 # '\xe5' and '\xe4' actually map to the same file 1443 # adding a suffix kicks in the 'preserving but insensitive' 1444 # route, and maintains the right files 1445 files = [u'\xe5.1', # a w/ circle iso-8859-1 1446 u'\xe4.2', # a w/ dots iso-8859-1 1447 u'\u017d', # Z with umlat iso-8859-2 1448 u'\u062c', # Arabic j 1449 u'\u0410', # Russian A 1450 u'\u65e5', # Kanji person 1451 ] 1452 1453 no_unicode_support = getattr(self._server, 'no_unicode_support', False) 1454 if no_unicode_support: 1455 self.knownFailure("test server cannot handle unicode paths") 1456 1457 try: 1458 self.build_tree(files, transport=t, line_endings='binary') 1459 except UnicodeError: 1460 raise TestSkipped( 1461 "cannot handle unicode paths in current encoding") 1462 1463 # A plain unicode string is not a valid url 1464 for fname in files: 1465 self.assertRaises(urlutils.InvalidURL, t.get, fname) 1466 1467 for fname in files: 1468 fname_utf8 = fname.encode('utf-8') 1469 contents = b'contents of %s\n' % (fname_utf8,) 1470 self.check_transport_contents(contents, t, urlutils.escape(fname)) 1471 1472 def test_connect_twice_is_same_content(self): 1473 # check that our server (whatever it is) is accessible reliably 1474 # via get_transport and multiple connections share content. 1475 transport = self.get_transport() 1476 if transport.is_readonly(): 1477 return 1478 transport.put_bytes('foo', b'bar') 1479 transport3 = self.get_transport() 1480 self.check_transport_contents(b'bar', transport3, 'foo') 1481 1482 # now opening at a relative url should give use a sane result: 1483 transport.mkdir('newdir') 1484 transport5 = self.get_transport('newdir') 1485 transport6 = transport5.clone('..') 1486 self.check_transport_contents(b'bar', transport6, 'foo') 1487 1488 def test_lock_write(self): 1489 """Test transport-level write locks. 1490 1491 These are deprecated and transports may decline to support them. 1492 """ 1493 transport = self.get_transport() 1494 if transport.is_readonly(): 1495 self.assertRaises(TransportNotPossible, 1496 transport.lock_write, 'foo') 1497 return 1498 transport.put_bytes('lock', b'') 1499 try: 1500 lock = transport.lock_write('lock') 1501 except TransportNotPossible: 1502 return 1503 # TODO make this consistent on all platforms: 1504 # self.assertRaises(LockError, transport.lock_write, 'lock') 1505 lock.unlock() 1506 1507 def test_lock_read(self): 1508 """Test transport-level read locks. 1509 1510 These are deprecated and transports may decline to support them. 1511 """ 1512 transport = self.get_transport() 1513 if transport.is_readonly(): 1514 open('lock', 'w').close() 1515 else: 1516 transport.put_bytes('lock', b'') 1517 try: 1518 lock = transport.lock_read('lock') 1519 except TransportNotPossible: 1520 return 1521 # TODO make this consistent on all platforms: 1522 # self.assertRaises(LockError, transport.lock_read, 'lock') 1523 lock.unlock() 1524 1525 def test_readv(self): 1526 transport = self.get_transport() 1527 if transport.is_readonly(): 1528 with open('a', 'w') as f: 1529 f.write('0123456789') 1530 else: 1531 transport.put_bytes('a', b'0123456789') 1532 1533 d = list(transport.readv('a', ((0, 1),))) 1534 self.assertEqual(d[0], (0, b'0')) 1535 1536 d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1)))) 1537 self.assertEqual(d[0], (0, b'0')) 1538 self.assertEqual(d[1], (1, b'1')) 1539 self.assertEqual(d[2], (3, b'34')) 1540 self.assertEqual(d[3], (9, b'9')) 1541 1542 def test_readv_out_of_order(self): 1543 transport = self.get_transport() 1544 if transport.is_readonly(): 1545 with open('a', 'w') as f: 1546 f.write('0123456789') 1547 else: 1548 transport.put_bytes('a', b'01234567890') 1549 1550 d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2)))) 1551 self.assertEqual(d[0], (1, b'1')) 1552 self.assertEqual(d[1], (9, b'9')) 1553 self.assertEqual(d[2], (0, b'0')) 1554 self.assertEqual(d[3], (3, b'34')) 1555 1556 def test_readv_with_adjust_for_latency(self): 1557 transport = self.get_transport() 1558 # the adjust for latency flag expands the data region returned 1559 # according to a per-transport heuristic, so testing is a little 1560 # tricky as we need more data than the largest combining that our 1561 # transports do. To accomodate this we generate random data and cross 1562 # reference the returned data with the random data. To avoid doing 1563 # multiple large random byte look ups we do several tests on the same 1564 # backing data. 1565 content = osutils.rand_bytes(200 * 1024) 1566 content_size = len(content) 1567 if transport.is_readonly(): 1568 self.build_tree_contents([('a', content)]) 1569 else: 1570 transport.put_bytes('a', content) 1571 1572 def check_result_data(result_vector): 1573 for item in result_vector: 1574 data_len = len(item[1]) 1575 self.assertEqual(content[item[0]:item[0] + data_len], item[1]) 1576 1577 # start corner case 1578 result = list(transport.readv('a', ((0, 30),), 1579 adjust_for_latency=True, upper_limit=content_size)) 1580 # we expect 1 result, from 0, to something > 30 1581 self.assertEqual(1, len(result)) 1582 self.assertEqual(0, result[0][0]) 1583 self.assertTrue(len(result[0][1]) >= 30) 1584 check_result_data(result) 1585 # end of file corner case 1586 result = list(transport.readv('a', ((204700, 100),), 1587 adjust_for_latency=True, upper_limit=content_size)) 1588 # we expect 1 result, from 204800- its length, to the end 1589 self.assertEqual(1, len(result)) 1590 data_len = len(result[0][1]) 1591 self.assertEqual(204800 - data_len, result[0][0]) 1592 self.assertTrue(data_len >= 100) 1593 check_result_data(result) 1594 # out of order ranges are made in order 1595 result = list(transport.readv('a', ((204700, 100), (0, 50)), 1596 adjust_for_latency=True, upper_limit=content_size)) 1597 # we expect 2 results, in order, start and end. 1598 self.assertEqual(2, len(result)) 1599 # start 1600 data_len = len(result[0][1]) 1601 self.assertEqual(0, result[0][0]) 1602 self.assertTrue(data_len >= 30) 1603 # end 1604 data_len = len(result[1][1]) 1605 self.assertEqual(204800 - data_len, result[1][0]) 1606 self.assertTrue(data_len >= 100) 1607 check_result_data(result) 1608 # close ranges get combined (even if out of order) 1609 for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]: 1610 result = list(transport.readv('a', request_vector, 1611 adjust_for_latency=True, upper_limit=content_size)) 1612 self.assertEqual(1, len(result)) 1613 data_len = len(result[0][1]) 1614 # minimum length is from 400 to 1034 - 634 1615 self.assertTrue(data_len >= 634) 1616 # must contain the region 400 to 1034 1617 self.assertTrue(result[0][0] <= 400) 1618 self.assertTrue(result[0][0] + data_len >= 1034) 1619 check_result_data(result) 1620 1621 def test_readv_with_adjust_for_latency_with_big_file(self): 1622 transport = self.get_transport() 1623 # test from observed failure case. 1624 if transport.is_readonly(): 1625 with open('a', 'w') as f: 1626 f.write('a' * 1024 * 1024) 1627 else: 1628 transport.put_bytes('a', b'a' * 1024 * 1024) 1629 broken_vector = [(465219, 800), (225221, 800), (445548, 800), 1630 (225037, 800), (221357, 800), (437077, 800), (947670, 800), 1631 (465373, 800), (947422, 800)] 1632 results = list(transport.readv('a', broken_vector, True, 1024 * 1024)) 1633 found_items = [False] * 9 1634 for pos, (start, length) in enumerate(broken_vector): 1635 # check the range is covered by the result 1636 for offset, data in results: 1637 if offset <= start and start + length <= offset + len(data): 1638 found_items[pos] = True 1639 self.assertEqual([True] * 9, found_items) 1640 1641 def test_get_with_open_write_stream_sees_all_content(self): 1642 t = self.get_transport() 1643 if t.is_readonly(): 1644 return 1645 with t.open_write_stream('foo') as handle: 1646 handle.write(b'bcd') 1647 self.assertEqual([(0, b'b'), (2, b'd')], list( 1648 t.readv('foo', ((0, 1), (2, 1))))) 1649 1650 def test_get_smart_medium(self): 1651 """All transports must either give a smart medium, or know they can't. 1652 """ 1653 transport = self.get_transport() 1654 try: 1655 client_medium = transport.get_smart_medium() 1656 except errors.NoSmartMedium: 1657 # as long as we got it we're fine 1658 pass 1659 else: 1660 from ..bzr.smart import medium 1661 self.assertIsInstance(client_medium, medium.SmartClientMedium) 1662 1663 def test_readv_short_read(self): 1664 transport = self.get_transport() 1665 if transport.is_readonly(): 1666 with open('a', 'w') as f: 1667 f.write('0123456789') 1668 else: 1669 transport.put_bytes('a', b'01234567890') 1670 1671 # This is intentionally reading off the end of the file 1672 # since we are sure that it cannot get there 1673 self.assertListRaises((errors.ShortReadvError, errors.InvalidRange, 1674 # Can be raised by paramiko 1675 AssertionError), 1676 transport.readv, 'a', [(1, 1), (8, 10)]) 1677 1678 # This is trying to seek past the end of the file, it should 1679 # also raise a special error 1680 self.assertListRaises((errors.ShortReadvError, errors.InvalidRange), 1681 transport.readv, 'a', [(12, 2)]) 1682 1683 def test_no_segment_parameters(self): 1684 """Segment parameters should be stripped and stored in 1685 transport.segment_parameters.""" 1686 transport = self.get_transport("foo") 1687 self.assertEqual({}, transport.get_segment_parameters()) 1688 1689 def test_segment_parameters(self): 1690 """Segment parameters should be stripped and stored in 1691 transport.get_segment_parameters().""" 1692 base_url = self._server.get_url() 1693 parameters = {"key1": "val1", "key2": "val2"} 1694 url = urlutils.join_segment_parameters(base_url, parameters) 1695 transport = _mod_transport.get_transport_from_url(url) 1696 self.assertEqual(parameters, transport.get_segment_parameters()) 1697 1698 def test_set_segment_parameters(self): 1699 """Segment parameters can be set and show up in base.""" 1700 transport = self.get_transport("foo") 1701 orig_base = transport.base 1702 transport.set_segment_parameter("arm", "board") 1703 self.assertEqual("%s,arm=board" % orig_base, transport.base) 1704 self.assertEqual({"arm": "board"}, transport.get_segment_parameters()) 1705 transport.set_segment_parameter("arm", None) 1706 transport.set_segment_parameter("nonexistant", None) 1707 self.assertEqual({}, transport.get_segment_parameters()) 1708 self.assertEqual(orig_base, transport.base) 1709 1710 def test_stat_symlink(self): 1711 # if a transport points directly to a symlink (and supports symlinks 1712 # at all) you can tell this. helps with bug 32669. 1713 t = self.get_transport() 1714 try: 1715 t.symlink('target', 'link') 1716 except TransportNotPossible: 1717 raise TestSkipped("symlinks not supported") 1718 t2 = t.clone('link') 1719 st = t2.stat('') 1720 self.assertTrue(stat.S_ISLNK(st.st_mode)) 1721 1722 def test_abspath_url_unquote_unreserved(self): 1723 """URLs from abspath should have unreserved characters unquoted 1724 1725 Need consistent quoting notably for tildes, see lp:842223 for more. 1726 """ 1727 t = self.get_transport() 1728 needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/" 1729 self.assertEqual(t.base + "-.09AZ_az~", 1730 t.abspath(needlessly_escaped_dir)) 1731 1732 def test_clone_url_unquote_unreserved(self): 1733 """Base URL of a cloned branch needs unreserved characters unquoted 1734 1735 Cloned transports should be prefix comparable for things like the 1736 isolation checking of tests, see lp:842223 for more. 1737 """ 1738 t1 = self.get_transport() 1739 needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/" 1740 self.build_tree([needlessly_escaped_dir], transport=t1) 1741 t2 = t1.clone(needlessly_escaped_dir) 1742 self.assertEqual(t1.base + "-.09AZ_az~/", t2.base) 1743 1744 def test_hook_post_connection_one(self): 1745 """Fire post_connect hook after a ConnectedTransport is first used""" 1746 log = [] 1747 Transport.hooks.install_named_hook("post_connect", log.append, None) 1748 t = self.get_transport() 1749 self.assertEqual([], log) 1750 t.has("non-existant") 1751 if isinstance(t, RemoteTransport): 1752 self.assertEqual([t.get_smart_medium()], log) 1753 elif isinstance(t, ConnectedTransport): 1754 self.assertEqual([t], log) 1755 else: 1756 self.assertEqual([], log) 1757 1758 def test_hook_post_connection_multi(self): 1759 """Fire post_connect hook once per unshared underlying connection""" 1760 log = [] 1761 Transport.hooks.install_named_hook("post_connect", log.append, None) 1762 t1 = self.get_transport() 1763 t2 = t1.clone(".") 1764 t3 = self.get_transport() 1765 self.assertEqual([], log) 1766 t1.has("x") 1767 t2.has("x") 1768 t3.has("x") 1769 if isinstance(t1, RemoteTransport): 1770 self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log) 1771 elif isinstance(t1, ConnectedTransport): 1772 self.assertEqual([t1, t3], log) 1773 else: 1774 self.assertEqual([], log) 1775