1# Copyright (C) 2006-2012, 2016 Canonical Ltd
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17"""RemoteTransport client for the smart-server.
18
19This module shouldn't be accessed directly.  The classes defined here should be
20imported from breezy.bzr.smart.
21"""
22
23__all__ = ['RemoteTransport', 'RemoteTCPTransport', 'RemoteSSHTransport']
24
25from io import BytesIO
26
27from .. import (
28    config,
29    debug,
30    errors,
31    trace,
32    transport,
33    urlutils,
34    )
35from ..bzr import (
36    remote,
37    )
38from ..bzr.smart import client, medium
39
40
41class _SmartStat(object):
42
43    def __init__(self, size, mode):
44        self.st_size = size
45        self.st_mode = mode
46
47
48class RemoteTransport(transport.ConnectedTransport):
49    """Connection to a smart server.
50
51    The connection holds references to the medium that can be used to send
52    requests to the server.
53
54    The connection has a notion of the current directory to which it's
55    connected; this is incorporated in filenames passed to the server.
56
57    This supports some higher-level RPC operations and can also be treated
58    like a Transport to do file-like operations.
59
60    The connection can be made over a tcp socket, an ssh pipe or a series of
61    http requests.  There are concrete subclasses for each type:
62    RemoteTCPTransport, etc.
63    """
64
65    # When making a readv request, cap it at requesting 5MB of data
66    _max_readv_bytes = 5 * 1024 * 1024
67
68    # IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
69    # responsibilities: Put those on SmartClient or similar. This is vital for
70    # the ability to support multiple versions of the smart protocol over time:
71    # RemoteTransport is an adapter from the Transport object model to the
72    # SmartClient model, not an encoder.
73
74    # FIXME: the medium parameter should be private, only the tests requires
75    # it. It may be even clearer to define a TestRemoteTransport that handles
76    # the specific cases of providing a _client and/or a _medium, and leave
77    # RemoteTransport as an abstract class.
78    def __init__(self, url, _from_transport=None, medium=None, _client=None):
79        """Constructor.
80
81        :param _from_transport: Another RemoteTransport instance that this
82            one is being cloned from.  Attributes such as the medium will
83            be reused.
84
85        :param medium: The medium to use for this RemoteTransport.  If None,
86            the medium from the _from_transport is shared.  If both this
87            and _from_transport are None, a new medium will be built.
88            _from_transport and medium cannot both be specified.
89
90        :param _client: Override the _SmartClient used by this transport.  This
91            should only be used for testing purposes; normally this is
92            determined from the medium.
93        """
94        super(RemoteTransport, self).__init__(
95            url, _from_transport=_from_transport)
96
97        # The medium is the connection, except when we need to share it with
98        # other objects (RemoteBzrDir, RemoteRepository etc). In these cases
99        # what we want to share is really the shared connection.
100
101        if (_from_transport is not None
102                and isinstance(_from_transport, RemoteTransport)):
103            _client = _from_transport._client
104        elif _from_transport is None:
105            # If no _from_transport is specified, we need to intialize the
106            # shared medium.
107            credentials = None
108            if medium is None:
109                medium, credentials = self._build_medium()
110                if 'hpss' in debug.debug_flags:
111                    trace.mutter('hpss: Built a new medium: %s',
112                                 medium.__class__.__name__)
113            self._shared_connection = transport._SharedConnection(medium,
114                                                                  credentials,
115                                                                  self.base)
116        elif medium is None:
117            # No medium was specified, so share the medium from the
118            # _from_transport.
119            medium = self._shared_connection.connection
120        else:
121            raise AssertionError(
122                "Both _from_transport (%r) and medium (%r) passed to "
123                "RemoteTransport.__init__, but these parameters are mutally "
124                "exclusive." % (_from_transport, medium))
125
126        if _client is None:
127            self._client = client._SmartClient(medium)
128        else:
129            self._client = _client
130
131    def _build_medium(self):
132        """Create the medium if _from_transport does not provide one.
133
134        The medium is analogous to the connection for ConnectedTransport: it
135        allows connection sharing.
136        """
137        # No credentials
138        return None, None
139
140    def _report_activity(self, bytes, direction):
141        """See Transport._report_activity.
142
143        Does nothing; the smart medium will report activity triggered by a
144        RemoteTransport.
145        """
146        pass
147
148    def is_readonly(self):
149        """Smart server transport can do read/write file operations."""
150        try:
151            resp = self._call2(b'Transport.is_readonly')
152        except errors.UnknownSmartMethod:
153            # XXX: nasty hack: servers before 0.16 don't have a
154            # 'Transport.is_readonly' verb, so we do what clients before 0.16
155            # did: assume False.
156            return False
157        if resp == (b'yes', ):
158            return True
159        elif resp == (b'no', ):
160            return False
161        else:
162            raise errors.UnexpectedSmartServerResponse(resp)
163
164    def get_smart_client(self):
165        return self._get_connection()
166
167    def get_smart_medium(self):
168        return self._get_connection()
169
170    def _remote_path(self, relpath):
171        """Returns the Unicode version of the absolute path for relpath."""
172        path = urlutils.URL._combine_paths(self._parsed_url.path, relpath)
173        if not isinstance(path, bytes):
174            path = path.encode()
175        return path
176
177    def _call(self, method, *args):
178        resp = self._call2(method, *args)
179        self._ensure_ok(resp)
180
181    def _call2(self, method, *args):
182        """Call a method on the remote server."""
183        try:
184            return self._client.call(method, *args)
185        except errors.ErrorFromSmartServer as err:
186            # The first argument, if present, is always a path.
187            if args:
188                context = {'relpath': args[0].decode('utf-8')}
189            else:
190                context = {}
191            self._translate_error(err, **context)
192
193    def _call_with_body_bytes(self, method, args, body):
194        """Call a method on the remote server with body bytes."""
195        try:
196            return self._client.call_with_body_bytes(method, args, body)
197        except errors.ErrorFromSmartServer as err:
198            # The first argument, if present, is always a path.
199            if args:
200                context = {'relpath': args[0]}
201            else:
202                context = {}
203            self._translate_error(err, **context)
204
205    def has(self, relpath):
206        """Indicate whether a remote file of the given name exists or not.
207
208        :see: Transport.has()
209        """
210        resp = self._call2(b'has', self._remote_path(relpath))
211        if resp == (b'yes', ):
212            return True
213        elif resp == (b'no', ):
214            return False
215        else:
216            raise errors.UnexpectedSmartServerResponse(resp)
217
218    def get(self, relpath):
219        """Return file-like object reading the contents of a remote file.
220
221        :see: Transport.get_bytes()/get_file()
222        """
223        return BytesIO(self.get_bytes(relpath))
224
225    def get_bytes(self, relpath):
226        remote = self._remote_path(relpath)
227        try:
228            resp, response_handler = self._client.call_expecting_body(
229                b'get', remote)
230        except errors.ErrorFromSmartServer as err:
231            self._translate_error(err, relpath)
232        if resp != (b'ok', ):
233            response_handler.cancel_read_body()
234            raise errors.UnexpectedSmartServerResponse(resp)
235        return response_handler.read_body_bytes()
236
237    def _serialise_optional_mode(self, mode):
238        if mode is None:
239            return b''
240        else:
241            return ('%d' % mode).encode('ascii')
242
243    def mkdir(self, relpath, mode=None):
244        resp = self._call2(b'mkdir', self._remote_path(relpath),
245                           self._serialise_optional_mode(mode))
246
247    def open_write_stream(self, relpath, mode=None):
248        """See Transport.open_write_stream."""
249        self.put_bytes(relpath, b"", mode)
250        result = transport.AppendBasedFileStream(self, relpath)
251        transport._file_streams[self.abspath(relpath)] = result
252        return result
253
254    def put_bytes(self, relpath, raw_bytes, mode=None):
255        if not isinstance(raw_bytes, bytes):
256            raise TypeError(
257                'raw_bytes must be bytes string, not %s' % type(raw_bytes))
258        resp = self._call_with_body_bytes(
259            b'put',
260            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
261            raw_bytes)
262        self._ensure_ok(resp)
263        return len(raw_bytes)
264
265    def put_bytes_non_atomic(self, relpath, raw_bytes, mode=None,
266                             create_parent_dir=False,
267                             dir_mode=None):
268        """See Transport.put_bytes_non_atomic."""
269        # FIXME: no encoding in the transport!
270        create_parent_str = b'F'
271        if create_parent_dir:
272            create_parent_str = b'T'
273
274        resp = self._call_with_body_bytes(
275            b'put_non_atomic',
276            (self._remote_path(relpath), self._serialise_optional_mode(mode),
277             create_parent_str, self._serialise_optional_mode(dir_mode)),
278            raw_bytes)
279        self._ensure_ok(resp)
280
281    def put_file(self, relpath, upload_file, mode=None):
282        # its not ideal to seek back, but currently put_non_atomic_file depends
283        # on transports not reading before failing - which is a faulty
284        # assumption I think - RBC 20060915
285        pos = upload_file.tell()
286        try:
287            return self.put_bytes(relpath, upload_file.read(), mode)
288        except:
289            upload_file.seek(pos)
290            raise
291
292    def put_file_non_atomic(self, relpath, f, mode=None,
293                            create_parent_dir=False,
294                            dir_mode=None):
295        return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
296                                         create_parent_dir=create_parent_dir,
297                                         dir_mode=dir_mode)
298
299    def append_file(self, relpath, from_file, mode=None):
300        return self.append_bytes(relpath, from_file.read(), mode)
301
302    def append_bytes(self, relpath, bytes, mode=None):
303        resp = self._call_with_body_bytes(
304            b'append',
305            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
306            bytes)
307        if resp[0] == b'appended':
308            return int(resp[1])
309        raise errors.UnexpectedSmartServerResponse(resp)
310
311    def delete(self, relpath):
312        resp = self._call2(b'delete', self._remote_path(relpath))
313        self._ensure_ok(resp)
314
315    def external_url(self):
316        """See breezy.transport.Transport.external_url."""
317        # the external path for RemoteTransports is the base
318        return self.base
319
320    def recommended_page_size(self):
321        """Return the recommended page size for this transport."""
322        return 64 * 1024
323
324    def _readv(self, relpath, offsets):
325        if not offsets:
326            return
327
328        offsets = list(offsets)
329
330        sorted_offsets = sorted(offsets)
331        coalesced = list(self._coalesce_offsets(sorted_offsets,
332                                                limit=self._max_readv_combine,
333                                                fudge_factor=self._bytes_to_read_before_seek,
334                                                max_size=self._max_readv_bytes))
335
336        # now that we've coallesced things, avoid making enormous requests
337        requests = []
338        cur_request = []
339        cur_len = 0
340        for c in coalesced:
341            if c.length + cur_len > self._max_readv_bytes:
342                requests.append(cur_request)
343                cur_request = [c]
344                cur_len = c.length
345                continue
346            cur_request.append(c)
347            cur_len += c.length
348        if cur_request:
349            requests.append(cur_request)
350        if 'hpss' in debug.debug_flags:
351            trace.mutter('%s.readv %s offsets => %s coalesced'
352                         ' => %s requests (%s)',
353                         self.__class__.__name__, len(offsets), len(coalesced),
354                         len(requests), sum(map(len, requests)))
355        # Cache the results, but only until they have been fulfilled
356        data_map = {}
357        # turn the list of offsets into a single stack to iterate
358        offset_stack = iter(offsets)
359        # using a list so it can be modified when passing down and coming back
360        next_offset = [next(offset_stack)]
361        for cur_request in requests:
362            try:
363                result = self._client.call_with_body_readv_array(
364                    (b'readv', self._remote_path(relpath),),
365                    [(c.start, c.length) for c in cur_request])
366                resp, response_handler = result
367            except errors.ErrorFromSmartServer as err:
368                self._translate_error(err, relpath)
369
370            if resp[0] != b'readv':
371                # This should raise an exception
372                response_handler.cancel_read_body()
373                raise errors.UnexpectedSmartServerResponse(resp)
374
375            for res in self._handle_response(offset_stack, cur_request,
376                                             response_handler,
377                                             data_map,
378                                             next_offset):
379                yield res
380
381    def _handle_response(self, offset_stack, coalesced, response_handler,
382                         data_map, next_offset):
383        cur_offset_and_size = next_offset[0]
384        # FIXME: this should know how many bytes are needed, for clarity.
385        data = response_handler.read_body_bytes()
386        data_offset = 0
387        for c_offset in coalesced:
388            if len(data) < c_offset.length:
389                raise errors.ShortReadvError(relpath, c_offset.start,
390                                             c_offset.length, actual=len(data))
391            for suboffset, subsize in c_offset.ranges:
392                key = (c_offset.start + suboffset, subsize)
393                this_data = data[data_offset + suboffset:
394                                 data_offset + suboffset + subsize]
395                # Special case when the data is in-order, rather than packing
396                # into a map and then back out again. Benchmarking shows that
397                # this has 100% hit rate, but leave in the data_map work just
398                # in case.
399                # TODO: Could we get away with using buffer() to avoid the
400                #       memory copy?  Callers would need to realize they may
401                #       not have a real string.
402                if key == cur_offset_and_size:
403                    yield cur_offset_and_size[0], this_data
404                    try:
405                        cur_offset_and_size = next_offset[0] = next(
406                            offset_stack)
407                    except StopIteration:
408                        return
409                else:
410                    data_map[key] = this_data
411            data_offset += c_offset.length
412
413            # Now that we've read some data, see if we can yield anything back
414            while cur_offset_and_size in data_map:
415                this_data = data_map.pop(cur_offset_and_size)
416                yield cur_offset_and_size[0], this_data
417                try:
418                    cur_offset_and_size = next_offset[0] = next(offset_stack)
419                except StopIteration:
420                    return
421
422    def rename(self, rel_from, rel_to):
423        self._call(b'rename',
424                   self._remote_path(rel_from),
425                   self._remote_path(rel_to))
426
427    def move(self, rel_from, rel_to):
428        self._call(b'move',
429                   self._remote_path(rel_from),
430                   self._remote_path(rel_to))
431
432    def rmdir(self, relpath):
433        resp = self._call(b'rmdir', self._remote_path(relpath))
434
435    def _ensure_ok(self, resp):
436        if resp[0] != b'ok':
437            raise errors.UnexpectedSmartServerResponse(resp)
438
439    def _translate_error(self, err, relpath=None):
440        remote._translate_error(err, path=relpath)
441
442    def disconnect(self):
443        m = self.get_smart_medium()
444        if m is not None:
445            m.disconnect()
446
447    def stat(self, relpath):
448        resp = self._call2(b'stat', self._remote_path(relpath))
449        if resp[0] == b'stat':
450            return _SmartStat(int(resp[1]), int(resp[2], 8))
451        raise errors.UnexpectedSmartServerResponse(resp)
452
453    # def lock_read(self, relpath):
454    # """Lock the given file for shared (read) access.
455    # :return: A lock object, which should be passed to Transport.unlock()
456    # """
457    # The old RemoteBranch ignore lock for reading, so we will
458    # continue that tradition and return a bogus lock object.
459    # class BogusLock(object):
460    # def __init__(self, path):
461    ##             self.path = path
462    # def unlock(self):
463    # pass
464    # return BogusLock(relpath)
465
466    def listable(self):
467        return True
468
469    def list_dir(self, relpath):
470        resp = self._call2(b'list_dir', self._remote_path(relpath))
471        if resp[0] == b'names':
472            return [name.decode('utf-8') for name in resp[1:]]
473        raise errors.UnexpectedSmartServerResponse(resp)
474
475    def iter_files_recursive(self):
476        resp = self._call2(b'iter_files_recursive', self._remote_path(''))
477        if resp[0] == b'names':
478            return [name.decode('utf-8') for name in resp[1:]]
479        raise errors.UnexpectedSmartServerResponse(resp)
480
481
482class RemoteTCPTransport(RemoteTransport):
483    """Connection to smart server over plain tcp.
484
485    This is essentially just a factory to get 'RemoteTransport(url,
486        SmartTCPClientMedium).
487    """
488
489    def _build_medium(self):
490        client_medium = medium.SmartTCPClientMedium(
491            self._parsed_url.host, self._parsed_url.port, self.base)
492        return client_medium, None
493
494
495class RemoteTCPTransportV2Only(RemoteTransport):
496    """Connection to smart server over plain tcp with the client hard-coded to
497    assume protocol v2 and remote server version <= 1.6.
498
499    This should only be used for testing.
500    """
501
502    def _build_medium(self):
503        client_medium = medium.SmartTCPClientMedium(
504            self._parsed_url.host, self._parsed_url.port, self.base)
505        client_medium._protocol_version = 2
506        client_medium._remember_remote_is_before((1, 6))
507        return client_medium, None
508
509
510class RemoteSSHTransport(RemoteTransport):
511    """Connection to smart server over SSH.
512
513    This is essentially just a factory to get 'RemoteTransport(url,
514        SmartSSHClientMedium).
515    """
516
517    def _build_medium(self):
518        location_config = config.LocationConfig(self.base)
519        bzr_remote_path = location_config.get_bzr_remote_path()
520        user = self._parsed_url.user
521        if user is None:
522            auth = config.AuthenticationConfig()
523            user = auth.get_user('ssh', self._parsed_url.host,
524                                 self._parsed_url.port)
525        ssh_params = medium.SSHParams(self._parsed_url.host,
526                                      self._parsed_url.port, user, self._parsed_url.password,
527                                      bzr_remote_path)
528        client_medium = medium.SmartSSHClientMedium(self.base, ssh_params)
529        return client_medium, (user, self._parsed_url.password)
530
531
532class RemoteHTTPTransport(RemoteTransport):
533    """Just a way to connect between a bzr+http:// url and http://.
534
535    This connection operates slightly differently than the RemoteSSHTransport.
536    It uses a plain http:// transport underneath, which defines what remote
537    .bzr/smart URL we are connected to. From there, all paths that are sent are
538    sent as relative paths, this way, the remote side can properly
539    de-reference them, since it is likely doing rewrite rules to translate an
540    HTTP path into a local path.
541    """
542
543    def __init__(self, base, _from_transport=None, http_transport=None):
544        if http_transport is None:
545            # FIXME: the password may be lost here because it appears in the
546            # url only for an intial construction (when the url came from the
547            # command-line).
548            http_url = base[len('bzr+'):]
549            self._http_transport = transport.get_transport_from_url(http_url)
550        else:
551            self._http_transport = http_transport
552        super(RemoteHTTPTransport, self).__init__(
553            base, _from_transport=_from_transport)
554
555    def _build_medium(self):
556        # We let http_transport take care of the credentials
557        return self._http_transport.get_smart_medium(), None
558
559    def _remote_path(self, relpath):
560        """After connecting, HTTP Transport only deals in relative URLs."""
561        # Adjust the relpath based on which URL this smart transport is
562        # connected to.
563        http_base = urlutils.normalize_url(self.get_smart_medium().base)
564        url = urlutils.join(self.base[len('bzr+'):], relpath)
565        url = urlutils.normalize_url(url)
566        return urlutils.relative_url(http_base, url)
567
568    def clone(self, relative_url):
569        """Make a new RemoteHTTPTransport related to me.
570
571        This is re-implemented rather than using the default
572        RemoteTransport.clone() because we must be careful about the underlying
573        http transport.
574
575        Also, the cloned smart transport will POST to the same .bzr/smart
576        location as this transport (although obviously the relative paths in the
577        smart requests may be different).  This is so that the server doesn't
578        have to handle .bzr/smart requests at arbitrary places inside .bzr
579        directories, just at the initial URL the user uses.
580        """
581        if relative_url:
582            abs_url = self.abspath(relative_url)
583        else:
584            abs_url = self.base
585        return RemoteHTTPTransport(abs_url,
586                                   _from_transport=self,
587                                   http_transport=self._http_transport)
588
589    def _redirected_to(self, source, target):
590        """See transport._redirected_to"""
591        redirected = self._http_transport._redirected_to(source, target)
592        if (redirected is not None
593                and isinstance(redirected, type(self._http_transport))):
594            return RemoteHTTPTransport('bzr+' + redirected.external_url(),
595                                       http_transport=redirected)
596        else:
597            # Either None or a transport for a different protocol
598            return redirected
599
600
601class HintingSSHTransport(transport.Transport):
602    """Simple transport that handles ssh:// and points out bzr+ssh:// and git+ssh://."""
603
604    # TODO(jelmer): Implement support for detecting whether the repository at the
605    # other end is a git or bzr repository.
606
607    def __init__(self, url):
608        raise errors.UnsupportedProtocol(
609            url, 'Use bzr+ssh for Bazaar operations over SSH, e.g. "bzr+%s". '
610            'Use git+ssh for Git operations over SSH, e.g. "git+%s".' % (url, url))
611
612
613def get_test_permutations():
614    """Return (transport, server) permutations for testing."""
615    # We may need a little more test framework support to construct an
616    # appropriate RemoteTransport in the future.
617    from ..tests import test_server
618    return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)]
619