1# Copyright (C) 2006-2012, 2016 Canonical Ltd 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 16 17"""RemoteTransport client for the smart-server. 18 19This module shouldn't be accessed directly. The classes defined here should be 20imported from breezy.bzr.smart. 21""" 22 23__all__ = ['RemoteTransport', 'RemoteTCPTransport', 'RemoteSSHTransport'] 24 25from io import BytesIO 26 27from .. import ( 28 config, 29 debug, 30 errors, 31 trace, 32 transport, 33 urlutils, 34 ) 35from ..bzr import ( 36 remote, 37 ) 38from ..bzr.smart import client, medium 39 40 41class _SmartStat(object): 42 43 def __init__(self, size, mode): 44 self.st_size = size 45 self.st_mode = mode 46 47 48class RemoteTransport(transport.ConnectedTransport): 49 """Connection to a smart server. 50 51 The connection holds references to the medium that can be used to send 52 requests to the server. 53 54 The connection has a notion of the current directory to which it's 55 connected; this is incorporated in filenames passed to the server. 56 57 This supports some higher-level RPC operations and can also be treated 58 like a Transport to do file-like operations. 59 60 The connection can be made over a tcp socket, an ssh pipe or a series of 61 http requests. There are concrete subclasses for each type: 62 RemoteTCPTransport, etc. 63 """ 64 65 # When making a readv request, cap it at requesting 5MB of data 66 _max_readv_bytes = 5 * 1024 * 1024 67 68 # IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding 69 # responsibilities: Put those on SmartClient or similar. This is vital for 70 # the ability to support multiple versions of the smart protocol over time: 71 # RemoteTransport is an adapter from the Transport object model to the 72 # SmartClient model, not an encoder. 73 74 # FIXME: the medium parameter should be private, only the tests requires 75 # it. It may be even clearer to define a TestRemoteTransport that handles 76 # the specific cases of providing a _client and/or a _medium, and leave 77 # RemoteTransport as an abstract class. 78 def __init__(self, url, _from_transport=None, medium=None, _client=None): 79 """Constructor. 80 81 :param _from_transport: Another RemoteTransport instance that this 82 one is being cloned from. Attributes such as the medium will 83 be reused. 84 85 :param medium: The medium to use for this RemoteTransport. If None, 86 the medium from the _from_transport is shared. If both this 87 and _from_transport are None, a new medium will be built. 88 _from_transport and medium cannot both be specified. 89 90 :param _client: Override the _SmartClient used by this transport. This 91 should only be used for testing purposes; normally this is 92 determined from the medium. 93 """ 94 super(RemoteTransport, self).__init__( 95 url, _from_transport=_from_transport) 96 97 # The medium is the connection, except when we need to share it with 98 # other objects (RemoteBzrDir, RemoteRepository etc). In these cases 99 # what we want to share is really the shared connection. 100 101 if (_from_transport is not None 102 and isinstance(_from_transport, RemoteTransport)): 103 _client = _from_transport._client 104 elif _from_transport is None: 105 # If no _from_transport is specified, we need to intialize the 106 # shared medium. 107 credentials = None 108 if medium is None: 109 medium, credentials = self._build_medium() 110 if 'hpss' in debug.debug_flags: 111 trace.mutter('hpss: Built a new medium: %s', 112 medium.__class__.__name__) 113 self._shared_connection = transport._SharedConnection(medium, 114 credentials, 115 self.base) 116 elif medium is None: 117 # No medium was specified, so share the medium from the 118 # _from_transport. 119 medium = self._shared_connection.connection 120 else: 121 raise AssertionError( 122 "Both _from_transport (%r) and medium (%r) passed to " 123 "RemoteTransport.__init__, but these parameters are mutally " 124 "exclusive." % (_from_transport, medium)) 125 126 if _client is None: 127 self._client = client._SmartClient(medium) 128 else: 129 self._client = _client 130 131 def _build_medium(self): 132 """Create the medium if _from_transport does not provide one. 133 134 The medium is analogous to the connection for ConnectedTransport: it 135 allows connection sharing. 136 """ 137 # No credentials 138 return None, None 139 140 def _report_activity(self, bytes, direction): 141 """See Transport._report_activity. 142 143 Does nothing; the smart medium will report activity triggered by a 144 RemoteTransport. 145 """ 146 pass 147 148 def is_readonly(self): 149 """Smart server transport can do read/write file operations.""" 150 try: 151 resp = self._call2(b'Transport.is_readonly') 152 except errors.UnknownSmartMethod: 153 # XXX: nasty hack: servers before 0.16 don't have a 154 # 'Transport.is_readonly' verb, so we do what clients before 0.16 155 # did: assume False. 156 return False 157 if resp == (b'yes', ): 158 return True 159 elif resp == (b'no', ): 160 return False 161 else: 162 raise errors.UnexpectedSmartServerResponse(resp) 163 164 def get_smart_client(self): 165 return self._get_connection() 166 167 def get_smart_medium(self): 168 return self._get_connection() 169 170 def _remote_path(self, relpath): 171 """Returns the Unicode version of the absolute path for relpath.""" 172 path = urlutils.URL._combine_paths(self._parsed_url.path, relpath) 173 if not isinstance(path, bytes): 174 path = path.encode() 175 return path 176 177 def _call(self, method, *args): 178 resp = self._call2(method, *args) 179 self._ensure_ok(resp) 180 181 def _call2(self, method, *args): 182 """Call a method on the remote server.""" 183 try: 184 return self._client.call(method, *args) 185 except errors.ErrorFromSmartServer as err: 186 # The first argument, if present, is always a path. 187 if args: 188 context = {'relpath': args[0].decode('utf-8')} 189 else: 190 context = {} 191 self._translate_error(err, **context) 192 193 def _call_with_body_bytes(self, method, args, body): 194 """Call a method on the remote server with body bytes.""" 195 try: 196 return self._client.call_with_body_bytes(method, args, body) 197 except errors.ErrorFromSmartServer as err: 198 # The first argument, if present, is always a path. 199 if args: 200 context = {'relpath': args[0]} 201 else: 202 context = {} 203 self._translate_error(err, **context) 204 205 def has(self, relpath): 206 """Indicate whether a remote file of the given name exists or not. 207 208 :see: Transport.has() 209 """ 210 resp = self._call2(b'has', self._remote_path(relpath)) 211 if resp == (b'yes', ): 212 return True 213 elif resp == (b'no', ): 214 return False 215 else: 216 raise errors.UnexpectedSmartServerResponse(resp) 217 218 def get(self, relpath): 219 """Return file-like object reading the contents of a remote file. 220 221 :see: Transport.get_bytes()/get_file() 222 """ 223 return BytesIO(self.get_bytes(relpath)) 224 225 def get_bytes(self, relpath): 226 remote = self._remote_path(relpath) 227 try: 228 resp, response_handler = self._client.call_expecting_body( 229 b'get', remote) 230 except errors.ErrorFromSmartServer as err: 231 self._translate_error(err, relpath) 232 if resp != (b'ok', ): 233 response_handler.cancel_read_body() 234 raise errors.UnexpectedSmartServerResponse(resp) 235 return response_handler.read_body_bytes() 236 237 def _serialise_optional_mode(self, mode): 238 if mode is None: 239 return b'' 240 else: 241 return ('%d' % mode).encode('ascii') 242 243 def mkdir(self, relpath, mode=None): 244 resp = self._call2(b'mkdir', self._remote_path(relpath), 245 self._serialise_optional_mode(mode)) 246 247 def open_write_stream(self, relpath, mode=None): 248 """See Transport.open_write_stream.""" 249 self.put_bytes(relpath, b"", mode) 250 result = transport.AppendBasedFileStream(self, relpath) 251 transport._file_streams[self.abspath(relpath)] = result 252 return result 253 254 def put_bytes(self, relpath, raw_bytes, mode=None): 255 if not isinstance(raw_bytes, bytes): 256 raise TypeError( 257 'raw_bytes must be bytes string, not %s' % type(raw_bytes)) 258 resp = self._call_with_body_bytes( 259 b'put', 260 (self._remote_path(relpath), self._serialise_optional_mode(mode)), 261 raw_bytes) 262 self._ensure_ok(resp) 263 return len(raw_bytes) 264 265 def put_bytes_non_atomic(self, relpath, raw_bytes, mode=None, 266 create_parent_dir=False, 267 dir_mode=None): 268 """See Transport.put_bytes_non_atomic.""" 269 # FIXME: no encoding in the transport! 270 create_parent_str = b'F' 271 if create_parent_dir: 272 create_parent_str = b'T' 273 274 resp = self._call_with_body_bytes( 275 b'put_non_atomic', 276 (self._remote_path(relpath), self._serialise_optional_mode(mode), 277 create_parent_str, self._serialise_optional_mode(dir_mode)), 278 raw_bytes) 279 self._ensure_ok(resp) 280 281 def put_file(self, relpath, upload_file, mode=None): 282 # its not ideal to seek back, but currently put_non_atomic_file depends 283 # on transports not reading before failing - which is a faulty 284 # assumption I think - RBC 20060915 285 pos = upload_file.tell() 286 try: 287 return self.put_bytes(relpath, upload_file.read(), mode) 288 except: 289 upload_file.seek(pos) 290 raise 291 292 def put_file_non_atomic(self, relpath, f, mode=None, 293 create_parent_dir=False, 294 dir_mode=None): 295 return self.put_bytes_non_atomic(relpath, f.read(), mode=mode, 296 create_parent_dir=create_parent_dir, 297 dir_mode=dir_mode) 298 299 def append_file(self, relpath, from_file, mode=None): 300 return self.append_bytes(relpath, from_file.read(), mode) 301 302 def append_bytes(self, relpath, bytes, mode=None): 303 resp = self._call_with_body_bytes( 304 b'append', 305 (self._remote_path(relpath), self._serialise_optional_mode(mode)), 306 bytes) 307 if resp[0] == b'appended': 308 return int(resp[1]) 309 raise errors.UnexpectedSmartServerResponse(resp) 310 311 def delete(self, relpath): 312 resp = self._call2(b'delete', self._remote_path(relpath)) 313 self._ensure_ok(resp) 314 315 def external_url(self): 316 """See breezy.transport.Transport.external_url.""" 317 # the external path for RemoteTransports is the base 318 return self.base 319 320 def recommended_page_size(self): 321 """Return the recommended page size for this transport.""" 322 return 64 * 1024 323 324 def _readv(self, relpath, offsets): 325 if not offsets: 326 return 327 328 offsets = list(offsets) 329 330 sorted_offsets = sorted(offsets) 331 coalesced = list(self._coalesce_offsets(sorted_offsets, 332 limit=self._max_readv_combine, 333 fudge_factor=self._bytes_to_read_before_seek, 334 max_size=self._max_readv_bytes)) 335 336 # now that we've coallesced things, avoid making enormous requests 337 requests = [] 338 cur_request = [] 339 cur_len = 0 340 for c in coalesced: 341 if c.length + cur_len > self._max_readv_bytes: 342 requests.append(cur_request) 343 cur_request = [c] 344 cur_len = c.length 345 continue 346 cur_request.append(c) 347 cur_len += c.length 348 if cur_request: 349 requests.append(cur_request) 350 if 'hpss' in debug.debug_flags: 351 trace.mutter('%s.readv %s offsets => %s coalesced' 352 ' => %s requests (%s)', 353 self.__class__.__name__, len(offsets), len(coalesced), 354 len(requests), sum(map(len, requests))) 355 # Cache the results, but only until they have been fulfilled 356 data_map = {} 357 # turn the list of offsets into a single stack to iterate 358 offset_stack = iter(offsets) 359 # using a list so it can be modified when passing down and coming back 360 next_offset = [next(offset_stack)] 361 for cur_request in requests: 362 try: 363 result = self._client.call_with_body_readv_array( 364 (b'readv', self._remote_path(relpath),), 365 [(c.start, c.length) for c in cur_request]) 366 resp, response_handler = result 367 except errors.ErrorFromSmartServer as err: 368 self._translate_error(err, relpath) 369 370 if resp[0] != b'readv': 371 # This should raise an exception 372 response_handler.cancel_read_body() 373 raise errors.UnexpectedSmartServerResponse(resp) 374 375 for res in self._handle_response(offset_stack, cur_request, 376 response_handler, 377 data_map, 378 next_offset): 379 yield res 380 381 def _handle_response(self, offset_stack, coalesced, response_handler, 382 data_map, next_offset): 383 cur_offset_and_size = next_offset[0] 384 # FIXME: this should know how many bytes are needed, for clarity. 385 data = response_handler.read_body_bytes() 386 data_offset = 0 387 for c_offset in coalesced: 388 if len(data) < c_offset.length: 389 raise errors.ShortReadvError(relpath, c_offset.start, 390 c_offset.length, actual=len(data)) 391 for suboffset, subsize in c_offset.ranges: 392 key = (c_offset.start + suboffset, subsize) 393 this_data = data[data_offset + suboffset: 394 data_offset + suboffset + subsize] 395 # Special case when the data is in-order, rather than packing 396 # into a map and then back out again. Benchmarking shows that 397 # this has 100% hit rate, but leave in the data_map work just 398 # in case. 399 # TODO: Could we get away with using buffer() to avoid the 400 # memory copy? Callers would need to realize they may 401 # not have a real string. 402 if key == cur_offset_and_size: 403 yield cur_offset_and_size[0], this_data 404 try: 405 cur_offset_and_size = next_offset[0] = next( 406 offset_stack) 407 except StopIteration: 408 return 409 else: 410 data_map[key] = this_data 411 data_offset += c_offset.length 412 413 # Now that we've read some data, see if we can yield anything back 414 while cur_offset_and_size in data_map: 415 this_data = data_map.pop(cur_offset_and_size) 416 yield cur_offset_and_size[0], this_data 417 try: 418 cur_offset_and_size = next_offset[0] = next(offset_stack) 419 except StopIteration: 420 return 421 422 def rename(self, rel_from, rel_to): 423 self._call(b'rename', 424 self._remote_path(rel_from), 425 self._remote_path(rel_to)) 426 427 def move(self, rel_from, rel_to): 428 self._call(b'move', 429 self._remote_path(rel_from), 430 self._remote_path(rel_to)) 431 432 def rmdir(self, relpath): 433 resp = self._call(b'rmdir', self._remote_path(relpath)) 434 435 def _ensure_ok(self, resp): 436 if resp[0] != b'ok': 437 raise errors.UnexpectedSmartServerResponse(resp) 438 439 def _translate_error(self, err, relpath=None): 440 remote._translate_error(err, path=relpath) 441 442 def disconnect(self): 443 m = self.get_smart_medium() 444 if m is not None: 445 m.disconnect() 446 447 def stat(self, relpath): 448 resp = self._call2(b'stat', self._remote_path(relpath)) 449 if resp[0] == b'stat': 450 return _SmartStat(int(resp[1]), int(resp[2], 8)) 451 raise errors.UnexpectedSmartServerResponse(resp) 452 453 # def lock_read(self, relpath): 454 # """Lock the given file for shared (read) access. 455 # :return: A lock object, which should be passed to Transport.unlock() 456 # """ 457 # The old RemoteBranch ignore lock for reading, so we will 458 # continue that tradition and return a bogus lock object. 459 # class BogusLock(object): 460 # def __init__(self, path): 461 ## self.path = path 462 # def unlock(self): 463 # pass 464 # return BogusLock(relpath) 465 466 def listable(self): 467 return True 468 469 def list_dir(self, relpath): 470 resp = self._call2(b'list_dir', self._remote_path(relpath)) 471 if resp[0] == b'names': 472 return [name.decode('utf-8') for name in resp[1:]] 473 raise errors.UnexpectedSmartServerResponse(resp) 474 475 def iter_files_recursive(self): 476 resp = self._call2(b'iter_files_recursive', self._remote_path('')) 477 if resp[0] == b'names': 478 return [name.decode('utf-8') for name in resp[1:]] 479 raise errors.UnexpectedSmartServerResponse(resp) 480 481 482class RemoteTCPTransport(RemoteTransport): 483 """Connection to smart server over plain tcp. 484 485 This is essentially just a factory to get 'RemoteTransport(url, 486 SmartTCPClientMedium). 487 """ 488 489 def _build_medium(self): 490 client_medium = medium.SmartTCPClientMedium( 491 self._parsed_url.host, self._parsed_url.port, self.base) 492 return client_medium, None 493 494 495class RemoteTCPTransportV2Only(RemoteTransport): 496 """Connection to smart server over plain tcp with the client hard-coded to 497 assume protocol v2 and remote server version <= 1.6. 498 499 This should only be used for testing. 500 """ 501 502 def _build_medium(self): 503 client_medium = medium.SmartTCPClientMedium( 504 self._parsed_url.host, self._parsed_url.port, self.base) 505 client_medium._protocol_version = 2 506 client_medium._remember_remote_is_before((1, 6)) 507 return client_medium, None 508 509 510class RemoteSSHTransport(RemoteTransport): 511 """Connection to smart server over SSH. 512 513 This is essentially just a factory to get 'RemoteTransport(url, 514 SmartSSHClientMedium). 515 """ 516 517 def _build_medium(self): 518 location_config = config.LocationConfig(self.base) 519 bzr_remote_path = location_config.get_bzr_remote_path() 520 user = self._parsed_url.user 521 if user is None: 522 auth = config.AuthenticationConfig() 523 user = auth.get_user('ssh', self._parsed_url.host, 524 self._parsed_url.port) 525 ssh_params = medium.SSHParams(self._parsed_url.host, 526 self._parsed_url.port, user, self._parsed_url.password, 527 bzr_remote_path) 528 client_medium = medium.SmartSSHClientMedium(self.base, ssh_params) 529 return client_medium, (user, self._parsed_url.password) 530 531 532class RemoteHTTPTransport(RemoteTransport): 533 """Just a way to connect between a bzr+http:// url and http://. 534 535 This connection operates slightly differently than the RemoteSSHTransport. 536 It uses a plain http:// transport underneath, which defines what remote 537 .bzr/smart URL we are connected to. From there, all paths that are sent are 538 sent as relative paths, this way, the remote side can properly 539 de-reference them, since it is likely doing rewrite rules to translate an 540 HTTP path into a local path. 541 """ 542 543 def __init__(self, base, _from_transport=None, http_transport=None): 544 if http_transport is None: 545 # FIXME: the password may be lost here because it appears in the 546 # url only for an intial construction (when the url came from the 547 # command-line). 548 http_url = base[len('bzr+'):] 549 self._http_transport = transport.get_transport_from_url(http_url) 550 else: 551 self._http_transport = http_transport 552 super(RemoteHTTPTransport, self).__init__( 553 base, _from_transport=_from_transport) 554 555 def _build_medium(self): 556 # We let http_transport take care of the credentials 557 return self._http_transport.get_smart_medium(), None 558 559 def _remote_path(self, relpath): 560 """After connecting, HTTP Transport only deals in relative URLs.""" 561 # Adjust the relpath based on which URL this smart transport is 562 # connected to. 563 http_base = urlutils.normalize_url(self.get_smart_medium().base) 564 url = urlutils.join(self.base[len('bzr+'):], relpath) 565 url = urlutils.normalize_url(url) 566 return urlutils.relative_url(http_base, url) 567 568 def clone(self, relative_url): 569 """Make a new RemoteHTTPTransport related to me. 570 571 This is re-implemented rather than using the default 572 RemoteTransport.clone() because we must be careful about the underlying 573 http transport. 574 575 Also, the cloned smart transport will POST to the same .bzr/smart 576 location as this transport (although obviously the relative paths in the 577 smart requests may be different). This is so that the server doesn't 578 have to handle .bzr/smart requests at arbitrary places inside .bzr 579 directories, just at the initial URL the user uses. 580 """ 581 if relative_url: 582 abs_url = self.abspath(relative_url) 583 else: 584 abs_url = self.base 585 return RemoteHTTPTransport(abs_url, 586 _from_transport=self, 587 http_transport=self._http_transport) 588 589 def _redirected_to(self, source, target): 590 """See transport._redirected_to""" 591 redirected = self._http_transport._redirected_to(source, target) 592 if (redirected is not None 593 and isinstance(redirected, type(self._http_transport))): 594 return RemoteHTTPTransport('bzr+' + redirected.external_url(), 595 http_transport=redirected) 596 else: 597 # Either None or a transport for a different protocol 598 return redirected 599 600 601class HintingSSHTransport(transport.Transport): 602 """Simple transport that handles ssh:// and points out bzr+ssh:// and git+ssh://.""" 603 604 # TODO(jelmer): Implement support for detecting whether the repository at the 605 # other end is a git or bzr repository. 606 607 def __init__(self, url): 608 raise errors.UnsupportedProtocol( 609 url, 'Use bzr+ssh for Bazaar operations over SSH, e.g. "bzr+%s". ' 610 'Use git+ssh for Git operations over SSH, e.g. "git+%s".' % (url, url)) 611 612 613def get_test_permutations(): 614 """Return (transport, server) permutations for testing.""" 615 # We may need a little more test framework support to construct an 616 # appropriate RemoteTransport in the future. 617 from ..tests import test_server 618 return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)] 619